bidingpurchasing.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package main
  2. import (
  3. "log"
  4. "qfw/util"
  5. elastic "qfw/util/elastic"
  6. "sync"
  7. "unicode/utf8"
  8. u "./util"
  9. "gopkg.in/mgo.v2/bson"
  10. )
  11. //定时查询bidding中extract_state为2的数据生成索引
  12. func biddingPurchaingTask(q map[string]interface{}) {
  13. defer util.Catch()
  14. //锁
  15. SaveUpdageLock := sync.Mutex{}
  16. //连接参数
  17. c, _ := bidding["collect"].(string) //bidding表
  18. db, _ := bidding["db"].(string) //库
  19. index, _ := bidding["index"].(string) //索引别名
  20. itype, _ := bidding["type"].(string)
  21. //
  22. session := mgo.GetMgoConn(86400)
  23. defer mgo.DestoryMongoConn(session)
  24. count, _ := session.DB(db).C(c).Find(&q).Count()
  25. log.Println("biddingPurchaingTask: ", db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  26. query := session.DB(db).C(c).Find(q).Select(bson.M{
  27. "projectinfo.attachment": 0,
  28. "contenthtml": 0,
  29. }).Iter()
  30. arrEs := make([]map[string]interface{}, savesizei)
  31. arrMgo := [][]map[string]interface{}{}
  32. var n int
  33. i := 0
  34. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  35. n++
  36. if util.IntAll(tmp["extracttype"]) == -1 || util.IntAll(tmp["dataging"]) == 1 { //重复数据不生索引
  37. tmp = make(map[string]interface{})
  38. continue
  39. }
  40. newTmp := map[string]interface{}{} //最终生索引的数据
  41. saveArr := []map[string]interface{}{}
  42. //oss拼装filetext
  43. if filetext := getFileText(tmp); len(filetext) > 0 {
  44. if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1
  45. tmp["detail"] = filetext
  46. saveArr = append(saveArr, map[string]interface{}{"_id": tmp["_id"]})
  47. saveArr = append(saveArr, map[string]interface{}{
  48. "$set": map[string]interface{}{
  49. "filedetail": 1,
  50. "detail": filetext,
  51. },
  52. })
  53. }
  54. newTmp["filetext"] = filetext
  55. }
  56. //purchasing
  57. if purchasing, ok := tmp["purchasing"].(string); ok {
  58. if len(purchasing) > 0 {
  59. newTmp["purchasing"] = tmp["purchasing"]
  60. }
  61. }
  62. //purchasinglist
  63. if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok {
  64. if len(purchasinglist) > 0 {
  65. purchasinglist_new := []map[string]interface{}{}
  66. for _, ls := range purchasinglist {
  67. lsm_new := make(map[string]interface{})
  68. lsm := ls.(map[string]interface{})
  69. for _, pf := range purchasinglistFields {
  70. if lsm[pf] != nil {
  71. lsm_new[pf] = lsm[pf]
  72. }
  73. }
  74. if lsm_new != nil && len(lsm_new) > 0 {
  75. purchasinglist_new = append(purchasinglist_new, lsm_new)
  76. }
  77. }
  78. if len(purchasinglist_new) > 0 {
  79. newTmp["purchasinglist"] = purchasinglist_new
  80. }
  81. }
  82. }
  83. //处理数据
  84. if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
  85. if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
  86. delete(tmp, "supervisorrate")
  87. }
  88. }
  89. //对projectscope字段的索引处理
  90. ps, _ := tmp["projectscope"].(string)
  91. if len(ps) > ESLEN {
  92. tmp["projectscope"] = string(([]rune(ps))[:4000])
  93. }
  94. SaveUpdageLock.Lock()
  95. for _, v := range biddingIndexFields { //索引字段
  96. if tmp[v] != nil {
  97. if "projectinfo" == v {
  98. mp, _ := tmp[v].(map[string]interface{})
  99. if mp != nil {
  100. newmap := map[string]interface{}{}
  101. for _, v1 := range projectinfoFields {
  102. if mp[v1] != nil {
  103. newmap[v1] = mp[v1]
  104. }
  105. }
  106. newTmp[v] = newmap
  107. // attachments := mp["attachments"]
  108. // con := ""
  109. // if attachments != nil {
  110. // am, _ := attachments.(map[string]interface{})
  111. // if am != nil {
  112. // for _, v1 := range am {
  113. // vm, _ := v1.(map[string]interface{})
  114. // if vm != nil {
  115. // c, _ := vm["content"].(string)
  116. // con += c
  117. // }
  118. // }
  119. // }
  120. // }
  121. // con = FilterDetailSpace(con)
  122. // if con != "" {
  123. // newTmp["attachments"] = con
  124. // }
  125. }
  126. } else {
  127. if v == "detail" {
  128. detail, _ := tmp[v].(string)
  129. newTmp[v] = FilterDetail(detail)
  130. } else {
  131. newTmp[v] = tmp[v]
  132. }
  133. }
  134. }
  135. }
  136. arrEs = append(arrEs, newTmp)
  137. if len(saveArr) > 0 {
  138. arrMgo = append(arrMgo, saveArr) //要更新数据
  139. }
  140. // arrMgo = append(arrMgo, []map[string]interface{}{ //要更新数据
  141. // map[string]interface{}{
  142. // "_id": tmp["_id"],
  143. // },
  144. // map[string]interface{}{
  145. // "$set": map[string]interface{}{
  146. // "extract_state": 4,
  147. // },
  148. // },
  149. // })
  150. //批量更新
  151. if len(arrMgo) >= savesizei-1 {
  152. mgo.UpdateBulkAll(db, c, arrMgo...)
  153. arrMgo = [][]map[string]interface{}{}
  154. }
  155. //生索引
  156. if len(arrEs) >= savesizei-1 {
  157. tmps := arrEs
  158. elastic.BulkSave(index, itype, &tmps, true)
  159. arrEs = []map[string]interface{}{}
  160. }
  161. SaveUpdageLock.Unlock()
  162. //计数
  163. if n%savesizei == 0 {
  164. log.Println("当前:", n)
  165. }
  166. tmp = make(map[string]interface{})
  167. }
  168. SaveUpdageLock.Lock()
  169. if len(arrMgo) > 0 {
  170. mgo.UpdateBulkAll(db, c, arrMgo...)
  171. }
  172. if len(arrEs) > 0 {
  173. tmps := arrEs
  174. elastic.BulkSave(index, itype, &tmps, true)
  175. }
  176. SaveUpdageLock.Unlock()
  177. log.Println("create filetext index...over", n)
  178. }
  179. func getFileText(tmp map[string]interface{}) (filetext string) {
  180. if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  181. for _, tmpData1 := range attchMap {
  182. if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok {
  183. for _, result := range tmpData2 {
  184. if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok {
  185. if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" {
  186. bs := u.OssGetObject(attach_url) //oss读数据
  187. if utf8.RuneCountInString(filetext+bs) < util.IntAllDef(Sysconfig["filelength"], 100000) {
  188. filetext += bs + "\n"
  189. } else {
  190. break
  191. }
  192. }
  193. }
  194. }
  195. }
  196. }
  197. }
  198. return
  199. }