project_es.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "esindex/config"
  7. "go.uber.org/zap"
  8. "math"
  9. "regexp"
  10. "strconv"
  11. )
  12. var (
  13. regLetter = regexp.MustCompile("[a-z]*")
  14. )
  15. func projectTask(data []byte, mapInfo map[string]interface{}) {
  16. defer util.Catch()
  17. q, _ := mapInfo["query"].(map[string]interface{})
  18. if q == nil {
  19. q = map[string]interface{}{
  20. "_id": map[string]interface{}{
  21. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  22. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  23. },
  24. }
  25. } else {
  26. if q["pici"] == nil {
  27. idMap, _ := q["_id"].(map[string]interface{})
  28. if idMap != nil {
  29. tmpQ := map[string]interface{}{}
  30. for c, id := range idMap {
  31. if idStr, ok := id.(string); ok && id != "" {
  32. tmpQ[c] = mongodb.StringTOBsonId(idStr)
  33. }
  34. }
  35. q["_id"] = tmpQ
  36. }
  37. }
  38. }
  39. conn := MgoP.GetMgoConn()
  40. defer MgoP.DestoryMongoConn(conn)
  41. count, _ := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(&q).Count()
  42. log.Info("projectTask", zap.String("coll", config.Conf.DB.MongoP.Coll), zap.Any("查询语句:", q), zap.Int64("同步总数:", count))
  43. query := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(q).Iter()
  44. n := 0
  45. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  46. if n%2000 == 0 {
  47. log.Info("current", zap.Int("count", n))
  48. }
  49. pp := map[string]map[string]interface{}{}
  50. if packages, ok := tmp["package"].(map[string]interface{}); ok {
  51. for _, pks := range packages {
  52. if pk, ok := pks.([]interface{}); ok {
  53. for _, v := range pk {
  54. if p, ok := v.(map[string]interface{}); ok {
  55. winner := util.ObjToString(p["winner"])
  56. bidamount := util.Float64All((p["bidamount"]))
  57. if len(winner) > 4 && bidamount > 0 {
  58. p := map[string]interface{}{
  59. "winner": winner,
  60. "bidamount": bidamount,
  61. }
  62. pp[winner] = p
  63. }
  64. }
  65. }
  66. }
  67. }
  68. } else {
  69. winner := util.ObjToString(tmp["winner"])
  70. bidamount := util.Float64All(tmp["bidamount"])
  71. if len(winner) > 4 && bidamount > 0 {
  72. p := map[string]interface{}{
  73. "winner": winner,
  74. "bidamount": bidamount,
  75. }
  76. pp[winner] = p
  77. }
  78. }
  79. pk1 := []map[string]interface{}{}
  80. for _, v := range pp {
  81. pk1 = append(pk1, v)
  82. }
  83. if len(pk1) > 0 {
  84. tmp["package1"] = pk1
  85. }
  86. budget := util.Float64All(tmp["budget"])
  87. bidamount := util.Float64All(tmp["bidamount"])
  88. if float64(budget) > 0 && float64(bidamount) > 0 {
  89. rate := float64(1) - float64(bidamount)/float64(budget)
  90. f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
  91. //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
  92. if f < 0 || f > 0.6 {
  93. delete(tmp, "bidamount")
  94. tmp["prate_flag"] = 1
  95. } else {
  96. tmp["project_rate"] = f
  97. }
  98. }
  99. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  100. tc := []string{}
  101. m2 := map[string]bool{}
  102. for _, v := range topscopeclass {
  103. str := util.ObjToString(v)
  104. str = regLetter.ReplaceAllString(str, "") // 去除字母
  105. if !m2[str] {
  106. m2[str] = true
  107. tc = append(tc, str)
  108. }
  109. }
  110. tmp["topscopeclass"] = tc
  111. }
  112. //不生索引字段
  113. delete(tmp, "package")
  114. delete(tmp, "infofield")
  115. bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期
  116. fzb_publishtime := int64(0) //记录第一个招标信息的publishtime
  117. bidcycle_flag := false //判断是否已计算出标书表编制周期
  118. list := tmp["list"].([]interface{})
  119. for _, m := range list {
  120. tmpM := m.(map[string]interface{})
  121. //删除purchasing,review_experts
  122. delete(tmpM, "purchasing")
  123. delete(tmpM, "review_experts")
  124. if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
  125. tmpB := util.Float64All(tmpM["bidamount"])
  126. tmpM["bidamount"] = tmpB
  127. }
  128. //projectscope截断
  129. listProjectscopeRune := []rune(util.ObjToString(tmpM["projectscope"]))
  130. if len(listProjectscopeRune) > 1000 {
  131. tmpM["projectscope"] = string(listProjectscopeRune[:1000])
  132. }
  133. //计算bidcycle标书表编制周期字段
  134. if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle
  135. if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" {
  136. zb_bidopentime := util.Int64All(tmpM["bidopentime"])
  137. zb_publishtime := util.Int64All(tmpM["publishtime"])
  138. if zb_publishtime > 0 {
  139. if zb_bidopentime > 0 {
  140. if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 {
  141. f_day := float64(tmpTime) / float64(86400)
  142. day := math.Ceil(f_day)
  143. tmp["bidcycle"] = int(day)
  144. bidcycle_flag = true
  145. }
  146. }
  147. if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime
  148. fzb_publishtime = zb_publishtime
  149. }
  150. }
  151. }
  152. }
  153. }
  154. //计算bidcycle标书表编制周期字段
  155. //list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算
  156. if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 {
  157. if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 {
  158. f_day := float64(tmpTime) / float64(86400)
  159. day := math.Ceil(f_day)
  160. tmp["bidcycle"] = int(day)
  161. }
  162. }
  163. //projectscope截断
  164. projectscopeRune := []rune(util.ObjToString(tmp["projectscope"]))
  165. if len(projectscopeRune) > 1000 {
  166. tmp["projectscope"] = string(projectscopeRune[:1000])
  167. }
  168. // if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  169. // tmp["budget"] = nil
  170. // }
  171. // if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  172. // tmp["bidamount"] = nil
  173. // }
  174. //go IS.Add("project")
  175. if tmp["bidamount"] != nil && util.Float64All(tmp["bidamount"]) > 1000000000 {
  176. delete(tmp, "bidamount")
  177. }
  178. if tmp["budget"] != nil && util.Float64All(tmp["budget"]) > 1000000000 {
  179. delete(tmp, "budget")
  180. }
  181. tmp["s_projectname"] = tmp["projectname"]
  182. saveProjectEsPool <- tmp
  183. tmp = make(map[string]interface{})
  184. }
  185. log.Info("create project index...over", zap.Any("mapInfo", mapInfo), zap.Int("count", n))
  186. }