biddingindexback2.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "mongodb"
  6. qutil "qfw/util"
  7. elastic "qfw/util/elastic"
  8. "strings"
  9. //elastic "qfw/util/elastic_v5"
  10. // "strings"
  11. "sync"
  12. "time"
  13. "gopkg.in/mgo.v2/bson"
  14. )
  15. func biddingBackTask2(data []byte, mapInfo map[string]interface{}) {
  16. defer qutil.Catch()
  17. q, _ := mapInfo["query"].(map[string]interface{})
  18. if q == nil {
  19. q = map[string]interface{}{
  20. "_id": bson.M{
  21. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  22. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  23. },
  24. }
  25. }
  26. c, _ := mapInfo["coll"].(string)
  27. if c == "" {
  28. c, _ = biddingback["collect"].(string)
  29. }
  30. cs := strings.Split(c, ",")
  31. for _, c := range cs {
  32. //bidding库
  33. session := mgo.GetMgoConn()
  34. defer mgo.DestoryMongoConn(session)
  35. //连接信息
  36. db, _ := biddingback["db"].(string)
  37. index, _ := biddingback["index"].(string)
  38. itype, _ := biddingback["type"].(string)
  39. count, _ := session.DB(db).C(c).Find(&q).Count()
  40. //线程池
  41. UpdatesLock := sync.Mutex{}
  42. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  43. query := session.DB(db).C(c).Find(q).Select(bson.M{
  44. "contenthtml": 0,
  45. "s_sha": 0,
  46. }).Sort("_id").Iter()
  47. //查询抽取结果
  48. n := 0
  49. //更新数组
  50. arrEs := []map[string]interface{}{}
  51. //对比两张表数据,减少查询次数
  52. thread := qutil.IntAll(mapInfo["thread"])
  53. //不传为0只生成招标索引,1生成招标+预览,2只生成预览
  54. if thread < 1 {
  55. thread = 3
  56. }
  57. log.Println("es线程数:", thread)
  58. espool := make(chan bool, thread)
  59. now1 := time.Now().Unix()
  60. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  61. if qutil.IntAll(tmp["extracttype"]) == -1 {
  62. tmp = make(map[string]interface{})
  63. continue
  64. }
  65. ct := qutil.Int64All(tmp["comeintime"])
  66. pt := qutil.Int64All(tmp["publishtime"])
  67. if pt > ct+86400 || pt > now1 { //时间问题,需要更新
  68. if ct > now1 {
  69. ct = now1
  70. }
  71. tmp["publishtime"] = ct
  72. }
  73. ps, _ := tmp["projectscope"].(string)
  74. if ps == "" {
  75. tmp["projectscope"] = "" //= tmp["detail"]
  76. }
  77. if len(ps) > ESLEN {
  78. tmp["projectscope"] = string(([]rune(ps))[:4000])
  79. }
  80. if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  81. tmp["budget"] = nil
  82. } else if sbd, ok := tmp["budget"].(string); ok {
  83. tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  84. }
  85. if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  86. tmp["bidamount"] = nil
  87. } else if sbd, ok := tmp["bidamount"].(string); ok {
  88. tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  89. }
  90. UpdatesLock.Lock()
  91. newTmp := map[string]interface{}{}
  92. for _, v := range biddingIndexFields {
  93. if tmp[v] != nil {
  94. if "projectinfo" == v {
  95. //处理附件 content
  96. mp, _ := tmp[v].(map[string]interface{})
  97. if mp != nil {
  98. newmap := map[string]interface{}{}
  99. for _, v1 := range projectinfoFields {
  100. if mp[v1] != nil {
  101. newmap[v1] = mp[v1]
  102. }
  103. }
  104. if len(newmap) > 0 {
  105. newTmp[v] = newmap
  106. }
  107. // attachments := mp["attachments"]
  108. // con := ""
  109. // if attachments != nil {
  110. // am, _ := attachments.(map[string]interface{})
  111. // if am != nil {
  112. // for _, v1 := range am {
  113. // vm, _ := v1.(map[string]interface{})
  114. // if vm != nil {
  115. // c, _ := vm["content"].(string)
  116. // con += c
  117. // }
  118. // }
  119. // }
  120. // }
  121. // if con != "" {
  122. // con = FilterDetailSpace(con)
  123. // newTmp["attachments"] = con
  124. // }
  125. }
  126. } else {
  127. if v == "detail" {
  128. detail, _ := tmp[v].(string)
  129. newTmp[v] = FilterDetail(detail)
  130. } else {
  131. newTmp[v] = tmp[v]
  132. }
  133. }
  134. } else if v == "budget" || v == "bidamount" {
  135. newTmp[v] = nil
  136. }
  137. }
  138. arrEs = append(arrEs, newTmp)
  139. if len(arrEs) >= BulkSizeBack {
  140. tmps := arrEs
  141. espool <- true
  142. go func(tmps []map[string]interface{}) {
  143. defer func() {
  144. <-espool
  145. }()
  146. elastic.BulkSave(index, itype, &tmps, true)
  147. }(tmps)
  148. arrEs = []map[string]interface{}{}
  149. }
  150. UpdatesLock.Unlock()
  151. if n%1000 == 0 {
  152. log.Println("current:", n, mongodb.BsonIdToSId(tmp["_id"]))
  153. }
  154. tmp = make(map[string]interface{})
  155. }
  156. UpdatesLock.Lock()
  157. if len(arrEs) > 0 {
  158. tmps := arrEs
  159. elastic.BulkSave(index, itype, &tmps, true)
  160. }
  161. UpdatesLock.Unlock()
  162. log.Println(mapInfo, "create biddingback2 index...over", c, n)
  163. }
  164. }