biddingindexback.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package main
  2. import (
  3. "log"
  4. qutil "qfw/util"
  5. elastic "qfw/util/elastic"
  6. //elastic "qfw/util/elastic_v5"
  7. "regexp"
  8. // "strings"
  9. "sync"
  10. "time"
  11. "gopkg.in/mgo.v2/bson"
  12. )
  13. var (
  14. BulkSizeBack = 400
  15. ESLEN = 32766
  16. )
  17. func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
  18. defer qutil.Catch()
  19. q, _ := mapInfo["query"].(map[string]interface{})
  20. if q == nil {
  21. q = map[string]interface{}{
  22. "_id": bson.M{
  23. "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
  24. "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
  25. },
  26. }
  27. }
  28. //bidding库
  29. session := mgo.GetMgoConn(86400)
  30. defer mgo.DestoryMongoConn(session)
  31. //连接信息
  32. c, _ := mapInfo["coll"].(string)
  33. if c == "" {
  34. // c, _ = bidding["collect"].(string)
  35. c, _ = biddingback["collect"].(string)
  36. }
  37. db, _ := biddingback["db"].(string)
  38. index, _ := biddingback["index"].(string)
  39. itype, _ := biddingback["type"].(string)
  40. count, _ := session.DB(db).C(c).Find(&q).Count()
  41. //线程池
  42. UpdatesLock := sync.Mutex{}
  43. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  44. //查询招标数据
  45. // query := session.DB(db).C(c).Find(q).Select(bson.M{
  46. // //"projectinfo.attachment": 0,
  47. // "contenthtml": 0,
  48. // }).Sort("_id").Iter()
  49. query := session.DB(db).C(c).Find(q).Select(bson.M{
  50. "contenthtml": 0,
  51. "s_sha": 0,
  52. }).Sort("_id").Iter()
  53. //查询抽取结果
  54. n := 0
  55. //更新数组
  56. arrEs := []map[string]interface{}{}
  57. //对比两张表数据,减少查询次数
  58. thread := qutil.IntAll(mapInfo["thread"])
  59. //不传为0只生成招标索引,1生成招标+预览,2只生成预览
  60. _multiIndex := qutil.IntAll(mapInfo["multiIndex"])
  61. if thread < 1 {
  62. thread = 3
  63. }
  64. log.Println("es线程数:", thread)
  65. espool := make(chan bool, thread)
  66. now1 := time.Now().Unix()
  67. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  68. if qutil.IntAll(tmp["extracttype"]) == -1 {
  69. tmp = make(map[string]interface{})
  70. continue
  71. } else {
  72. tmp["extracttype"] = 1
  73. }
  74. ct := qutil.Int64All(tmp["comeintime"])
  75. pt := qutil.Int64All(tmp["publishtime"])
  76. if pt > ct+86400 || pt > now1 { //时间问题,需要更新
  77. if ct > now1 {
  78. ct = now1
  79. }
  80. tmp["publishtime"] = ct
  81. }
  82. ps, _ := tmp["projectscope"].(string)
  83. // if ps == "" {
  84. // tmp["projectscope"] = "" //= tmp["detail"]
  85. // }
  86. if len(ps) > ESLEN {
  87. tmp["projectscope"] = string(([]rune(ps))[:4000])
  88. }
  89. // if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  90. // tmp["budget"] = nil
  91. // } else if sbd, ok := tmp["budget"].(string); ok {
  92. // tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  93. // }
  94. // if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  95. // tmp["bidamount"] = nil
  96. // } else if sbd, ok := tmp["bidamount"].(string); ok {
  97. // tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  98. // }
  99. UpdatesLock.Lock()
  100. newTmp := map[string]interface{}{}
  101. for _, v := range biddingIndexFields {
  102. if tmp[v] != nil {
  103. if "projectinfo" == v {
  104. mp, _ := tmp[v].(map[string]interface{})
  105. if mp != nil {
  106. newmap := map[string]interface{}{}
  107. for _, v1 := range projectinfoFields {
  108. if mp[v1] != nil {
  109. newmap[v1] = mp[v1]
  110. }
  111. }
  112. newTmp[v] = newmap
  113. // attachments := mp["attachments"]
  114. // con := ""
  115. // if attachments != nil {
  116. // am, _ := attachments.(map[string]interface{})
  117. // if am != nil {
  118. // for _, v1 := range am {
  119. // vm, _ := v1.(map[string]interface{})
  120. // if vm != nil {
  121. // c, _ := vm["content"].(string)
  122. // con += c
  123. // }
  124. // }
  125. // }
  126. // }
  127. // con = FilterDetailSpace(con)
  128. // if con != "" {
  129. // newTmp["attachments"] = con
  130. // }
  131. }
  132. } else {
  133. if v == "detail" {
  134. detail, _ := tmp[v].(string)
  135. newTmp[v] = FilterDetail(detail)
  136. } else {
  137. newTmp[v] = tmp[v]
  138. }
  139. }
  140. } /* else if v == "budget" || v == "bidamount" {
  141. newTmp[v] = nil
  142. }*/
  143. }
  144. arrEs = append(arrEs, newTmp)
  145. if len(arrEs) >= BulkSizeBack {
  146. tmps := arrEs
  147. espool <- true
  148. go func(tmps []map[string]interface{}) {
  149. defer func() {
  150. <-espool
  151. }()
  152. if _multiIndex == 0 {
  153. elastic.BulkSave(index, itype, &tmps, true)
  154. } else if _multiIndex == 1 && len(multiIndex) == 2 {
  155. elastic.BulkSave(index, itype, &tmps, true)
  156. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  157. } else if _multiIndex == 2 && len(multiIndex) == 2 {
  158. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  159. }
  160. }(tmps)
  161. arrEs = []map[string]interface{}{}
  162. }
  163. UpdatesLock.Unlock()
  164. if n%1000 == 0 {
  165. log.Println("current:", n, qutil.BsonIdToSId(tmp["_id"]))
  166. }
  167. tmp = make(map[string]interface{})
  168. }
  169. UpdatesLock.Lock()
  170. if len(arrEs) > 0 {
  171. tmps := arrEs
  172. if _multiIndex == 0 {
  173. elastic.BulkSave(index, itype, &tmps, true)
  174. } else if _multiIndex == 1 && len(multiIndex) == 2 {
  175. elastic.BulkSave(index, itype, &tmps, true)
  176. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  177. } else if _multiIndex == 2 && len(multiIndex) == 2 {
  178. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  179. }
  180. }
  181. UpdatesLock.Unlock()
  182. log.Println(mapInfo, "create biddingback index...over", n)
  183. }
  184. var filterReg = regexp.MustCompile("<[^>]+>")
  185. var filterSpace = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]")
  186. func FilterDetail(text string) string {
  187. return filterReg.ReplaceAllString(text, "")
  188. }
  189. func FilterDetailSpace(text string) string {
  190. return filterSpace.ReplaceAllString(text, "")
  191. }