main.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. es7 "github.com/olivere/elastic/v7"
  7. "github.com/wcc4869/common_utils/log"
  8. "go.uber.org/zap"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "reflect"
  14. "strings"
  15. "time"
  16. )
  17. var (
  18. Mgo *mongodb.MongodbSim
  19. MgoB *mongodb.MongodbSim
  20. MgoBAi *mongodb.MongodbSim
  21. MgoT *mongodb.MongodbSim //测试环境链接
  22. MgoR *mongodb.MongodbSim
  23. saveSize = 50
  24. Es *elastic.Elastic // 19908
  25. EsNew *elastic.Elastic //19905
  26. EsT *elastic.Elastic
  27. // 更新mongo
  28. updatePool = make(chan []map[string]interface{}, 5000)
  29. updateSp = make(chan bool, 5)
  30. //更新es
  31. updateEsPool = make(chan []map[string]interface{}, 5000)
  32. updateEsSp = make(chan bool, 5) //保存协程
  33. updateProjectEsPool = make(chan []map[string]interface{}, 5000)
  34. updateProjectEsSp = make(chan bool, 5) //保存协程
  35. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  36. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  37. )
  38. func Init() {
  39. MgoB = &mongodb.MongodbSim{
  40. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  41. //MongodbAddr: "127.0.0.1:27083",
  42. DbName: "qfw",
  43. Size: 10,
  44. UserName: "SJZY_RWbid_ES",
  45. Password: "SJZY@B4i4D5e6S",
  46. //Direct: true,
  47. }
  48. MgoB.InitPool()
  49. //MgoBAi = &mongodb.MongodbSim{
  50. // //MongodbAddr: "172.17.189.140:27080",
  51. // MongodbAddr: "127.0.0.1:27083",
  52. // DbName: "qfw_ai",
  53. // Size: 10,
  54. // UserName: "SJZY_RWbid_ES",
  55. // Password: "SJZY@B4i4D5e6S",
  56. // Direct: true,
  57. //}
  58. //MgoBAi.InitPool()
  59. //mongodb 163
  60. //Mgo = &mongodb.MongodbSim{
  61. // //MongodbAddr: "172.17.189.140:27080",
  62. // MongodbAddr: "127.0.0.1:27083",
  63. // DbName: "qfw",
  64. // Size: 10,
  65. // UserName: "SJZY_RWbid_ES",
  66. // Password: "SJZY@B4i4D5e6S",
  67. // Direct: true,
  68. //}
  69. //Mgo.InitPool()
  70. //85
  71. //MgoR = &mongodb.MongodbSim{
  72. // //MongodbAddr: "127.0.0.1:27080",
  73. // MongodbAddr: "172.17.4.85:27080",
  74. // DbName: "qfw",
  75. // Size: 10,
  76. // //Direct: true,
  77. //}
  78. //MgoR.InitPool()
  79. ////测试环境MongoDB
  80. //MgoT = &mongodb.MongodbSim{
  81. // MongodbAddr: "172.20.45.129:27002",
  82. // DbName: "qfw_data",
  83. // Size: 10,
  84. // UserName: "",
  85. // Password: "",
  86. // //Direct: true,
  87. //}
  88. //MgoT.InitPool()
  89. //
  90. //////测试环境es
  91. //EsT = &elastic.Elastic{
  92. // S_esurl: "http://172.20.45.129:9206",
  93. // I_size: 5,
  94. // Username: "",
  95. // Password: "",
  96. //}
  97. //EsT.InitElasticSize()
  98. //es
  99. Es = &elastic.Elastic{
  100. //S_esurl: "http://127.0.0.1:19908",
  101. S_esurl: "http://172.17.4.184:19908",
  102. I_size: 5,
  103. Username: "jybid",
  104. Password: "Top2023_JEB01i@31",
  105. }
  106. Es.InitElasticSize()
  107. //es 新集群
  108. EsNew = &elastic.Elastic{
  109. //S_esurl: "http://127.0.0.1:19905",
  110. S_esurl: "http://172.17.4.184:19905",
  111. I_size: 5,
  112. Username: "jybid",
  113. Password: "Top2023_JEB01i@31",
  114. }
  115. EsNew.InitElasticSize()
  116. }
  117. func main() {
  118. //updatePing()
  119. //return
  120. Init()
  121. //InitEsBiddingField()
  122. go updateMethod() //更新mongodb
  123. go updateEsMethod() //更新es
  124. //go updateEsMethodTest() // 更新测试环境ES
  125. //go updateEsHrefMethod() //更新es href 字段
  126. //go updateProjectEsMethod()
  127. //taskRunProject()
  128. //taskRunBidding()
  129. //dealBidding() //正式环境bidding数据处理
  130. //dealBiddingAi() //正式环境bidding数据处理
  131. //dealBiddingTest() // 测试环境数据处理
  132. //dealBiddingEsHref() // 根据临时表,更新es href 字段
  133. //dealBiddingNiJian() //更新拟建数据中buyer = owner
  134. //updateBiddingBidamount()
  135. //updateProject()
  136. //-------------------------------//
  137. //fixBiddingEs()
  138. //updateBiddingType() //更新标讯分类
  139. //updateBiddingisValidFile() //更新bidding isValidFile字段
  140. //updateBiddingTypeBySpidecode() //更新bidding ;根据spidecode 字段
  141. //updateBiddingBasicClass() //更新 存量数据 basicClass 字段
  142. //updateBiddingBasicClassTest() //更新测试环境 basicClass 字段
  143. updateBiddingToptype() // 更新招标分类结果
  144. log.Info("over")
  145. //c := make(chan bool, 1)
  146. //<-c
  147. }
  148. // fixBiddingEs 修复bidding 索引数据,
  149. func fixBiddingEs() {
  150. defer util.Catch()
  151. sess := MgoR.GetMgoConn()
  152. defer MgoR.DestoryMongoConn(sess)
  153. where := map[string]interface{}{
  154. "_id": map[string]interface{}{
  155. "$gte": mongodb.StringTOBsonId("6847fc265f834436f08ef4fe"),
  156. "$lte": mongodb.StringTOBsonId("6848e42b5f834436f092f645"),
  157. },
  158. }
  159. it := sess.DB("qfw").C("result_20220219").Find(where).Select(nil).Iter()
  160. count := 0
  161. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  162. if count%1000 == 0 {
  163. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  164. }
  165. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  166. repeat := util.IntAll(tmp["repeat"])
  167. repeat_reason := util.ObjToString(tmp["repeat_reason"])
  168. if repeat == 1 && strings.Contains(repeat_reason, "采集源重复") {
  169. Es.DeleteByID("bidding", biddingID)
  170. log.Info("fixBiddingEs", zap.String("biddingID", biddingID))
  171. EsNew.DeleteByID("bidding", biddingID)
  172. }
  173. }
  174. }
  175. func InitEsBiddingField() {
  176. now := time.Now()
  177. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  178. if len(*info) > 0 {
  179. for _, m := range *info {
  180. if util.IntAll(m["level"]) == 1 {
  181. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  182. } else if util.IntAll(m["level"]) == 2 {
  183. pfield := util.ObjToString(m["pfield"])
  184. pfieldMap := BiddingLevelField[pfield]
  185. if pfieldMap == nil {
  186. pfieldMap = make(map[string]string, 0)
  187. }
  188. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  189. BiddingLevelField[pfield] = pfieldMap
  190. }
  191. }
  192. }
  193. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  194. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  195. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  196. }
  197. // taskRun 更新es 省市区三个字段
  198. func taskRunBidding() {
  199. defer util.Catch()
  200. sess := MgoB.GetMgoConn()
  201. defer MgoB.DestoryMongoConn(sess)
  202. //查询条件
  203. //q := map[string]interface{}{
  204. // //"_id": map[string]interface{}{
  205. // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  206. // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  207. // //
  208. // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  209. // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  210. // //},
  211. // //"comeintime": map[string]interface{}{
  212. // // "$gt": 1669824000,
  213. // // //"$lte": 1669864950,
  214. // // "$lte": 1702265941,
  215. // //},
  216. // //"site": "国家能源e购",
  217. // "toptype": map[string]interface{}{"$exists": 0},
  218. //}
  219. //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  220. it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
  221. fmt.Println("taskRun 开始")
  222. count := 0
  223. //realNum := 0
  224. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  225. if count%100 == 0 {
  226. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  227. }
  228. update := map[string]interface{}{}
  229. // 1.更新省市区
  230. //if area, ok := tmp["area"]; ok && area != nil {
  231. // update["area"] = area
  232. //} else {
  233. // update["area"] = ""
  234. //}
  235. //
  236. //if city, ok := tmp["city"]; ok && city != nil {
  237. // update["city"] = city
  238. //} else {
  239. // update["city"] = ""
  240. //}
  241. //
  242. //if district, ok := tmp["district"]; ok && district != nil {
  243. // if district == "乌拉盖管委会" {
  244. // update["district"] = "乌拉盖管理区管委会"
  245. // } else if district == "错那县" {
  246. // update["district"] = "错那市"
  247. // } else if district == "河南周口经济开发区" {
  248. // update["district"] = "周口临港开发区"
  249. // } else if district == "米林县" {
  250. // update["district"] = "米林市"
  251. // }
  252. //
  253. //}
  254. //-------------------------------------------//
  255. // 2.更新中标单位、采购单位、代理机构
  256. biddingID := util.ObjToString(tmp["id"])
  257. //biddingID := mongodb.BsonIdToSId(tmp["_id"])
  258. if _, ok := tmp["buyer"]; ok {
  259. update["buyer"] = tmp["buyer"]
  260. }
  261. if _, ok := tmp["agency"]; ok {
  262. update["agency"] = tmp["agency"]
  263. }
  264. if _, ok := tmp["s_winner"]; ok {
  265. update["s_winner"] = tmp["s_winner"]
  266. }
  267. if _, ok := tmp["winner"]; ok {
  268. update["winner"] = tmp["winner"]
  269. }
  270. //-------------------------------------------//
  271. //3. 更新中标金额
  272. //biddingID := util.ObjToString(tmp["id"])
  273. //if _, ok := tmp["nb"]; !ok {
  274. // continue
  275. //} else {
  276. // update["bidamount"] = tmp["nb"]
  277. //}
  278. //update["bidamount"] = tmp["bidamount"]
  279. //// 更新 MongoDB + ES
  280. if len(update) > 0 {
  281. MgoB.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  282. //2.es 项目 更新字段
  283. err := Es.UpdateDocument("bidding", biddingID, update)
  284. err = EsNew.UpdateDocument("bidding", biddingID, update)
  285. if err != nil && err.Error() != "Document not updated: noop" {
  286. log.Info("bidding es update err", err, biddingID)
  287. }
  288. }
  289. }
  290. log.Info("Run Over...Count:", log.Int("count", count))
  291. }
  292. // taskRunProject 更新项目表 省市区
  293. func taskRunProject() {
  294. defer util.Catch()
  295. sess := Mgo.GetMgoConn()
  296. defer Mgo.DestoryMongoConn(sess)
  297. // 项目数据
  298. MgoP := &mongodb.MongodbSim{
  299. MongodbAddr: "172.17.4.85:27080",
  300. //MongodbAddr: "127.0.0.1:27080",
  301. Size: 10,
  302. DbName: "qfw",
  303. //Direct: true,
  304. }
  305. MgoP.InitPool()
  306. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  307. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  308. fmt.Println("taskRun 开始")
  309. count := 0
  310. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  311. if count%10000 == 0 {
  312. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  313. }
  314. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  315. where := map[string]interface{}{
  316. "ids": biddingID,
  317. }
  318. // 找到对应项目数据
  319. p, _ := MgoP.FindOne("projectset_20230904", where)
  320. projectId := mongodb.BsonIdToSId((*p)["_id"])
  321. //1.更新MongoDB
  322. update := map[string]interface{}{}
  323. if area, ok := tmp["area"]; ok && area != nil {
  324. update["area"] = area
  325. } else {
  326. update["area"] = ""
  327. }
  328. if city, ok := tmp["city"]; ok && city != nil {
  329. update["city"] = city
  330. } else {
  331. update["city"] = ""
  332. }
  333. if district, ok := tmp["district"]; ok && district != nil {
  334. update["district"] = district
  335. } else {
  336. update["district"] = ""
  337. }
  338. if len(update) > 0 {
  339. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  340. //2.es 项目 更新字段
  341. err := Es.UpdateDocument("projectset", projectId, update)
  342. if err != nil {
  343. log.Info("es update err", err, projectId)
  344. }
  345. }
  346. //2.es 项目 更新字段
  347. //if len(update) > 0 {
  348. // // 更新es
  349. // //updateEsPool <- []map[string]interface{}{
  350. // // {"_id": projectId},
  351. // // update,
  352. // //}
  353. //}
  354. }
  355. log.Info("Run Over...Count:", log.Int("count", count))
  356. }
  357. // dealData 正式环境,同步合同期限
  358. func dealData() {
  359. defer util.Catch()
  360. sess := Mgo.GetMgoConn()
  361. defer Mgo.DestoryMongoConn(sess)
  362. //where := map[string]interface{}{
  363. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  364. //}
  365. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  366. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  367. count := 0
  368. realNum := 0
  369. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  370. if count%1000 == 0 {
  371. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  372. }
  373. idStr := mongodb.BsonIdToSId(tmp["_id"])
  374. update := make(map[string]interface{})
  375. if tmp["signaturedate"] != nil {
  376. update["signaturedate"] = tmp["signaturedate"]
  377. }
  378. if tmp["contractperiod"] != nil {
  379. update["contractperiod"] = tmp["contractperiod"]
  380. }
  381. if tmp["expiredate"] != nil {
  382. update["expiredate"] = tmp["expiredate"]
  383. }
  384. if len(update) == 0 {
  385. continue
  386. }
  387. //bidding 表
  388. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  389. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  390. data := *bidding
  391. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  392. // 针对存量数据,重复数据不进索引
  393. if util.IntAll(data["extracttype"]) == -1 {
  394. tmp = make(map[string]interface{})
  395. continue
  396. }
  397. } else {
  398. //bidding_back
  399. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  400. data := *bidding
  401. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  402. // 针对存量数据,重复数据不进索引
  403. if util.IntAll(data["extracttype"]) == -1 {
  404. tmp = make(map[string]interface{})
  405. continue
  406. }
  407. }
  408. realNum++
  409. //2.es 更新字段
  410. esUpdate := update
  411. esUpdate["id"] = idStr
  412. if len(esUpdate) > 0 {
  413. // 更新es
  414. updateEsPool <- []map[string]interface{}{
  415. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  416. esUpdate,
  417. }
  418. }
  419. tmp = make(map[string]interface{})
  420. }
  421. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  422. }
  423. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  424. func dealResult() {
  425. defer util.Catch()
  426. sess := MgoR.GetMgoConn()
  427. defer MgoR.DestoryMongoConn(sess)
  428. where := map[string]interface{}{
  429. "_id": map[string]interface{}{
  430. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  431. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  432. },
  433. "subtype": "合同",
  434. }
  435. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  436. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  437. count := 0
  438. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  439. if count%1000 == 0 {
  440. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  441. }
  442. idStr := mongodb.BsonIdToSId(tmp["_id"])
  443. update := make(map[string]interface{})
  444. if tmp["signaturedate"] != nil {
  445. update["signaturedate"] = tmp["signaturedate"]
  446. }
  447. if tmp["contractperiod"] != nil {
  448. update["contractperiod"] = tmp["contractperiod"]
  449. }
  450. if tmp["expiredate"] != nil {
  451. update["expiredate"] = tmp["expiredate"]
  452. }
  453. if len(update) == 0 {
  454. continue
  455. }
  456. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  457. data := *bidding
  458. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  459. // 针对存量数据,重复数据不进索引
  460. if util.IntAll(data["extracttype"]) == -1 {
  461. tmp = make(map[string]interface{})
  462. continue
  463. }
  464. //2.es 更新字段
  465. esUpdate := update
  466. esUpdate["id"] = idStr
  467. if len(esUpdate) > 0 {
  468. // 更新es
  469. updateEsPool <- []map[string]interface{}{
  470. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  471. esUpdate,
  472. }
  473. }
  474. tmp = make(map[string]interface{})
  475. }
  476. log.Info("Run Over...Count:", log.Int("count", count))
  477. }
  478. // dealBidding 处理bidding数据
  479. func dealBidding() {
  480. defer util.Catch()
  481. sess := MgoB.GetMgoConn()
  482. defer MgoB.DestoryMongoConn(sess)
  483. //where := map[string]interface{}{
  484. // "comeintime": map[string]interface{}{
  485. // "$lt": 1722009600,
  486. // //"$lt": 1718812802,
  487. // "$gte": 1718899200,
  488. // },
  489. //}
  490. //where := map[string]interface{}{
  491. // "_id": map[string]interface{}{
  492. // "$gte": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea71e"),
  493. // "$lt": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea720"),
  494. // },
  495. //}
  496. it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
  497. fmt.Println("taskRun 开始")
  498. count := 0
  499. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  500. if count%10000 == 0 {
  501. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  502. }
  503. //update := map[string]interface{}{}
  504. esUpdate := map[string]interface{}{}
  505. if util.IntAll(tmp["extracttype"]) == -1 {
  506. continue
  507. }
  508. if util.ObjToString(tmp["purchasing"]) == "" {
  509. continue
  510. }
  511. esUpdate["purchasing"] = tmp["purchasing"]
  512. // 2.更新中标单位,中标金额
  513. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  514. // update["tag_topinformation"] = tag_topinformation
  515. //}
  516. //
  517. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  518. // update["property_form"] = property_form
  519. //}
  520. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  521. //fmt.Println(biddingID)
  522. /**
  523. "s_subscopeclass" : "其它",
  524. "s_topscopeclass" : "其它",
  525. "subscopeclass" : [
  526. "其它"
  527. ],
  528. "topscopeclass" : [
  529. "其它"
  530. ],
  531. */
  532. //// 行业分类默认值
  533. //resultSubs := make([]string, 0)
  534. //resultTobs := make([]string, 0)
  535. //if topscopeclass, ok := tmp["topscopeclass"]; ok && topscopeclass != nil {
  536. // if topps, ok2 := topscopeclass.([]interface{}); ok2 {
  537. // for _, v := range topps {
  538. // top := util.ObjToString(v)
  539. // if top != "" {
  540. // resultTobs = append(resultTobs, top)
  541. // }
  542. // }
  543. // }
  544. // //1.一级分类是空数组或者 是 其它
  545. // if len(resultTobs) == 0 || resultTobs[0] == "其它" {
  546. // update["topscopeclass"] = []string{"其它"}
  547. // update["subscopeclass"] = []string{"其它"}
  548. // update["s_topscopeclass"] = "其它"
  549. // update["s_subscopeclass"] = "其它"
  550. // esUpdate["s_topscopeclass"] = "其它"
  551. // esUpdate["s_subscopeclass"] = "其它"
  552. // esUpdate["topscopeclass"] = []string{"其它"}
  553. // } else {
  554. // if subs, ok3 := tmp["subscopeclass"]; ok3 {
  555. // if subbs, ok4 := subs.([]interface{}); ok4 {
  556. // for _, v := range subbs {
  557. // sub := util.ObjToString(v)
  558. // if sub != "" && sub != "其它" {
  559. // resultSubs = append(resultSubs, sub)
  560. // }
  561. // }
  562. // }
  563. // }
  564. // newTops, newSubs, cleanedTops := ProcessTopscopeclass(resultTobs, resultSubs)
  565. // update["topscopeclass"] = newTops
  566. // update["subscopeclass"] = newSubs
  567. // update["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  568. // update["s_subscopeclass"] = strings.Join(newSubs, ",")
  569. // esUpdate["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  570. // esUpdate["s_subscopeclass"] = strings.Join(newSubs, ",")
  571. // esUpdate["topscopeclass"] = newTops
  572. // }
  573. //
  574. //} else {
  575. // update["topscopeclass"] = []string{"其它"}
  576. // update["subscopeclass"] = []string{"其它"}
  577. // update["s_topscopeclass"] = "其它"
  578. // update["s_subscopeclass"] = "其它"
  579. // esUpdate["s_topscopeclass"] = "其它"
  580. // esUpdate["s_subscopeclass"] = "其它"
  581. // esUpdate["topscopeclass"] = []string{"其它"}
  582. //}
  583. //
  584. ////procurementlist 处理预计采购时间
  585. ////if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  586. //// field := "procurementlist"
  587. //// if tmp[field] != nil {
  588. //// if field == "procurementlist" {
  589. //// if tmp["procurementlist"] != nil {
  590. //// var arr []interface{}
  591. //// plist := tmp["procurementlist"].([]interface{})
  592. //// for _, p := range plist {
  593. //// p1 := p.(map[string]interface{})
  594. //// p2 := make(map[string]interface{})
  595. //// for k, v := range BiddingLevelField[field] {
  596. //// if k == "projectname" && util.ObjToString(p1[k]) == "" {
  597. //// p2[k] = util.ObjToString(tmp["projectname"])
  598. //// } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  599. //// p2[k] = util.ObjToString(tmp["buyer"])
  600. //// } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  601. //// res := getMethod(util.ObjToString(p1[k]))
  602. //// if res != 0 {
  603. //// p2[k] = res
  604. //// }
  605. //// } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  606. //// p2[k] = p1[k]
  607. //// }
  608. ////
  609. //// }
  610. //// arr = append(arr, p2)
  611. //// }
  612. //// if len(arr) > 0 {
  613. //// esUpdate[field] = arr
  614. //// }
  615. //// }
  616. //// }
  617. //// }
  618. ////}
  619. //
  620. //if len(update) > 0 {
  621. // //fmt.Println("aaaaa", biddingID)
  622. // //更新mongo
  623. // //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  624. // //更新MongoDB
  625. // updatePool <- []map[string]interface{}{
  626. // {"_id": tmp["_id"]},
  627. // {"$set": update},
  628. // }
  629. //
  630. // //2.es 项目 更新字段
  631. // //err := Es.UpdateDocument("bidding", biddingID, update)
  632. // //if err != nil && err.Error() != "Document not updated: noop" {
  633. // // log.Info("bidding es update err", err, biddingID)
  634. // //}
  635. // //// 更新es
  636. // //updateEsPool <- []map[string]interface{}{
  637. // // {"_id": biddingID},
  638. // // update,
  639. // //}
  640. //}
  641. // 更新Es 数据
  642. if len(esUpdate) > 0 {
  643. // 更新es
  644. updateEsPool <- []map[string]interface{}{
  645. {"_id": biddingID},
  646. esUpdate,
  647. }
  648. }
  649. }
  650. log.Info("Run Over...Count:", log.Int("count", count))
  651. }
  652. // dealBiddingAi 处理qfw_ai 数据库bidding 数据
  653. func dealBiddingAi() {
  654. defer util.Catch()
  655. sess := MgoBAi.GetMgoConn()
  656. defer MgoBAi.DestoryMongoConn(sess)
  657. it := sess.DB("qfw_ai").C("zxl_20240926").Find(nil).Select(nil).Iter()
  658. fmt.Println("taskRun 开始")
  659. count := 0
  660. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  661. if count%1000 == 0 {
  662. fmt.Println("current:", count)
  663. }
  664. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  665. update := map[string]interface{}{}
  666. //if budget, ok := tmp["budget"]; ok && budget != nil {
  667. // update["budget"] = budget
  668. //}
  669. if bidamount, ok := tmp["bidamount"]; ok && bidamount != nil {
  670. update["bidamount"] = bidamount
  671. } else {
  672. update["bidamount"] = 0.0
  673. }
  674. //if projectcode, ok := tmp["projectcode"]; ok && projectcode != nil {
  675. // update["projectcode"] = projectcode
  676. //}
  677. if len(update) > 0 {
  678. MgoBAi.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  679. //2.es 项目 更新字段
  680. err := Es.UpdateDocument("bidding_ai", biddingID, update)
  681. if err != nil && err.Error() != "Document not updated: noop" {
  682. log.Info("bidding es update err", err, biddingID)
  683. }
  684. }
  685. }
  686. fmt.Println("over ----------- over ")
  687. }
  688. func dealBiddingByEs() {
  689. //url := "http://172.17.4.184:19908"
  690. url := "http://127.0.0.1:19908"
  691. username := "jybid"
  692. password := "Top2023_JEB01i@31"
  693. index := "bidding" //索引名称
  694. //index := "projectset" //索引名称
  695. // 创建 Elasticsearch 客户端
  696. client, err := es7.NewClient(
  697. es7.SetURL(url),
  698. es7.SetBasicAuth(username, password),
  699. es7.SetSniff(false),
  700. )
  701. if err != nil {
  702. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  703. }
  704. query := es7.NewBoolQuery()
  705. query.Must(es7.NewRangeQuery("comeintime").Gt(1718812800))
  706. query.MustNot(es7.NewExistsQuery("s_topscopeclass"))
  707. ctx := context.Background()
  708. //开始滚动搜索
  709. scrollID := ""
  710. scroll := "10m"
  711. searchSource := es7.NewSearchSource().
  712. Query(query).
  713. Size(10000).
  714. Sort("_doc", true) //升序排序
  715. //Sort("_doc", false) //降序排序
  716. searchService := client.Scroll(index).
  717. Size(10000).
  718. Scroll(scroll).
  719. SearchSource(searchSource)
  720. res, err := searchService.Do(ctx)
  721. if err != nil {
  722. if err == io.EOF {
  723. fmt.Println("没有数据")
  724. } else {
  725. panic(err)
  726. }
  727. }
  728. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  729. fmt.Println("总数是:", res.TotalHits())
  730. total := 0
  731. for len(res.Hits.Hits) > 0 {
  732. for _, hit := range res.Hits.Hits {
  733. var doc map[string]interface{}
  734. err := json.Unmarshal(hit.Source, &doc)
  735. if err != nil {
  736. fmt.Printf("解析文档失败:%s", err)
  737. continue
  738. }
  739. //delete(doc, "filetext")
  740. //delete(doc, "detail")
  741. //
  742. ////存入新表
  743. //err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc)
  744. //if err != nil {
  745. // fmt.Println("error", doc["id"])
  746. //}
  747. }
  748. total = total + len(res.Hits.Hits)
  749. scrollID = res.ScrollId
  750. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  751. fmt.Println("current count:", total)
  752. if err != nil {
  753. if err == io.EOF {
  754. // 滚动到最后一批数据,退出循环
  755. break
  756. }
  757. fmt.Println("滚动搜索失败:", err, res)
  758. break // 处理错误时退出循环
  759. }
  760. }
  761. // 在循环外调用 ClearScroll
  762. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  763. if err != nil {
  764. fmt.Printf("清理滚动搜索失败:%s", err)
  765. }
  766. fmt.Println("结束~~~~~~~~~~~~~~~")
  767. }
  768. // dealBiddingTest 处理测试环境数据
  769. func dealBiddingTest() {
  770. defer util.Catch()
  771. sess := MgoT.GetMgoConn()
  772. defer MgoT.DestoryMongoConn(sess)
  773. it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter()
  774. fmt.Println("taskRun 开始")
  775. count := 0
  776. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  777. if count%10000 == 0 {
  778. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  779. }
  780. update := map[string]interface{}{}
  781. // 2.更新中标单位,中标金额
  782. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  783. // update["tag_topinformation"] = tag_topinformation
  784. //}
  785. //
  786. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  787. // update["property_form"] = property_form
  788. //}
  789. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  790. /**
  791. "s_subscopeclass" : "其它",
  792. "s_topscopeclass" : "其它",
  793. "subscopeclass" : [
  794. "其它"
  795. ],
  796. "topscopeclass" : [
  797. "其它"
  798. ],
  799. */
  800. // 行业分类默认值
  801. if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
  802. update["topscopeclass"] = []string{"其它"}
  803. update["s_topscopeclass"] = "其它"
  804. }
  805. if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
  806. update["subscopeclass"] = []string{"其它"}
  807. update["s_subscopeclass"] = "其它"
  808. }
  809. if util.ObjToString(tmp["s_topscopeclass"]) == "其它" {
  810. update["topscopeclass"] = []string{"其它"}
  811. update["s_topscopeclass"] = "其它"
  812. }
  813. if util.ObjToString(tmp["s_subscopeclass"]) == "其它" {
  814. update["subscopeclass"] = []string{"其它"}
  815. update["s_subscopeclass"] = "其它"
  816. }
  817. //procurementlist 处理预计采购时间
  818. if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  819. for field, _ := range BiddingField {
  820. if tmp[field] != nil {
  821. if field == "procurementlist" {
  822. if tmp["procurementlist"] != nil {
  823. var arr []interface{}
  824. plist := tmp["procurementlist"].([]interface{})
  825. for _, p := range plist {
  826. p1 := p.(map[string]interface{})
  827. p2 := make(map[string]interface{})
  828. for k, v := range BiddingLevelField[field] {
  829. if k == "projectname" && util.ObjToString(p1[k]) == "" {
  830. p2[k] = util.ObjToString(tmp["projectname"])
  831. } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  832. p2[k] = util.ObjToString(tmp["buyer"])
  833. } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  834. res := getMethod(util.ObjToString(p1[k]))
  835. if res != 0 {
  836. p2[k] = res
  837. }
  838. } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  839. p2[k] = p1[k]
  840. }
  841. }
  842. arr = append(arr, p2)
  843. }
  844. if len(arr) > 0 {
  845. update[field] = arr
  846. }
  847. }
  848. }
  849. }
  850. }
  851. }
  852. if len(update) > 0 {
  853. fmt.Println("aaaaa", biddingID)
  854. //更新mongo
  855. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  856. //更新MongoDB
  857. //updatePool <- []map[string]interface{}{
  858. // {"_id": tmp["_id"]},
  859. // {"$set": update},
  860. //}
  861. //2.es 项目 更新字段
  862. //err := Es.UpdateDocument("bidding", biddingID, update)
  863. //if err != nil && err.Error() != "Document not updated: noop" {
  864. // log.Info("bidding es update err", err, biddingID)
  865. //}
  866. // 更新es
  867. //updateEsPool <- []map[string]interface{}{
  868. // {"_id": biddingID},
  869. // update,
  870. //}
  871. }
  872. }
  873. log.Info("Run Over...Count:", log.Int("count", count))
  874. }
  875. // updateMethod 更新MongoDB
  876. func updateMethod() {
  877. arru := make([][]map[string]interface{}, saveSize)
  878. indexu := 0
  879. for {
  880. select {
  881. case v := <-updatePool:
  882. arru[indexu] = v
  883. indexu++
  884. if indexu == saveSize {
  885. updateSp <- true
  886. go func(arru [][]map[string]interface{}) {
  887. defer func() {
  888. <-updateSp
  889. }()
  890. MgoB.UpdateBulk("bidding", arru...)
  891. }(arru)
  892. arru = make([][]map[string]interface{}, saveSize)
  893. indexu = 0
  894. }
  895. case <-time.After(1000 * time.Millisecond):
  896. if indexu > 0 {
  897. updateSp <- true
  898. go func(arru [][]map[string]interface{}) {
  899. defer func() {
  900. <-updateSp
  901. }()
  902. MgoB.UpdateBulk("bidding", arru...)
  903. }(arru[:indexu])
  904. arru = make([][]map[string]interface{}, saveSize)
  905. indexu = 0
  906. }
  907. }
  908. }
  909. }
  910. // updateEsMethod 更新es
  911. func updateEsMethod() {
  912. arru := make([][]map[string]interface{}, 200)
  913. indexu := 0
  914. for {
  915. select {
  916. case v := <-updateEsPool:
  917. arru[indexu] = v
  918. indexu++
  919. if indexu == 200 {
  920. updateEsSp <- true
  921. go func(arru [][]map[string]interface{}) {
  922. defer func() {
  923. <-updateEsSp
  924. }()
  925. Es.UpdateBulk("bidding", arru...)
  926. EsNew.UpdateBulk("bidding", arru...)
  927. }(arru)
  928. arru = make([][]map[string]interface{}, 200)
  929. indexu = 0
  930. }
  931. case <-time.After(1000 * time.Millisecond):
  932. if indexu > 0 {
  933. updateEsSp <- true
  934. go func(arru [][]map[string]interface{}) {
  935. defer func() {
  936. <-updateEsSp
  937. }()
  938. Es.UpdateBulk("bidding", arru...)
  939. EsNew.UpdateBulk("bidding", arru...)
  940. }(arru[:indexu])
  941. arru = make([][]map[string]interface{}, 200)
  942. indexu = 0
  943. }
  944. }
  945. }
  946. }
  947. // updateEsMethodTest 更新测试环境ES
  948. func updateEsMethodTest() {
  949. arru := make([][]map[string]interface{}, 200)
  950. indexu := 0
  951. for {
  952. select {
  953. case v := <-updateEsPool:
  954. arru[indexu] = v
  955. indexu++
  956. if indexu == 200 {
  957. updateEsSp <- true
  958. go func(arru [][]map[string]interface{}) {
  959. defer func() {
  960. <-updateEsSp
  961. }()
  962. EsT.UpdateBulk("bidding", arru...)
  963. }(arru)
  964. arru = make([][]map[string]interface{}, 200)
  965. indexu = 0
  966. }
  967. case <-time.After(1000 * time.Millisecond):
  968. if indexu > 0 {
  969. updateEsSp <- true
  970. go func(arru [][]map[string]interface{}) {
  971. defer func() {
  972. <-updateEsSp
  973. }()
  974. EsT.UpdateBulk("bidding", arru...)
  975. }(arru[:indexu])
  976. arru = make([][]map[string]interface{}, 200)
  977. indexu = 0
  978. }
  979. }
  980. }
  981. }
  982. // updateEsMethod 更新es href 字段
  983. func updateEsHrefMethod() {
  984. arru := make([][]map[string]interface{}, 200)
  985. indexu := 0
  986. for {
  987. select {
  988. case v := <-updateEsPool:
  989. arru[indexu] = v
  990. indexu++
  991. if indexu == 200 {
  992. updateEsSp <- true
  993. go func(arru [][]map[string]interface{}) {
  994. defer func() {
  995. <-updateEsSp
  996. }()
  997. Es.UpdateBulk("bidding", arru...)
  998. Es.UpdateBulk("bidding_ai", arru...)
  999. Es.UpdateBulk("bidding_temporary", arru...)
  1000. EsNew.UpdateBulk("bidding", arru...)
  1001. EsNew.UpdateBulk("bidding_customer", arru...)
  1002. EsNew.UpdateBulk("bidding_free", arru...)
  1003. EsNew.UpdateBulk("bidding_year", arru...)
  1004. EsNew.UpdateBulk("bidding_all", arru...)
  1005. EsNew.UpdateBulk("bidding_temporary", arru...)
  1006. }(arru)
  1007. arru = make([][]map[string]interface{}, 200)
  1008. indexu = 0
  1009. }
  1010. case <-time.After(1000 * time.Millisecond):
  1011. if indexu > 0 {
  1012. updateEsSp <- true
  1013. go func(arru [][]map[string]interface{}) {
  1014. defer func() {
  1015. <-updateEsSp
  1016. }()
  1017. Es.UpdateBulk("bidding", arru...)
  1018. Es.UpdateBulk("bidding_ai", arru...)
  1019. Es.UpdateBulk("bidding_temporary", arru...)
  1020. EsNew.UpdateBulk("bidding", arru...)
  1021. EsNew.UpdateBulk("bidding_customer", arru...)
  1022. EsNew.UpdateBulk("bidding_free", arru...)
  1023. EsNew.UpdateBulk("bidding_year", arru...)
  1024. EsNew.UpdateBulk("bidding_all", arru...)
  1025. EsNew.UpdateBulk("bidding_temporary", arru...)
  1026. }(arru[:indexu])
  1027. arru = make([][]map[string]interface{}, 200)
  1028. indexu = 0
  1029. }
  1030. }
  1031. }
  1032. }
  1033. // updateProjectEsMethod 更新项目索引
  1034. func updateProjectEsMethod() {
  1035. arru := make([][]map[string]interface{}, 200)
  1036. indexu := 0
  1037. for {
  1038. select {
  1039. case v := <-updateProjectEsPool:
  1040. arru[indexu] = v
  1041. indexu++
  1042. if indexu == 200 {
  1043. updateProjectEsSp <- true
  1044. go func(arru [][]map[string]interface{}) {
  1045. defer func() {
  1046. <-updateProjectEsSp
  1047. }()
  1048. Es.UpdateBulk("projectset", arru...)
  1049. }(arru)
  1050. arru = make([][]map[string]interface{}, 200)
  1051. indexu = 0
  1052. }
  1053. case <-time.After(1000 * time.Millisecond):
  1054. if indexu > 0 {
  1055. updateProjectEsSp <- true
  1056. go func(arru [][]map[string]interface{}) {
  1057. defer func() {
  1058. <-updateProjectEsSp
  1059. }()
  1060. Es.UpdateBulk("projectset", arru...)
  1061. }(arru[:indexu])
  1062. arru = make([][]map[string]interface{}, 200)
  1063. indexu = 0
  1064. }
  1065. }
  1066. }
  1067. }