biddingdata.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson"
  4. util "utils"
  5. "utils/mongodb"
  6. )
  7. func (t *TaskInfo) biddingDataTask(data []byte, mapInfo map[string]interface{}) {
  8. defer util.Catch()
  9. q, _ := mapInfo["query"].(map[string]interface{})
  10. if q == nil {
  11. q = map[string]interface{}{
  12. "_id": bson.M{
  13. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  14. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  15. },
  16. }
  17. } else {
  18. idMap := q["_id"].(map[string]interface{})
  19. tmpQ := map[string]interface{}{}
  20. for c, id := range idMap {
  21. if idStr, ok := id.(string); ok && id != "" {
  22. tmpQ[c] = mongodb.StringTOBsonId(idStr)
  23. }
  24. }
  25. q["_id"] = tmpQ
  26. }
  27. //bidding库
  28. biddingConn := biddingMgo.GetMgoConn()
  29. defer biddingMgo.DestoryMongoConn(biddingConn)
  30. //连接信息
  31. c, _ := mapInfo["coll"].(string)
  32. if c == "" {
  33. c, _ = bidding["collect"].(string)
  34. } else {
  35. currentColl = c
  36. }
  37. count, _ := biddingConn.DB(biddingMgo.DbName).C(c).Find(&q).Count()
  38. util.Debug("查询语句:", q, "同步总数:", count, "elastic库:")
  39. //查询招标数据
  40. query := biddingConn.DB(biddingMgo.DbName).C(c).Find(q).Select(bson.M{
  41. "projectname": 1,
  42. "title": 1,
  43. "site": 1,
  44. "href": 1,
  45. "publishtime": 1,
  46. "spidercode": 1,
  47. "extracttype": 1,
  48. }).Sort("_id").Iter()
  49. n := 0
  50. //更新数组
  51. var mpool = make(chan bool, t.thread)
  52. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  53. if n%20000 == 0 {
  54. util.Debug("current:", n, tmp["_id"])
  55. }
  56. mpool <- true
  57. go func(tmp map[string]interface{}) {
  58. defer func() {
  59. <-mpool
  60. }()
  61. newTmp := GetEsField(tmp, nil, t.stype)
  62. newTmp["extracttype"] = util.IntAll(tmp["extracttype"])
  63. saveEsAllPool <- newTmp
  64. }(tmp)
  65. tmp = make(map[string]interface{})
  66. }
  67. util.Debug(mapInfo, "create biddingdata index...over", n)
  68. }