biddingindex.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson"
  4. "log"
  5. qutil "qfw/util"
  6. elastic "qfw/util/elastic"
  7. "reflect"
  8. "strings"
  9. "unicode/utf8"
  10. //elastic "qfw/util/elastic_v5"
  11. "regexp"
  12. // "strings"
  13. "sync"
  14. )
  15. var (
  16. BulkSizeBack = 400
  17. ESLEN = 32766
  18. )
  19. var reg_letter = regexp.MustCompile("[a-z]*")
  20. func biddingTask() {
  21. defer qutil.Catch()
  22. //bidding库
  23. session := mgo.GetMgoConn()
  24. defer mgo.DestoryMongoConn(session)
  25. db := qutil.ObjToString(bidding["db"])
  26. coll := qutil.ObjToString(bidding["collect"])
  27. //q := map[string]interface{}{"updatetime": map[string]interface{}{"$gt": 1643262000}}
  28. //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5db7c324a5cb26b9b78b0f09")}
  29. count, _ := session.DB(db).C(coll).Find(nil).Count()
  30. index := qutil.ObjToString(bidding["index"])
  31. stype := qutil.ObjToString(bidding["type"])
  32. //线程池
  33. UpdatesLock := sync.Mutex{}
  34. qutil.Debug("查询语句:", nil, "同步总数:", count, "elastic库:")
  35. //查询招标数据
  36. query := session.DB(db).C(coll).Find(nil).Select(bson.M{
  37. "projectinfo.attachment": 0,
  38. "contenthtml": 0,
  39. "publishdept": 0,
  40. }).Sort("_id").Iter()
  41. //查询抽取结果
  42. n := 0
  43. //更新数组
  44. arrEs := []map[string]interface{}{}
  45. thread := 10
  46. espool := make(chan bool, 5)
  47. var mpool = make(chan bool, thread)
  48. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  49. if n%2000 == 0 {
  50. log.Println("current:", n, tmp["_id"])
  51. }
  52. if qutil.ObjToString(tmp["useable"]) == "0" {
  53. continue
  54. }
  55. mpool <- true
  56. go func(tmp map[string]interface{}) {
  57. defer func() {
  58. <-mpool
  59. }()
  60. subscopeclass, _ := tmp["subscopeclass"].([]interface{}) //subscopeclass
  61. if subscopeclass != nil {
  62. m1 := map[string]bool{}
  63. newclass := []string{}
  64. for _, sc := range subscopeclass {
  65. sclass, _ := sc.(string)
  66. if !m1[sclass] {
  67. m1[sclass] = true
  68. newclass = append(newclass, sclass)
  69. }
  70. }
  71. tmp["s_subscopeclass"] = strings.Join(newclass, ",")
  72. tmp["subscopeclass"] = newclass
  73. }
  74. topscopeclass, _ := tmp["topscopeclass"].([]interface{}) //topscopeclass
  75. if topscopeclass != nil {
  76. m2 := map[string]bool{}
  77. newclass := []string{}
  78. for _, tc := range topscopeclass {
  79. tclass, _ := tc.(string)
  80. tclass = reg_letter.ReplaceAllString(tclass, "") // 去除字母
  81. if !m2[tclass] {
  82. m2[tclass] = true
  83. newclass = append(newclass, tclass)
  84. }
  85. }
  86. tmp["s_topscopeclass"] = strings.Join(newclass, ",")
  87. }
  88. //对projectscope字段的索引处理
  89. ps, _ := tmp["projectscope"].(string)
  90. if len(ps) > ESLEN {
  91. tmp["projectscope"] = string(([]rune(ps))[:4000])
  92. }
  93. //对标的物为空处理
  94. filetext := getFileText(tmp)
  95. filetextS := filetext
  96. if len([]rune(filetextS)) > 10 { //attach_text
  97. tmp["filetext"] = filetext
  98. }
  99. if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" {
  100. delete(tmp, "purchasing")
  101. }
  102. if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 {
  103. delete(tmp, "purchasinglist")
  104. }
  105. //数据为空处理
  106. for _, f := range []string{"bidstatus", "city", "district", "channel"} {
  107. if fVal, ok := tmp[f].(string); ok && fVal == "" {
  108. delete(tmp, f)
  109. }
  110. }
  111. UpdatesLock.Lock()
  112. newTmp := map[string]interface{}{}
  113. for field, ftype := range biddingIndexFieldsMap {
  114. if tmp[field] != nil { //
  115. if field == "projectinfo" {
  116. mp, _ := tmp[field].(map[string]interface{})
  117. if mp != nil {
  118. newmap := map[string]interface{}{}
  119. for k, ktype := range projectinfoFieldsMap {
  120. mpv := mp[k]
  121. if mpv != nil && reflect.TypeOf(mpv).String() == ktype {
  122. newmap[k] = mp[k]
  123. }
  124. }
  125. if len(newmap) > 0 {
  126. newTmp[field] = newmap
  127. }
  128. }
  129. } else if field == "purchasinglist" { //标的物处理
  130. purchasinglist_new := []map[string]interface{}{}
  131. if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 {
  132. for _, ls := range pcl {
  133. lsm_new := make(map[string]interface{})
  134. lsm := ls.(map[string]interface{})
  135. for pf, pftype := range purchasinglistFieldsMap {
  136. lsmv := lsm[pf]
  137. if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype {
  138. lsm_new[pf] = lsm[pf]
  139. }
  140. }
  141. if lsm_new != nil && len(lsm_new) > 0 {
  142. purchasinglist_new = append(purchasinglist_new, lsm_new)
  143. }
  144. }
  145. }
  146. if len(purchasinglist_new) > 0 {
  147. newTmp[field] = purchasinglist_new
  148. }
  149. } else if field == "winnerorder" { //中标候选
  150. winnerorder_new := []map[string]interface{}{}
  151. if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 {
  152. for _, win := range winnerorder {
  153. winMap_new := make(map[string]interface{})
  154. winMap := win.(map[string]interface{})
  155. for wf, wftype := range winnerorderlistFieldsMap {
  156. wfv := winMap[wf]
  157. if wfv != nil && reflect.TypeOf(wfv).String() == wftype {
  158. if wf == "sort" && qutil.Int64All(wfv) > 100 {
  159. continue
  160. }
  161. winMap_new[wf] = winMap[wf]
  162. }
  163. }
  164. if winMap_new != nil && len(winMap_new) > 0 {
  165. winnerorder_new = append(winnerorder_new, winMap_new)
  166. }
  167. }
  168. }
  169. if len(winnerorder_new) > 0 {
  170. newTmp[field] = winnerorder_new
  171. }
  172. } else if field == "qualifies" {
  173. //项目资质
  174. qs := []string{}
  175. if q, _ := tmp[field].([]interface{}); len(q) > 0 {
  176. for _, v := range q {
  177. v1 := v.(map[string]interface{})
  178. qs = append(qs, qutil.ObjToString(v1["key"]))
  179. }
  180. }
  181. if len(qs) > 0 {
  182. newTmp[field] = strings.Join(qs, ",")
  183. }
  184. } else if field == "review_experts" {
  185. // 评审专家
  186. if arr, ok := tmp["review_experts"].([]interface{}); ok && len(arr) > 0 {
  187. arr1 := qutil.ObjArrToStringArr(arr)
  188. newTmp[field] = strings.Join(arr1, ",")
  189. }
  190. } else if field == "entidlist" {
  191. newTmp[field] = tmp[field]
  192. } else if field == "bidopentime" {
  193. if tmp[field] != nil && tmp["bidendtime"] == nil {
  194. newTmp["bidendtime"] = tmp[field]
  195. }
  196. if tmp[field] == nil && tmp["bidendtime"] != nil {
  197. newTmp[field] = tmp["bidendtime"]
  198. }
  199. } else if field == "detail" { //过滤
  200. detail, _ := tmp[field].(string)
  201. if len([]rune(detail)) > detailLength {
  202. detail = detail[:detailLength]
  203. }
  204. if strings.Contains(detail, qutil.ObjToString(tmp["title"])) {
  205. newTmp[field] = FilterDetail(detail)
  206. } else {
  207. newTmp[field] = qutil.ObjToString(tmp["title"]) + " " + FilterDetail(detail)
  208. }
  209. } else if field == "_id" || field == "topscopeclass" { //不做处理
  210. newTmp[field] = tmp[field]
  211. } else if field == "publishtime" || field == "comeintime" {
  212. //字段类型不正确,特别处理
  213. if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 {
  214. newTmp[field] = qutil.Int64All(tmp[field])
  215. }
  216. } else if field == "s" {
  217. newTmp[field] = tmp[field]
  218. } else { //其它字段判断数据类型,不正确舍弃
  219. if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
  220. continue
  221. } else {
  222. if fieldval != "" {
  223. newTmp[field] = fieldval
  224. }
  225. }
  226. }
  227. }
  228. }
  229. arrEs = append(arrEs, newTmp)
  230. if len(arrEs) >= BulkSizeBack {
  231. tmps := arrEs
  232. espool <- true
  233. go func(tmps []map[string]interface{}) {
  234. defer func() {
  235. <-espool
  236. }()
  237. elastic.BulkSave(index, stype, &tmps, true)
  238. }(tmps)
  239. arrEs = []map[string]interface{}{}
  240. }
  241. UpdatesLock.Unlock()
  242. }(tmp)
  243. tmp = make(map[string]interface{})
  244. }
  245. for i := 0; i < thread; i++ {
  246. mpool <- true
  247. }
  248. UpdatesLock.Lock()
  249. if len(arrEs) > 0 {
  250. tmps := arrEs
  251. elastic.BulkSave(index, stype, &tmps, true)
  252. }
  253. UpdatesLock.Unlock()
  254. log.Println("create biddingback index...over", n)
  255. }
  256. var filterReg = regexp.MustCompile("<[^>]+>")
  257. var filterSpace = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]")
  258. func FilterDetail(text string) string {
  259. return filterReg.ReplaceAllString(text, "")
  260. }
  261. func FilterDetailSpace(text string) string {
  262. return filterSpace.ReplaceAllString(text, "")
  263. }
  264. // 正则判断是否包含
  265. func checkContains(s, sub string) bool {
  266. reg := regexp.MustCompile(`(?i)(^|([\s\t\n]+))(` + sub + `)($|([\s\t\n]+))`)
  267. return reg.MatchString(s)
  268. }
  269. func getFileText(tmp map[string]interface{}) (filetext string) {
  270. if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  271. for _, tmpData1 := range attchMap {
  272. if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok {
  273. for _, result := range tmpData2 {
  274. if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok {
  275. if attach_url := qutil.ObjToString(resultMap["attach_url"]); attach_url != "" {
  276. bs := OssGetObject(attach_url) //oss读数据
  277. if utf8.RuneCountInString(filetext+bs) < fileLength {
  278. filetext += bs + "\n"
  279. } else {
  280. if utf8.RuneCountInString(bs) > fileLength {
  281. filetext = bs[0:fileLength]
  282. } else {
  283. filetext = bs
  284. }
  285. break
  286. }
  287. }
  288. }
  289. }
  290. }
  291. }
  292. }
  293. return
  294. }