main.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. es7 "github.com/olivere/elastic/v7"
  7. "github.com/wcc4869/common_utils/log"
  8. "go.uber.org/zap"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "reflect"
  14. "strings"
  15. "time"
  16. )
  17. var (
  18. Mgo *mongodb.MongodbSim
  19. MgoB *mongodb.MongodbSim
  20. MgoBAi *mongodb.MongodbSim
  21. MgoT *mongodb.MongodbSim //测试环境链接
  22. MgoR *mongodb.MongodbSim
  23. saveSize = 50
  24. Es *elastic.Elastic // 19908
  25. EsNew *elastic.Elastic //19905
  26. EsT *elastic.Elastic
  27. // 更新mongo
  28. updatePool = make(chan []map[string]interface{}, 5000)
  29. updateSp = make(chan bool, 5)
  30. //更新es
  31. updateEsPool = make(chan []map[string]interface{}, 5000)
  32. updateEsSp = make(chan bool, 5) //保存协程
  33. updateProjectEsPool = make(chan []map[string]interface{}, 5000)
  34. updateProjectEsSp = make(chan bool, 5) //保存协程
  35. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  36. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  37. )
  38. func Init() {
  39. MgoB = &mongodb.MongodbSim{
  40. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  41. //MongodbAddr: "127.0.0.1:27083",
  42. DbName: "qfw",
  43. Size: 10,
  44. UserName: "SJZY_RWbid_ES",
  45. Password: "SJZY@B4i4D5e6S",
  46. //Direct: true,
  47. }
  48. MgoB.InitPool()
  49. //MgoBAi = &mongodb.MongodbSim{
  50. // //MongodbAddr: "172.17.189.140:27080",
  51. // MongodbAddr: "127.0.0.1:27083",
  52. // DbName: "qfw_ai",
  53. // Size: 10,
  54. // UserName: "SJZY_RWbid_ES",
  55. // Password: "SJZY@B4i4D5e6S",
  56. // Direct: true,
  57. //}
  58. //MgoBAi.InitPool()
  59. //mongodb 163
  60. //Mgo = &mongodb.MongodbSim{
  61. // //MongodbAddr: "172.17.189.140:27080",
  62. // MongodbAddr: "127.0.0.1:27083",
  63. // DbName: "qfw",
  64. // Size: 10,
  65. // UserName: "SJZY_RWbid_ES",
  66. // Password: "SJZY@B4i4D5e6S",
  67. // Direct: true,
  68. //}
  69. //Mgo.InitPool()
  70. //85
  71. //MgoR = &mongodb.MongodbSim{
  72. // //MongodbAddr: "127.0.0.1:27080",
  73. // MongodbAddr: "172.17.4.85:27080",
  74. // DbName: "qfw",
  75. // Size: 10,
  76. // //Direct: true,
  77. //}
  78. //MgoR.InitPool()
  79. ////测试环境MongoDB
  80. //MgoT = &mongodb.MongodbSim{
  81. // MongodbAddr: "172.20.45.129:27002",
  82. // DbName: "qfw_data",
  83. // Size: 10,
  84. // UserName: "",
  85. // Password: "",
  86. // //Direct: true,
  87. //}
  88. //MgoT.InitPool()
  89. //
  90. //////测试环境es
  91. //EsT = &elastic.Elastic{
  92. // S_esurl: "http://172.20.45.129:9206",
  93. // I_size: 5,
  94. // Username: "",
  95. // Password: "",
  96. //}
  97. //EsT.InitElasticSize()
  98. //es
  99. Es = &elastic.Elastic{
  100. //S_esurl: "http://127.0.0.1:19908",
  101. S_esurl: "http://172.17.4.184:19908",
  102. I_size: 5,
  103. Username: "jybid",
  104. Password: "Top2023_JEB01i@31",
  105. }
  106. Es.InitElasticSize()
  107. //es 新集群
  108. EsNew = &elastic.Elastic{
  109. //S_esurl: "http://127.0.0.1:19905",
  110. S_esurl: "http://172.17.4.184:19905",
  111. I_size: 5,
  112. Username: "jybid",
  113. Password: "Top2023_JEB01i@31",
  114. }
  115. EsNew.InitElasticSize()
  116. }
  117. func main() {
  118. //updatePing()
  119. //return
  120. Init()
  121. deleteBiddingES()
  122. //updateBiddingBuyer()
  123. return
  124. //InitEsBiddingField()
  125. go updateMethod() //更新mongodb
  126. go updateEsMethod() //更新es
  127. //go updateEsMethodTest() // 更新测试环境ES
  128. //go updateEsHrefMethod() //更新es href 字段
  129. //go updateProjectEsMethod()
  130. //taskRunProject()
  131. //taskRunBidding()
  132. //dealBidding() //正式环境bidding数据处理
  133. //dealBiddingAi() //正式环境bidding数据处理
  134. //dealBiddingTest() // 测试环境数据处理
  135. //dealBiddingEsHref() // 根据临时表,更新es href 字段
  136. //dealBiddingNiJian() //更新拟建数据中buyer = owner
  137. //updateBiddingBidamount()
  138. //updateProject()
  139. //-------------------------------//
  140. //fixBiddingEs()
  141. //updateBiddingType() //更新标讯分类
  142. //updateBiddingisValidFile() //更新bidding isValidFile字段
  143. //updateBiddingTypeBySpidecode() //更新bidding ;根据spidecode 字段
  144. //updateBiddingBasicClass() //更新 存量数据 basicClass 字段
  145. //updateBiddingBasicClassTest() //更新测试环境 basicClass 字段
  146. //updateBiddingToptype() // 更新招标分类结果
  147. log.Info("over")
  148. //c := make(chan bool, 1)
  149. //<-c
  150. }
  151. // fixBiddingEs 修复bidding 索引数据,
  152. func fixBiddingEs() {
  153. defer util.Catch()
  154. sess := MgoR.GetMgoConn()
  155. defer MgoR.DestoryMongoConn(sess)
  156. where := map[string]interface{}{
  157. "_id": map[string]interface{}{
  158. "$gte": mongodb.StringTOBsonId("6847fc265f834436f08ef4fe"),
  159. "$lte": mongodb.StringTOBsonId("6848e42b5f834436f092f645"),
  160. },
  161. }
  162. it := sess.DB("qfw").C("result_20220219").Find(where).Select(nil).Iter()
  163. count := 0
  164. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  165. if count%1000 == 0 {
  166. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  167. }
  168. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  169. repeat := util.IntAll(tmp["repeat"])
  170. repeat_reason := util.ObjToString(tmp["repeat_reason"])
  171. if repeat == 1 && strings.Contains(repeat_reason, "采集源重复") {
  172. Es.DeleteByID("bidding", biddingID)
  173. log.Info("fixBiddingEs", zap.String("biddingID", biddingID))
  174. EsNew.DeleteByID("bidding", biddingID)
  175. }
  176. }
  177. }
  178. func InitEsBiddingField() {
  179. now := time.Now()
  180. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  181. if len(*info) > 0 {
  182. for _, m := range *info {
  183. if util.IntAll(m["level"]) == 1 {
  184. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  185. } else if util.IntAll(m["level"]) == 2 {
  186. pfield := util.ObjToString(m["pfield"])
  187. pfieldMap := BiddingLevelField[pfield]
  188. if pfieldMap == nil {
  189. pfieldMap = make(map[string]string, 0)
  190. }
  191. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  192. BiddingLevelField[pfield] = pfieldMap
  193. }
  194. }
  195. }
  196. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  197. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  198. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  199. }
  200. // taskRun 更新es 省市区三个字段
  201. func taskRunBidding() {
  202. defer util.Catch()
  203. sess := MgoB.GetMgoConn()
  204. defer MgoB.DestoryMongoConn(sess)
  205. //查询条件
  206. //q := map[string]interface{}{
  207. // //"_id": map[string]interface{}{
  208. // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  209. // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  210. // //
  211. // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  212. // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  213. // //},
  214. // //"comeintime": map[string]interface{}{
  215. // // "$gt": 1669824000,
  216. // // //"$lte": 1669864950,
  217. // // "$lte": 1702265941,
  218. // //},
  219. // //"site": "国家能源e购",
  220. // "toptype": map[string]interface{}{"$exists": 0},
  221. //}
  222. //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  223. it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
  224. fmt.Println("taskRun 开始")
  225. count := 0
  226. //realNum := 0
  227. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  228. if count%100 == 0 {
  229. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  230. }
  231. update := map[string]interface{}{}
  232. // 1.更新省市区
  233. //if area, ok := tmp["area"]; ok && area != nil {
  234. // update["area"] = area
  235. //} else {
  236. // update["area"] = ""
  237. //}
  238. //
  239. //if city, ok := tmp["city"]; ok && city != nil {
  240. // update["city"] = city
  241. //} else {
  242. // update["city"] = ""
  243. //}
  244. //
  245. //if district, ok := tmp["district"]; ok && district != nil {
  246. // if district == "乌拉盖管委会" {
  247. // update["district"] = "乌拉盖管理区管委会"
  248. // } else if district == "错那县" {
  249. // update["district"] = "错那市"
  250. // } else if district == "河南周口经济开发区" {
  251. // update["district"] = "周口临港开发区"
  252. // } else if district == "米林县" {
  253. // update["district"] = "米林市"
  254. // }
  255. //
  256. //}
  257. //-------------------------------------------//
  258. // 2.更新中标单位、采购单位、代理机构
  259. biddingID := util.ObjToString(tmp["id"])
  260. //biddingID := mongodb.BsonIdToSId(tmp["_id"])
  261. if _, ok := tmp["buyer"]; ok {
  262. update["buyer"] = tmp["buyer"]
  263. }
  264. if _, ok := tmp["agency"]; ok {
  265. update["agency"] = tmp["agency"]
  266. }
  267. if _, ok := tmp["s_winner"]; ok {
  268. update["s_winner"] = tmp["s_winner"]
  269. }
  270. if _, ok := tmp["winner"]; ok {
  271. update["winner"] = tmp["winner"]
  272. }
  273. //-------------------------------------------//
  274. //3. 更新中标金额
  275. //biddingID := util.ObjToString(tmp["id"])
  276. //if _, ok := tmp["nb"]; !ok {
  277. // continue
  278. //} else {
  279. // update["bidamount"] = tmp["nb"]
  280. //}
  281. //update["bidamount"] = tmp["bidamount"]
  282. //// 更新 MongoDB + ES
  283. if len(update) > 0 {
  284. MgoB.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  285. //2.es 项目 更新字段
  286. err := Es.UpdateDocument("bidding", biddingID, update)
  287. err = EsNew.UpdateDocument("bidding", biddingID, update)
  288. if err != nil && err.Error() != "Document not updated: noop" {
  289. log.Info("bidding es update err", err, biddingID)
  290. }
  291. }
  292. }
  293. log.Info("Run Over...Count:", log.Int("count", count))
  294. }
  295. // taskRunProject 更新项目表 省市区
  296. func taskRunProject() {
  297. defer util.Catch()
  298. sess := Mgo.GetMgoConn()
  299. defer Mgo.DestoryMongoConn(sess)
  300. // 项目数据
  301. MgoP := &mongodb.MongodbSim{
  302. MongodbAddr: "172.17.4.85:27080",
  303. //MongodbAddr: "127.0.0.1:27080",
  304. Size: 10,
  305. DbName: "qfw",
  306. //Direct: true,
  307. }
  308. MgoP.InitPool()
  309. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  310. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  311. fmt.Println("taskRun 开始")
  312. count := 0
  313. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  314. if count%10000 == 0 {
  315. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  316. }
  317. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  318. where := map[string]interface{}{
  319. "ids": biddingID,
  320. }
  321. // 找到对应项目数据
  322. p, _ := MgoP.FindOne("projectset_20230904", where)
  323. projectId := mongodb.BsonIdToSId((*p)["_id"])
  324. //1.更新MongoDB
  325. update := map[string]interface{}{}
  326. if area, ok := tmp["area"]; ok && area != nil {
  327. update["area"] = area
  328. } else {
  329. update["area"] = ""
  330. }
  331. if city, ok := tmp["city"]; ok && city != nil {
  332. update["city"] = city
  333. } else {
  334. update["city"] = ""
  335. }
  336. if district, ok := tmp["district"]; ok && district != nil {
  337. update["district"] = district
  338. } else {
  339. update["district"] = ""
  340. }
  341. if len(update) > 0 {
  342. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  343. //2.es 项目 更新字段
  344. err := Es.UpdateDocument("projectset", projectId, update)
  345. if err != nil {
  346. log.Info("es update err", err, projectId)
  347. }
  348. }
  349. //2.es 项目 更新字段
  350. //if len(update) > 0 {
  351. // // 更新es
  352. // //updateEsPool <- []map[string]interface{}{
  353. // // {"_id": projectId},
  354. // // update,
  355. // //}
  356. //}
  357. }
  358. log.Info("Run Over...Count:", log.Int("count", count))
  359. }
  360. // dealData 正式环境,同步合同期限
  361. func dealData() {
  362. defer util.Catch()
  363. sess := Mgo.GetMgoConn()
  364. defer Mgo.DestoryMongoConn(sess)
  365. //where := map[string]interface{}{
  366. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  367. //}
  368. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  369. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  370. count := 0
  371. realNum := 0
  372. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  373. if count%1000 == 0 {
  374. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  375. }
  376. idStr := mongodb.BsonIdToSId(tmp["_id"])
  377. update := make(map[string]interface{})
  378. if tmp["signaturedate"] != nil {
  379. update["signaturedate"] = tmp["signaturedate"]
  380. }
  381. if tmp["contractperiod"] != nil {
  382. update["contractperiod"] = tmp["contractperiod"]
  383. }
  384. if tmp["expiredate"] != nil {
  385. update["expiredate"] = tmp["expiredate"]
  386. }
  387. if len(update) == 0 {
  388. continue
  389. }
  390. //bidding 表
  391. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  392. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  393. data := *bidding
  394. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  395. // 针对存量数据,重复数据不进索引
  396. if util.IntAll(data["extracttype"]) == -1 {
  397. tmp = make(map[string]interface{})
  398. continue
  399. }
  400. } else {
  401. //bidding_back
  402. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  403. data := *bidding
  404. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  405. // 针对存量数据,重复数据不进索引
  406. if util.IntAll(data["extracttype"]) == -1 {
  407. tmp = make(map[string]interface{})
  408. continue
  409. }
  410. }
  411. realNum++
  412. //2.es 更新字段
  413. esUpdate := update
  414. esUpdate["id"] = idStr
  415. if len(esUpdate) > 0 {
  416. // 更新es
  417. updateEsPool <- []map[string]interface{}{
  418. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  419. esUpdate,
  420. }
  421. }
  422. tmp = make(map[string]interface{})
  423. }
  424. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  425. }
  426. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  427. func dealResult() {
  428. defer util.Catch()
  429. sess := MgoR.GetMgoConn()
  430. defer MgoR.DestoryMongoConn(sess)
  431. where := map[string]interface{}{
  432. "_id": map[string]interface{}{
  433. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  434. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  435. },
  436. "subtype": "合同",
  437. }
  438. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  439. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  440. count := 0
  441. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  442. if count%1000 == 0 {
  443. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  444. }
  445. idStr := mongodb.BsonIdToSId(tmp["_id"])
  446. update := make(map[string]interface{})
  447. if tmp["signaturedate"] != nil {
  448. update["signaturedate"] = tmp["signaturedate"]
  449. }
  450. if tmp["contractperiod"] != nil {
  451. update["contractperiod"] = tmp["contractperiod"]
  452. }
  453. if tmp["expiredate"] != nil {
  454. update["expiredate"] = tmp["expiredate"]
  455. }
  456. if len(update) == 0 {
  457. continue
  458. }
  459. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  460. data := *bidding
  461. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  462. // 针对存量数据,重复数据不进索引
  463. if util.IntAll(data["extracttype"]) == -1 {
  464. tmp = make(map[string]interface{})
  465. continue
  466. }
  467. //2.es 更新字段
  468. esUpdate := update
  469. esUpdate["id"] = idStr
  470. if len(esUpdate) > 0 {
  471. // 更新es
  472. updateEsPool <- []map[string]interface{}{
  473. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  474. esUpdate,
  475. }
  476. }
  477. tmp = make(map[string]interface{})
  478. }
  479. log.Info("Run Over...Count:", log.Int("count", count))
  480. }
  481. // dealBidding 处理bidding数据
  482. func dealBidding() {
  483. defer util.Catch()
  484. sess := MgoB.GetMgoConn()
  485. defer MgoB.DestoryMongoConn(sess)
  486. //where := map[string]interface{}{
  487. // "comeintime": map[string]interface{}{
  488. // "$lt": 1722009600,
  489. // //"$lt": 1718812802,
  490. // "$gte": 1718899200,
  491. // },
  492. //}
  493. //where := map[string]interface{}{
  494. // "_id": map[string]interface{}{
  495. // "$gte": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea71e"),
  496. // "$lt": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea720"),
  497. // },
  498. //}
  499. it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
  500. fmt.Println("taskRun 开始")
  501. count := 0
  502. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  503. if count%10000 == 0 {
  504. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  505. }
  506. //update := map[string]interface{}{}
  507. esUpdate := map[string]interface{}{}
  508. if util.IntAll(tmp["extracttype"]) == -1 {
  509. continue
  510. }
  511. if util.ObjToString(tmp["purchasing"]) == "" {
  512. continue
  513. }
  514. esUpdate["purchasing"] = tmp["purchasing"]
  515. // 2.更新中标单位,中标金额
  516. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  517. // update["tag_topinformation"] = tag_topinformation
  518. //}
  519. //
  520. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  521. // update["property_form"] = property_form
  522. //}
  523. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  524. //fmt.Println(biddingID)
  525. /**
  526. "s_subscopeclass" : "其它",
  527. "s_topscopeclass" : "其它",
  528. "subscopeclass" : [
  529. "其它"
  530. ],
  531. "topscopeclass" : [
  532. "其它"
  533. ],
  534. */
  535. //// 行业分类默认值
  536. //resultSubs := make([]string, 0)
  537. //resultTobs := make([]string, 0)
  538. //if topscopeclass, ok := tmp["topscopeclass"]; ok && topscopeclass != nil {
  539. // if topps, ok2 := topscopeclass.([]interface{}); ok2 {
  540. // for _, v := range topps {
  541. // top := util.ObjToString(v)
  542. // if top != "" {
  543. // resultTobs = append(resultTobs, top)
  544. // }
  545. // }
  546. // }
  547. // //1.一级分类是空数组或者 是 其它
  548. // if len(resultTobs) == 0 || resultTobs[0] == "其它" {
  549. // update["topscopeclass"] = []string{"其它"}
  550. // update["subscopeclass"] = []string{"其它"}
  551. // update["s_topscopeclass"] = "其它"
  552. // update["s_subscopeclass"] = "其它"
  553. // esUpdate["s_topscopeclass"] = "其它"
  554. // esUpdate["s_subscopeclass"] = "其它"
  555. // esUpdate["topscopeclass"] = []string{"其它"}
  556. // } else {
  557. // if subs, ok3 := tmp["subscopeclass"]; ok3 {
  558. // if subbs, ok4 := subs.([]interface{}); ok4 {
  559. // for _, v := range subbs {
  560. // sub := util.ObjToString(v)
  561. // if sub != "" && sub != "其它" {
  562. // resultSubs = append(resultSubs, sub)
  563. // }
  564. // }
  565. // }
  566. // }
  567. // newTops, newSubs, cleanedTops := ProcessTopscopeclass(resultTobs, resultSubs)
  568. // update["topscopeclass"] = newTops
  569. // update["subscopeclass"] = newSubs
  570. // update["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  571. // update["s_subscopeclass"] = strings.Join(newSubs, ",")
  572. // esUpdate["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  573. // esUpdate["s_subscopeclass"] = strings.Join(newSubs, ",")
  574. // esUpdate["topscopeclass"] = newTops
  575. // }
  576. //
  577. //} else {
  578. // update["topscopeclass"] = []string{"其它"}
  579. // update["subscopeclass"] = []string{"其它"}
  580. // update["s_topscopeclass"] = "其它"
  581. // update["s_subscopeclass"] = "其它"
  582. // esUpdate["s_topscopeclass"] = "其它"
  583. // esUpdate["s_subscopeclass"] = "其它"
  584. // esUpdate["topscopeclass"] = []string{"其它"}
  585. //}
  586. //
  587. ////procurementlist 处理预计采购时间
  588. ////if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  589. //// field := "procurementlist"
  590. //// if tmp[field] != nil {
  591. //// if field == "procurementlist" {
  592. //// if tmp["procurementlist"] != nil {
  593. //// var arr []interface{}
  594. //// plist := tmp["procurementlist"].([]interface{})
  595. //// for _, p := range plist {
  596. //// p1 := p.(map[string]interface{})
  597. //// p2 := make(map[string]interface{})
  598. //// for k, v := range BiddingLevelField[field] {
  599. //// if k == "projectname" && util.ObjToString(p1[k]) == "" {
  600. //// p2[k] = util.ObjToString(tmp["projectname"])
  601. //// } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  602. //// p2[k] = util.ObjToString(tmp["buyer"])
  603. //// } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  604. //// res := getMethod(util.ObjToString(p1[k]))
  605. //// if res != 0 {
  606. //// p2[k] = res
  607. //// }
  608. //// } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  609. //// p2[k] = p1[k]
  610. //// }
  611. ////
  612. //// }
  613. //// arr = append(arr, p2)
  614. //// }
  615. //// if len(arr) > 0 {
  616. //// esUpdate[field] = arr
  617. //// }
  618. //// }
  619. //// }
  620. //// }
  621. ////}
  622. //
  623. //if len(update) > 0 {
  624. // //fmt.Println("aaaaa", biddingID)
  625. // //更新mongo
  626. // //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  627. // //更新MongoDB
  628. // updatePool <- []map[string]interface{}{
  629. // {"_id": tmp["_id"]},
  630. // {"$set": update},
  631. // }
  632. //
  633. // //2.es 项目 更新字段
  634. // //err := Es.UpdateDocument("bidding", biddingID, update)
  635. // //if err != nil && err.Error() != "Document not updated: noop" {
  636. // // log.Info("bidding es update err", err, biddingID)
  637. // //}
  638. // //// 更新es
  639. // //updateEsPool <- []map[string]interface{}{
  640. // // {"_id": biddingID},
  641. // // update,
  642. // //}
  643. //}
  644. // 更新Es 数据
  645. if len(esUpdate) > 0 {
  646. // 更新es
  647. updateEsPool <- []map[string]interface{}{
  648. {"_id": biddingID},
  649. esUpdate,
  650. }
  651. }
  652. }
  653. log.Info("Run Over...Count:", log.Int("count", count))
  654. }
  655. // dealBiddingAi 处理qfw_ai 数据库bidding 数据
  656. func dealBiddingAi() {
  657. defer util.Catch()
  658. sess := MgoBAi.GetMgoConn()
  659. defer MgoBAi.DestoryMongoConn(sess)
  660. it := sess.DB("qfw_ai").C("zxl_20240926").Find(nil).Select(nil).Iter()
  661. fmt.Println("taskRun 开始")
  662. count := 0
  663. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  664. if count%1000 == 0 {
  665. fmt.Println("current:", count)
  666. }
  667. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  668. update := map[string]interface{}{}
  669. //if budget, ok := tmp["budget"]; ok && budget != nil {
  670. // update["budget"] = budget
  671. //}
  672. if bidamount, ok := tmp["bidamount"]; ok && bidamount != nil {
  673. update["bidamount"] = bidamount
  674. } else {
  675. update["bidamount"] = 0.0
  676. }
  677. //if projectcode, ok := tmp["projectcode"]; ok && projectcode != nil {
  678. // update["projectcode"] = projectcode
  679. //}
  680. if len(update) > 0 {
  681. MgoBAi.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  682. //2.es 项目 更新字段
  683. err := Es.UpdateDocument("bidding_ai", biddingID, update)
  684. if err != nil && err.Error() != "Document not updated: noop" {
  685. log.Info("bidding es update err", err, biddingID)
  686. }
  687. }
  688. }
  689. fmt.Println("over ----------- over ")
  690. }
  691. func dealBiddingByEs() {
  692. //url := "http://172.17.4.184:19908"
  693. url := "http://127.0.0.1:19908"
  694. username := "jybid"
  695. password := "Top2023_JEB01i@31"
  696. index := "bidding" //索引名称
  697. //index := "projectset" //索引名称
  698. // 创建 Elasticsearch 客户端
  699. client, err := es7.NewClient(
  700. es7.SetURL(url),
  701. es7.SetBasicAuth(username, password),
  702. es7.SetSniff(false),
  703. )
  704. if err != nil {
  705. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  706. }
  707. query := es7.NewBoolQuery()
  708. query.Must(es7.NewRangeQuery("comeintime").Gt(1718812800))
  709. query.MustNot(es7.NewExistsQuery("s_topscopeclass"))
  710. ctx := context.Background()
  711. //开始滚动搜索
  712. scrollID := ""
  713. scroll := "10m"
  714. searchSource := es7.NewSearchSource().
  715. Query(query).
  716. Size(10000).
  717. Sort("_doc", true) //升序排序
  718. //Sort("_doc", false) //降序排序
  719. searchService := client.Scroll(index).
  720. Size(10000).
  721. Scroll(scroll).
  722. SearchSource(searchSource)
  723. res, err := searchService.Do(ctx)
  724. if err != nil {
  725. if err == io.EOF {
  726. fmt.Println("没有数据")
  727. } else {
  728. panic(err)
  729. }
  730. }
  731. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  732. fmt.Println("总数是:", res.TotalHits())
  733. total := 0
  734. for len(res.Hits.Hits) > 0 {
  735. for _, hit := range res.Hits.Hits {
  736. var doc map[string]interface{}
  737. err := json.Unmarshal(hit.Source, &doc)
  738. if err != nil {
  739. fmt.Printf("解析文档失败:%s", err)
  740. continue
  741. }
  742. //delete(doc, "filetext")
  743. //delete(doc, "detail")
  744. //
  745. ////存入新表
  746. //err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc)
  747. //if err != nil {
  748. // fmt.Println("error", doc["id"])
  749. //}
  750. }
  751. total = total + len(res.Hits.Hits)
  752. scrollID = res.ScrollId
  753. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  754. fmt.Println("current count:", total)
  755. if err != nil {
  756. if err == io.EOF {
  757. // 滚动到最后一批数据,退出循环
  758. break
  759. }
  760. fmt.Println("滚动搜索失败:", err, res)
  761. break // 处理错误时退出循环
  762. }
  763. }
  764. // 在循环外调用 ClearScroll
  765. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  766. if err != nil {
  767. fmt.Printf("清理滚动搜索失败:%s", err)
  768. }
  769. fmt.Println("结束~~~~~~~~~~~~~~~")
  770. }
  771. // dealBiddingTest 处理测试环境数据
  772. func dealBiddingTest() {
  773. defer util.Catch()
  774. sess := MgoT.GetMgoConn()
  775. defer MgoT.DestoryMongoConn(sess)
  776. it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter()
  777. fmt.Println("taskRun 开始")
  778. count := 0
  779. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  780. if count%10000 == 0 {
  781. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  782. }
  783. update := map[string]interface{}{}
  784. // 2.更新中标单位,中标金额
  785. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  786. // update["tag_topinformation"] = tag_topinformation
  787. //}
  788. //
  789. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  790. // update["property_form"] = property_form
  791. //}
  792. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  793. /**
  794. "s_subscopeclass" : "其它",
  795. "s_topscopeclass" : "其它",
  796. "subscopeclass" : [
  797. "其它"
  798. ],
  799. "topscopeclass" : [
  800. "其它"
  801. ],
  802. */
  803. // 行业分类默认值
  804. if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
  805. update["topscopeclass"] = []string{"其它"}
  806. update["s_topscopeclass"] = "其它"
  807. }
  808. if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
  809. update["subscopeclass"] = []string{"其它"}
  810. update["s_subscopeclass"] = "其它"
  811. }
  812. if util.ObjToString(tmp["s_topscopeclass"]) == "其它" {
  813. update["topscopeclass"] = []string{"其它"}
  814. update["s_topscopeclass"] = "其它"
  815. }
  816. if util.ObjToString(tmp["s_subscopeclass"]) == "其它" {
  817. update["subscopeclass"] = []string{"其它"}
  818. update["s_subscopeclass"] = "其它"
  819. }
  820. //procurementlist 处理预计采购时间
  821. if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  822. for field, _ := range BiddingField {
  823. if tmp[field] != nil {
  824. if field == "procurementlist" {
  825. if tmp["procurementlist"] != nil {
  826. var arr []interface{}
  827. plist := tmp["procurementlist"].([]interface{})
  828. for _, p := range plist {
  829. p1 := p.(map[string]interface{})
  830. p2 := make(map[string]interface{})
  831. for k, v := range BiddingLevelField[field] {
  832. if k == "projectname" && util.ObjToString(p1[k]) == "" {
  833. p2[k] = util.ObjToString(tmp["projectname"])
  834. } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  835. p2[k] = util.ObjToString(tmp["buyer"])
  836. } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  837. res := getMethod(util.ObjToString(p1[k]))
  838. if res != 0 {
  839. p2[k] = res
  840. }
  841. } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  842. p2[k] = p1[k]
  843. }
  844. }
  845. arr = append(arr, p2)
  846. }
  847. if len(arr) > 0 {
  848. update[field] = arr
  849. }
  850. }
  851. }
  852. }
  853. }
  854. }
  855. if len(update) > 0 {
  856. fmt.Println("aaaaa", biddingID)
  857. //更新mongo
  858. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  859. //更新MongoDB
  860. //updatePool <- []map[string]interface{}{
  861. // {"_id": tmp["_id"]},
  862. // {"$set": update},
  863. //}
  864. //2.es 项目 更新字段
  865. //err := Es.UpdateDocument("bidding", biddingID, update)
  866. //if err != nil && err.Error() != "Document not updated: noop" {
  867. // log.Info("bidding es update err", err, biddingID)
  868. //}
  869. // 更新es
  870. //updateEsPool <- []map[string]interface{}{
  871. // {"_id": biddingID},
  872. // update,
  873. //}
  874. }
  875. }
  876. log.Info("Run Over...Count:", log.Int("count", count))
  877. }
  878. // updateMethod 更新MongoDB
  879. func updateMethod() {
  880. arru := make([][]map[string]interface{}, saveSize)
  881. indexu := 0
  882. for {
  883. select {
  884. case v := <-updatePool:
  885. arru[indexu] = v
  886. indexu++
  887. if indexu == saveSize {
  888. updateSp <- true
  889. go func(arru [][]map[string]interface{}) {
  890. defer func() {
  891. <-updateSp
  892. }()
  893. MgoB.UpdateBulk("bidding", arru...)
  894. }(arru)
  895. arru = make([][]map[string]interface{}, saveSize)
  896. indexu = 0
  897. }
  898. case <-time.After(1000 * time.Millisecond):
  899. if indexu > 0 {
  900. updateSp <- true
  901. go func(arru [][]map[string]interface{}) {
  902. defer func() {
  903. <-updateSp
  904. }()
  905. MgoB.UpdateBulk("bidding", arru...)
  906. }(arru[:indexu])
  907. arru = make([][]map[string]interface{}, saveSize)
  908. indexu = 0
  909. }
  910. }
  911. }
  912. }
  913. // updateEsMethod 更新es
  914. func updateEsMethod() {
  915. arru := make([][]map[string]interface{}, 200)
  916. indexu := 0
  917. for {
  918. select {
  919. case v := <-updateEsPool:
  920. arru[indexu] = v
  921. indexu++
  922. if indexu == 200 {
  923. updateEsSp <- true
  924. go func(arru [][]map[string]interface{}) {
  925. defer func() {
  926. <-updateEsSp
  927. }()
  928. Es.UpdateBulk("bidding", arru...)
  929. EsNew.UpdateBulk("bidding", arru...)
  930. }(arru)
  931. arru = make([][]map[string]interface{}, 200)
  932. indexu = 0
  933. }
  934. case <-time.After(1000 * time.Millisecond):
  935. if indexu > 0 {
  936. updateEsSp <- true
  937. go func(arru [][]map[string]interface{}) {
  938. defer func() {
  939. <-updateEsSp
  940. }()
  941. Es.UpdateBulk("bidding", arru...)
  942. EsNew.UpdateBulk("bidding", arru...)
  943. }(arru[:indexu])
  944. arru = make([][]map[string]interface{}, 200)
  945. indexu = 0
  946. }
  947. }
  948. }
  949. }
  950. // updateEsMethodTest 更新测试环境ES
  951. func updateEsMethodTest() {
  952. arru := make([][]map[string]interface{}, 200)
  953. indexu := 0
  954. for {
  955. select {
  956. case v := <-updateEsPool:
  957. arru[indexu] = v
  958. indexu++
  959. if indexu == 200 {
  960. updateEsSp <- true
  961. go func(arru [][]map[string]interface{}) {
  962. defer func() {
  963. <-updateEsSp
  964. }()
  965. EsT.UpdateBulk("bidding", arru...)
  966. }(arru)
  967. arru = make([][]map[string]interface{}, 200)
  968. indexu = 0
  969. }
  970. case <-time.After(1000 * time.Millisecond):
  971. if indexu > 0 {
  972. updateEsSp <- true
  973. go func(arru [][]map[string]interface{}) {
  974. defer func() {
  975. <-updateEsSp
  976. }()
  977. EsT.UpdateBulk("bidding", arru...)
  978. }(arru[:indexu])
  979. arru = make([][]map[string]interface{}, 200)
  980. indexu = 0
  981. }
  982. }
  983. }
  984. }
  985. // updateEsMethod 更新es href 字段
  986. func updateEsHrefMethod() {
  987. arru := make([][]map[string]interface{}, 200)
  988. indexu := 0
  989. for {
  990. select {
  991. case v := <-updateEsPool:
  992. arru[indexu] = v
  993. indexu++
  994. if indexu == 200 {
  995. updateEsSp <- true
  996. go func(arru [][]map[string]interface{}) {
  997. defer func() {
  998. <-updateEsSp
  999. }()
  1000. Es.UpdateBulk("bidding", arru...)
  1001. Es.UpdateBulk("bidding_ai", arru...)
  1002. Es.UpdateBulk("bidding_temporary", arru...)
  1003. EsNew.UpdateBulk("bidding", arru...)
  1004. EsNew.UpdateBulk("bidding_customer", arru...)
  1005. EsNew.UpdateBulk("bidding_free", arru...)
  1006. EsNew.UpdateBulk("bidding_year", arru...)
  1007. EsNew.UpdateBulk("bidding_all", arru...)
  1008. EsNew.UpdateBulk("bidding_temporary", arru...)
  1009. }(arru)
  1010. arru = make([][]map[string]interface{}, 200)
  1011. indexu = 0
  1012. }
  1013. case <-time.After(1000 * time.Millisecond):
  1014. if indexu > 0 {
  1015. updateEsSp <- true
  1016. go func(arru [][]map[string]interface{}) {
  1017. defer func() {
  1018. <-updateEsSp
  1019. }()
  1020. Es.UpdateBulk("bidding", arru...)
  1021. Es.UpdateBulk("bidding_ai", arru...)
  1022. Es.UpdateBulk("bidding_temporary", arru...)
  1023. EsNew.UpdateBulk("bidding", arru...)
  1024. EsNew.UpdateBulk("bidding_customer", arru...)
  1025. EsNew.UpdateBulk("bidding_free", arru...)
  1026. EsNew.UpdateBulk("bidding_year", arru...)
  1027. EsNew.UpdateBulk("bidding_all", arru...)
  1028. EsNew.UpdateBulk("bidding_temporary", arru...)
  1029. }(arru[:indexu])
  1030. arru = make([][]map[string]interface{}, 200)
  1031. indexu = 0
  1032. }
  1033. }
  1034. }
  1035. }
  1036. // updateProjectEsMethod 更新项目索引
  1037. func updateProjectEsMethod() {
  1038. arru := make([][]map[string]interface{}, 200)
  1039. indexu := 0
  1040. for {
  1041. select {
  1042. case v := <-updateProjectEsPool:
  1043. arru[indexu] = v
  1044. indexu++
  1045. if indexu == 200 {
  1046. updateProjectEsSp <- true
  1047. go func(arru [][]map[string]interface{}) {
  1048. defer func() {
  1049. <-updateProjectEsSp
  1050. }()
  1051. Es.UpdateBulk("projectset", arru...)
  1052. }(arru)
  1053. arru = make([][]map[string]interface{}, 200)
  1054. indexu = 0
  1055. }
  1056. case <-time.After(1000 * time.Millisecond):
  1057. if indexu > 0 {
  1058. updateProjectEsSp <- true
  1059. go func(arru [][]map[string]interface{}) {
  1060. defer func() {
  1061. <-updateProjectEsSp
  1062. }()
  1063. Es.UpdateBulk("projectset", arru...)
  1064. }(arru[:indexu])
  1065. arru = make([][]map[string]interface{}, 200)
  1066. indexu = 0
  1067. }
  1068. }
  1069. }
  1070. }