task.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "app.yhyue.com/data_processing/common_utils/udp"
  8. "encoding/json"
  9. "field_sync/config"
  10. "field_sync/oss"
  11. "fmt"
  12. "go.mongodb.org/mongo-driver/bson"
  13. "go.uber.org/zap"
  14. "net"
  15. "reflect"
  16. "regexp"
  17. "strings"
  18. "time"
  19. )
  20. var (
  21. regLetter = regexp.MustCompile("[a-z]*")
  22. cityEndReg = regexp.MustCompile("(区|县|市)$")
  23. ProvinceDict map[string][]Province //省份-map
  24. CityDict map[string][]City //城市-map
  25. DistrictDict map[string][]District //区县-map
  26. )
  27. func biddingTask(data []byte, mapInfo map[string]interface{}) {
  28. defer util.Catch()
  29. stype := util.ObjToString(mapInfo["stype"])
  30. if stype == "bidding" {
  31. uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])},
  32. "lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}}
  33. MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 7, "updatetime": time.Now().Unix()}}, false, true)
  34. }
  35. // 领域标签处理的数据 id段
  36. if stype == "bidding_history" {
  37. MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0})
  38. }
  39. q, _ := mapInfo["query"].(map[string]interface{})
  40. bkey, _ := mapInfo["bkey"].(string)
  41. if q == nil {
  42. q = map[string]interface{}{
  43. "_id": map[string]interface{}{
  44. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  45. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  46. },
  47. }
  48. }
  49. //extract库
  50. extractConn := MgoE.GetMgoConn()
  51. defer MgoE.DestoryMongoConn(extractConn)
  52. extractResult := extractConn.DB(MgoE.DbName).C(config.Conf.DB.MongoE.Coll).Find(q).Select(map[string]interface{}{
  53. "field_source": 0,
  54. "kvtext": 0,
  55. }).Sort("_id").Iter()
  56. eMap := map[string]map[string]interface{}{}
  57. extCount, repeatCount := 0, 0
  58. for tmp := make(map[string]interface{}); extractResult.Next(tmp); extCount++ {
  59. if util.IntAll(tmp["repeat"]) == 1 {
  60. repeatCount++
  61. }
  62. tid := mongodb.BsonIdToSId(tmp["_id"])
  63. eMap[tid] = tmp
  64. tmp = make(map[string]interface{})
  65. }
  66. log.Info("抽取表", zap.Int("数据量", extCount), zap.Int("重复数据量", repeatCount))
  67. //bidding库
  68. biddingConn := MgoB.GetMgoConn()
  69. count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count()
  70. log.Info("bidding表", zap.Int64("同步总数:", count))
  71. c := 0
  72. if count < 500000 {
  73. var res []map[string]interface{}
  74. result := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(map[string]interface{}{
  75. "contenthtml": 0,
  76. }).Iter()
  77. for tmp := make(map[string]interface{}); result.Next(tmp); {
  78. res = append(res, tmp)
  79. tmp = make(map[string]interface{})
  80. }
  81. MgoB.DestoryMongoConn(biddingConn)
  82. log.Info("查询结果", zap.Int64("bidding", count), zap.Int("抽取:", extCount))
  83. c = doIndex(res, eMap, bkey, stype)
  84. } else {
  85. log.Info("查询结果", zap.Int64("数据量太大,放弃", count))
  86. MgoB.DestoryMongoConn(biddingConn)
  87. }
  88. log.Info("bidding sync...over", zap.Int64("all", count), zap.Int("extract sync", c))
  89. NextNode(mapInfo, stype)
  90. NextNodePro(mapInfo, stype)
  91. NextNodeTidb(mapInfo, stype)
  92. if stype == "bidding_history" {
  93. NextNodeBidData(mapInfo) // bidding-data数据
  94. NextNodeTidbQyxy(mapInfo) // tidb-企业数据
  95. NextNodeHn(mapInfo)
  96. }
  97. }
  98. func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
  99. defer util.Catch()
  100. q, _ := mapInfo["query"].(map[string]interface{})
  101. if q == nil {
  102. q = map[string]interface{}{
  103. "_id": map[string]interface{}{
  104. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  105. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  106. },
  107. }
  108. }
  109. //extract库
  110. extractConn := MgoE.GetMgoConn()
  111. defer MgoE.DestoryMongoConn(extractConn)
  112. extractResult := extractConn.DB(MgoE.DbName).C(config.Conf.DB.MongoE.Coll).Find(q).Select(map[string]interface{}{
  113. "field_source": 0,
  114. "kvtext": 0,
  115. }).Sort("-_id").Iter()
  116. //bidding库
  117. biddingConn := MgoB.GetMgoConn()
  118. defer MgoB.DestoryMongoConn(biddingConn)
  119. count := 0
  120. var compare map[string]interface{}
  121. result := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(map[string]interface{}{
  122. "contenthtml": 0,
  123. "field_source": 0,
  124. }).Sort("-_id").Iter()
  125. for tmp := make(map[string]interface{}); result.Next(tmp); count++ {
  126. update := map[string]interface{}{}
  127. del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段
  128. //对比方法----------------
  129. for {
  130. if compare == nil {
  131. compare = make(map[string]interface{})
  132. if !extractResult.Next(compare) {
  133. break
  134. }
  135. }
  136. if compare != nil {
  137. cid := mongodb.BsonIdToSId(compare["_id"])
  138. tid := mongodb.BsonIdToSId(tmp["_id"])
  139. if cid == tid {
  140. //更新bidding表;bidding表modifyinfo中的字段不更新
  141. modifyinfo := make(map[string]bool)
  142. if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil {
  143. for k := range tmpmodifyinfo {
  144. modifyinfo[k] = true
  145. }
  146. }
  147. for _, k := range config.Conf.Serve.FieldS {
  148. v1 := compare[k] //extract
  149. v2 := tmp[k] //bidding
  150. if v2 == nil && v1 != nil {
  151. update[k] = v1
  152. } else if v2 != nil && v1 != nil && !modifyinfo[k] {
  153. update[k] = v1
  154. } else if v2 != nil && v1 == nil && !modifyinfo[k] {
  155. if k == "s_subscopeclass" && del["subscopeclass"] == nil {
  156. continue
  157. } else if k == "s_topscopeclass" && del["topscopeclass"] == nil {
  158. continue
  159. }
  160. del[k] = 1
  161. //util.Debug("抽取结果没有值,bidding有值:field--", k, "val--", v2)
  162. }
  163. }
  164. //if util.IntAll(compare["repeat"]) == 1 {
  165. // update["extracttype"] = -1
  166. // update["dataprocess"] = 7
  167. //} else {
  168. // update["extracttype"] = 1
  169. // update["dataprocess"] = 8
  170. //}
  171. break
  172. } else {
  173. if cid < tid {
  174. compare = nil
  175. continue
  176. } else {
  177. break
  178. }
  179. }
  180. } else {
  181. break
  182. }
  183. }
  184. //------------------对比结束
  185. //处理分类
  186. //if compare != nil { //extract
  187. // fieldFun(compare, update)
  188. // compare = nil
  189. //}
  190. // 城市标准化
  191. if update["area"] != nil || update["city"] != nil || update["district"] != nil {
  192. rdata := standardCheckCity(util.ObjToString(tmp["area"]), util.ObjToString(tmp["city"]), util.ObjToString(tmp["district"]))
  193. if len(rdata) > 0 {
  194. for k, v := range rdata {
  195. if v != "" {
  196. delete(update, v)
  197. del[v] = 1
  198. } else {
  199. delete(del, k)
  200. update[k] = v
  201. }
  202. }
  203. }
  204. }
  205. // entidlist
  206. extractMap := make(map[string]interface{})
  207. if update["s_winner"] != "" {
  208. cid := companyFun(update)
  209. if len(cid) > 0 {
  210. update["entidlist"] = cid
  211. extractMap["entidlist"] = cid
  212. }
  213. }
  214. //if len(extractMap) > 0 {
  215. // updateExtPool <- []map[string]interface{}{
  216. // {"_id": tmp["_id"]},
  217. // {"$set": extractMap},
  218. // }
  219. //}
  220. // 附件有效字段
  221. //if i := validFile(tmp); i != 0 {
  222. // if i == -1 {
  223. // update["isValidFile"] = false
  224. // } else {
  225. // update["isValidFile"] = true
  226. // }
  227. //}
  228. if len(update) > 0 {
  229. if len(del) > 0 { //删除的字段
  230. updateBidPool <- []map[string]interface{}{{
  231. "_id": tmp["_id"],
  232. },
  233. {"$set": update, "$unset": del},
  234. }
  235. } else {
  236. updateBidPool <- []map[string]interface{}{{
  237. "_id": tmp["_id"],
  238. },
  239. {"$set": update},
  240. }
  241. }
  242. }
  243. if count%50000 == 0 {
  244. log.Info("biddingTask", zap.Int("current", count))
  245. }
  246. tmp = make(map[string]interface{})
  247. }
  248. log.Info("biddingAll sync...over", zap.Int("all", count))
  249. }
  250. func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, bkey, stype string) int {
  251. syncNo := 0 //抽取表数据同步数量
  252. //对比两张表数据,减少查询次数
  253. var compare map[string]interface{}
  254. var bidUpdate [][]map[string]interface{}
  255. var extUpdate [][]map[string]interface{}
  256. //SaveEsLock := &sync.Mutex{}
  257. log.Info("start ...")
  258. for n, tmp := range infos {
  259. tid := mongodb.BsonIdToSId(tmp["_id"])
  260. update := map[string]interface{}{} //要更新的mongo数据
  261. //对比方法----------------
  262. if eMap[tid] != nil {
  263. compare = eMap[tid]
  264. if stype == "bidding" {
  265. // 增量id段 正常数据
  266. if dg := util.IntAll(compare["dataging"]); dg == 1 { //extract中dataging=1跳过
  267. tmp = make(map[string]interface{})
  268. compare = nil
  269. continue
  270. }
  271. delete(eMap, tid)
  272. }
  273. if stype == "bidding_history" {
  274. //增量id段 历史数据
  275. if compare["history_updatetime"] == nil { //extract中history_updatetime不存在跳过
  276. tmp = make(map[string]interface{})
  277. compare = nil
  278. continue
  279. }
  280. delete(eMap, tid)
  281. }
  282. syncNo++
  283. for _, k := range config.Conf.Serve.FieldS {
  284. v1 := compare[k] //extract
  285. v2 := tmp[k] //bidding
  286. if v2 == nil && v1 != nil {
  287. update[k] = v1
  288. } else if v2 != nil && v1 != nil {
  289. update[k] = v1
  290. } else if v2 != nil && v1 == nil {
  291. if k == "city" || k == "district" {
  292. update[k] = ""
  293. }
  294. }
  295. }
  296. if util.IntAll(compare["repeat"]) == 1 {
  297. update["extracttype"] = -1
  298. update["dataprocess"] = 7
  299. } else {
  300. update["extracttype"] = 1
  301. update["dataprocess"] = 8
  302. }
  303. } else {
  304. compare = nil
  305. if util.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging
  306. update["dataging"] = 0
  307. }
  308. update["dataprocess"] = 8
  309. }
  310. //下面可以多线程跑的--->
  311. //处理分类
  312. if compare != nil { //extract
  313. fieldFun(compare, update)
  314. compare = nil
  315. }
  316. //------------------对比结束
  317. //处理key descript
  318. if bkey == "" {
  319. DealInfo(&tmp, &update)
  320. }
  321. // entidlist
  322. extractMap := make(map[string]interface{})
  323. if update["s_winner"] != "" {
  324. cid := companyFun(update)
  325. if len(cid) > 0 {
  326. tmp["entidlist"] = cid
  327. update["entidlist"] = cid
  328. extractMap["entidlist"] = cid
  329. }
  330. }
  331. // 6.10 剑鱼发布信息分类处理, 写在这里是为了修改抽取表
  332. typeFunc(tmp, update, extractMap)
  333. if len(extractMap) > 0 {
  334. if extractMap["toptype"] != nil && extractMap["subtype"] == nil {
  335. //updateExtPool <- []map[string]interface{}{
  336. // {"_id": tmp["_id"]},
  337. // {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
  338. //}
  339. extUpdate = append(extUpdate, []map[string]interface{}{
  340. {"_id": tmp["_id"]},
  341. {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
  342. })
  343. } else {
  344. //updateExtPool <- []map[string]interface{}{
  345. // {"_id": tmp["_id"]},
  346. // {"$set": extractMap},
  347. //}
  348. extUpdate = append(extUpdate, []map[string]interface{}{
  349. {"_id": tmp["_id"]},
  350. {"$set": extractMap},
  351. })
  352. }
  353. if len(extUpdate) >= MgoBulkSize {
  354. tmps := extUpdate
  355. MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
  356. extUpdate = [][]map[string]interface{}{}
  357. }
  358. }
  359. // 附件有效字段
  360. if i := validFile(tmp); i != 0 {
  361. if i == -1 {
  362. tmp["isValidFile"] = false
  363. update["isValidFile"] = false
  364. } else {
  365. tmp["isValidFile"] = true
  366. update["isValidFile"] = true
  367. }
  368. }
  369. if len(update) > 0 {
  370. //SaveEsLock.Lock()
  371. bidUpdate = append(bidUpdate, []map[string]interface{}{{
  372. "_id": tmp["_id"],
  373. },
  374. {"$set": update},
  375. })
  376. if len(bidUpdate) >= MgoBulkSize {
  377. tmps := bidUpdate
  378. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
  379. bidUpdate = [][]map[string]interface{}{}
  380. }
  381. //SaveEsLock.Unlock()
  382. //updateBidPool <- []map[string]interface{}{{
  383. // "_id": tmp["_id"],
  384. //},
  385. // {"$set": update},
  386. //}
  387. }
  388. if n%500 == 0 {
  389. log.Info("biddingTask", zap.Int("current", n))
  390. }
  391. tmp = make(map[string]interface{})
  392. }
  393. //SaveEsLock.Lock()
  394. if len(bidUpdate) > 0 {
  395. tmps := bidUpdate
  396. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
  397. bidUpdate = [][]map[string]interface{}{}
  398. }
  399. if len(extUpdate) > 0 {
  400. tmps := extUpdate
  401. MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
  402. extUpdate = [][]map[string]interface{}{}
  403. }
  404. //SaveEsLock.Unlock()
  405. return syncNo
  406. }
  407. // @Description subscopeclass、topscopeclass、package
  408. // @Author J 2022/6/7 5:54 PM
  409. func fieldFun(compare, update map[string]interface{}) {
  410. subscopeclass, _ := compare["subscopeclass"].([]interface{}) //subscopeclass
  411. if subscopeclass != nil {
  412. m1 := map[string]bool{}
  413. newclass := []string{}
  414. for _, sc := range subscopeclass {
  415. sclass, _ := sc.(string)
  416. if !m1[sclass] {
  417. m1[sclass] = true
  418. newclass = append(newclass, sclass)
  419. }
  420. }
  421. update["s_subscopeclass"] = strings.Join(newclass, ",")
  422. update["subscopeclass"] = newclass
  423. }
  424. topscopeclass, _ := compare["topscopeclass"].([]interface{}) //topscopeclass
  425. if topscopeclass != nil {
  426. m2 := map[string]bool{}
  427. newclass := []string{}
  428. for _, tc := range topscopeclass {
  429. tclass, _ := tc.(string)
  430. tclass = regLetter.ReplaceAllString(tclass, "") // 去除字母
  431. if !m2[tclass] {
  432. m2[tclass] = true
  433. newclass = append(newclass, tclass)
  434. }
  435. }
  436. update["topscopeclass"] = topscopeclass
  437. update["s_topscopeclass"] = strings.Join(newclass, ",")
  438. }
  439. if package1 := compare["package"]; package1 != nil {
  440. packageM, _ := package1.(map[string]interface{})
  441. update["package"] = packageM
  442. for _, p := range packageM {
  443. pm, _ := p.(map[string]interface{})
  444. if util.ObjToString(pm["winner"]) != "" || util.Float64All(pm["budget"]) > 0 ||
  445. util.Float64All(pm["bidamount"]) > 0 {
  446. update["multipackage"] = 1
  447. break
  448. }
  449. }
  450. } else {
  451. update["multipackage"] = 0
  452. }
  453. }
  454. // @Description entidlist
  455. // @Author J 2022/6/7 2:36 PM
  456. func companyFun(tmp map[string]interface{}) (cid []string) {
  457. sWinnerarr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  458. for _, w := range sWinnerarr {
  459. if w != "" {
  460. id := redis.GetStr("qyxy_id", w)
  461. if id == "" {
  462. ents, _ := MgoQ.Find(config.Conf.DB.MongoQ.Coll, map[string]interface{}{"company_name": w}, map[string]interface{}{"updatetime": -1}, map[string]interface{}{"company_name": 1}, false, -1, -1)
  463. if len(*ents) > 0 {
  464. id = util.ObjToString((*ents)[0]["_id"])
  465. redis.PutCKV("qyxy_id", w, id)
  466. } else {
  467. ent, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, map[string]interface{}{"history_name": w})
  468. if len(*ent) > 0 {
  469. id = util.ObjToString((*ent)["company_id"])
  470. redis.PutCKV("qyxy_id", w, id)
  471. }
  472. }
  473. }
  474. if id == "" {
  475. id = "-"
  476. }
  477. cid = append(cid, id)
  478. }
  479. }
  480. return cid
  481. }
  482. // @Description update 修改bidding表,extractM修改抽取表
  483. // @Author J 2022/6/10 10:29 AM
  484. func typeFunc(tmp, update, extractM map[string]interface{}) {
  485. if jyData, ok := tmp["jyfb_data"].(map[string]interface{}); ok {
  486. if t := util.ObjToString(jyData["type"]); t != "" {
  487. switch t {
  488. //case "采购信息":
  489. case "招标公告":
  490. if util.ObjToString(tmp["toptype"]) != "招标" {
  491. update["toptype"] = "招标"
  492. extractM["toptype"] = "招标"
  493. delete(update, "subtype")
  494. }
  495. case "采购意向":
  496. if util.ObjToString(tmp["toptype"]) != "采购意向" {
  497. update["toptype"] = "采购意向"
  498. update["subtype"] = "采购意向"
  499. extractM["toptype"] = "采购意向"
  500. extractM["subtype"] = "采购意向"
  501. }
  502. case "招标预告":
  503. if util.ObjToString(tmp["toptype"]) != "预告" {
  504. update["toptype"] = "预告"
  505. extractM["toptype"] = "预告"
  506. delete(update, "subtype")
  507. }
  508. case "招标结果":
  509. if util.ObjToString(tmp["toptype"]) != "结果" {
  510. update["toptype"] = "结果"
  511. extractM["toptype"] = "结果"
  512. delete(update, "subtype")
  513. }
  514. }
  515. }
  516. }
  517. }
  518. // @Description 附件有效字段(isValidFile)
  519. // @Author J 2022/7/8 14:41
  520. func validFile(tmp map[string]interface{}) int {
  521. isContinue := false
  522. if pinfo, o := tmp["projectinfo"].(map[string]interface{}); o {
  523. if atts, o1 := pinfo["attachments"].(map[string]interface{}); o1 {
  524. for _, att := range atts {
  525. if att == nil {
  526. continue
  527. }
  528. if reflect.TypeOf(att).String() == "string" {
  529. continue
  530. }
  531. att1 := att.(map[string]interface{})
  532. if fid := util.ObjToString(att1["fid"]); fid != "" {
  533. isContinue = true
  534. break
  535. }
  536. }
  537. if isContinue {
  538. if attachTxt, o := tmp["attach_text"].(map[string]interface{}); o {
  539. if len(attachTxt) > 0 {
  540. for _, at := range attachTxt {
  541. at1 := at.(map[string]interface{})
  542. if len(at1) > 0 {
  543. for k, _ := range at1 {
  544. if reflect.TypeOf(at1[k]).String() == "string" {
  545. continue
  546. }
  547. at2 := at1[k].(map[string]interface{})
  548. s := strings.ToLower(util.ObjToString(at2["file_name"]))
  549. if !strings.Contains(s, "jpg") || !strings.Contains(s, "jpeg") != strings.Contains(s, "png") ||
  550. strings.Contains(s, "pdf") {
  551. if strings.Contains(s, "swf") || strings.Contains(s, "html") {
  552. return -1
  553. } else if AnalysisFile(oss.OssGetObject(util.ObjToString(at2["attach_url"]))) {
  554. return 1
  555. }
  556. }
  557. }
  558. break
  559. } else {
  560. break
  561. }
  562. }
  563. }
  564. }
  565. flag := false
  566. for _, att := range atts {
  567. if att == nil {
  568. continue
  569. }
  570. if reflect.TypeOf(att).String() == "string" {
  571. continue
  572. }
  573. att1 := att.(map[string]interface{})
  574. if fid := util.ObjToString(att1["fid"]); fid != "" {
  575. ftype := strings.ToLower(util.ObjToString(tmp["ftype"]))
  576. if ftype != "swf" && ftype != "html" && oss.OssObjExists("jy-datafile", fid) {
  577. return 1
  578. } else {
  579. flag = true
  580. }
  581. }
  582. }
  583. if flag {
  584. return -1
  585. }
  586. }
  587. }
  588. }
  589. return 0
  590. }
  591. // @Description id不变,内容变化 重新索引数据
  592. // @Author J 2022/8/10 13:29
  593. func taskinfo(id string) {
  594. tmp, _ := MgoB.FindById("bidding", id, nil)
  595. if tmp == nil || len(*tmp) == 0 {
  596. log.Info(fmt.Sprintf("taskinfo bidding id=%s 未查询到数据", id))
  597. return
  598. }
  599. extractM, _ := MgoE.FindById(config.Conf.DB.MongoE.Coll, id, nil)
  600. if extractM == nil || len(*extractM) == 0 {
  601. extractM, _ = MgoE.FindById(config.Conf.DB.MongoE.Coll1, id, nil)
  602. if extractM == nil || len(*extractM) == 0 {
  603. log.Info(fmt.Sprintf("taskinfo extract id=%s 未查询到数据", id))
  604. return
  605. }
  606. }
  607. update := map[string]interface{}{} //要更新的mongo数据
  608. //更新bidding表字段
  609. for _, k := range config.Conf.Serve.FieldS {
  610. v1 := (*extractM)[k] //extract
  611. v2 := (*tmp)[k] //bidding
  612. if v2 == nil && v1 != nil {
  613. update[k] = v1
  614. } else if v2 != nil && v1 != nil {
  615. update[k] = v1
  616. } else if v2 != nil && v1 == nil {
  617. if k == "city" || k == "district" {
  618. update[k] = ""
  619. }
  620. }
  621. }
  622. if util.IntAll((*extractM)["repeat"]) == 1 {
  623. update["extracttype"] = -1
  624. update["dataprocess"] = 7
  625. } else {
  626. update["extracttype"] = 1
  627. update["dataprocess"] = 8
  628. }
  629. //处理分类
  630. fieldFun(*extractM, update)
  631. extractMap := make(map[string]interface{})
  632. if util.ObjToString((*tmp)["s_winner"]) != "" {
  633. cid := companyFun(*tmp)
  634. if len(cid) > 0 {
  635. update["entidlist"] = cid
  636. extractMap["entidlist"] = cid
  637. }
  638. MgoE.UpdateById(config.Conf.DB.MongoE.Coll, id, map[string]interface{}{"$set": extractMap})
  639. }
  640. // 附件有效字段
  641. if i := validFile(*tmp); i != 0 {
  642. if i == -1 {
  643. update["isValidFile"] = false
  644. } else {
  645. update["isValidFile"] = true
  646. }
  647. }
  648. if len(update) > 0 {
  649. MgoB.UpdateById(config.Conf.DB.MongoB.Coll, id, map[string]interface{}{"$set": update})
  650. }
  651. mapinfo := map[string]interface{}{
  652. "infoid": id,
  653. "stype": "index-by-id",
  654. }
  655. datas, _ := json.Marshal(mapinfo)
  656. var next = &net.UDPAddr{
  657. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  658. Port: util.IntAll(config.Conf.Udp.Next.Port),
  659. }
  660. log.Info("nsq data over", zap.Any("es", next), zap.String("mapinfo", string(datas)))
  661. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  662. }
  663. //城市标准校验
  664. func standardCheckCity(area string, city string, district string) map[string]string {
  665. rdata := make(map[string]string)
  666. if area == "香港" || area == "澳门" || area == "台湾" || (area == "全国" && (city == "" && district == "")) {
  667. return rdata
  668. }
  669. //第一步:区校验
  670. if district != "" {
  671. districtArr := DistrictDict[district]
  672. if districtArr == nil { //涉及了 个别别名相关的数据
  673. trim_arr := aliasDataDistrict(district) //拆分后缀
  674. if len(trim_arr) > 0 {
  675. for _, alias_district := range trim_arr {
  676. alias_districtArr := DistrictDict[alias_district]
  677. for _, v := range alias_districtArr {
  678. if city == v.C_Name && area == v.P_Name {
  679. rdata["district"] = alias_district
  680. return rdata
  681. }
  682. }
  683. }
  684. }
  685. rdata["district"] = ""
  686. } else {
  687. isTrue := false
  688. for _, v := range districtArr {
  689. if city == v.C_Name && area == v.P_Name {
  690. isTrue = true
  691. break
  692. }
  693. }
  694. if isTrue { //完全匹配
  695. return rdata
  696. } else { //未完全匹配
  697. if len(districtArr) == 1 {
  698. rdata["area"] = districtArr[0].P_Name
  699. rdata["city"] = districtArr[0].C_Name
  700. rdata["district"] = districtArr[0].D_Name
  701. return rdata
  702. } else {
  703. rdata["district"] = ""
  704. }
  705. }
  706. }
  707. }
  708. //第二步:区校验-失败 市-校验
  709. if city != "" {
  710. cityArr := CityDict[city]
  711. if cityArr == nil {
  712. //把市当成区,匹配三级 - 存在优化空间- city:郑州 别名
  713. districtArr := DistrictDict[city]
  714. for _, v := range districtArr {
  715. if city == v.C_Name && area == v.P_Name {
  716. rdata["area"] = districtArr[0].P_Name
  717. rdata["city"] = districtArr[0].C_Name
  718. rdata["district"] = districtArr[0].D_Name
  719. return rdata
  720. }
  721. }
  722. rdata["city"] = ""
  723. } else {
  724. isTrue := false
  725. for _, v := range cityArr {
  726. if area == v.P_Name {
  727. isTrue = true
  728. break
  729. }
  730. }
  731. if isTrue { //完全匹配
  732. return rdata
  733. } else { //未完全匹配
  734. if len(cityArr) == 1 {
  735. rdata["area"] = cityArr[0].P_Name
  736. rdata["city"] = cityArr[0].C_Name
  737. rdata["district"] = ""
  738. return rdata
  739. } else {
  740. rdata["city"] = ""
  741. }
  742. }
  743. }
  744. }
  745. //第三步:省份校验
  746. if ProvinceDict[area] == nil {
  747. rdata["area"] = "全国"
  748. rdata["city"] = ""
  749. rdata["district"] = ""
  750. }
  751. return rdata
  752. }
  753. //拆分三级县
  754. func aliasDataDistrict(district string) []string {
  755. arr := []string{}
  756. if cityEndReg.MatchString(district) {
  757. str := cityEndReg.FindString(district)
  758. strings.TrimRight(district, str)
  759. if str == "县" {
  760. arr = append(arr, fmt.Sprintf("%s区", strings.TrimRight(district, str)))
  761. arr = append(arr, fmt.Sprintf("%s市", strings.TrimRight(district, str)))
  762. } else if str == "区" {
  763. arr = append(arr, fmt.Sprintf("%s县", strings.TrimRight(district, str)))
  764. arr = append(arr, fmt.Sprintf("%s市", strings.TrimRight(district, str)))
  765. } else if str == "市" {
  766. arr = append(arr, fmt.Sprintf("%s县", strings.TrimRight(district, str)))
  767. arr = append(arr, fmt.Sprintf("%s区", strings.TrimRight(district, str)))
  768. } else {
  769. }
  770. } else { //未找到 district- 区县市 例: district : 金水
  771. arr = append(arr, fmt.Sprintf("%s区", district))
  772. arr = append(arr, fmt.Sprintf("%s县", district))
  773. arr = append(arr, fmt.Sprintf("%s市", district))
  774. }
  775. return arr
  776. }