projectindex.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package main
  2. import (
  3. "math"
  4. "strconv"
  5. //"fmt"
  6. "log"
  7. "qfw/util"
  8. elastic "qfw/util/elastic"
  9. mgov "gopkg.in/mgo.v2"
  10. "gopkg.in/mgo.v2/bson"
  11. )
  12. func projectTask(data []byte, project, mapInfo map[string]interface{}) {
  13. defer util.Catch()
  14. q, _ := mapInfo["query"].(map[string]interface{})
  15. if q == nil {
  16. q = map[string]interface{}{
  17. "_id": bson.M{
  18. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  19. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  20. },
  21. }
  22. } else {
  23. if q["pici"] == nil {
  24. idMap, _ := q["_id"].(map[string]interface{})
  25. if idMap != nil {
  26. tmpQ := map[string]interface{}{}
  27. for c, id := range idMap {
  28. if idStr, ok := id.(string); ok && id != "" {
  29. tmpQ[c] = util.StringTOBsonId(idStr)
  30. }
  31. }
  32. q["_id"] = tmpQ
  33. }
  34. }
  35. }
  36. var session *mgov.Session
  37. if project["addr"] != nil {
  38. session = project2db.GetMgoConn(3600)
  39. defer project2db.DestoryMongoConn(session)
  40. } else {
  41. session = extractmgo.GetMgoConn(3600)
  42. defer extractmgo.DestoryMongoConn(session)
  43. }
  44. c, _ := project["collect"].(string)
  45. db, _ := project["db"].(string)
  46. index, _ := project["index"].(string)
  47. itype, _ := project["type"].(string)
  48. count, _ := session.DB(db).C(c).Find(&q).Count()
  49. savepool := make(chan bool, 10)
  50. log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  51. query := session.DB(db).C(c).Find(q).Iter()
  52. arr := make([]map[string]interface{}, savesizei)
  53. var n int
  54. i := 0
  55. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  56. pp := map[string]map[string]interface{}{}
  57. if packages, ok := tmp["package"].(map[string]interface{}); ok {
  58. for _, pks := range packages {
  59. if pk, ok := pks.([]interface{}); ok {
  60. for _, v := range pk {
  61. if p, ok := v.(map[string]interface{}); ok {
  62. winner := util.ObjToString(p["winner"])
  63. bidamount := util.Float64All((p["bidamount"]))
  64. if len(winner) > 4 && bidamount > 0 {
  65. p := map[string]interface{}{
  66. "winner": winner,
  67. "bidamount": bidamount,
  68. }
  69. pp[winner] = p
  70. }
  71. }
  72. }
  73. }
  74. }
  75. } else {
  76. winner := util.ObjToString(tmp["winner"])
  77. bidamount := util.Float64All(tmp["bidamount"])
  78. if len(winner) > 4 && bidamount > 0 {
  79. p := map[string]interface{}{
  80. "winner": winner,
  81. "bidamount": bidamount,
  82. }
  83. pp[winner] = p
  84. }
  85. }
  86. pk1 := []map[string]interface{}{}
  87. for _, v := range pp {
  88. pk1 = append(pk1, v)
  89. }
  90. if len(pk1) > 0 {
  91. tmp["package1"] = pk1
  92. }
  93. budget := util.Float64All(tmp["budget"])
  94. bidamount := util.Float64All(tmp["bidamount"])
  95. if float64(budget) > 0 && float64(bidamount) > 0 {
  96. rate := float64(1) - float64(bidamount)/float64(budget)
  97. f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
  98. //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
  99. if f < 0 || f > 0.6 {
  100. delete(tmp, "bidamount")
  101. tmp["prate_flag"] = 1
  102. } else {
  103. tmp["project_rate"] = f
  104. }
  105. }
  106. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  107. tc := []string{}
  108. m2 := map[string]bool{}
  109. for _, v := range topscopeclass {
  110. str := util.ObjToString(v)
  111. str = reg_letter.ReplaceAllString(str, "") // 去除字母
  112. if !m2[str] {
  113. m2[str] = true
  114. tc = append(tc, str)
  115. }
  116. }
  117. tmp["topscopeclass"] = tc
  118. }
  119. //不生索引字段
  120. delete(tmp, "package")
  121. delete(tmp, "infofield")
  122. bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期
  123. fzb_publishtime := int64(0) //记录第一个招标信息的publishtime
  124. bidcycle_flag := false //判断是否已计算出标书表编制周期
  125. list := tmp["list"].([]interface{})
  126. for _, m := range list {
  127. tmpM := m.(map[string]interface{})
  128. //删除purchasing,review_experts
  129. delete(tmpM, "purchasing")
  130. delete(tmpM, "review_experts")
  131. if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
  132. tmpB := util.Float64All(tmpM["bidamount"])
  133. tmpM["bidamount"] = tmpB
  134. }
  135. //projectscope截断
  136. listProjectscopeRune := []rune(util.ObjToString(tmpM["projectscope"]))
  137. if len(listProjectscopeRune) > 1000 {
  138. tmpM["projectscope"] = string(listProjectscopeRune[:1000])
  139. }
  140. //计算bidcycle标书表编制周期字段
  141. if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle
  142. if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" {
  143. zb_bidopentime := util.Int64All(tmpM["bidopentime"])
  144. zb_publishtime := util.Int64All(tmpM["publishtime"])
  145. if zb_publishtime > 0 {
  146. if zb_bidopentime > 0 {
  147. if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 {
  148. f_day := float64(tmpTime) / float64(86400)
  149. day := math.Ceil(f_day)
  150. tmp["bidcycle"] = int(day)
  151. bidcycle_flag = true
  152. }
  153. }
  154. if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime
  155. fzb_publishtime = zb_publishtime
  156. }
  157. }
  158. }
  159. }
  160. }
  161. //计算bidcycle标书表编制周期字段
  162. //list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算
  163. if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 {
  164. if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 {
  165. f_day := float64(tmpTime) / float64(86400)
  166. day := math.Ceil(f_day)
  167. tmp["bidcycle"] = int(day)
  168. }
  169. }
  170. //projectscope截断
  171. projectscopeRune := []rune(util.ObjToString(tmp["projectscope"]))
  172. if len(projectscopeRune) > 1000 {
  173. tmp["projectscope"] = string(projectscopeRune[:1000])
  174. }
  175. // if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  176. // tmp["budget"] = nil
  177. // }
  178. // if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  179. // tmp["bidamount"] = nil
  180. // }
  181. //go IS.Add("project")
  182. tmp["s_projectname"] = tmp["projectname"]
  183. arr[i] = tmp
  184. n++
  185. if i == savesizei-1 {
  186. savepool <- true
  187. tmps := arr
  188. go func(tmpn *[]map[string]interface{}) {
  189. defer func() {
  190. <-savepool
  191. }()
  192. elastic.BulkSave(index, itype, tmpn, true)
  193. }(&tmps)
  194. i = 0
  195. arr = make([]map[string]interface{}, savesizei)
  196. }
  197. if n%savesizei == 0 {
  198. log.Println("当前:", n)
  199. }
  200. tmp = make(map[string]interface{})
  201. }
  202. if i > 0 {
  203. //util.Debug(arr)
  204. elastic.BulkSave(index, itype, &arr, true)
  205. }
  206. log.Println(mapInfo, "create project index...over", n)
  207. }