biddingindexback.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. qutil "qfw/util"
  6. elastic "qfw/util/elastic"
  7. //elastic "qfw/util/elastic_v5"
  8. "regexp"
  9. // "strings"
  10. "sync"
  11. "time"
  12. "gopkg.in/mgo.v2/bson"
  13. )
  14. var (
  15. BulkSizeBack = 400
  16. ESLEN = 32766
  17. )
  18. func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
  19. defer qutil.Catch()
  20. q, _ := mapInfo["query"].(map[string]interface{})
  21. if q == nil {
  22. q = map[string]interface{}{
  23. "_id": bson.M{
  24. "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
  25. "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
  26. },
  27. }
  28. }
  29. //bidding库
  30. session := mgo.GetMgoConn(86400)
  31. defer mgo.DestoryMongoConn(session)
  32. //连接信息
  33. c, _ := mapInfo["coll"].(string)
  34. if c == "" {
  35. // c, _ = bidding["collect"].(string)
  36. c, _ = biddingback["collect"].(string)
  37. }
  38. db, _ := biddingback["db"].(string)
  39. index, _ := biddingback["index"].(string)
  40. itype, _ := biddingback["type"].(string)
  41. count, _ := session.DB(db).C(c).Find(&q).Count()
  42. //线程池
  43. UpdatesLock := sync.Mutex{}
  44. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  45. //查询招标数据
  46. // query := session.DB(db).C(c).Find(q).Select(bson.M{
  47. // //"projectinfo.attachment": 0,
  48. // "contenthtml": 0,
  49. // }).Sort("_id").Iter()
  50. query := session.DB(db).C(c).Find(q).Select(bson.M{
  51. "contenthtml": 0,
  52. "s_sha": 0,
  53. }).Sort("_id").Iter()
  54. //查询抽取结果
  55. n := 0
  56. //更新数组
  57. arrEs := []map[string]interface{}{}
  58. //对比两张表数据,减少查询次数
  59. thread := qutil.IntAll(mapInfo["thread"])
  60. //不传为0只生成招标索引,1生成招标+预览,2只生成预览
  61. _multiIndex := qutil.IntAll(mapInfo["multiIndex"])
  62. if thread < 1 {
  63. thread = 3
  64. }
  65. log.Println("es线程数:", thread)
  66. espool := make(chan bool, thread)
  67. now1 := time.Now().Unix()
  68. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  69. if qutil.IntAll(tmp["extracttype"]) == -1 {
  70. tmp = make(map[string]interface{})
  71. continue
  72. }
  73. ct := qutil.Int64All(tmp["comeintime"])
  74. pt := qutil.Int64All(tmp["publishtime"])
  75. if pt > ct+86400 || pt > now1 { //时间问题,需要更新
  76. if ct > now1 {
  77. ct = now1
  78. }
  79. tmp["publishtime"] = ct
  80. }
  81. ps, _ := tmp["projectscope"].(string)
  82. if ps == "" {
  83. tmp["projectscope"] = "" //= tmp["detail"]
  84. }
  85. if len(ps) > ESLEN {
  86. tmp["projectscope"] = string(([]rune(ps))[:4000])
  87. }
  88. if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  89. tmp["budget"] = nil
  90. } else if sbd, ok := tmp["budget"].(string); ok {
  91. tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  92. }
  93. if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  94. tmp["bidamount"] = nil
  95. } else if sbd, ok := tmp["bidamount"].(string); ok {
  96. tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  97. }
  98. UpdatesLock.Lock()
  99. newTmp := map[string]interface{}{}
  100. for _, v := range biddingIndexFields {
  101. if tmp[v] != nil {
  102. if "projectinfo" == v {
  103. mp, _ := tmp[v].(map[string]interface{})
  104. if mp != nil {
  105. newmap := map[string]interface{}{}
  106. for _, v1 := range projectinfoFields {
  107. if mp[v1] != nil {
  108. newmap[v1] = mp[v1]
  109. }
  110. }
  111. newTmp[v] = newmap
  112. }
  113. } else {
  114. if v == "detail" {
  115. detail, _ := tmp[v].(string)
  116. newTmp[v] = FilterDetail(detail)
  117. } else {
  118. newTmp[v] = tmp[v]
  119. }
  120. }
  121. } else if v == "budget" || v == "bidamount" {
  122. newTmp[v] = nil
  123. }
  124. }
  125. arrEs = append(arrEs, newTmp)
  126. if len(arrEs) >= BulkSizeBack {
  127. tmps := arrEs
  128. espool <- true
  129. go func(tmps []map[string]interface{}) {
  130. defer func() {
  131. <-espool
  132. }()
  133. if _multiIndex == 0 {
  134. elastic.BulkSave(index, itype, &tmps, true)
  135. } else if _multiIndex == 1 && len(multiIndex) == 2 {
  136. elastic.BulkSave(index, itype, &tmps, true)
  137. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  138. } else if _multiIndex == 2 && len(multiIndex) == 2 {
  139. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  140. }
  141. }(tmps)
  142. arrEs = []map[string]interface{}{}
  143. }
  144. UpdatesLock.Unlock()
  145. if n%1000 == 0 {
  146. log.Println("current:", n, qutil.BsonIdToSId(tmp["_id"]))
  147. }
  148. tmp = make(map[string]interface{})
  149. }
  150. UpdatesLock.Lock()
  151. if len(arrEs) > 0 {
  152. tmps := arrEs
  153. if _multiIndex == 0 {
  154. elastic.BulkSave(index, itype, &tmps, true)
  155. } else if _multiIndex == 1 && len(multiIndex) == 2 {
  156. elastic.BulkSave(index, itype, &tmps, true)
  157. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  158. } else if _multiIndex == 2 && len(multiIndex) == 2 {
  159. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  160. }
  161. }
  162. UpdatesLock.Unlock()
  163. log.Println(mapInfo, "create biddingback index...over", n)
  164. }
  165. var filterReg = regexp.MustCompile("<[^>]+>")
  166. func FilterDetail(text string) string {
  167. return filterReg.ReplaceAllString(text, "")
  168. }