biddingindexback.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. qutil "qfw/util"
  6. elastic "qfw/util/elastic"
  7. //elastic "qfw/util/elastic_v5"
  8. "regexp"
  9. // "strings"
  10. "sync"
  11. "time"
  12. "gopkg.in/mgo.v2/bson"
  13. )
  14. var (
  15. BulkSizeBack = 400
  16. ESLEN = 32766
  17. )
  18. func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
  19. defer qutil.Catch()
  20. q, _ := mapInfo["query"].(map[string]interface{})
  21. if q == nil {
  22. q = map[string]interface{}{
  23. "_id": bson.M{
  24. "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
  25. "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
  26. },
  27. }
  28. }
  29. //bidding库
  30. session := mgo.GetMgoConn(86400)
  31. defer mgo.DestoryMongoConn(session)
  32. //连接信息
  33. c, _ := mapInfo["coll"].(string)
  34. if c == "" {
  35. // c, _ = bidding["collect"].(string)
  36. c, _ = biddingback["collect"].(string)
  37. }
  38. db, _ := biddingback["db"].(string)
  39. index, _ := biddingback["index"].(string)
  40. itype, _ := biddingback["type"].(string)
  41. count, _ := session.DB(db).C(c).Find(&q).Count()
  42. //线程池
  43. UpdatesLock := sync.Mutex{}
  44. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  45. //查询招标数据
  46. // query := session.DB(db).C(c).Find(q).Select(bson.M{
  47. // //"projectinfo.attachment": 0,
  48. // "contenthtml": 0,
  49. // }).Sort("_id").Iter()
  50. query := session.DB(db).C(c).Find(q).Select(bson.M{
  51. "contenthtml": 0,
  52. "s_sha": 0,
  53. }).Sort("_id").Iter()
  54. //查询抽取结果
  55. n := 0
  56. //更新数组
  57. arrEs := []map[string]interface{}{}
  58. //对比两张表数据,减少查询次数
  59. thread := qutil.IntAll(mapInfo["thread"])
  60. //不传为0只生成招标索引,1生成招标+预览,2只生成预览
  61. _multiIndex := qutil.IntAll(mapInfo["multiIndex"])
  62. if thread < 1 {
  63. thread = 3
  64. }
  65. log.Println("es线程数:", thread)
  66. espool := make(chan bool, thread)
  67. now1 := time.Now().Unix()
  68. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  69. if qutil.IntAll(tmp["extracttype"]) == -1 {
  70. tmp = make(map[string]interface{})
  71. continue
  72. } else {
  73. tmp["extracttype"] = 1
  74. }
  75. ct := qutil.Int64All(tmp["comeintime"])
  76. pt := qutil.Int64All(tmp["publishtime"])
  77. if pt > ct+86400 || pt > now1 { //时间问题,需要更新
  78. if ct > now1 {
  79. ct = now1
  80. }
  81. tmp["publishtime"] = ct
  82. }
  83. ps, _ := tmp["projectscope"].(string)
  84. if ps == "" {
  85. tmp["projectscope"] = "" //= tmp["detail"]
  86. }
  87. if len(ps) > ESLEN {
  88. tmp["projectscope"] = string(([]rune(ps))[:4000])
  89. }
  90. if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  91. tmp["budget"] = nil
  92. } else if sbd, ok := tmp["budget"].(string); ok {
  93. tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  94. }
  95. if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  96. tmp["bidamount"] = nil
  97. } else if sbd, ok := tmp["bidamount"].(string); ok {
  98. tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  99. }
  100. UpdatesLock.Lock()
  101. newTmp := map[string]interface{}{}
  102. for _, v := range biddingIndexFields {
  103. if tmp[v] != nil {
  104. if "projectinfo" == v {
  105. mp, _ := tmp[v].(map[string]interface{})
  106. if mp != nil {
  107. newmap := map[string]interface{}{}
  108. for _, v1 := range projectinfoFields {
  109. if mp[v1] != nil {
  110. newmap[v1] = mp[v1]
  111. }
  112. }
  113. newTmp[v] = newmap
  114. }
  115. } else {
  116. if v == "detail" {
  117. detail, _ := tmp[v].(string)
  118. newTmp[v] = FilterDetail(detail)
  119. } else {
  120. newTmp[v] = tmp[v]
  121. }
  122. }
  123. } else if v == "budget" || v == "bidamount" {
  124. newTmp[v] = nil
  125. }
  126. }
  127. arrEs = append(arrEs, newTmp)
  128. if len(arrEs) >= BulkSizeBack {
  129. tmps := arrEs
  130. espool <- true
  131. go func(tmps []map[string]interface{}) {
  132. defer func() {
  133. <-espool
  134. }()
  135. if _multiIndex == 0 {
  136. elastic.BulkSave(index, itype, &tmps, true)
  137. } else if _multiIndex == 1 && len(multiIndex) == 2 {
  138. elastic.BulkSave(index, itype, &tmps, true)
  139. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  140. } else if _multiIndex == 2 && len(multiIndex) == 2 {
  141. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  142. }
  143. }(tmps)
  144. arrEs = []map[string]interface{}{}
  145. }
  146. UpdatesLock.Unlock()
  147. if n%1000 == 0 {
  148. log.Println("current:", n, qutil.BsonIdToSId(tmp["_id"]))
  149. }
  150. tmp = make(map[string]interface{})
  151. }
  152. UpdatesLock.Lock()
  153. if len(arrEs) > 0 {
  154. tmps := arrEs
  155. if _multiIndex == 0 {
  156. elastic.BulkSave(index, itype, &tmps, true)
  157. } else if _multiIndex == 1 && len(multiIndex) == 2 {
  158. elastic.BulkSave(index, itype, &tmps, true)
  159. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  160. } else if _multiIndex == 2 && len(multiIndex) == 2 {
  161. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  162. }
  163. }
  164. UpdatesLock.Unlock()
  165. log.Println(mapInfo, "create biddingback index...over", n)
  166. }
  167. var filterReg = regexp.MustCompile("<[^>]+>")
  168. var filterSpace = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]")
  169. func FilterDetail(text string) string {
  170. return filterReg.ReplaceAllString(text, "")
  171. }
  172. func FilterDetailSpace(text string) string {
  173. return filterSpace.ReplaceAllString(text, "")
  174. }