main.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson"
  4. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  6. "log"
  7. "strings"
  8. "time"
  9. )
  10. var (
  11. MgoB *mongodb.MongodbSim
  12. updatePool = make(chan []map[string]interface{}, 5000)
  13. )
  14. func InitMgo() {
  15. MgoB = &mongodb.MongodbSim{
  16. //MongodbAddr: "172.17.189.140:27080",
  17. MongodbAddr: "127.0.0.1:27083",
  18. Size: 10,
  19. DbName: "qfw",
  20. UserName: "SJZY_RWbid_ES",
  21. Password: "SJZY@B4i4D5e6S",
  22. Direct: true,
  23. }
  24. MgoB.InitPool()
  25. }
  26. func main() {
  27. InitMgo()
  28. getBidding()
  29. //go updateMethod()
  30. //dealNavColumn()
  31. log.Println("over")
  32. //select {}
  33. }
  34. func dealNavColumn() {
  35. columns := make([]map[string]interface{}, 0)
  36. column := []string{"招标公告", "招标预告", "招标结果", "招标信用", "采购意向", "项目分包", "企业直采",
  37. "政府采购", "拟在建项目", "审批项目", "推荐项目", "业主委托项目", "热门项目", "新兴项目",
  38. "国家级项目", "省级项目"}
  39. for k, v := range column {
  40. column := map[string]interface{}{
  41. "name": v,
  42. "sort": k + 1,
  43. }
  44. columns = append(columns, column)
  45. }
  46. now := time.Now()
  47. where := map[string]interface{}{
  48. "comeintime": map[string]interface{}{
  49. "$gte": now.AddDate(-1, 0, 0).Unix(),
  50. },
  51. }
  52. sess := MgoB.GetMgoConn()
  53. defer MgoB.DestoryMongoConn(sess)
  54. //
  55. query := sess.DB("qfw").C("bidding").Find(&where).Select(map[string]interface{}{
  56. "contenthtml": 0}).Iter()
  57. count := 0
  58. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  59. if count%1000 == 0 {
  60. log.Println("current", count)
  61. }
  62. //针对产权数据,暂时不入es 索引库
  63. if util.IntAll(tmp["infoformat"]) == 3 {
  64. continue
  65. }
  66. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  67. tmp = make(map[string]interface{})
  68. continue
  69. }
  70. if util.IntAll(tmp["extracttype"]) != 1 {
  71. continue
  72. }
  73. title := util.ObjToString(tmp["title"])
  74. if !strings.Contains(title, "省级财政资金") {
  75. continue
  76. } else {
  77. rea := TagBidding(tmp)
  78. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  79. insert := map[string]interface{}{
  80. "bidding_id": mongodb.BsonIdToSId(tmp["_id"]),
  81. }
  82. MgoB.Save("wcc_bidding_id", insert)
  83. log.Println("bidding_id", mongodb.BsonIdToSId(tmp["_id"]))
  84. updatePool <- []map[string]interface{}{
  85. {"_id": tmp["_id"]},
  86. {"$set": bson.M{
  87. "nav_column": reb,
  88. }},
  89. }
  90. }
  91. }
  92. }
  93. // updateMethod 更新MongoDB
  94. func updateMethod() {
  95. updateSp := make(chan bool, 2)
  96. arru := make([][]map[string]interface{}, 200)
  97. indexu := 0
  98. for {
  99. select {
  100. case v := <-updatePool:
  101. arru[indexu] = v
  102. indexu++
  103. if indexu == 200 {
  104. updateSp <- true
  105. go func(arru [][]map[string]interface{}) {
  106. defer func() {
  107. <-updateSp
  108. }()
  109. MgoB.UpdateBulk("bidding", arru...)
  110. }(arru)
  111. arru = make([][]map[string]interface{}, 200)
  112. indexu = 0
  113. }
  114. case <-time.After(1000 * time.Millisecond):
  115. if indexu > 0 {
  116. updateSp <- true
  117. go func(arru [][]map[string]interface{}) {
  118. defer func() {
  119. <-updateSp
  120. }()
  121. MgoB.UpdateBulk("bidding", arru...)
  122. }(arru[:indexu])
  123. arru = make([][]map[string]interface{}, 200)
  124. indexu = 0
  125. }
  126. }
  127. }
  128. }
  129. func hots() {
  130. exists := make(map[string]bool)
  131. res, _ := MgoB.Find("bidding_hots", nil, map[string]interface{}{"createtime": -1}, nil, false, -1, -1)
  132. for _, v := range *res {
  133. biddingID := util.ObjToString(v["bidding_id"])
  134. if !exists[biddingID] {
  135. exists[biddingID] = true
  136. }
  137. }
  138. data := make([]map[string]interface{}, 0)
  139. for _, v := range *res {
  140. biddingID := util.ObjToString(v["bidding_id"])
  141. if exists[biddingID] {
  142. data = append(data, v)
  143. exists[biddingID] = false
  144. }
  145. }
  146. MgoB.SaveBulk("wcc_hots", data...)
  147. }