biddingall.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. package main
  2. import (
  3. "fmt"
  4. "go.mongodb.org/mongo-driver/bson"
  5. "regexp"
  6. "strings"
  7. "sync"
  8. util "utils"
  9. "utils/mongodb"
  10. )
  11. func (t *TaskInfo) biddingAllTask(data []byte, mapInfo map[string]interface{}) {
  12. defer util.Catch()
  13. var mpool = make(chan bool, t.thread)
  14. q, _ := mapInfo["query"].(map[string]interface{})
  15. if q == nil {
  16. q = map[string]interface{}{
  17. "_id": bson.M{
  18. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  19. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  20. },
  21. }
  22. } else {
  23. idMap := q["_id"].(map[string]interface{})
  24. tmpQ := map[string]interface{}{}
  25. for c, id := range idMap {
  26. if idStr, ok := id.(string); ok && id != "" {
  27. tmpQ[c] = mongodb.StringTOBsonId(idStr)
  28. }
  29. }
  30. q["_id"] = tmpQ
  31. }
  32. //bidding库
  33. biddingConn := biddingMgo.GetMgoConn()
  34. defer biddingMgo.DestoryMongoConn(biddingConn)
  35. //extract库
  36. extractConn := extractMgo.GetMgoConn()
  37. defer extractMgo.DestoryMongoConn(extractConn)
  38. //连接信息
  39. c, _ := mapInfo["coll"].(string)
  40. if c == "" {
  41. c, _ = bidding["collect"].(string)
  42. } else {
  43. currentColl = c
  44. }
  45. extractc, _ := extract["collect"].(string)
  46. count, _ := biddingConn.DB(biddingMgo.DbName).C(c).Find(&q).Count()
  47. //线程池
  48. UpdatesLock := sync.Mutex{}
  49. util.Debug("查询语句:", q, "同步总数:", count)
  50. //查询招标数据
  51. query := biddingConn.DB(biddingMgo.DbName).C(c).Find(q).Select(bson.M{
  52. "projectinfo.attachment": 0,
  53. "contenthtml": 0,
  54. "publishdept": 0, // 6.30 迭代报错,字段值乱码
  55. }).Sort("_id").Iter()
  56. //查询抽取结果
  57. extractResult := extractConn.DB(extractMgo.DbName).C(extractc).Find(q).Sort("_id").Iter()
  58. n := 0
  59. //对比两张表数据,减少查询次数
  60. var compare map[string]interface{}
  61. bnil := false
  62. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  63. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  64. tmp = make(map[string]interface{})
  65. continue
  66. }
  67. update := map[string]interface{}{}
  68. del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段
  69. //对比方法----------------
  70. for {
  71. if compare == nil {
  72. compare = make(map[string]interface{})
  73. if !extractResult.Next(compare) {
  74. break
  75. }
  76. }
  77. if compare != nil {
  78. //对比
  79. cid := mongodb.BsonIdToSId(compare["_id"])
  80. tid := mongodb.BsonIdToSId(tmp["_id"])
  81. if cid == tid {
  82. bnil = false
  83. //更新bidding表,生成索引;bidding表modifyinfo中的字段不更新
  84. modifyinfo := make(map[string]bool)
  85. if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil {
  86. for k, _ := range tmpmodifyinfo {
  87. modifyinfo[k] = true
  88. }
  89. }
  90. for _, k := range biddingMgoFields { //fields更新到mongo的字段
  91. v1 := compare[k] //extract
  92. v2 := tmp[k] //bidding
  93. if v2 == nil && v1 != nil {
  94. update[k] = v1
  95. } else if v2 != nil && v1 != nil && !modifyinfo[k] {
  96. update[k] = v1
  97. } else if v2 != nil && v1 == nil && !modifyinfo[k] { //
  98. if k == "s_subscopeclass" && del["subscopeclass"] == nil {
  99. continue
  100. } else if k == "s_topscopeclass" && del["topscopeclass"] == nil {
  101. continue
  102. }
  103. del[k] = 1
  104. //qutil.Debug("抽取结果没有值,bidding有值:field--", k, "val--", v2)
  105. }
  106. }
  107. if util.IntAll(compare["repeat"]) == 1 {
  108. update["extracttype"] = -1
  109. } else {
  110. update["extracttype"] = 1
  111. }
  112. break
  113. } else {
  114. if cid < tid {
  115. bnil = false
  116. compare = nil
  117. continue
  118. } else {
  119. bnil = true
  120. break
  121. }
  122. }
  123. } else {
  124. bnil = false
  125. break
  126. }
  127. }
  128. //下面可以多线程跑的--->
  129. //处理分类
  130. mpool <- true
  131. _id := tmp["_id"]
  132. go func(tmp, update, compare, del map[string]interface{}, bnil bool) {
  133. defer func() {
  134. <-mpool
  135. }()
  136. if !bnil && compare != nil && len(compare) > 0 {
  137. FieldMethod(compare, update)
  138. compare = nil
  139. } else {
  140. area := util.ObjToString(tmp["area"])
  141. city := util.ObjToString(tmp["city"])
  142. district := util.ObjToString(tmp["district"])
  143. UpdatesLock.Lock()
  144. rdata := standardCheckCity(area, city, district)
  145. UpdatesLock.Unlock()
  146. if len(rdata) > 0 {
  147. for k, v := range rdata {
  148. update[k] = v
  149. }
  150. }
  151. }
  152. //------------------对比结束
  153. //同时保存到elastic
  154. for tk, tv := range update {
  155. tmp[tk] = tv
  156. }
  157. if tmp["s_winner"] != "" {
  158. cid := FieldFun(tmp)
  159. if len(cid) > 0 {
  160. tmp["entidlist"] = cid
  161. update["entidlist"] = cid
  162. tmp_up := []map[string]interface{}{}
  163. tmp_up = append(tmp_up, map[string]interface{}{"_id": tmp["_id"]})
  164. tmp_up = append(tmp_up, map[string]interface{}{"$set": map[string]interface{}{"entidlist": cid}})
  165. updateExtractPool <- tmp_up
  166. }
  167. }
  168. clearMap(tmp)
  169. //go IS.Add("bidding")
  170. UpdatesLock.Lock()
  171. if util.IntAll(update["extracttype"]) != -1 {
  172. newTmp := GetEsField(tmp, update, t.stype)
  173. saveEsPool <- newTmp
  174. }
  175. if len(update) > 0 {
  176. delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除
  177. if len(del) > 0 { //删除的数据
  178. updateBiddingPool <- []map[string]interface{}{{
  179. "_id": tmp["_id"],
  180. },
  181. {"$set": update, "$unset": del},
  182. }
  183. } else {
  184. updateBiddingPool <- []map[string]interface{}{{
  185. "_id": tmp["_id"],
  186. },
  187. {"$set": update},
  188. }
  189. }
  190. }
  191. UpdatesLock.Unlock()
  192. }(tmp, update, compare, del, bnil)
  193. if n%20000 == 0 {
  194. util.Debug("current:", n, _id)
  195. }
  196. tmp = make(map[string]interface{})
  197. }
  198. util.Debug(mapInfo, "create bidding index...over", n)
  199. }
  200. //城市标准校验
  201. func standardCheckCity(area string, city string, district string) map[string]string {
  202. rdata := make(map[string]string)
  203. if area == "香港" || area == "澳门" || area == "台湾" || (area == "全国" && (city == "" && district == "")) {
  204. return rdata
  205. }
  206. //第一步:区校验
  207. if district != "" {
  208. districtArr := DistrictDict[district]
  209. if districtArr == nil { //涉及了 个别别名相关的数据
  210. trim_arr := aliasDataDistrict(district) //拆分后缀
  211. if len(trim_arr) > 0 {
  212. for _, alias_district := range trim_arr {
  213. alias_districtArr := DistrictDict[alias_district]
  214. for _, v := range alias_districtArr {
  215. if city == v.C_Name && area == v.P_Name {
  216. rdata["district"] = alias_district
  217. return rdata
  218. }
  219. }
  220. }
  221. }
  222. rdata["district"] = ""
  223. } else {
  224. isTrue := false
  225. for _, v := range districtArr {
  226. if city == v.C_Name && area == v.P_Name {
  227. isTrue = true
  228. break
  229. }
  230. }
  231. if isTrue { //完全匹配
  232. return rdata
  233. } else { //未完全匹配
  234. if len(districtArr) == 1 {
  235. rdata["area"] = districtArr[0].P_Name
  236. rdata["city"] = districtArr[0].C_Name
  237. rdata["district"] = districtArr[0].D_Name
  238. return rdata
  239. } else {
  240. rdata["district"] = ""
  241. }
  242. }
  243. }
  244. }
  245. //第二步:区校验-失败 市-校验
  246. if city != "" {
  247. cityArr := CityDict[city]
  248. if cityArr == nil {
  249. //把市当成区,匹配三级 - 存在优化空间- city:郑州 别名
  250. districtArr := DistrictDict[city]
  251. for _, v := range districtArr {
  252. if city == v.C_Name && area == v.P_Name {
  253. rdata["area"] = districtArr[0].P_Name
  254. rdata["city"] = districtArr[0].C_Name
  255. rdata["district"] = districtArr[0].D_Name
  256. return rdata
  257. }
  258. }
  259. rdata["city"] = ""
  260. } else {
  261. isTrue := false
  262. for _, v := range cityArr {
  263. if area == v.P_Name {
  264. isTrue = true
  265. break
  266. }
  267. }
  268. if isTrue { //完全匹配
  269. return rdata
  270. } else { //未完全匹配
  271. if len(cityArr) == 1 {
  272. rdata["area"] = cityArr[0].P_Name
  273. rdata["city"] = cityArr[0].C_Name
  274. rdata["district"] = ""
  275. return rdata
  276. } else {
  277. rdata["city"] = ""
  278. }
  279. }
  280. }
  281. }
  282. //第三步:省份校验
  283. if ProvinceDict[area] == nil {
  284. rdata["area"] = "全国"
  285. rdata["city"] = ""
  286. rdata["district"] = ""
  287. }
  288. return rdata
  289. }
  290. var cityEndReg = regexp.MustCompile("(区|县|市)$")
  291. //拆分三级县
  292. func aliasDataDistrict(district string) []string {
  293. arr := []string{}
  294. if cityEndReg.MatchString(district) {
  295. str := cityEndReg.FindString(district)
  296. strings.TrimRight(district, str)
  297. if str == "县" {
  298. arr = append(arr, fmt.Sprintf("%s区", strings.TrimRight(district, str)))
  299. arr = append(arr, fmt.Sprintf("%s市", strings.TrimRight(district, str)))
  300. } else if str == "区" {
  301. arr = append(arr, fmt.Sprintf("%s县", strings.TrimRight(district, str)))
  302. arr = append(arr, fmt.Sprintf("%s市", strings.TrimRight(district, str)))
  303. } else if str == "市" {
  304. arr = append(arr, fmt.Sprintf("%s县", strings.TrimRight(district, str)))
  305. arr = append(arr, fmt.Sprintf("%s区", strings.TrimRight(district, str)))
  306. } else {
  307. }
  308. } else { //未找到 district- 区县市 例: district : 金水
  309. arr = append(arr, fmt.Sprintf("%s区", district))
  310. arr = append(arr, fmt.Sprintf("%s县", district))
  311. arr = append(arr, fmt.Sprintf("%s市", district))
  312. }
  313. return arr
  314. }