main.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. es7 "github.com/olivere/elastic/v7"
  7. "github.com/wcc4869/common_utils/log"
  8. "go.uber.org/zap"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "reflect"
  14. "time"
  15. )
  16. var (
  17. Mgo *mongodb.MongodbSim
  18. MgoB *mongodb.MongodbSim
  19. MgoBAi *mongodb.MongodbSim
  20. MgoT *mongodb.MongodbSim //测试环境链接
  21. MgoR *mongodb.MongodbSim
  22. saveSize = 50
  23. Es *elastic.Elastic // 19908
  24. EsNew *elastic.Elastic //19905
  25. //EsT *elastic.Elastic
  26. // 更新mongo
  27. updatePool = make(chan []map[string]interface{}, 5000)
  28. updateSp = make(chan bool, 5)
  29. //更新es
  30. updateEsPool = make(chan []map[string]interface{}, 5000)
  31. updateEsSp = make(chan bool, 5) //保存协程
  32. updateProjectEsPool = make(chan []map[string]interface{}, 5000)
  33. updateProjectEsSp = make(chan bool, 5) //保存协程
  34. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  35. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  36. )
  37. func Init() {
  38. MgoB = &mongodb.MongodbSim{
  39. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  40. //MongodbAddr: "127.0.0.1:27083",
  41. DbName: "qfw",
  42. Size: 10,
  43. UserName: "SJZY_RWbid_ES",
  44. Password: "SJZY@B4i4D5e6S",
  45. //Direct: true,
  46. }
  47. MgoB.InitPool()
  48. //MgoBAi = &mongodb.MongodbSim{
  49. // //MongodbAddr: "172.17.189.140:27080",
  50. // MongodbAddr: "127.0.0.1:27083",
  51. // DbName: "qfw_ai",
  52. // Size: 10,
  53. // UserName: "SJZY_RWbid_ES",
  54. // Password: "SJZY@B4i4D5e6S",
  55. // Direct: true,
  56. //}
  57. //MgoBAi.InitPool()
  58. //mongodb 163
  59. //Mgo = &mongodb.MongodbSim{
  60. // //MongodbAddr: "172.17.189.140:27080",
  61. // MongodbAddr: "127.0.0.1:27083",
  62. // DbName: "qfw",
  63. // Size: 10,
  64. // UserName: "SJZY_RWbid_ES",
  65. // Password: "SJZY@B4i4D5e6S",
  66. // Direct: true,
  67. //}
  68. //Mgo.InitPool()
  69. //85
  70. //MgoR = &mongodb.MongodbSim{
  71. // MongodbAddr: "127.0.0.1:27080",
  72. // //MongodbAddr: "172.17.4.85:27080",
  73. // DbName: "qfw",
  74. // Size: 10,
  75. // Direct: true,
  76. //}
  77. //MgoR.InitPool()
  78. //测试环境MongoDB
  79. //MgoT = &mongodb.MongodbSim{
  80. // //MongodbAddr: "172.17.189.140:27080",
  81. // MongodbAddr: "192.168.3.206:27002",
  82. // DbName: "qfw_data",
  83. // Size: 10,
  84. // UserName: "root",
  85. // Password: "root",
  86. // //Direct: true,
  87. //}
  88. //MgoT.InitPool()
  89. ////测试环境es
  90. //Es = &elastic.Elastic{
  91. // S_esurl: "http://192.168.3.149:9201",
  92. // //S_esurl: "http://172.17.4.184:19805",
  93. // I_size: 5,
  94. // Username: "",
  95. // Password: "",
  96. //}
  97. //Es.InitElasticSize()
  98. //es
  99. Es = &elastic.Elastic{
  100. //S_esurl: "http://127.0.0.1:19908",
  101. S_esurl: "http://172.17.4.184:19908",
  102. I_size: 5,
  103. Username: "jybid",
  104. Password: "Top2023_JEB01i@31",
  105. }
  106. Es.InitElasticSize()
  107. //es 新集群
  108. EsNew = &elastic.Elastic{
  109. //S_esurl: "http://127.0.0.1:19905",
  110. S_esurl: "http://172.17.4.184:19905",
  111. I_size: 5,
  112. Username: "jybid",
  113. Password: "Top2023_JEB01i@31",
  114. }
  115. EsNew.InitElasticSize()
  116. }
  117. func main() {
  118. Init()
  119. //InitEsBiddingField()
  120. //go updateMethod() //更新mongodb
  121. go updateEsMethod() //更新es
  122. //go updateEsHrefMethod() //更新es href 字段
  123. //go updateProjectEsMethod()
  124. //taskRunProject()
  125. //taskRunBidding()
  126. dealBidding() //正式环境bidding数据处理
  127. //dealBiddingAi() //正式环境bidding数据处理
  128. //dealBiddingTest() // 测试环境数据处理
  129. //dealBiddingEsHref() // 根据临时表,更新es href 字段
  130. //dealBiddingNiJian() //更新拟建数据中buyer = owner
  131. //updateBiddingBidamount()
  132. //updateProject()
  133. log.Info("over")
  134. c := make(chan bool, 1)
  135. <-c
  136. }
  137. func InitEsBiddingField() {
  138. now := time.Now()
  139. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  140. if len(*info) > 0 {
  141. for _, m := range *info {
  142. if util.IntAll(m["level"]) == 1 {
  143. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  144. } else if util.IntAll(m["level"]) == 2 {
  145. pfield := util.ObjToString(m["pfield"])
  146. pfieldMap := BiddingLevelField[pfield]
  147. if pfieldMap == nil {
  148. pfieldMap = make(map[string]string, 0)
  149. }
  150. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  151. BiddingLevelField[pfield] = pfieldMap
  152. }
  153. }
  154. }
  155. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  156. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  157. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  158. }
  159. // taskRun 更新es 省市区三个字段
  160. func taskRunBidding() {
  161. defer util.Catch()
  162. sess := MgoB.GetMgoConn()
  163. defer MgoB.DestoryMongoConn(sess)
  164. //查询条件
  165. //q := map[string]interface{}{
  166. // //"_id": map[string]interface{}{
  167. // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  168. // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  169. // //
  170. // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  171. // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  172. // //},
  173. // //"comeintime": map[string]interface{}{
  174. // // "$gt": 1669824000,
  175. // // //"$lte": 1669864950,
  176. // // "$lte": 1702265941,
  177. // //},
  178. // //"site": "国家能源e购",
  179. // "toptype": map[string]interface{}{"$exists": 0},
  180. //}
  181. //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  182. it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
  183. fmt.Println("taskRun 开始")
  184. count := 0
  185. //realNum := 0
  186. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  187. if count%100 == 0 {
  188. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  189. }
  190. update := map[string]interface{}{}
  191. // 1.更新省市区
  192. //if area, ok := tmp["area"]; ok && area != nil {
  193. // update["area"] = area
  194. //} else {
  195. // update["area"] = ""
  196. //}
  197. //
  198. //if city, ok := tmp["city"]; ok && city != nil {
  199. // update["city"] = city
  200. //} else {
  201. // update["city"] = ""
  202. //}
  203. //
  204. //if district, ok := tmp["district"]; ok && district != nil {
  205. // if district == "乌拉盖管委会" {
  206. // update["district"] = "乌拉盖管理区管委会"
  207. // } else if district == "错那县" {
  208. // update["district"] = "错那市"
  209. // } else if district == "河南周口经济开发区" {
  210. // update["district"] = "周口临港开发区"
  211. // } else if district == "米林县" {
  212. // update["district"] = "米林市"
  213. // }
  214. //
  215. //}
  216. //-------------------------------------------//
  217. // 2.更新中标单位、采购单位、代理机构
  218. biddingID := util.ObjToString(tmp["id"])
  219. //biddingID := mongodb.BsonIdToSId(tmp["_id"])
  220. if _, ok := tmp["buyer"]; ok {
  221. update["buyer"] = tmp["buyer"]
  222. }
  223. if _, ok := tmp["agency"]; ok {
  224. update["agency"] = tmp["agency"]
  225. }
  226. if _, ok := tmp["s_winner"]; ok {
  227. update["s_winner"] = tmp["s_winner"]
  228. }
  229. if _, ok := tmp["winner"]; ok {
  230. update["winner"] = tmp["winner"]
  231. }
  232. //-------------------------------------------//
  233. //3. 更新中标金额
  234. //biddingID := util.ObjToString(tmp["id"])
  235. //if _, ok := tmp["nb"]; !ok {
  236. // continue
  237. //} else {
  238. // update["bidamount"] = tmp["nb"]
  239. //}
  240. //update["bidamount"] = tmp["bidamount"]
  241. //// 更新 MongoDB + ES
  242. if len(update) > 0 {
  243. MgoB.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  244. //2.es 项目 更新字段
  245. err := Es.UpdateDocument("bidding", biddingID, update)
  246. err = EsNew.UpdateDocument("bidding", biddingID, update)
  247. if err != nil && err.Error() != "Document not updated: noop" {
  248. log.Info("bidding es update err", err, biddingID)
  249. }
  250. }
  251. }
  252. log.Info("Run Over...Count:", log.Int("count", count))
  253. }
  254. // taskRunProject 更新项目表 省市区
  255. func taskRunProject() {
  256. defer util.Catch()
  257. sess := Mgo.GetMgoConn()
  258. defer Mgo.DestoryMongoConn(sess)
  259. // 项目数据
  260. MgoP := &mongodb.MongodbSim{
  261. MongodbAddr: "172.17.4.85:27080",
  262. //MongodbAddr: "127.0.0.1:27080",
  263. Size: 10,
  264. DbName: "qfw",
  265. //Direct: true,
  266. }
  267. MgoP.InitPool()
  268. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  269. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  270. fmt.Println("taskRun 开始")
  271. count := 0
  272. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  273. if count%10000 == 0 {
  274. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  275. }
  276. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  277. where := map[string]interface{}{
  278. "ids": biddingID,
  279. }
  280. // 找到对应项目数据
  281. p, _ := MgoP.FindOne("projectset_20230904", where)
  282. projectId := mongodb.BsonIdToSId((*p)["_id"])
  283. //1.更新MongoDB
  284. update := map[string]interface{}{}
  285. if area, ok := tmp["area"]; ok && area != nil {
  286. update["area"] = area
  287. } else {
  288. update["area"] = ""
  289. }
  290. if city, ok := tmp["city"]; ok && city != nil {
  291. update["city"] = city
  292. } else {
  293. update["city"] = ""
  294. }
  295. if district, ok := tmp["district"]; ok && district != nil {
  296. update["district"] = district
  297. } else {
  298. update["district"] = ""
  299. }
  300. if len(update) > 0 {
  301. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  302. //2.es 项目 更新字段
  303. err := Es.UpdateDocument("projectset", projectId, update)
  304. if err != nil {
  305. log.Info("es update err", err, projectId)
  306. }
  307. }
  308. //2.es 项目 更新字段
  309. //if len(update) > 0 {
  310. // // 更新es
  311. // //updateEsPool <- []map[string]interface{}{
  312. // // {"_id": projectId},
  313. // // update,
  314. // //}
  315. //}
  316. }
  317. log.Info("Run Over...Count:", log.Int("count", count))
  318. }
  319. // dealData 正式环境,同步合同期限
  320. func dealData() {
  321. defer util.Catch()
  322. sess := Mgo.GetMgoConn()
  323. defer Mgo.DestoryMongoConn(sess)
  324. //where := map[string]interface{}{
  325. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  326. //}
  327. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  328. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  329. count := 0
  330. realNum := 0
  331. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  332. if count%1000 == 0 {
  333. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  334. }
  335. idStr := mongodb.BsonIdToSId(tmp["_id"])
  336. update := make(map[string]interface{})
  337. if tmp["signaturedate"] != nil {
  338. update["signaturedate"] = tmp["signaturedate"]
  339. }
  340. if tmp["contractperiod"] != nil {
  341. update["contractperiod"] = tmp["contractperiod"]
  342. }
  343. if tmp["expiredate"] != nil {
  344. update["expiredate"] = tmp["expiredate"]
  345. }
  346. if len(update) == 0 {
  347. continue
  348. }
  349. //bidding 表
  350. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  351. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  352. data := *bidding
  353. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  354. // 针对存量数据,重复数据不进索引
  355. if util.IntAll(data["extracttype"]) == -1 {
  356. tmp = make(map[string]interface{})
  357. continue
  358. }
  359. } else {
  360. //bidding_back
  361. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  362. data := *bidding
  363. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  364. // 针对存量数据,重复数据不进索引
  365. if util.IntAll(data["extracttype"]) == -1 {
  366. tmp = make(map[string]interface{})
  367. continue
  368. }
  369. }
  370. realNum++
  371. //2.es 更新字段
  372. esUpdate := update
  373. esUpdate["id"] = idStr
  374. if len(esUpdate) > 0 {
  375. // 更新es
  376. updateEsPool <- []map[string]interface{}{
  377. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  378. esUpdate,
  379. }
  380. }
  381. tmp = make(map[string]interface{})
  382. }
  383. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  384. }
  385. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  386. func dealResult() {
  387. defer util.Catch()
  388. sess := MgoR.GetMgoConn()
  389. defer MgoR.DestoryMongoConn(sess)
  390. where := map[string]interface{}{
  391. "_id": map[string]interface{}{
  392. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  393. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  394. },
  395. "subtype": "合同",
  396. }
  397. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  398. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  399. count := 0
  400. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  401. if count%1000 == 0 {
  402. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  403. }
  404. idStr := mongodb.BsonIdToSId(tmp["_id"])
  405. update := make(map[string]interface{})
  406. if tmp["signaturedate"] != nil {
  407. update["signaturedate"] = tmp["signaturedate"]
  408. }
  409. if tmp["contractperiod"] != nil {
  410. update["contractperiod"] = tmp["contractperiod"]
  411. }
  412. if tmp["expiredate"] != nil {
  413. update["expiredate"] = tmp["expiredate"]
  414. }
  415. if len(update) == 0 {
  416. continue
  417. }
  418. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  419. data := *bidding
  420. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  421. // 针对存量数据,重复数据不进索引
  422. if util.IntAll(data["extracttype"]) == -1 {
  423. tmp = make(map[string]interface{})
  424. continue
  425. }
  426. //2.es 更新字段
  427. esUpdate := update
  428. esUpdate["id"] = idStr
  429. if len(esUpdate) > 0 {
  430. // 更新es
  431. updateEsPool <- []map[string]interface{}{
  432. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  433. esUpdate,
  434. }
  435. }
  436. tmp = make(map[string]interface{})
  437. }
  438. log.Info("Run Over...Count:", log.Int("count", count))
  439. }
  440. // dealBidding 处理bidding数据
  441. func dealBidding() {
  442. defer util.Catch()
  443. sess := MgoB.GetMgoConn()
  444. defer MgoB.DestoryMongoConn(sess)
  445. //where := map[string]interface{}{
  446. // "comeintime": map[string]interface{}{
  447. // "$lt": 1722009600,
  448. // //"$lt": 1718812802,
  449. // "$gte": 1718899200,
  450. // },
  451. //}
  452. //where := map[string]interface{}{
  453. // "_id": map[string]interface{}{
  454. // "$gte": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea71e"),
  455. // "$lt": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea720"),
  456. // },
  457. //}
  458. it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
  459. fmt.Println("taskRun 开始")
  460. count := 0
  461. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  462. if count%10000 == 0 {
  463. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  464. }
  465. //update := map[string]interface{}{}
  466. esUpdate := map[string]interface{}{}
  467. if util.IntAll(tmp["extracttype"]) == -1 {
  468. continue
  469. }
  470. if util.ObjToString(tmp["purchasing"]) == "" {
  471. continue
  472. }
  473. esUpdate["purchasing"] = tmp["purchasing"]
  474. // 2.更新中标单位,中标金额
  475. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  476. // update["tag_topinformation"] = tag_topinformation
  477. //}
  478. //
  479. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  480. // update["property_form"] = property_form
  481. //}
  482. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  483. //fmt.Println(biddingID)
  484. /**
  485. "s_subscopeclass" : "其它",
  486. "s_topscopeclass" : "其它",
  487. "subscopeclass" : [
  488. "其它"
  489. ],
  490. "topscopeclass" : [
  491. "其它"
  492. ],
  493. */
  494. //// 行业分类默认值
  495. //resultSubs := make([]string, 0)
  496. //resultTobs := make([]string, 0)
  497. //if topscopeclass, ok := tmp["topscopeclass"]; ok && topscopeclass != nil {
  498. // if topps, ok2 := topscopeclass.([]interface{}); ok2 {
  499. // for _, v := range topps {
  500. // top := util.ObjToString(v)
  501. // if top != "" {
  502. // resultTobs = append(resultTobs, top)
  503. // }
  504. // }
  505. // }
  506. // //1.一级分类是空数组或者 是 其它
  507. // if len(resultTobs) == 0 || resultTobs[0] == "其它" {
  508. // update["topscopeclass"] = []string{"其它"}
  509. // update["subscopeclass"] = []string{"其它"}
  510. // update["s_topscopeclass"] = "其它"
  511. // update["s_subscopeclass"] = "其它"
  512. // esUpdate["s_topscopeclass"] = "其它"
  513. // esUpdate["s_subscopeclass"] = "其它"
  514. // esUpdate["topscopeclass"] = []string{"其它"}
  515. // } else {
  516. // if subs, ok3 := tmp["subscopeclass"]; ok3 {
  517. // if subbs, ok4 := subs.([]interface{}); ok4 {
  518. // for _, v := range subbs {
  519. // sub := util.ObjToString(v)
  520. // if sub != "" && sub != "其它" {
  521. // resultSubs = append(resultSubs, sub)
  522. // }
  523. // }
  524. // }
  525. // }
  526. // newTops, newSubs, cleanedTops := ProcessTopscopeclass(resultTobs, resultSubs)
  527. // update["topscopeclass"] = newTops
  528. // update["subscopeclass"] = newSubs
  529. // update["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  530. // update["s_subscopeclass"] = strings.Join(newSubs, ",")
  531. // esUpdate["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  532. // esUpdate["s_subscopeclass"] = strings.Join(newSubs, ",")
  533. // esUpdate["topscopeclass"] = newTops
  534. // }
  535. //
  536. //} else {
  537. // update["topscopeclass"] = []string{"其它"}
  538. // update["subscopeclass"] = []string{"其它"}
  539. // update["s_topscopeclass"] = "其它"
  540. // update["s_subscopeclass"] = "其它"
  541. // esUpdate["s_topscopeclass"] = "其它"
  542. // esUpdate["s_subscopeclass"] = "其它"
  543. // esUpdate["topscopeclass"] = []string{"其它"}
  544. //}
  545. //
  546. ////procurementlist 处理预计采购时间
  547. ////if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  548. //// field := "procurementlist"
  549. //// if tmp[field] != nil {
  550. //// if field == "procurementlist" {
  551. //// if tmp["procurementlist"] != nil {
  552. //// var arr []interface{}
  553. //// plist := tmp["procurementlist"].([]interface{})
  554. //// for _, p := range plist {
  555. //// p1 := p.(map[string]interface{})
  556. //// p2 := make(map[string]interface{})
  557. //// for k, v := range BiddingLevelField[field] {
  558. //// if k == "projectname" && util.ObjToString(p1[k]) == "" {
  559. //// p2[k] = util.ObjToString(tmp["projectname"])
  560. //// } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  561. //// p2[k] = util.ObjToString(tmp["buyer"])
  562. //// } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  563. //// res := getMethod(util.ObjToString(p1[k]))
  564. //// if res != 0 {
  565. //// p2[k] = res
  566. //// }
  567. //// } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  568. //// p2[k] = p1[k]
  569. //// }
  570. ////
  571. //// }
  572. //// arr = append(arr, p2)
  573. //// }
  574. //// if len(arr) > 0 {
  575. //// esUpdate[field] = arr
  576. //// }
  577. //// }
  578. //// }
  579. //// }
  580. ////}
  581. //
  582. //if len(update) > 0 {
  583. // //fmt.Println("aaaaa", biddingID)
  584. // //更新mongo
  585. // //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  586. // //更新MongoDB
  587. // updatePool <- []map[string]interface{}{
  588. // {"_id": tmp["_id"]},
  589. // {"$set": update},
  590. // }
  591. //
  592. // //2.es 项目 更新字段
  593. // //err := Es.UpdateDocument("bidding", biddingID, update)
  594. // //if err != nil && err.Error() != "Document not updated: noop" {
  595. // // log.Info("bidding es update err", err, biddingID)
  596. // //}
  597. // //// 更新es
  598. // //updateEsPool <- []map[string]interface{}{
  599. // // {"_id": biddingID},
  600. // // update,
  601. // //}
  602. //}
  603. // 更新Es 数据
  604. if len(esUpdate) > 0 {
  605. // 更新es
  606. updateEsPool <- []map[string]interface{}{
  607. {"_id": biddingID},
  608. esUpdate,
  609. }
  610. }
  611. }
  612. log.Info("Run Over...Count:", log.Int("count", count))
  613. }
  614. // dealBiddingAi 处理qfw_ai 数据库bidding 数据
  615. func dealBiddingAi() {
  616. defer util.Catch()
  617. sess := MgoBAi.GetMgoConn()
  618. defer MgoBAi.DestoryMongoConn(sess)
  619. it := sess.DB("qfw_ai").C("zxl_20240926").Find(nil).Select(nil).Iter()
  620. fmt.Println("taskRun 开始")
  621. count := 0
  622. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  623. if count%1000 == 0 {
  624. fmt.Println("current:", count)
  625. }
  626. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  627. update := map[string]interface{}{}
  628. //if budget, ok := tmp["budget"]; ok && budget != nil {
  629. // update["budget"] = budget
  630. //}
  631. if bidamount, ok := tmp["bidamount"]; ok && bidamount != nil {
  632. update["bidamount"] = bidamount
  633. } else {
  634. update["bidamount"] = 0.0
  635. }
  636. //if projectcode, ok := tmp["projectcode"]; ok && projectcode != nil {
  637. // update["projectcode"] = projectcode
  638. //}
  639. if len(update) > 0 {
  640. MgoBAi.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  641. //2.es 项目 更新字段
  642. err := Es.UpdateDocument("bidding_ai", biddingID, update)
  643. if err != nil && err.Error() != "Document not updated: noop" {
  644. log.Info("bidding es update err", err, biddingID)
  645. }
  646. }
  647. }
  648. fmt.Println("over ----------- over ")
  649. }
  650. func dealBiddingByEs() {
  651. //url := "http://172.17.4.184:19908"
  652. url := "http://127.0.0.1:19908"
  653. username := "jybid"
  654. password := "Top2023_JEB01i@31"
  655. index := "bidding" //索引名称
  656. //index := "projectset" //索引名称
  657. // 创建 Elasticsearch 客户端
  658. client, err := es7.NewClient(
  659. es7.SetURL(url),
  660. es7.SetBasicAuth(username, password),
  661. es7.SetSniff(false),
  662. )
  663. if err != nil {
  664. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  665. }
  666. query := es7.NewBoolQuery()
  667. query.Must(es7.NewRangeQuery("comeintime").Gt(1718812800))
  668. query.MustNot(es7.NewExistsQuery("s_topscopeclass"))
  669. ctx := context.Background()
  670. //开始滚动搜索
  671. scrollID := ""
  672. scroll := "10m"
  673. searchSource := es7.NewSearchSource().
  674. Query(query).
  675. Size(10000).
  676. Sort("_doc", true) //升序排序
  677. //Sort("_doc", false) //降序排序
  678. searchService := client.Scroll(index).
  679. Size(10000).
  680. Scroll(scroll).
  681. SearchSource(searchSource)
  682. res, err := searchService.Do(ctx)
  683. if err != nil {
  684. if err == io.EOF {
  685. fmt.Println("没有数据")
  686. } else {
  687. panic(err)
  688. }
  689. }
  690. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  691. fmt.Println("总数是:", res.TotalHits())
  692. total := 0
  693. for len(res.Hits.Hits) > 0 {
  694. for _, hit := range res.Hits.Hits {
  695. var doc map[string]interface{}
  696. err := json.Unmarshal(hit.Source, &doc)
  697. if err != nil {
  698. fmt.Printf("解析文档失败:%s", err)
  699. continue
  700. }
  701. //delete(doc, "filetext")
  702. //delete(doc, "detail")
  703. //
  704. ////存入新表
  705. //err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc)
  706. //if err != nil {
  707. // fmt.Println("error", doc["id"])
  708. //}
  709. }
  710. total = total + len(res.Hits.Hits)
  711. scrollID = res.ScrollId
  712. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  713. fmt.Println("current count:", total)
  714. if err != nil {
  715. if err == io.EOF {
  716. // 滚动到最后一批数据,退出循环
  717. break
  718. }
  719. fmt.Println("滚动搜索失败:", err, res)
  720. break // 处理错误时退出循环
  721. }
  722. }
  723. // 在循环外调用 ClearScroll
  724. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  725. if err != nil {
  726. fmt.Printf("清理滚动搜索失败:%s", err)
  727. }
  728. fmt.Println("结束~~~~~~~~~~~~~~~")
  729. }
  730. // dealBiddingTest 处理测试环境数据
  731. func dealBiddingTest() {
  732. defer util.Catch()
  733. sess := MgoT.GetMgoConn()
  734. defer MgoT.DestoryMongoConn(sess)
  735. it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter()
  736. fmt.Println("taskRun 开始")
  737. count := 0
  738. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  739. if count%10000 == 0 {
  740. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  741. }
  742. update := map[string]interface{}{}
  743. // 2.更新中标单位,中标金额
  744. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  745. // update["tag_topinformation"] = tag_topinformation
  746. //}
  747. //
  748. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  749. // update["property_form"] = property_form
  750. //}
  751. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  752. /**
  753. "s_subscopeclass" : "其它",
  754. "s_topscopeclass" : "其它",
  755. "subscopeclass" : [
  756. "其它"
  757. ],
  758. "topscopeclass" : [
  759. "其它"
  760. ],
  761. */
  762. // 行业分类默认值
  763. if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
  764. update["topscopeclass"] = []string{"其它"}
  765. update["s_topscopeclass"] = "其它"
  766. }
  767. if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
  768. update["subscopeclass"] = []string{"其它"}
  769. update["s_subscopeclass"] = "其它"
  770. }
  771. if util.ObjToString(tmp["s_topscopeclass"]) == "其它" {
  772. update["topscopeclass"] = []string{"其它"}
  773. update["s_topscopeclass"] = "其它"
  774. }
  775. if util.ObjToString(tmp["s_subscopeclass"]) == "其它" {
  776. update["subscopeclass"] = []string{"其它"}
  777. update["s_subscopeclass"] = "其它"
  778. }
  779. //procurementlist 处理预计采购时间
  780. if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  781. for field, _ := range BiddingField {
  782. if tmp[field] != nil {
  783. if field == "procurementlist" {
  784. if tmp["procurementlist"] != nil {
  785. var arr []interface{}
  786. plist := tmp["procurementlist"].([]interface{})
  787. for _, p := range plist {
  788. p1 := p.(map[string]interface{})
  789. p2 := make(map[string]interface{})
  790. for k, v := range BiddingLevelField[field] {
  791. if k == "projectname" && util.ObjToString(p1[k]) == "" {
  792. p2[k] = util.ObjToString(tmp["projectname"])
  793. } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  794. p2[k] = util.ObjToString(tmp["buyer"])
  795. } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  796. res := getMethod(util.ObjToString(p1[k]))
  797. if res != 0 {
  798. p2[k] = res
  799. }
  800. } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  801. p2[k] = p1[k]
  802. }
  803. }
  804. arr = append(arr, p2)
  805. }
  806. if len(arr) > 0 {
  807. update[field] = arr
  808. }
  809. }
  810. }
  811. }
  812. }
  813. }
  814. if len(update) > 0 {
  815. fmt.Println("aaaaa", biddingID)
  816. //更新mongo
  817. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  818. //更新MongoDB
  819. //updatePool <- []map[string]interface{}{
  820. // {"_id": tmp["_id"]},
  821. // {"$set": update},
  822. //}
  823. //2.es 项目 更新字段
  824. //err := Es.UpdateDocument("bidding", biddingID, update)
  825. //if err != nil && err.Error() != "Document not updated: noop" {
  826. // log.Info("bidding es update err", err, biddingID)
  827. //}
  828. // 更新es
  829. //updateEsPool <- []map[string]interface{}{
  830. // {"_id": biddingID},
  831. // update,
  832. //}
  833. }
  834. }
  835. log.Info("Run Over...Count:", log.Int("count", count))
  836. }
  837. // updateMethod 更新MongoDB
  838. func updateMethod() {
  839. arru := make([][]map[string]interface{}, saveSize)
  840. indexu := 0
  841. for {
  842. select {
  843. case v := <-updatePool:
  844. arru[indexu] = v
  845. indexu++
  846. if indexu == saveSize {
  847. updateSp <- true
  848. go func(arru [][]map[string]interface{}) {
  849. defer func() {
  850. <-updateSp
  851. }()
  852. MgoB.UpdateBulk("bidding", arru...)
  853. }(arru)
  854. arru = make([][]map[string]interface{}, saveSize)
  855. indexu = 0
  856. }
  857. case <-time.After(1000 * time.Millisecond):
  858. if indexu > 0 {
  859. updateSp <- true
  860. go func(arru [][]map[string]interface{}) {
  861. defer func() {
  862. <-updateSp
  863. }()
  864. MgoB.UpdateBulk("bidding", arru...)
  865. }(arru[:indexu])
  866. arru = make([][]map[string]interface{}, saveSize)
  867. indexu = 0
  868. }
  869. }
  870. }
  871. }
  872. // updateEsMethod 更新es
  873. func updateEsMethod() {
  874. arru := make([][]map[string]interface{}, 200)
  875. indexu := 0
  876. for {
  877. select {
  878. case v := <-updateEsPool:
  879. arru[indexu] = v
  880. indexu++
  881. if indexu == 200 {
  882. updateEsSp <- true
  883. go func(arru [][]map[string]interface{}) {
  884. defer func() {
  885. <-updateEsSp
  886. }()
  887. Es.UpdateBulk("bidding", arru...)
  888. EsNew.UpdateBulk("bidding", arru...)
  889. }(arru)
  890. arru = make([][]map[string]interface{}, 200)
  891. indexu = 0
  892. }
  893. case <-time.After(1000 * time.Millisecond):
  894. if indexu > 0 {
  895. updateEsSp <- true
  896. go func(arru [][]map[string]interface{}) {
  897. defer func() {
  898. <-updateEsSp
  899. }()
  900. Es.UpdateBulk("bidding", arru...)
  901. EsNew.UpdateBulk("bidding", arru...)
  902. }(arru[:indexu])
  903. arru = make([][]map[string]interface{}, 200)
  904. indexu = 0
  905. }
  906. }
  907. }
  908. }
  909. // updateEsMethod 更新es href 字段
  910. func updateEsHrefMethod() {
  911. arru := make([][]map[string]interface{}, 200)
  912. indexu := 0
  913. for {
  914. select {
  915. case v := <-updateEsPool:
  916. arru[indexu] = v
  917. indexu++
  918. if indexu == 200 {
  919. updateEsSp <- true
  920. go func(arru [][]map[string]interface{}) {
  921. defer func() {
  922. <-updateEsSp
  923. }()
  924. Es.UpdateBulk("bidding", arru...)
  925. Es.UpdateBulk("bidding_ai", arru...)
  926. Es.UpdateBulk("bidding_temporary", arru...)
  927. EsNew.UpdateBulk("bidding", arru...)
  928. EsNew.UpdateBulk("bidding_customer", arru...)
  929. EsNew.UpdateBulk("bidding_free", arru...)
  930. EsNew.UpdateBulk("bidding_year", arru...)
  931. EsNew.UpdateBulk("bidding_all", arru...)
  932. EsNew.UpdateBulk("bidding_temporary", arru...)
  933. }(arru)
  934. arru = make([][]map[string]interface{}, 200)
  935. indexu = 0
  936. }
  937. case <-time.After(1000 * time.Millisecond):
  938. if indexu > 0 {
  939. updateEsSp <- true
  940. go func(arru [][]map[string]interface{}) {
  941. defer func() {
  942. <-updateEsSp
  943. }()
  944. Es.UpdateBulk("bidding", arru...)
  945. Es.UpdateBulk("bidding_ai", arru...)
  946. Es.UpdateBulk("bidding_temporary", arru...)
  947. EsNew.UpdateBulk("bidding", arru...)
  948. EsNew.UpdateBulk("bidding_customer", arru...)
  949. EsNew.UpdateBulk("bidding_free", arru...)
  950. EsNew.UpdateBulk("bidding_year", arru...)
  951. EsNew.UpdateBulk("bidding_all", arru...)
  952. EsNew.UpdateBulk("bidding_temporary", arru...)
  953. }(arru[:indexu])
  954. arru = make([][]map[string]interface{}, 200)
  955. indexu = 0
  956. }
  957. }
  958. }
  959. }
  960. // updateProjectEsMethod 更新项目索引
  961. func updateProjectEsMethod() {
  962. arru := make([][]map[string]interface{}, 200)
  963. indexu := 0
  964. for {
  965. select {
  966. case v := <-updateProjectEsPool:
  967. arru[indexu] = v
  968. indexu++
  969. if indexu == 200 {
  970. updateProjectEsSp <- true
  971. go func(arru [][]map[string]interface{}) {
  972. defer func() {
  973. <-updateProjectEsSp
  974. }()
  975. Es.UpdateBulk("projectset", arru...)
  976. }(arru)
  977. arru = make([][]map[string]interface{}, 200)
  978. indexu = 0
  979. }
  980. case <-time.After(1000 * time.Millisecond):
  981. if indexu > 0 {
  982. updateProjectEsSp <- true
  983. go func(arru [][]map[string]interface{}) {
  984. defer func() {
  985. <-updateProjectEsSp
  986. }()
  987. Es.UpdateBulk("projectset", arru...)
  988. }(arru[:indexu])
  989. arru = make([][]map[string]interface{}, 200)
  990. indexu = 0
  991. }
  992. }
  993. }
  994. }