bidingpurchasing.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package main
  2. import (
  3. "log"
  4. "qfw/util"
  5. elastic "qfw/util/elastic"
  6. "sync"
  7. "unicode/utf8"
  8. u "./util"
  9. "gopkg.in/mgo.v2/bson"
  10. )
  11. //定时查询bidding中extract_state为2的数据生成索引
  12. func biddingPurchaingTask(q map[string]interface{}) {
  13. defer util.Catch()
  14. //线程池
  15. SaveUpdageLock := sync.Mutex{}
  16. //连接参数
  17. c, _ := bidding["collect"].(string) //bidding表
  18. db, _ := bidding["db"].(string) //库
  19. index, _ := bidding["index"].(string) //索引别名
  20. itype, _ := bidding["type"].(string)
  21. //
  22. session := mgo.GetMgoConn(86400)
  23. defer mgo.DestoryMongoConn(session)
  24. count, _ := session.DB(db).C(c).Find(&q).Count()
  25. log.Println("biddingPurchaingTask: ", db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  26. query := session.DB(db).C(c).Find(q).Select(bson.M{
  27. "projectinfo.attachment": 0,
  28. "contenthtml": 0,
  29. }).Iter()
  30. arrEs := make([]map[string]interface{}, savesizei)
  31. arrMgo := [][]map[string]interface{}{}
  32. var n int
  33. i := 0
  34. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  35. n++
  36. if util.IntAll(tmp["extracttype"]) == -1 { //重复数据不生索引
  37. tmp = make(map[string]interface{})
  38. continue
  39. }
  40. newTmp := map[string]interface{}{} //最终生索引的数据
  41. //oss拼装filetext
  42. filetext := getFileText(tmp)
  43. newTmp["filetext"] = filetext
  44. //purchasing
  45. newTmp["purchasing"] = tmp["purchasing"]
  46. //purchasinglist
  47. newTmp["purchasinglist"] = tmp["purchasinglist"]
  48. //处理数据
  49. if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
  50. if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
  51. delete(tmp, "supervisorrate")
  52. }
  53. }
  54. //对projectscope字段的索引处理
  55. ps, _ := tmp["projectscope"].(string)
  56. if len(ps) > ESLEN {
  57. tmp["projectscope"] = string(([]rune(ps))[:4000])
  58. }
  59. SaveUpdageLock.Lock()
  60. for _, v := range biddingIndexFields { //索引字段
  61. if tmp[v] != nil {
  62. if "projectinfo" == v {
  63. mp, _ := tmp[v].(map[string]interface{})
  64. if mp != nil {
  65. newmap := map[string]interface{}{}
  66. for _, v1 := range projectinfoFields {
  67. if mp[v1] != nil {
  68. newmap[v1] = mp[v1]
  69. }
  70. }
  71. newTmp[v] = newmap
  72. attachments := mp["attachments"]
  73. con := ""
  74. if attachments != nil {
  75. am, _ := attachments.(map[string]interface{})
  76. if am != nil {
  77. for _, v1 := range am {
  78. vm, _ := v1.(map[string]interface{})
  79. if vm != nil {
  80. c, _ := vm["content"].(string)
  81. con += c
  82. }
  83. }
  84. }
  85. }
  86. con = FilterDetailSpace(con)
  87. if con != "" {
  88. newTmp["attachments"] = con
  89. }
  90. }
  91. } else {
  92. if v == "detail" {
  93. detail, _ := tmp[v].(string)
  94. newTmp[v] = FilterDetail(detail)
  95. } else {
  96. newTmp[v] = tmp[v]
  97. }
  98. }
  99. }
  100. }
  101. arrEs = append(arrEs, newTmp)
  102. arrMgo = append(arrMgo, []map[string]interface{}{ //要更新数据
  103. map[string]interface{}{
  104. "_id": tmp["_id"],
  105. },
  106. map[string]interface{}{
  107. "$set": map[string]interface{}{
  108. "extract_state": 4,
  109. },
  110. },
  111. })
  112. //批量更新
  113. if len(arrMgo) >= savesizei-1 {
  114. mgo.UpdateBulkAll(db, c, arrMgo...)
  115. arrMgo = [][]map[string]interface{}{}
  116. }
  117. //生索引
  118. if len(arrEs) >= savesizei-1 {
  119. tmps := arrEs
  120. elastic.BulkSave(index, itype, &tmps, true)
  121. arrEs = []map[string]interface{}{}
  122. }
  123. SaveUpdageLock.Unlock()
  124. //计数
  125. if n%savesizei == 0 {
  126. log.Println("当前:", n)
  127. }
  128. tmp = make(map[string]interface{})
  129. }
  130. SaveUpdageLock.Lock()
  131. if len(arrMgo) > 0 {
  132. mgo.UpdateBulkAll(db, c, arrMgo...)
  133. }
  134. if len(arrEs) > 0 {
  135. tmps := arrEs
  136. elastic.BulkSave(index, itype, &tmps, true)
  137. }
  138. SaveUpdageLock.Unlock()
  139. log.Println("create filetext index...over", n)
  140. }
  141. func getFileText(tmp map[string]interface{}) (filetext string) {
  142. if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  143. for _, tmpData1 := range attchMap {
  144. if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok {
  145. for _, result := range tmpData2 {
  146. if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok {
  147. if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" {
  148. bs := u.OssGetObject(attach_url) //oss读数据
  149. if utf8.RuneCountInString(filetext+bs) < util.IntAllDef(Sysconfig["filelength"], 100000) {
  150. filetext += bs + "\n"
  151. } else {
  152. break
  153. }
  154. }
  155. }
  156. }
  157. }
  158. }
  159. }
  160. return
  161. }