bidingpurchasing.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package main
  2. import (
  3. "log"
  4. "qfw/util"
  5. elastic "qfw/util/elastic"
  6. "sync"
  7. "unicode/utf8"
  8. u "./util"
  9. "gopkg.in/mgo.v2/bson"
  10. )
  11. //定时查询bidding中extract_state为2的数据生成索引
  12. func biddingPurchaingTask(q map[string]interface{}) {
  13. defer util.Catch()
  14. //线程池
  15. SaveUpdageLock := sync.Mutex{}
  16. //连接参数
  17. c, _ := bidding["collect"].(string) //bidding表
  18. db, _ := bidding["db"].(string) //库
  19. index, _ := bidding["index"].(string) //索引别名
  20. itype, _ := bidding["type"].(string)
  21. //
  22. session := mgo.GetMgoConn(86400)
  23. count, _ := session.DB(db).C(c).Find(&q).Count()
  24. log.Println("biddingPurchaingTask: ", db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  25. query := session.DB(db).C(c).Find(q).Select(bson.M{
  26. "projectinfo.attachment": 0,
  27. "contenthtml": 0,
  28. }).Iter()
  29. arrEs := make([]map[string]interface{}, savesizei)
  30. arrMgo := [][]map[string]interface{}{}
  31. var n int
  32. i := 0
  33. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  34. n++
  35. if util.IntAll(tmp["extracttype"]) == -1 { //重复数据不生索引
  36. continue
  37. }
  38. newTmp := map[string]interface{}{} //最终生索引的数据
  39. //oss拼装filetext
  40. filetext := getFileText(tmp)
  41. newTmp["filetext"] = filetext
  42. //purchasing
  43. newTmp["purchasing"] = tmp["purchasing"]
  44. //purchasinglist
  45. newTmp["purchasinglist"] = tmp["purchasinglist"]
  46. //处理数据
  47. if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
  48. if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
  49. delete(tmp, "supervisorrate")
  50. }
  51. }
  52. //对projectscope字段的索引处理
  53. ps, _ := tmp["projectscope"].(string)
  54. if len(ps) > ESLEN {
  55. tmp["projectscope"] = string(([]rune(ps))[:4000])
  56. }
  57. SaveUpdageLock.Lock()
  58. for _, v := range biddingIndexFields { //索引字段
  59. if tmp[v] != nil {
  60. if "projectinfo" == v {
  61. mp, _ := tmp[v].(map[string]interface{})
  62. if mp != nil {
  63. newmap := map[string]interface{}{}
  64. for _, v1 := range projectinfoFields {
  65. if mp[v1] != nil {
  66. newmap[v1] = mp[v1]
  67. }
  68. }
  69. newTmp[v] = newmap
  70. attachments := mp["attachments"]
  71. con := ""
  72. if attachments != nil {
  73. am, _ := attachments.(map[string]interface{})
  74. if am != nil {
  75. for _, v1 := range am {
  76. vm, _ := v1.(map[string]interface{})
  77. if vm != nil {
  78. c, _ := vm["content"].(string)
  79. con += c
  80. }
  81. }
  82. }
  83. }
  84. con = FilterDetailSpace(con)
  85. if con != "" {
  86. newTmp["attachments"] = con
  87. }
  88. }
  89. } else {
  90. if v == "detail" {
  91. detail, _ := tmp[v].(string)
  92. newTmp[v] = FilterDetail(detail)
  93. } else {
  94. newTmp[v] = tmp[v]
  95. }
  96. }
  97. }
  98. }
  99. arrEs = append(arrEs, newTmp)
  100. arrMgo = append(arrMgo, []map[string]interface{}{ //要更新数据
  101. map[string]interface{}{
  102. "_id": tmp["_id"],
  103. },
  104. map[string]interface{}{
  105. "$set": map[string]interface{}{
  106. "extract_state": 4,
  107. },
  108. },
  109. })
  110. //批量更新
  111. if len(arrMgo) >= savesizei-1 {
  112. mgo.UpdateBulkAll(db, c, arrMgo...)
  113. arrMgo = [][]map[string]interface{}{}
  114. }
  115. //生索引
  116. if len(arrEs) >= savesizei-1 {
  117. tmps := arrEs
  118. elastic.BulkSave(index, itype, &tmps, true)
  119. arrEs = []map[string]interface{}{}
  120. }
  121. SaveUpdageLock.Unlock()
  122. //计数
  123. if n%savesizei == 0 {
  124. log.Println("当前:", n)
  125. }
  126. tmp = make(map[string]interface{})
  127. }
  128. SaveUpdageLock.Lock()
  129. if len(arrMgo) > 0 {
  130. mgo.UpdateBulkAll(db, c, arrMgo...)
  131. }
  132. if len(arrEs) > 0 {
  133. tmps := arrEs
  134. elastic.BulkSave(index, itype, &tmps, true)
  135. }
  136. SaveUpdageLock.Unlock()
  137. log.Println("create filetext index...over", n)
  138. }
  139. func getFileText(tmp map[string]interface{}) (filetext string) {
  140. if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  141. for _, tmpData1 := range attchMap {
  142. if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok {
  143. for _, result := range tmpData2 {
  144. if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok {
  145. if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" {
  146. bs := u.OssGetObject(attach_url) //oss读数据
  147. if utf8.RuneCountInString(filetext+bs) < util.IntAllDef(Sysconfig["filelength"], 100000) {
  148. filetext += bs + "\n"
  149. } else {
  150. break
  151. }
  152. }
  153. }
  154. }
  155. }
  156. }
  157. }
  158. return
  159. }