main.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. es7 "github.com/olivere/elastic/v7"
  7. "github.com/wcc4869/common_utils/log"
  8. "go.uber.org/zap"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "reflect"
  14. "strings"
  15. "time"
  16. )
  17. var (
  18. Mgo *mongodb.MongodbSim
  19. MgoB *mongodb.MongodbSim
  20. MgoBAi *mongodb.MongodbSim
  21. MgoT *mongodb.MongodbSim //测试环境链接
  22. MgoR *mongodb.MongodbSim
  23. saveSize = 50
  24. Es *elastic.Elastic // 19908
  25. EsNew *elastic.Elastic //19905
  26. //EsT *elastic.Elastic
  27. // 更新mongo
  28. updatePool = make(chan []map[string]interface{}, 5000)
  29. updateSp = make(chan bool, 5)
  30. //更新es
  31. updateEsPool = make(chan []map[string]interface{}, 5000)
  32. updateEsSp = make(chan bool, 5) //保存协程
  33. updateProjectEsPool = make(chan []map[string]interface{}, 5000)
  34. updateProjectEsSp = make(chan bool, 5) //保存协程
  35. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  36. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  37. )
  38. func Init() {
  39. //MgoB = &mongodb.MongodbSim{
  40. // MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  41. // //MongodbAddr: "127.0.0.1:27083",
  42. // DbName: "qfw",
  43. // Size: 10,
  44. // UserName: "SJZY_RWbid_ES",
  45. // Password: "SJZY@B4i4D5e6S",
  46. // //Direct: true,
  47. //}
  48. //MgoB.InitPool()
  49. //MgoBAi = &mongodb.MongodbSim{
  50. // //MongodbAddr: "172.17.189.140:27080",
  51. // MongodbAddr: "127.0.0.1:27083",
  52. // DbName: "qfw_ai",
  53. // Size: 10,
  54. // UserName: "SJZY_RWbid_ES",
  55. // Password: "SJZY@B4i4D5e6S",
  56. // Direct: true,
  57. //}
  58. //MgoBAi.InitPool()
  59. //mongodb 163
  60. //Mgo = &mongodb.MongodbSim{
  61. // //MongodbAddr: "172.17.189.140:27080",
  62. // MongodbAddr: "127.0.0.1:27083",
  63. // DbName: "qfw",
  64. // Size: 10,
  65. // UserName: "SJZY_RWbid_ES",
  66. // Password: "SJZY@B4i4D5e6S",
  67. // Direct: true,
  68. //}
  69. //Mgo.InitPool()
  70. //85
  71. MgoR = &mongodb.MongodbSim{
  72. //MongodbAddr: "127.0.0.1:27080",
  73. MongodbAddr: "172.17.4.85:27080",
  74. DbName: "qfw",
  75. Size: 10,
  76. //Direct: true,
  77. }
  78. MgoR.InitPool()
  79. //测试环境MongoDB
  80. //MgoT = &mongodb.MongodbSim{
  81. // //MongodbAddr: "172.17.189.140:27080",
  82. // MongodbAddr: "192.168.3.206:27002",
  83. // DbName: "qfw_data",
  84. // Size: 10,
  85. // UserName: "root",
  86. // Password: "root",
  87. // //Direct: true,
  88. //}
  89. //MgoT.InitPool()
  90. ////测试环境es
  91. //Es = &elastic.Elastic{
  92. // S_esurl: "http://192.168.3.149:9201",
  93. // //S_esurl: "http://172.17.4.184:19805",
  94. // I_size: 5,
  95. // Username: "",
  96. // Password: "",
  97. //}
  98. //Es.InitElasticSize()
  99. //es
  100. Es = &elastic.Elastic{
  101. //S_esurl: "http://127.0.0.1:19908",
  102. S_esurl: "http://172.17.4.184:19908",
  103. I_size: 5,
  104. Username: "jybid",
  105. Password: "Top2023_JEB01i@31",
  106. }
  107. Es.InitElasticSize()
  108. //es 新集群
  109. EsNew = &elastic.Elastic{
  110. //S_esurl: "http://127.0.0.1:19905",
  111. S_esurl: "http://172.17.4.184:19905",
  112. I_size: 5,
  113. Username: "jybid",
  114. Password: "Top2023_JEB01i@31",
  115. }
  116. EsNew.InitElasticSize()
  117. }
  118. func main() {
  119. Init()
  120. //InitEsBiddingField()
  121. //go updateMethod() //更新mongodb
  122. //go updateEsMethod() //更新es
  123. //go updateEsHrefMethod() //更新es href 字段
  124. //go updateProjectEsMethod()
  125. //taskRunProject()
  126. //taskRunBidding()
  127. //dealBidding() //正式环境bidding数据处理
  128. //dealBiddingAi() //正式环境bidding数据处理
  129. //dealBiddingTest() // 测试环境数据处理
  130. //dealBiddingEsHref() // 根据临时表,更新es href 字段
  131. //dealBiddingNiJian() //更新拟建数据中buyer = owner
  132. //updateBiddingBidamount()
  133. //updateProject()
  134. //-------------------------------//
  135. fixBiddingEs()
  136. log.Info("over")
  137. c := make(chan bool, 1)
  138. <-c
  139. }
  140. // fixBiddingEs 修复bidding 索引数据,
  141. func fixBiddingEs() {
  142. defer util.Catch()
  143. sess := MgoR.GetMgoConn()
  144. defer MgoR.DestoryMongoConn(sess)
  145. where := map[string]interface{}{
  146. "_id": map[string]interface{}{
  147. "$gte": mongodb.StringTOBsonId("6847fc265f834436f08ef4fe"),
  148. "$lte": mongodb.StringTOBsonId("6848e42b5f834436f092f645"),
  149. },
  150. }
  151. it := sess.DB("qfw").C("result_20220219").Find(where).Select(nil).Iter()
  152. count := 0
  153. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  154. if count%1000 == 0 {
  155. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  156. }
  157. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  158. repeat := util.IntAll(tmp["repeat"])
  159. repeat_reason := util.ObjToString(tmp["repeat_reason"])
  160. if repeat == 1 && strings.Contains(repeat_reason, "采集源重复") {
  161. Es.DeleteByID("bidding", biddingID)
  162. log.Info("fixBiddingEs", zap.String("biddingID", biddingID))
  163. EsNew.DeleteByID("bidding", biddingID)
  164. }
  165. }
  166. }
  167. func InitEsBiddingField() {
  168. now := time.Now()
  169. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  170. if len(*info) > 0 {
  171. for _, m := range *info {
  172. if util.IntAll(m["level"]) == 1 {
  173. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  174. } else if util.IntAll(m["level"]) == 2 {
  175. pfield := util.ObjToString(m["pfield"])
  176. pfieldMap := BiddingLevelField[pfield]
  177. if pfieldMap == nil {
  178. pfieldMap = make(map[string]string, 0)
  179. }
  180. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  181. BiddingLevelField[pfield] = pfieldMap
  182. }
  183. }
  184. }
  185. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  186. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  187. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  188. }
  189. // taskRun 更新es 省市区三个字段
  190. func taskRunBidding() {
  191. defer util.Catch()
  192. sess := MgoB.GetMgoConn()
  193. defer MgoB.DestoryMongoConn(sess)
  194. //查询条件
  195. //q := map[string]interface{}{
  196. // //"_id": map[string]interface{}{
  197. // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  198. // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  199. // //
  200. // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  201. // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  202. // //},
  203. // //"comeintime": map[string]interface{}{
  204. // // "$gt": 1669824000,
  205. // // //"$lte": 1669864950,
  206. // // "$lte": 1702265941,
  207. // //},
  208. // //"site": "国家能源e购",
  209. // "toptype": map[string]interface{}{"$exists": 0},
  210. //}
  211. //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  212. it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
  213. fmt.Println("taskRun 开始")
  214. count := 0
  215. //realNum := 0
  216. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  217. if count%100 == 0 {
  218. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  219. }
  220. update := map[string]interface{}{}
  221. // 1.更新省市区
  222. //if area, ok := tmp["area"]; ok && area != nil {
  223. // update["area"] = area
  224. //} else {
  225. // update["area"] = ""
  226. //}
  227. //
  228. //if city, ok := tmp["city"]; ok && city != nil {
  229. // update["city"] = city
  230. //} else {
  231. // update["city"] = ""
  232. //}
  233. //
  234. //if district, ok := tmp["district"]; ok && district != nil {
  235. // if district == "乌拉盖管委会" {
  236. // update["district"] = "乌拉盖管理区管委会"
  237. // } else if district == "错那县" {
  238. // update["district"] = "错那市"
  239. // } else if district == "河南周口经济开发区" {
  240. // update["district"] = "周口临港开发区"
  241. // } else if district == "米林县" {
  242. // update["district"] = "米林市"
  243. // }
  244. //
  245. //}
  246. //-------------------------------------------//
  247. // 2.更新中标单位、采购单位、代理机构
  248. biddingID := util.ObjToString(tmp["id"])
  249. //biddingID := mongodb.BsonIdToSId(tmp["_id"])
  250. if _, ok := tmp["buyer"]; ok {
  251. update["buyer"] = tmp["buyer"]
  252. }
  253. if _, ok := tmp["agency"]; ok {
  254. update["agency"] = tmp["agency"]
  255. }
  256. if _, ok := tmp["s_winner"]; ok {
  257. update["s_winner"] = tmp["s_winner"]
  258. }
  259. if _, ok := tmp["winner"]; ok {
  260. update["winner"] = tmp["winner"]
  261. }
  262. //-------------------------------------------//
  263. //3. 更新中标金额
  264. //biddingID := util.ObjToString(tmp["id"])
  265. //if _, ok := tmp["nb"]; !ok {
  266. // continue
  267. //} else {
  268. // update["bidamount"] = tmp["nb"]
  269. //}
  270. //update["bidamount"] = tmp["bidamount"]
  271. //// 更新 MongoDB + ES
  272. if len(update) > 0 {
  273. MgoB.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  274. //2.es 项目 更新字段
  275. err := Es.UpdateDocument("bidding", biddingID, update)
  276. err = EsNew.UpdateDocument("bidding", biddingID, update)
  277. if err != nil && err.Error() != "Document not updated: noop" {
  278. log.Info("bidding es update err", err, biddingID)
  279. }
  280. }
  281. }
  282. log.Info("Run Over...Count:", log.Int("count", count))
  283. }
  284. // taskRunProject 更新项目表 省市区
  285. func taskRunProject() {
  286. defer util.Catch()
  287. sess := Mgo.GetMgoConn()
  288. defer Mgo.DestoryMongoConn(sess)
  289. // 项目数据
  290. MgoP := &mongodb.MongodbSim{
  291. MongodbAddr: "172.17.4.85:27080",
  292. //MongodbAddr: "127.0.0.1:27080",
  293. Size: 10,
  294. DbName: "qfw",
  295. //Direct: true,
  296. }
  297. MgoP.InitPool()
  298. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  299. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  300. fmt.Println("taskRun 开始")
  301. count := 0
  302. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  303. if count%10000 == 0 {
  304. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  305. }
  306. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  307. where := map[string]interface{}{
  308. "ids": biddingID,
  309. }
  310. // 找到对应项目数据
  311. p, _ := MgoP.FindOne("projectset_20230904", where)
  312. projectId := mongodb.BsonIdToSId((*p)["_id"])
  313. //1.更新MongoDB
  314. update := map[string]interface{}{}
  315. if area, ok := tmp["area"]; ok && area != nil {
  316. update["area"] = area
  317. } else {
  318. update["area"] = ""
  319. }
  320. if city, ok := tmp["city"]; ok && city != nil {
  321. update["city"] = city
  322. } else {
  323. update["city"] = ""
  324. }
  325. if district, ok := tmp["district"]; ok && district != nil {
  326. update["district"] = district
  327. } else {
  328. update["district"] = ""
  329. }
  330. if len(update) > 0 {
  331. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  332. //2.es 项目 更新字段
  333. err := Es.UpdateDocument("projectset", projectId, update)
  334. if err != nil {
  335. log.Info("es update err", err, projectId)
  336. }
  337. }
  338. //2.es 项目 更新字段
  339. //if len(update) > 0 {
  340. // // 更新es
  341. // //updateEsPool <- []map[string]interface{}{
  342. // // {"_id": projectId},
  343. // // update,
  344. // //}
  345. //}
  346. }
  347. log.Info("Run Over...Count:", log.Int("count", count))
  348. }
  349. // dealData 正式环境,同步合同期限
  350. func dealData() {
  351. defer util.Catch()
  352. sess := Mgo.GetMgoConn()
  353. defer Mgo.DestoryMongoConn(sess)
  354. //where := map[string]interface{}{
  355. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  356. //}
  357. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  358. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  359. count := 0
  360. realNum := 0
  361. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  362. if count%1000 == 0 {
  363. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  364. }
  365. idStr := mongodb.BsonIdToSId(tmp["_id"])
  366. update := make(map[string]interface{})
  367. if tmp["signaturedate"] != nil {
  368. update["signaturedate"] = tmp["signaturedate"]
  369. }
  370. if tmp["contractperiod"] != nil {
  371. update["contractperiod"] = tmp["contractperiod"]
  372. }
  373. if tmp["expiredate"] != nil {
  374. update["expiredate"] = tmp["expiredate"]
  375. }
  376. if len(update) == 0 {
  377. continue
  378. }
  379. //bidding 表
  380. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  381. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  382. data := *bidding
  383. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  384. // 针对存量数据,重复数据不进索引
  385. if util.IntAll(data["extracttype"]) == -1 {
  386. tmp = make(map[string]interface{})
  387. continue
  388. }
  389. } else {
  390. //bidding_back
  391. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  392. data := *bidding
  393. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  394. // 针对存量数据,重复数据不进索引
  395. if util.IntAll(data["extracttype"]) == -1 {
  396. tmp = make(map[string]interface{})
  397. continue
  398. }
  399. }
  400. realNum++
  401. //2.es 更新字段
  402. esUpdate := update
  403. esUpdate["id"] = idStr
  404. if len(esUpdate) > 0 {
  405. // 更新es
  406. updateEsPool <- []map[string]interface{}{
  407. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  408. esUpdate,
  409. }
  410. }
  411. tmp = make(map[string]interface{})
  412. }
  413. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  414. }
  415. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  416. func dealResult() {
  417. defer util.Catch()
  418. sess := MgoR.GetMgoConn()
  419. defer MgoR.DestoryMongoConn(sess)
  420. where := map[string]interface{}{
  421. "_id": map[string]interface{}{
  422. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  423. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  424. },
  425. "subtype": "合同",
  426. }
  427. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  428. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  429. count := 0
  430. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  431. if count%1000 == 0 {
  432. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  433. }
  434. idStr := mongodb.BsonIdToSId(tmp["_id"])
  435. update := make(map[string]interface{})
  436. if tmp["signaturedate"] != nil {
  437. update["signaturedate"] = tmp["signaturedate"]
  438. }
  439. if tmp["contractperiod"] != nil {
  440. update["contractperiod"] = tmp["contractperiod"]
  441. }
  442. if tmp["expiredate"] != nil {
  443. update["expiredate"] = tmp["expiredate"]
  444. }
  445. if len(update) == 0 {
  446. continue
  447. }
  448. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  449. data := *bidding
  450. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  451. // 针对存量数据,重复数据不进索引
  452. if util.IntAll(data["extracttype"]) == -1 {
  453. tmp = make(map[string]interface{})
  454. continue
  455. }
  456. //2.es 更新字段
  457. esUpdate := update
  458. esUpdate["id"] = idStr
  459. if len(esUpdate) > 0 {
  460. // 更新es
  461. updateEsPool <- []map[string]interface{}{
  462. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  463. esUpdate,
  464. }
  465. }
  466. tmp = make(map[string]interface{})
  467. }
  468. log.Info("Run Over...Count:", log.Int("count", count))
  469. }
  470. // dealBidding 处理bidding数据
  471. func dealBidding() {
  472. defer util.Catch()
  473. sess := MgoB.GetMgoConn()
  474. defer MgoB.DestoryMongoConn(sess)
  475. //where := map[string]interface{}{
  476. // "comeintime": map[string]interface{}{
  477. // "$lt": 1722009600,
  478. // //"$lt": 1718812802,
  479. // "$gte": 1718899200,
  480. // },
  481. //}
  482. //where := map[string]interface{}{
  483. // "_id": map[string]interface{}{
  484. // "$gte": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea71e"),
  485. // "$lt": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea720"),
  486. // },
  487. //}
  488. it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
  489. fmt.Println("taskRun 开始")
  490. count := 0
  491. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  492. if count%10000 == 0 {
  493. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  494. }
  495. //update := map[string]interface{}{}
  496. esUpdate := map[string]interface{}{}
  497. if util.IntAll(tmp["extracttype"]) == -1 {
  498. continue
  499. }
  500. if util.ObjToString(tmp["purchasing"]) == "" {
  501. continue
  502. }
  503. esUpdate["purchasing"] = tmp["purchasing"]
  504. // 2.更新中标单位,中标金额
  505. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  506. // update["tag_topinformation"] = tag_topinformation
  507. //}
  508. //
  509. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  510. // update["property_form"] = property_form
  511. //}
  512. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  513. //fmt.Println(biddingID)
  514. /**
  515. "s_subscopeclass" : "其它",
  516. "s_topscopeclass" : "其它",
  517. "subscopeclass" : [
  518. "其它"
  519. ],
  520. "topscopeclass" : [
  521. "其它"
  522. ],
  523. */
  524. //// 行业分类默认值
  525. //resultSubs := make([]string, 0)
  526. //resultTobs := make([]string, 0)
  527. //if topscopeclass, ok := tmp["topscopeclass"]; ok && topscopeclass != nil {
  528. // if topps, ok2 := topscopeclass.([]interface{}); ok2 {
  529. // for _, v := range topps {
  530. // top := util.ObjToString(v)
  531. // if top != "" {
  532. // resultTobs = append(resultTobs, top)
  533. // }
  534. // }
  535. // }
  536. // //1.一级分类是空数组或者 是 其它
  537. // if len(resultTobs) == 0 || resultTobs[0] == "其它" {
  538. // update["topscopeclass"] = []string{"其它"}
  539. // update["subscopeclass"] = []string{"其它"}
  540. // update["s_topscopeclass"] = "其它"
  541. // update["s_subscopeclass"] = "其它"
  542. // esUpdate["s_topscopeclass"] = "其它"
  543. // esUpdate["s_subscopeclass"] = "其它"
  544. // esUpdate["topscopeclass"] = []string{"其它"}
  545. // } else {
  546. // if subs, ok3 := tmp["subscopeclass"]; ok3 {
  547. // if subbs, ok4 := subs.([]interface{}); ok4 {
  548. // for _, v := range subbs {
  549. // sub := util.ObjToString(v)
  550. // if sub != "" && sub != "其它" {
  551. // resultSubs = append(resultSubs, sub)
  552. // }
  553. // }
  554. // }
  555. // }
  556. // newTops, newSubs, cleanedTops := ProcessTopscopeclass(resultTobs, resultSubs)
  557. // update["topscopeclass"] = newTops
  558. // update["subscopeclass"] = newSubs
  559. // update["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  560. // update["s_subscopeclass"] = strings.Join(newSubs, ",")
  561. // esUpdate["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  562. // esUpdate["s_subscopeclass"] = strings.Join(newSubs, ",")
  563. // esUpdate["topscopeclass"] = newTops
  564. // }
  565. //
  566. //} else {
  567. // update["topscopeclass"] = []string{"其它"}
  568. // update["subscopeclass"] = []string{"其它"}
  569. // update["s_topscopeclass"] = "其它"
  570. // update["s_subscopeclass"] = "其它"
  571. // esUpdate["s_topscopeclass"] = "其它"
  572. // esUpdate["s_subscopeclass"] = "其它"
  573. // esUpdate["topscopeclass"] = []string{"其它"}
  574. //}
  575. //
  576. ////procurementlist 处理预计采购时间
  577. ////if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  578. //// field := "procurementlist"
  579. //// if tmp[field] != nil {
  580. //// if field == "procurementlist" {
  581. //// if tmp["procurementlist"] != nil {
  582. //// var arr []interface{}
  583. //// plist := tmp["procurementlist"].([]interface{})
  584. //// for _, p := range plist {
  585. //// p1 := p.(map[string]interface{})
  586. //// p2 := make(map[string]interface{})
  587. //// for k, v := range BiddingLevelField[field] {
  588. //// if k == "projectname" && util.ObjToString(p1[k]) == "" {
  589. //// p2[k] = util.ObjToString(tmp["projectname"])
  590. //// } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  591. //// p2[k] = util.ObjToString(tmp["buyer"])
  592. //// } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  593. //// res := getMethod(util.ObjToString(p1[k]))
  594. //// if res != 0 {
  595. //// p2[k] = res
  596. //// }
  597. //// } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  598. //// p2[k] = p1[k]
  599. //// }
  600. ////
  601. //// }
  602. //// arr = append(arr, p2)
  603. //// }
  604. //// if len(arr) > 0 {
  605. //// esUpdate[field] = arr
  606. //// }
  607. //// }
  608. //// }
  609. //// }
  610. ////}
  611. //
  612. //if len(update) > 0 {
  613. // //fmt.Println("aaaaa", biddingID)
  614. // //更新mongo
  615. // //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  616. // //更新MongoDB
  617. // updatePool <- []map[string]interface{}{
  618. // {"_id": tmp["_id"]},
  619. // {"$set": update},
  620. // }
  621. //
  622. // //2.es 项目 更新字段
  623. // //err := Es.UpdateDocument("bidding", biddingID, update)
  624. // //if err != nil && err.Error() != "Document not updated: noop" {
  625. // // log.Info("bidding es update err", err, biddingID)
  626. // //}
  627. // //// 更新es
  628. // //updateEsPool <- []map[string]interface{}{
  629. // // {"_id": biddingID},
  630. // // update,
  631. // //}
  632. //}
  633. // 更新Es 数据
  634. if len(esUpdate) > 0 {
  635. // 更新es
  636. updateEsPool <- []map[string]interface{}{
  637. {"_id": biddingID},
  638. esUpdate,
  639. }
  640. }
  641. }
  642. log.Info("Run Over...Count:", log.Int("count", count))
  643. }
  644. // dealBiddingAi 处理qfw_ai 数据库bidding 数据
  645. func dealBiddingAi() {
  646. defer util.Catch()
  647. sess := MgoBAi.GetMgoConn()
  648. defer MgoBAi.DestoryMongoConn(sess)
  649. it := sess.DB("qfw_ai").C("zxl_20240926").Find(nil).Select(nil).Iter()
  650. fmt.Println("taskRun 开始")
  651. count := 0
  652. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  653. if count%1000 == 0 {
  654. fmt.Println("current:", count)
  655. }
  656. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  657. update := map[string]interface{}{}
  658. //if budget, ok := tmp["budget"]; ok && budget != nil {
  659. // update["budget"] = budget
  660. //}
  661. if bidamount, ok := tmp["bidamount"]; ok && bidamount != nil {
  662. update["bidamount"] = bidamount
  663. } else {
  664. update["bidamount"] = 0.0
  665. }
  666. //if projectcode, ok := tmp["projectcode"]; ok && projectcode != nil {
  667. // update["projectcode"] = projectcode
  668. //}
  669. if len(update) > 0 {
  670. MgoBAi.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  671. //2.es 项目 更新字段
  672. err := Es.UpdateDocument("bidding_ai", biddingID, update)
  673. if err != nil && err.Error() != "Document not updated: noop" {
  674. log.Info("bidding es update err", err, biddingID)
  675. }
  676. }
  677. }
  678. fmt.Println("over ----------- over ")
  679. }
  680. func dealBiddingByEs() {
  681. //url := "http://172.17.4.184:19908"
  682. url := "http://127.0.0.1:19908"
  683. username := "jybid"
  684. password := "Top2023_JEB01i@31"
  685. index := "bidding" //索引名称
  686. //index := "projectset" //索引名称
  687. // 创建 Elasticsearch 客户端
  688. client, err := es7.NewClient(
  689. es7.SetURL(url),
  690. es7.SetBasicAuth(username, password),
  691. es7.SetSniff(false),
  692. )
  693. if err != nil {
  694. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  695. }
  696. query := es7.NewBoolQuery()
  697. query.Must(es7.NewRangeQuery("comeintime").Gt(1718812800))
  698. query.MustNot(es7.NewExistsQuery("s_topscopeclass"))
  699. ctx := context.Background()
  700. //开始滚动搜索
  701. scrollID := ""
  702. scroll := "10m"
  703. searchSource := es7.NewSearchSource().
  704. Query(query).
  705. Size(10000).
  706. Sort("_doc", true) //升序排序
  707. //Sort("_doc", false) //降序排序
  708. searchService := client.Scroll(index).
  709. Size(10000).
  710. Scroll(scroll).
  711. SearchSource(searchSource)
  712. res, err := searchService.Do(ctx)
  713. if err != nil {
  714. if err == io.EOF {
  715. fmt.Println("没有数据")
  716. } else {
  717. panic(err)
  718. }
  719. }
  720. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  721. fmt.Println("总数是:", res.TotalHits())
  722. total := 0
  723. for len(res.Hits.Hits) > 0 {
  724. for _, hit := range res.Hits.Hits {
  725. var doc map[string]interface{}
  726. err := json.Unmarshal(hit.Source, &doc)
  727. if err != nil {
  728. fmt.Printf("解析文档失败:%s", err)
  729. continue
  730. }
  731. //delete(doc, "filetext")
  732. //delete(doc, "detail")
  733. //
  734. ////存入新表
  735. //err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc)
  736. //if err != nil {
  737. // fmt.Println("error", doc["id"])
  738. //}
  739. }
  740. total = total + len(res.Hits.Hits)
  741. scrollID = res.ScrollId
  742. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  743. fmt.Println("current count:", total)
  744. if err != nil {
  745. if err == io.EOF {
  746. // 滚动到最后一批数据,退出循环
  747. break
  748. }
  749. fmt.Println("滚动搜索失败:", err, res)
  750. break // 处理错误时退出循环
  751. }
  752. }
  753. // 在循环外调用 ClearScroll
  754. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  755. if err != nil {
  756. fmt.Printf("清理滚动搜索失败:%s", err)
  757. }
  758. fmt.Println("结束~~~~~~~~~~~~~~~")
  759. }
  760. // dealBiddingTest 处理测试环境数据
  761. func dealBiddingTest() {
  762. defer util.Catch()
  763. sess := MgoT.GetMgoConn()
  764. defer MgoT.DestoryMongoConn(sess)
  765. it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter()
  766. fmt.Println("taskRun 开始")
  767. count := 0
  768. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  769. if count%10000 == 0 {
  770. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  771. }
  772. update := map[string]interface{}{}
  773. // 2.更新中标单位,中标金额
  774. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  775. // update["tag_topinformation"] = tag_topinformation
  776. //}
  777. //
  778. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  779. // update["property_form"] = property_form
  780. //}
  781. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  782. /**
  783. "s_subscopeclass" : "其它",
  784. "s_topscopeclass" : "其它",
  785. "subscopeclass" : [
  786. "其它"
  787. ],
  788. "topscopeclass" : [
  789. "其它"
  790. ],
  791. */
  792. // 行业分类默认值
  793. if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
  794. update["topscopeclass"] = []string{"其它"}
  795. update["s_topscopeclass"] = "其它"
  796. }
  797. if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
  798. update["subscopeclass"] = []string{"其它"}
  799. update["s_subscopeclass"] = "其它"
  800. }
  801. if util.ObjToString(tmp["s_topscopeclass"]) == "其它" {
  802. update["topscopeclass"] = []string{"其它"}
  803. update["s_topscopeclass"] = "其它"
  804. }
  805. if util.ObjToString(tmp["s_subscopeclass"]) == "其它" {
  806. update["subscopeclass"] = []string{"其它"}
  807. update["s_subscopeclass"] = "其它"
  808. }
  809. //procurementlist 处理预计采购时间
  810. if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  811. for field, _ := range BiddingField {
  812. if tmp[field] != nil {
  813. if field == "procurementlist" {
  814. if tmp["procurementlist"] != nil {
  815. var arr []interface{}
  816. plist := tmp["procurementlist"].([]interface{})
  817. for _, p := range plist {
  818. p1 := p.(map[string]interface{})
  819. p2 := make(map[string]interface{})
  820. for k, v := range BiddingLevelField[field] {
  821. if k == "projectname" && util.ObjToString(p1[k]) == "" {
  822. p2[k] = util.ObjToString(tmp["projectname"])
  823. } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  824. p2[k] = util.ObjToString(tmp["buyer"])
  825. } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  826. res := getMethod(util.ObjToString(p1[k]))
  827. if res != 0 {
  828. p2[k] = res
  829. }
  830. } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  831. p2[k] = p1[k]
  832. }
  833. }
  834. arr = append(arr, p2)
  835. }
  836. if len(arr) > 0 {
  837. update[field] = arr
  838. }
  839. }
  840. }
  841. }
  842. }
  843. }
  844. if len(update) > 0 {
  845. fmt.Println("aaaaa", biddingID)
  846. //更新mongo
  847. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  848. //更新MongoDB
  849. //updatePool <- []map[string]interface{}{
  850. // {"_id": tmp["_id"]},
  851. // {"$set": update},
  852. //}
  853. //2.es 项目 更新字段
  854. //err := Es.UpdateDocument("bidding", biddingID, update)
  855. //if err != nil && err.Error() != "Document not updated: noop" {
  856. // log.Info("bidding es update err", err, biddingID)
  857. //}
  858. // 更新es
  859. //updateEsPool <- []map[string]interface{}{
  860. // {"_id": biddingID},
  861. // update,
  862. //}
  863. }
  864. }
  865. log.Info("Run Over...Count:", log.Int("count", count))
  866. }
  867. // updateMethod 更新MongoDB
  868. func updateMethod() {
  869. arru := make([][]map[string]interface{}, saveSize)
  870. indexu := 0
  871. for {
  872. select {
  873. case v := <-updatePool:
  874. arru[indexu] = v
  875. indexu++
  876. if indexu == saveSize {
  877. updateSp <- true
  878. go func(arru [][]map[string]interface{}) {
  879. defer func() {
  880. <-updateSp
  881. }()
  882. MgoB.UpdateBulk("bidding", arru...)
  883. }(arru)
  884. arru = make([][]map[string]interface{}, saveSize)
  885. indexu = 0
  886. }
  887. case <-time.After(1000 * time.Millisecond):
  888. if indexu > 0 {
  889. updateSp <- true
  890. go func(arru [][]map[string]interface{}) {
  891. defer func() {
  892. <-updateSp
  893. }()
  894. MgoB.UpdateBulk("bidding", arru...)
  895. }(arru[:indexu])
  896. arru = make([][]map[string]interface{}, saveSize)
  897. indexu = 0
  898. }
  899. }
  900. }
  901. }
  902. // updateEsMethod 更新es
  903. func updateEsMethod() {
  904. arru := make([][]map[string]interface{}, 200)
  905. indexu := 0
  906. for {
  907. select {
  908. case v := <-updateEsPool:
  909. arru[indexu] = v
  910. indexu++
  911. if indexu == 200 {
  912. updateEsSp <- true
  913. go func(arru [][]map[string]interface{}) {
  914. defer func() {
  915. <-updateEsSp
  916. }()
  917. Es.UpdateBulk("bidding", arru...)
  918. EsNew.UpdateBulk("bidding", arru...)
  919. }(arru)
  920. arru = make([][]map[string]interface{}, 200)
  921. indexu = 0
  922. }
  923. case <-time.After(1000 * time.Millisecond):
  924. if indexu > 0 {
  925. updateEsSp <- true
  926. go func(arru [][]map[string]interface{}) {
  927. defer func() {
  928. <-updateEsSp
  929. }()
  930. Es.UpdateBulk("bidding", arru...)
  931. EsNew.UpdateBulk("bidding", arru...)
  932. }(arru[:indexu])
  933. arru = make([][]map[string]interface{}, 200)
  934. indexu = 0
  935. }
  936. }
  937. }
  938. }
  939. // updateEsMethod 更新es href 字段
  940. func updateEsHrefMethod() {
  941. arru := make([][]map[string]interface{}, 200)
  942. indexu := 0
  943. for {
  944. select {
  945. case v := <-updateEsPool:
  946. arru[indexu] = v
  947. indexu++
  948. if indexu == 200 {
  949. updateEsSp <- true
  950. go func(arru [][]map[string]interface{}) {
  951. defer func() {
  952. <-updateEsSp
  953. }()
  954. Es.UpdateBulk("bidding", arru...)
  955. Es.UpdateBulk("bidding_ai", arru...)
  956. Es.UpdateBulk("bidding_temporary", arru...)
  957. EsNew.UpdateBulk("bidding", arru...)
  958. EsNew.UpdateBulk("bidding_customer", arru...)
  959. EsNew.UpdateBulk("bidding_free", arru...)
  960. EsNew.UpdateBulk("bidding_year", arru...)
  961. EsNew.UpdateBulk("bidding_all", arru...)
  962. EsNew.UpdateBulk("bidding_temporary", arru...)
  963. }(arru)
  964. arru = make([][]map[string]interface{}, 200)
  965. indexu = 0
  966. }
  967. case <-time.After(1000 * time.Millisecond):
  968. if indexu > 0 {
  969. updateEsSp <- true
  970. go func(arru [][]map[string]interface{}) {
  971. defer func() {
  972. <-updateEsSp
  973. }()
  974. Es.UpdateBulk("bidding", arru...)
  975. Es.UpdateBulk("bidding_ai", arru...)
  976. Es.UpdateBulk("bidding_temporary", arru...)
  977. EsNew.UpdateBulk("bidding", arru...)
  978. EsNew.UpdateBulk("bidding_customer", arru...)
  979. EsNew.UpdateBulk("bidding_free", arru...)
  980. EsNew.UpdateBulk("bidding_year", arru...)
  981. EsNew.UpdateBulk("bidding_all", arru...)
  982. EsNew.UpdateBulk("bidding_temporary", arru...)
  983. }(arru[:indexu])
  984. arru = make([][]map[string]interface{}, 200)
  985. indexu = 0
  986. }
  987. }
  988. }
  989. }
  990. // updateProjectEsMethod 更新项目索引
  991. func updateProjectEsMethod() {
  992. arru := make([][]map[string]interface{}, 200)
  993. indexu := 0
  994. for {
  995. select {
  996. case v := <-updateProjectEsPool:
  997. arru[indexu] = v
  998. indexu++
  999. if indexu == 200 {
  1000. updateProjectEsSp <- true
  1001. go func(arru [][]map[string]interface{}) {
  1002. defer func() {
  1003. <-updateProjectEsSp
  1004. }()
  1005. Es.UpdateBulk("projectset", arru...)
  1006. }(arru)
  1007. arru = make([][]map[string]interface{}, 200)
  1008. indexu = 0
  1009. }
  1010. case <-time.After(1000 * time.Millisecond):
  1011. if indexu > 0 {
  1012. updateProjectEsSp <- true
  1013. go func(arru [][]map[string]interface{}) {
  1014. defer func() {
  1015. <-updateProjectEsSp
  1016. }()
  1017. Es.UpdateBulk("projectset", arru...)
  1018. }(arru[:indexu])
  1019. arru = make([][]map[string]interface{}, 200)
  1020. indexu = 0
  1021. }
  1022. }
  1023. }
  1024. }