project_es.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. package main
  2. import (
  3. "esindex/config"
  4. "go.uber.org/zap"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  8. "math"
  9. "reflect"
  10. "regexp"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. )
  15. var (
  16. regLetter = regexp.MustCompile("[a-z]*")
  17. )
  18. func projectTask(data []byte, mapInfo map[string]interface{}) {
  19. defer util.Catch()
  20. q, _ := mapInfo["query"].(map[string]interface{})
  21. if q == nil {
  22. q = map[string]interface{}{
  23. "_id": map[string]interface{}{
  24. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  25. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  26. },
  27. }
  28. } else {
  29. if q["pici"] == nil {
  30. idMap, _ := q["_id"].(map[string]interface{})
  31. if idMap != nil {
  32. tmpQ := map[string]interface{}{}
  33. for c, id := range idMap {
  34. if idStr, ok := id.(string); ok && id != "" {
  35. tmpQ[c] = mongodb.StringTOBsonId(idStr)
  36. }
  37. }
  38. q["_id"] = tmpQ
  39. }
  40. }
  41. }
  42. conn := MgoP.GetMgoConn()
  43. defer MgoP.DestoryMongoConn(conn)
  44. count, _ := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(&q).Count()
  45. log.Info("projectTask", zap.String("coll", config.Conf.DB.MongoP.Coll), zap.Any("查询语句:", q), zap.Int64("同步总数:", count))
  46. query := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(q).Iter()
  47. n := 0
  48. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  49. if n%2000 == 0 {
  50. log.Info("current", zap.Int("count", n))
  51. log.Info("current", zap.Any("_id", tmp["_id"]))
  52. }
  53. newTmp := make(map[string]interface{})
  54. newTmp["s_projectname"] = tmp["projectname"]
  55. for f, ftype := range ProjectField {
  56. if tmp[f] != nil {
  57. if f == "package" {
  58. pp := map[string]map[string]interface{}{}
  59. if packages, ok := tmp["package"].(map[string]interface{}); ok {
  60. for _, pks := range packages {
  61. if pk, ok := pks.([]interface{}); ok {
  62. for _, v := range pk {
  63. if p, ok := v.(map[string]interface{}); ok {
  64. winner := util.ObjToString(p["winner"])
  65. bidamount := util.Float64All((p["bidamount"]))
  66. if len(winner) > 4 && bidamount > 0 {
  67. p := map[string]interface{}{
  68. "winner": winner,
  69. "bidamount": bidamount,
  70. }
  71. pp[winner] = p
  72. }
  73. }
  74. }
  75. }
  76. }
  77. } else {
  78. winner := util.ObjToString(tmp["winner"])
  79. bidamount := util.Float64All(tmp["bidamount"])
  80. if len(winner) > 4 && bidamount > 0 {
  81. p := map[string]interface{}{
  82. "winner": winner,
  83. "bidamount": bidamount,
  84. }
  85. pp[winner] = p
  86. }
  87. }
  88. pk1 := []map[string]interface{}{}
  89. for _, v := range pp {
  90. pk1 = append(pk1, v)
  91. }
  92. if len(pk1) > 0 {
  93. newTmp["package1"] = pk1
  94. }
  95. } else if f == "topscopeclass" {
  96. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  97. tc := []string{}
  98. m2 := map[string]bool{}
  99. for _, v := range topscopeclass {
  100. str := util.ObjToString(v)
  101. str = regLetter.ReplaceAllString(str, "") // 去除字母
  102. if !m2[str] {
  103. m2[str] = true
  104. tc = append(tc, str)
  105. }
  106. }
  107. newTmp["topscopeclass"] = tc
  108. }
  109. } else if f == "list" {
  110. if list, ok := tmp[f].([]interface{}); ok {
  111. var newList []map[string]interface{}
  112. for _, item := range list {
  113. item1 := item.(map[string]interface{})
  114. listm := make(map[string]interface{})
  115. for f1, ftype1 := range ProjectListF {
  116. if item1[f1] != nil {
  117. if f == "topscopeclass" || f == "subscopeclass" {
  118. listm[f] = item1[f1]
  119. } else {
  120. if fieldval := item1[f1]; reflect.TypeOf(fieldval).String() != ftype1 {
  121. continue
  122. } else {
  123. if fieldval != "" {
  124. listm[f1] = fieldval
  125. }
  126. }
  127. }
  128. }
  129. }
  130. newList = append(newList, listm)
  131. }
  132. newTmp[f] = newList
  133. }
  134. } else if f == "budget" || f == "bidamount" || f == "sortprice" {
  135. if tmp[f] != nil && util.Float64All(tmp[f]) <= 1000000000 {
  136. newTmp[f] = tmp[f]
  137. }
  138. } else if f == "projectscope" {
  139. projectscopeRune := []rune(util.ObjToString(tmp[f]))
  140. if len(projectscopeRune) > 1000 {
  141. newTmp[f] = util.ObjToString(tmp[f])[:1000]
  142. } else {
  143. newTmp[f] = tmp[f]
  144. }
  145. } else if f == "ids" || f == "mpc" || f == "mpn" || f == "review_experts" || f == "winnerorder" ||
  146. f == "entidlist" || f == "first_cooperation" || f == "subscopeclass" || f == "jgtime" {
  147. newTmp[f] = tmp[f]
  148. } else if f == "_id" {
  149. newTmp["_id"] = mongodb.BsonIdToSId(tmp["_id"])
  150. newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"])
  151. } else {
  152. if fieldval := tmp[f]; reflect.TypeOf(fieldval).String() != ftype && ftype != "" {
  153. continue
  154. } else {
  155. if fieldval != "" {
  156. newTmp[f] = fieldval
  157. }
  158. }
  159. }
  160. }
  161. }
  162. budget := util.Float64All(newTmp["budget"])
  163. bidamount := util.Float64All(newTmp["bidamount"])
  164. if float64(budget) > 0 && float64(bidamount) > 0 {
  165. rate := float64(1) - float64(bidamount)/float64(budget)
  166. f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
  167. //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
  168. if f < 0 || f > 0.6 {
  169. delete(newTmp, "bidamount")
  170. newTmp["prate_flag"] = 1
  171. } else {
  172. newTmp["project_rate"] = f
  173. }
  174. }
  175. bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期
  176. fzb_publishtime := int64(0) //记录第一个招标信息的publishtime
  177. bidcycle_flag := false //判断是否已计算出标书表编制周期
  178. list := tmp["list"].([]interface{})
  179. for _, m := range list {
  180. tmpM := m.(map[string]interface{})
  181. if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
  182. tmpB := util.Float64All(tmpM["bidamount"])
  183. tmpM["bidamount"] = tmpB
  184. }
  185. //计算bidcycle标书表编制周期字段
  186. if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle
  187. if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" {
  188. zb_bidopentime := util.Int64All(tmpM["bidopentime"])
  189. zb_publishtime := util.Int64All(tmpM["publishtime"])
  190. if zb_publishtime > 0 {
  191. if zb_bidopentime > 0 {
  192. if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 {
  193. f_day := float64(tmpTime) / float64(86400)
  194. day := math.Ceil(f_day)
  195. tmp["bidcycle"] = int(day)
  196. bidcycle_flag = true
  197. }
  198. }
  199. if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime
  200. fzb_publishtime = zb_publishtime
  201. }
  202. }
  203. }
  204. }
  205. }
  206. //计算bidcycle标书表编制周期字段
  207. //list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算
  208. if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 {
  209. if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 {
  210. f_day := float64(tmpTime) / float64(86400)
  211. day := math.Ceil(f_day)
  212. newTmp["bidcycle"] = int(day)
  213. }
  214. }
  215. //项目名称副标题
  216. subtitleProjectname := util.ObjToString(tmp["subtitle_projectname"])
  217. if subtitleProjectname != "" {
  218. newTmp["subtitle_projectname"] = subtitleProjectname
  219. if !cache.Contains(uint32(hash(subtitleProjectname))) {
  220. cache.Add(uint32(hash(subtitleProjectname)))
  221. cacheModify = true
  222. }
  223. } else {
  224. name := getNewName(tmp)
  225. if name != "" {
  226. newTmp["subtitle_projectname"] = name
  227. update := make(map[string]interface{})
  228. update["subtitle_projectname"] = name
  229. res := MgoP.UpdateById(config.Conf.DB.MongoP.Coll, mongodb.BsonIdToSId(tmp["_id"]), map[string]interface{}{"$set": update})
  230. if !res {
  231. log.Info("项目数据", zap.Any(mongodb.BsonIdToSId(tmp["_id"]), "项目名称副标题更新失败"))
  232. }
  233. }
  234. }
  235. saveProjectEsPool <- newTmp
  236. tmp = make(map[string]interface{})
  237. }
  238. log.Info("create project index...over", zap.Any("mapInfo", mapInfo), zap.Int("count", n))
  239. }
  240. // projectDetailTask 项目索引,添加详情字段
  241. func projectDetailTask(data []byte, mapInfo map[string]interface{}) {
  242. defer util.Catch()
  243. q, _ := mapInfo["query"].(map[string]interface{})
  244. if q == nil {
  245. q = map[string]interface{}{
  246. "_id": map[string]interface{}{
  247. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  248. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  249. },
  250. }
  251. } else {
  252. if q["pici"] == nil {
  253. idMap, _ := q["_id"].(map[string]interface{})
  254. if idMap != nil {
  255. tmpQ := map[string]interface{}{}
  256. for c, id := range idMap {
  257. if idStr, ok := id.(string); ok && id != "" {
  258. tmpQ[c] = mongodb.StringTOBsonId(idStr)
  259. }
  260. }
  261. q["_id"] = tmpQ
  262. }
  263. }
  264. }
  265. conn := MgoP.GetMgoConn()
  266. defer MgoP.DestoryMongoConn(conn)
  267. count, _ := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(&q).Count()
  268. log.Info("projectDetailTask", zap.String("coll", config.Conf.DB.MongoP.Coll), zap.Any("查询语句:", q), zap.Int64("同步总数:", count))
  269. query := conn.DB(MgoP.DbName).C(config.Conf.DB.MongoP.Coll).Find(q).Iter()
  270. n := 0
  271. //
  272. ch := make(chan bool, 10)
  273. wg := &sync.WaitGroup{}
  274. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  275. if n%2000 == 0 {
  276. log.Info("current", zap.Int("count", n))
  277. log.Info("current", zap.Any("_id", tmp["_id"]))
  278. }
  279. ch <- true
  280. wg.Add(1)
  281. go func(tmp map[string]interface{}) {
  282. defer func() {
  283. <-ch
  284. wg.Done()
  285. }()
  286. newTmp := make(map[string]interface{})
  287. newTmp["s_projectname"] = tmp["projectname"]
  288. for f, ftype := range ProjectField {
  289. if tmp[f] != nil {
  290. if f == "package" {
  291. pp := map[string]map[string]interface{}{}
  292. if packages, ok := tmp["package"].(map[string]interface{}); ok {
  293. for _, pks := range packages {
  294. if pk, ok := pks.([]interface{}); ok {
  295. for _, v := range pk {
  296. if p, ok := v.(map[string]interface{}); ok {
  297. winner := util.ObjToString(p["winner"])
  298. bidamount := util.Float64All((p["bidamount"]))
  299. if len(winner) > 4 && bidamount > 0 {
  300. p := map[string]interface{}{
  301. "winner": winner,
  302. "bidamount": bidamount,
  303. }
  304. pp[winner] = p
  305. }
  306. }
  307. }
  308. }
  309. }
  310. } else {
  311. winner := util.ObjToString(tmp["winner"])
  312. bidamount := util.Float64All(tmp["bidamount"])
  313. if len(winner) > 4 && bidamount > 0 {
  314. p := map[string]interface{}{
  315. "winner": winner,
  316. "bidamount": bidamount,
  317. }
  318. pp[winner] = p
  319. }
  320. }
  321. pk1 := []map[string]interface{}{}
  322. for _, v := range pp {
  323. pk1 = append(pk1, v)
  324. }
  325. if len(pk1) > 0 {
  326. newTmp["package1"] = pk1
  327. }
  328. } else if f == "topscopeclass" {
  329. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  330. tc := []string{}
  331. m2 := map[string]bool{}
  332. for _, v := range topscopeclass {
  333. str := util.ObjToString(v)
  334. str = regLetter.ReplaceAllString(str, "") // 去除字母
  335. if !m2[str] {
  336. m2[str] = true
  337. tc = append(tc, str)
  338. }
  339. }
  340. newTmp["topscopeclass"] = tc
  341. }
  342. } else if f == "list" {
  343. if list, ok := tmp[f].([]interface{}); ok {
  344. var newList []map[string]interface{}
  345. for _, item := range list {
  346. item1 := item.(map[string]interface{})
  347. listm := make(map[string]interface{})
  348. for f1, ftype1 := range ProjectListF {
  349. if item1[f1] != nil {
  350. if f == "topscopeclass" || f == "subscopeclass" {
  351. listm[f] = item1[f1]
  352. } else {
  353. if fieldval := item1[f1]; reflect.TypeOf(fieldval).String() != ftype1 {
  354. continue
  355. } else {
  356. if fieldval != "" {
  357. listm[f1] = fieldval
  358. }
  359. }
  360. }
  361. }
  362. }
  363. newList = append(newList, listm)
  364. }
  365. newTmp[f] = newList
  366. }
  367. } else if f == "budget" || f == "bidamount" || f == "sortprice" {
  368. if tmp[f] != nil && util.Float64All(tmp[f]) <= 1000000000 {
  369. newTmp[f] = tmp[f]
  370. }
  371. } else if f == "projectscope" {
  372. projectscopeRune := []rune(util.ObjToString(tmp[f]))
  373. if len(projectscopeRune) > 1000 {
  374. newTmp[f] = util.ObjToString(tmp[f])[:1000]
  375. } else {
  376. newTmp[f] = tmp[f]
  377. }
  378. } else if f == "ids" || f == "mpc" || f == "mpn" || f == "review_experts" || f == "winnerorder" ||
  379. f == "entidlist" || f == "first_cooperation" || f == "subscopeclass" || f == "jgtime" {
  380. newTmp[f] = tmp[f]
  381. } else if f == "_id" {
  382. newTmp["_id"] = mongodb.BsonIdToSId(tmp["_id"])
  383. newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"])
  384. } else {
  385. if fieldval := tmp[f]; reflect.TypeOf(fieldval).String() != ftype && ftype != "" {
  386. continue
  387. } else {
  388. if fieldval != "" {
  389. newTmp[f] = fieldval
  390. }
  391. }
  392. }
  393. }
  394. }
  395. budget := util.Float64All(newTmp["budget"])
  396. bidamount := util.Float64All(newTmp["bidamount"])
  397. if float64(budget) > 0 && float64(bidamount) > 0 {
  398. rate := float64(1) - float64(bidamount)/float64(budget)
  399. f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
  400. //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
  401. if f < 0 || f > 0.6 {
  402. delete(newTmp, "bidamount")
  403. newTmp["prate_flag"] = 1
  404. } else {
  405. newTmp["project_rate"] = f
  406. }
  407. }
  408. bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期
  409. fzb_publishtime := int64(0) //记录第一个招标信息的publishtime
  410. bidcycle_flag := false //判断是否已计算出标书表编制周期
  411. //项目详情,辅助字段,处理过的list里面id,英文逗号拼接
  412. detail_ids := util.ObjToString(tmp["detail_ids"]) //1,2,3
  413. detailIds := make([]string, 0)
  414. if detail_ids != "" {
  415. detailIds = strings.Split(detail_ids, ",") //[1,2,3]
  416. }
  417. detail := make([]string, 0) //最终的详情字段
  418. //todo 回去原索引 详情字段
  419. projectID := mongodb.BsonIdToSId(tmp["_id"])
  420. _, oldProject := Es.GetById(config.Conf.DB.Es.IndexPD, projectID)
  421. if oldProject != nil {
  422. oldDetail := util.ObjToString(oldProject["detail"])
  423. if oldDetail != "" {
  424. old_details := strings.Split(oldDetail, " ")
  425. detail = append(detail, old_details...)
  426. }
  427. }
  428. //统计现有字符串长度
  429. totalNumCount := CountChineseCharacters(detail)
  430. list := tmp["list"].([]interface{})
  431. for _, m := range list {
  432. tmpM := m.(map[string]interface{})
  433. if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
  434. tmpB := util.Float64All(tmpM["bidamount"])
  435. tmpM["bidamount"] = tmpB
  436. }
  437. //计算bidcycle标书表编制周期字段
  438. if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle
  439. if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" {
  440. zb_bidopentime := util.Int64All(tmpM["bidopentime"])
  441. zb_publishtime := util.Int64All(tmpM["publishtime"])
  442. if zb_publishtime > 0 {
  443. if zb_bidopentime > 0 {
  444. if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 {
  445. f_day := float64(tmpTime) / float64(86400)
  446. day := math.Ceil(f_day)
  447. tmp["bidcycle"] = int(day)
  448. bidcycle_flag = true
  449. }
  450. }
  451. if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime
  452. fzb_publishtime = zb_publishtime
  453. }
  454. }
  455. }
  456. }
  457. //todo 处理项目详情 新字段;获取es 已有数据,判断是否需要更新detail
  458. infoid := util.ObjToString(tmpM["infoid"])
  459. if infoid != "" && !IsInStringArray(infoid, detailIds) && (totalNumCount < config.Conf.DB.Es.DetailCount) {
  460. detailIds = append(detailIds, infoid)
  461. if infoid > "5a862e7040d2d9bbe88e3b1f" {
  462. biddingData, _ := MgoB.FindById("bidding", infoid, nil)
  463. biddingDetail := util.ObjToString((*biddingData)["detail"])
  464. da, _ := CleanHTMLTags(biddingDetail)
  465. characterArray := SplitTextByChinesePunctuation(da)
  466. detail = append(detail, RemoveDuplicates(characterArray)...)
  467. } else {
  468. biddingData, _ := MgoB.FindById("bidding_back", infoid, nil)
  469. biddingDetail := util.ObjToString((*biddingData)["detail"])
  470. da, _ := CleanHTMLTags(biddingDetail)
  471. characterArray := SplitTextByChinesePunctuation(da)
  472. detail = append(detail, RemoveDuplicates(characterArray)...)
  473. }
  474. }
  475. }
  476. if len(detail) > 0 {
  477. detailNew := RemoveDuplicates(detail)
  478. newTmp["detail"] = strings.Join(detailNew, " ")
  479. }
  480. //计算bidcycle标书表编制周期字段
  481. //list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算
  482. if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 {
  483. if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 {
  484. f_day := float64(tmpTime) / float64(86400)
  485. day := math.Ceil(f_day)
  486. newTmp["bidcycle"] = int(day)
  487. }
  488. }
  489. //todo 这里和上面正常项目索引做了区别,不在单独处理,直接使用数据库数据
  490. //项目名称副标题
  491. subtitleProjectname := util.ObjToString(tmp["subtitle_projectname"])
  492. if subtitleProjectname != "" {
  493. newTmp["subtitle_projectname"] = subtitleProjectname
  494. }
  495. //更新项目表,已经处理过的 标讯id
  496. if len(detailIds) > 0 {
  497. new_bidding_ids := strings.Join(detailIds, ",")
  498. update := map[string]interface{}{
  499. "detail_ids": new_bidding_ids,
  500. }
  501. MgoP.UpdateById(config.Conf.DB.MongoP.Coll, mongodb.BsonIdToSId(tmp["_id"]), map[string]interface{}{"$set": update})
  502. }
  503. saveProjectDetailEsPool <- newTmp
  504. }(tmp)
  505. tmp = map[string]interface{}{}
  506. }
  507. wg.Wait()
  508. log.Info("create projectDetailTask index...over", zap.Any("mapInfo", mapInfo), zap.Int("count", n))
  509. }