project_es.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package main
  2. import (
  3. "esindex/config"
  4. "go.uber.org/zap"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  8. "math"
  9. "reflect"
  10. "regexp"
  11. "strconv"
  12. )
  13. var (
  14. regLetter = regexp.MustCompile("[a-z]*")
  15. )
  16. func projectTask(data []byte, mapInfo map[string]interface{}) {
  17. defer util.Catch()
  18. q, _ := mapInfo["query"].(map[string]interface{})
  19. if q == nil {
  20. q = map[string]interface{}{
  21. "_id": map[string]interface{}{
  22. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  23. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  24. },
  25. }
  26. } else {
  27. if q["pici"] == nil {
  28. idMap, _ := q["_id"].(map[string]interface{})
  29. if idMap != nil {
  30. tmpQ := map[string]interface{}{}
  31. for c, id := range idMap {
  32. if idStr, ok := id.(string); ok && id != "" {
  33. tmpQ[c] = mongodb.StringTOBsonId(idStr)
  34. }
  35. }
  36. q["_id"] = tmpQ
  37. }
  38. }
  39. }
  40. conn := MgoP.GetMgoConn()
  41. defer MgoP.DestoryMongoConn(conn)
  42. count, _ := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(&q).Count()
  43. log.Info("projectTask", zap.String("coll", config.Conf.DB.MongoP.Coll), zap.Any("查询语句:", q), zap.Int64("同步总数:", count))
  44. query := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(q).Iter()
  45. n := 0
  46. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  47. if n%2000 == 0 {
  48. log.Info("current", zap.Int("count", n))
  49. log.Info("current", zap.Any("_id", tmp["_id"]))
  50. }
  51. newTmp := make(map[string]interface{})
  52. newTmp["s_projectname"] = tmp["projectname"]
  53. for f, ftype := range ProjectField {
  54. if tmp[f] != nil {
  55. if f == "package" {
  56. pp := map[string]map[string]interface{}{}
  57. if packages, ok := tmp["package"].(map[string]interface{}); ok {
  58. for _, pks := range packages {
  59. if pk, ok := pks.([]interface{}); ok {
  60. for _, v := range pk {
  61. if p, ok := v.(map[string]interface{}); ok {
  62. winner := util.ObjToString(p["winner"])
  63. bidamount := util.Float64All((p["bidamount"]))
  64. if len(winner) > 4 && bidamount > 0 {
  65. p := map[string]interface{}{
  66. "winner": winner,
  67. "bidamount": bidamount,
  68. }
  69. pp[winner] = p
  70. }
  71. }
  72. }
  73. }
  74. }
  75. } else {
  76. winner := util.ObjToString(tmp["winner"])
  77. bidamount := util.Float64All(tmp["bidamount"])
  78. if len(winner) > 4 && bidamount > 0 {
  79. p := map[string]interface{}{
  80. "winner": winner,
  81. "bidamount": bidamount,
  82. }
  83. pp[winner] = p
  84. }
  85. }
  86. pk1 := []map[string]interface{}{}
  87. for _, v := range pp {
  88. pk1 = append(pk1, v)
  89. }
  90. if len(pk1) > 0 {
  91. newTmp["package1"] = pk1
  92. }
  93. } else if f == "topscopeclass" {
  94. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  95. tc := []string{}
  96. m2 := map[string]bool{}
  97. for _, v := range topscopeclass {
  98. str := util.ObjToString(v)
  99. str = regLetter.ReplaceAllString(str, "") // 去除字母
  100. if !m2[str] {
  101. m2[str] = true
  102. tc = append(tc, str)
  103. }
  104. }
  105. newTmp["topscopeclass"] = tc
  106. }
  107. } else if f == "list" {
  108. if list, ok := tmp[f].([]interface{}); ok {
  109. var newList []map[string]interface{}
  110. for _, item := range list {
  111. item1 := item.(map[string]interface{})
  112. listm := make(map[string]interface{})
  113. for f1, ftype1 := range ProjectListF {
  114. if item1[f1] != nil {
  115. if f == "topscopeclass" || f == "subscopeclass" {
  116. listm[f] = item1[f1]
  117. } else {
  118. if fieldval := item1[f1]; reflect.TypeOf(fieldval).String() != ftype1 {
  119. continue
  120. } else {
  121. if fieldval != "" {
  122. listm[f1] = fieldval
  123. }
  124. }
  125. }
  126. }
  127. }
  128. newList = append(newList, listm)
  129. }
  130. newTmp[f] = newList
  131. }
  132. } else if f == "budget" || f == "bidamount" || f == "sortprice" {
  133. if tmp[f] != nil && util.Float64All(tmp[f]) <= 1000000000 {
  134. newTmp[f] = tmp[f]
  135. }
  136. } else if f == "projectscope" {
  137. projectscopeRune := []rune(util.ObjToString(tmp[f]))
  138. if len(projectscopeRune) > 1000 {
  139. newTmp[f] = util.ObjToString(tmp[f])[:1000]
  140. } else {
  141. newTmp[f] = tmp[f]
  142. }
  143. } else if f == "ids" || f == "mpc" || f == "mpn" || f == "review_experts" || f == "winnerorder" ||
  144. f == "entidlist" || f == "first_cooperation" || f == "subscopeclass" {
  145. newTmp[f] = tmp[f]
  146. } else if f == "_id" {
  147. newTmp["_id"] = mongodb.BsonIdToSId(tmp["_id"])
  148. newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"])
  149. } else {
  150. if fieldval := tmp[f]; reflect.TypeOf(fieldval).String() != ftype {
  151. continue
  152. } else {
  153. if fieldval != "" {
  154. newTmp[f] = fieldval
  155. }
  156. }
  157. }
  158. }
  159. }
  160. budget := util.Float64All(newTmp["budget"])
  161. bidamount := util.Float64All(newTmp["bidamount"])
  162. if float64(budget) > 0 && float64(bidamount) > 0 {
  163. rate := float64(1) - float64(bidamount)/float64(budget)
  164. f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
  165. //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
  166. if f < 0 || f > 0.6 {
  167. delete(newTmp, "bidamount")
  168. newTmp["prate_flag"] = 1
  169. } else {
  170. newTmp["project_rate"] = f
  171. }
  172. }
  173. bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期
  174. fzb_publishtime := int64(0) //记录第一个招标信息的publishtime
  175. bidcycle_flag := false //判断是否已计算出标书表编制周期
  176. list := tmp["list"].([]interface{})
  177. for _, m := range list {
  178. tmpM := m.(map[string]interface{})
  179. if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
  180. tmpB := util.Float64All(tmpM["bidamount"])
  181. tmpM["bidamount"] = tmpB
  182. }
  183. //计算bidcycle标书表编制周期字段
  184. if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle
  185. if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" {
  186. zb_bidopentime := util.Int64All(tmpM["bidopentime"])
  187. zb_publishtime := util.Int64All(tmpM["publishtime"])
  188. if zb_publishtime > 0 {
  189. if zb_bidopentime > 0 {
  190. if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 {
  191. f_day := float64(tmpTime) / float64(86400)
  192. day := math.Ceil(f_day)
  193. tmp["bidcycle"] = int(day)
  194. bidcycle_flag = true
  195. }
  196. }
  197. if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime
  198. fzb_publishtime = zb_publishtime
  199. }
  200. }
  201. }
  202. }
  203. }
  204. //计算bidcycle标书表编制周期字段
  205. //list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算
  206. if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 {
  207. if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 {
  208. f_day := float64(tmpTime) / float64(86400)
  209. day := math.Ceil(f_day)
  210. newTmp["bidcycle"] = int(day)
  211. }
  212. }
  213. saveProjectEsPool <- newTmp
  214. tmp = make(map[string]interface{})
  215. }
  216. log.Info("create project index...over", zap.Any("mapInfo", mapInfo), zap.Int("count", n))
  217. }