biddingindexback2.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. qutil "qfw/util"
  6. elastic "qfw/util/elastic"
  7. "strings"
  8. //elastic "qfw/util/elastic_v5"
  9. // "strings"
  10. "sync"
  11. "time"
  12. "gopkg.in/mgo.v2/bson"
  13. )
  14. func biddingBackTask2(data []byte, mapInfo map[string]interface{}) {
  15. defer qutil.Catch()
  16. q, _ := mapInfo["query"].(map[string]interface{})
  17. if q == nil {
  18. q = map[string]interface{}{
  19. "_id": bson.M{
  20. "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
  21. "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
  22. },
  23. }
  24. }
  25. c, _ := mapInfo["coll"].(string)
  26. if c == "" {
  27. c, _ = biddingback["collect"].(string)
  28. }
  29. cs := strings.Split(c, ",")
  30. for _, c := range cs {
  31. //bidding库
  32. session := mgo.GetMgoConn(86400)
  33. defer mgo.DestoryMongoConn(session)
  34. //连接信息
  35. db, _ := biddingback["db"].(string)
  36. index, _ := biddingback["index"].(string)
  37. itype, _ := biddingback["type"].(string)
  38. count, _ := session.DB(db).C(c).Find(&q).Count()
  39. //线程池
  40. UpdatesLock := sync.Mutex{}
  41. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  42. query := session.DB(db).C(c).Find(q).Select(bson.M{
  43. "contenthtml": 0,
  44. "s_sha": 0,
  45. }).Sort("_id").Iter()
  46. //查询抽取结果
  47. n := 0
  48. //更新数组
  49. arrEs := []map[string]interface{}{}
  50. //对比两张表数据,减少查询次数
  51. thread := qutil.IntAll(mapInfo["thread"])
  52. //不传为0只生成招标索引,1生成招标+预览,2只生成预览
  53. if thread < 1 {
  54. thread = 3
  55. }
  56. log.Println("es线程数:", thread)
  57. espool := make(chan bool, thread)
  58. now1 := time.Now().Unix()
  59. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  60. if qutil.IntAll(tmp["extracttype"]) == -1 {
  61. tmp = make(map[string]interface{})
  62. continue
  63. }
  64. ct := qutil.Int64All(tmp["comeintime"])
  65. pt := qutil.Int64All(tmp["publishtime"])
  66. if pt > ct+86400 || pt > now1 { //时间问题,需要更新
  67. if ct > now1 {
  68. ct = now1
  69. }
  70. tmp["publishtime"] = ct
  71. }
  72. ps, _ := tmp["projectscope"].(string)
  73. if ps == "" {
  74. tmp["projectscope"] = "" //= tmp["detail"]
  75. }
  76. if len(ps) > ESLEN {
  77. tmp["projectscope"] = string(([]rune(ps))[:4000])
  78. }
  79. if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  80. tmp["budget"] = nil
  81. } else if sbd, ok := tmp["budget"].(string); ok {
  82. tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  83. }
  84. if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  85. tmp["bidamount"] = nil
  86. } else if sbd, ok := tmp["bidamount"].(string); ok {
  87. tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  88. }
  89. UpdatesLock.Lock()
  90. newTmp := map[string]interface{}{}
  91. for _, v := range biddingIndexFields {
  92. if tmp[v] != nil {
  93. if "projectinfo" == v {
  94. //处理附件 content
  95. mp, _ := tmp[v].(map[string]interface{})
  96. if mp != nil {
  97. newmap := map[string]interface{}{}
  98. for _, v1 := range projectinfoFields {
  99. if mp[v1] != nil {
  100. newmap[v1] = mp[v1]
  101. }
  102. }
  103. if len(newmap) > 0 {
  104. newTmp[v] = newmap
  105. }
  106. attachments := mp["attachments"]
  107. con := ""
  108. if attachments != nil {
  109. am, _ := attachments.(map[string]interface{})
  110. if am != nil {
  111. for _, v1 := range am {
  112. vm, _ := v1.(map[string]interface{})
  113. if vm != nil {
  114. c, _ := vm["content"].(string)
  115. con += c
  116. }
  117. }
  118. }
  119. }
  120. if con != "" {
  121. con = FilterDetailSpace(con)
  122. newTmp["attachments"] = con
  123. }
  124. }
  125. } else {
  126. if v == "detail" {
  127. detail, _ := tmp[v].(string)
  128. newTmp[v] = FilterDetail(detail)
  129. } else {
  130. newTmp[v] = tmp[v]
  131. }
  132. }
  133. } else if v == "budget" || v == "bidamount" {
  134. newTmp[v] = nil
  135. }
  136. }
  137. arrEs = append(arrEs, newTmp)
  138. if len(arrEs) >= BulkSizeBack {
  139. tmps := arrEs
  140. espool <- true
  141. go func(tmps []map[string]interface{}) {
  142. defer func() {
  143. <-espool
  144. }()
  145. elastic.BulkSave(index, itype, &tmps, true)
  146. }(tmps)
  147. arrEs = []map[string]interface{}{}
  148. }
  149. UpdatesLock.Unlock()
  150. if n%1000 == 0 {
  151. log.Println("current:", n, qutil.BsonIdToSId(tmp["_id"]))
  152. }
  153. tmp = make(map[string]interface{})
  154. }
  155. UpdatesLock.Lock()
  156. if len(arrEs) > 0 {
  157. tmps := arrEs
  158. elastic.BulkSave(index, itype, &tmps, true)
  159. }
  160. UpdatesLock.Unlock()
  161. log.Println(mapInfo, "create biddingback2 index...over", c, n)
  162. }
  163. }