main.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/wcc4869/common_utils/log"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  8. "time"
  9. )
  10. var (
  11. Mgo *mongodb.MongodbSim
  12. MgoT *mongodb.MongodbSim //测试环境链接
  13. MgoR *mongodb.MongodbSim
  14. saveSize = 50
  15. Es *elastic.Elastic
  16. EsNew *elastic.Elastic
  17. EsT *elastic.Elastic
  18. // 更新mongo
  19. updatePool = make(chan []map[string]interface{}, 5000)
  20. updateSp = make(chan bool, 5)
  21. //更新es
  22. updateEsPool = make(chan []map[string]interface{}, 5000)
  23. updateEsSp = make(chan bool, 5) //保存协程
  24. )
  25. func main() {
  26. //mongodb 163
  27. //Mgo = &mongodb.MongodbSim{
  28. // //MongodbAddr: "172.17.189.140:27080",
  29. // MongodbAddr: "127.0.0.1:27083",
  30. // DbName: "qfw",
  31. // Size: 10,
  32. // UserName: "SJZY_RWbid_ES",
  33. // Password: "SJZY@B4i4D5e6S",
  34. // Direct: true,
  35. //}
  36. //Mgo.InitPool()
  37. //85
  38. //MgoR = &mongodb.MongodbSim{
  39. // //MongodbAddr: "127.0.0.1:27080",
  40. // MongodbAddr: "172.17.4.85:27080",
  41. // DbName: "qfw",
  42. // Size: 10,
  43. // //Direct: true,
  44. //}
  45. //MgoR.InitPool()
  46. ////测试环境MongoDB
  47. MgoT = &mongodb.MongodbSim{
  48. //MongodbAddr: "172.17.189.140:27080",
  49. MongodbAddr: "192.168.3.206:27002",
  50. DbName: "qfw_data",
  51. Size: 10,
  52. UserName: "root",
  53. Password: "root",
  54. //Direct: true,
  55. }
  56. MgoT.InitPool()
  57. // 测试环境es
  58. Es = &elastic.Elastic{
  59. S_esurl: "http://192.168.3.149:9201",
  60. //S_esurl: "http://172.17.4.184:19805",
  61. I_size: 5,
  62. Username: "",
  63. Password: "",
  64. }
  65. Es.InitElasticSize()
  66. //es
  67. //Es = &elastic.Elastic{
  68. // S_esurl: "http://127.0.0.1:19908",
  69. // //S_esurl: "http://172.17.4.184:19908",
  70. // I_size: 5,
  71. // Username: "jybid",
  72. // Password: "Top2023_JEB01i@31",
  73. //}
  74. //Es.InitElasticSize()
  75. // es 新集群
  76. //EsNew = &elastic.Elastic{
  77. // S_esurl: "http://127.0.0.1:19905",
  78. // //S_esurl: "http://172.17.4.184:19905",
  79. // I_size: 5,
  80. // Username: "jybid",
  81. // Password: "Top2023_JEB01i@31",
  82. //}
  83. //EsNew.InitElasticSize()
  84. //go updateMethod() //更新mongodb
  85. go updateEsMethod() //更新es
  86. //go updateProjectEsMethod()
  87. //taskRunProject()
  88. //taskRunBidding()
  89. dealBiddingTest() // 测试环境数据处理
  90. //updateProject()
  91. fmt.Println("over")
  92. c := make(chan bool, 1)
  93. <-c
  94. }
  95. // taskRun 更新es 省市区三个字段
  96. func taskRunBidding() {
  97. defer util.Catch()
  98. sess := Mgo.GetMgoConn()
  99. defer Mgo.DestoryMongoConn(sess)
  100. //查询条件
  101. //q := map[string]interface{}{
  102. // //"_id": map[string]interface{}{
  103. // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  104. // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  105. // //
  106. // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  107. // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  108. // //},
  109. // //"comeintime": map[string]interface{}{
  110. // // "$gt": 1669824000,
  111. // // //"$lte": 1669864950,
  112. // // "$lte": 1702265941,
  113. // //},
  114. // //"site": "国家能源e购",
  115. // "toptype": map[string]interface{}{"$exists": 0},
  116. //}
  117. //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  118. it := sess.DB("qfw").C("zktest_bidding_0619_compare").Find(nil).Select(nil).Sort("_id").Iter()
  119. fmt.Println("taskRun 开始")
  120. count := 0
  121. //realNum := 0
  122. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  123. if count%1000 == 0 {
  124. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  125. }
  126. update := map[string]interface{}{}
  127. // 1.更新省市区
  128. //if area, ok := tmp["area"]; ok && area != nil {
  129. // update["area"] = area
  130. //} else {
  131. // update["area"] = ""
  132. //}
  133. //
  134. //if city, ok := tmp["city"]; ok && city != nil {
  135. // update["city"] = city
  136. //} else {
  137. // update["city"] = ""
  138. //}
  139. //
  140. //if district, ok := tmp["district"]; ok && district != nil {
  141. // update["district"] = district
  142. //} else {
  143. // update["district"] = ""
  144. //}
  145. // 2.更新中标单位,中标金额
  146. if winner, ok := tmp["winner"]; ok && winner != nil {
  147. update["winner"] = winner
  148. } else {
  149. update["winner"] = ""
  150. }
  151. if s_winner, ok := tmp["s_winner"]; ok && s_winner != nil {
  152. update["s_winner"] = s_winner
  153. } else {
  154. update["s_winner"] = ""
  155. }
  156. if bidamount, ok := tmp["bidamount"]; ok && bidamount != nil {
  157. update["bidamount"] = bidamount
  158. } else {
  159. update["bidamount"] = nil
  160. }
  161. //biddingID := util.ObjToString(tmp["id"])
  162. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  163. if len(update) > 0 {
  164. //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  165. //2.es 项目 更新字段
  166. err := Es.UpdateDocument("bidding", biddingID, update)
  167. err = EsNew.UpdateDocument("bidding", biddingID, update)
  168. if err != nil && err.Error() != "Document not updated: noop" {
  169. log.Info("bidding es update err", err, biddingID)
  170. }
  171. }
  172. }
  173. log.Info("Run Over...Count:", log.Int("count", count))
  174. }
  175. // taskRunProject 更新项目表 省市区
  176. func taskRunProject() {
  177. defer util.Catch()
  178. sess := Mgo.GetMgoConn()
  179. defer Mgo.DestoryMongoConn(sess)
  180. // 项目数据
  181. MgoP := &mongodb.MongodbSim{
  182. MongodbAddr: "172.17.4.85:27080",
  183. //MongodbAddr: "127.0.0.1:27080",
  184. Size: 10,
  185. DbName: "qfw",
  186. //Direct: true,
  187. }
  188. MgoP.InitPool()
  189. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  190. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  191. fmt.Println("taskRun 开始")
  192. count := 0
  193. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  194. if count%10000 == 0 {
  195. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  196. }
  197. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  198. where := map[string]interface{}{
  199. "ids": biddingID,
  200. }
  201. // 找到对应项目数据
  202. p, _ := MgoP.FindOne("projectset_20230904", where)
  203. projectId := mongodb.BsonIdToSId((*p)["_id"])
  204. //1.更新MongoDB
  205. update := map[string]interface{}{}
  206. if area, ok := tmp["area"]; ok && area != nil {
  207. update["area"] = area
  208. } else {
  209. update["area"] = ""
  210. }
  211. if city, ok := tmp["city"]; ok && city != nil {
  212. update["city"] = city
  213. } else {
  214. update["city"] = ""
  215. }
  216. if district, ok := tmp["district"]; ok && district != nil {
  217. update["district"] = district
  218. } else {
  219. update["district"] = ""
  220. }
  221. if len(update) > 0 {
  222. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  223. //2.es 项目 更新字段
  224. err := Es.UpdateDocument("projectset", projectId, update)
  225. if err != nil {
  226. log.Info("es update err", err, projectId)
  227. }
  228. }
  229. //2.es 项目 更新字段
  230. //if len(update) > 0 {
  231. // // 更新es
  232. // //updateEsPool <- []map[string]interface{}{
  233. // // {"_id": projectId},
  234. // // update,
  235. // //}
  236. //}
  237. }
  238. log.Info("Run Over...Count:", log.Int("count", count))
  239. }
  240. // dealData 正式环境,同步合同期限
  241. func dealData() {
  242. defer util.Catch()
  243. sess := Mgo.GetMgoConn()
  244. defer Mgo.DestoryMongoConn(sess)
  245. //where := map[string]interface{}{
  246. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  247. //}
  248. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  249. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  250. count := 0
  251. realNum := 0
  252. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  253. if count%1000 == 0 {
  254. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  255. }
  256. idStr := mongodb.BsonIdToSId(tmp["_id"])
  257. update := make(map[string]interface{})
  258. if tmp["signaturedate"] != nil {
  259. update["signaturedate"] = tmp["signaturedate"]
  260. }
  261. if tmp["contractperiod"] != nil {
  262. update["contractperiod"] = tmp["contractperiod"]
  263. }
  264. if tmp["expiredate"] != nil {
  265. update["expiredate"] = tmp["expiredate"]
  266. }
  267. if len(update) == 0 {
  268. continue
  269. }
  270. //bidding 表
  271. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  272. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  273. data := *bidding
  274. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  275. // 针对存量数据,重复数据不进索引
  276. if util.IntAll(data["extracttype"]) == -1 {
  277. tmp = make(map[string]interface{})
  278. continue
  279. }
  280. } else {
  281. //bidding_back
  282. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  283. data := *bidding
  284. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  285. // 针对存量数据,重复数据不进索引
  286. if util.IntAll(data["extracttype"]) == -1 {
  287. tmp = make(map[string]interface{})
  288. continue
  289. }
  290. }
  291. realNum++
  292. //2.es 更新字段
  293. esUpdate := update
  294. esUpdate["id"] = idStr
  295. if len(esUpdate) > 0 {
  296. // 更新es
  297. updateEsPool <- []map[string]interface{}{
  298. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  299. esUpdate,
  300. }
  301. }
  302. tmp = make(map[string]interface{})
  303. }
  304. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  305. }
  306. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  307. func dealResult() {
  308. defer util.Catch()
  309. sess := MgoR.GetMgoConn()
  310. defer MgoR.DestoryMongoConn(sess)
  311. where := map[string]interface{}{
  312. "_id": map[string]interface{}{
  313. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  314. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  315. },
  316. "subtype": "合同",
  317. }
  318. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  319. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  320. count := 0
  321. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  322. if count%1000 == 0 {
  323. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  324. }
  325. idStr := mongodb.BsonIdToSId(tmp["_id"])
  326. update := make(map[string]interface{})
  327. if tmp["signaturedate"] != nil {
  328. update["signaturedate"] = tmp["signaturedate"]
  329. }
  330. if tmp["contractperiod"] != nil {
  331. update["contractperiod"] = tmp["contractperiod"]
  332. }
  333. if tmp["expiredate"] != nil {
  334. update["expiredate"] = tmp["expiredate"]
  335. }
  336. if len(update) == 0 {
  337. continue
  338. }
  339. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  340. data := *bidding
  341. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  342. // 针对存量数据,重复数据不进索引
  343. if util.IntAll(data["extracttype"]) == -1 {
  344. tmp = make(map[string]interface{})
  345. continue
  346. }
  347. //2.es 更新字段
  348. esUpdate := update
  349. esUpdate["id"] = idStr
  350. if len(esUpdate) > 0 {
  351. // 更新es
  352. updateEsPool <- []map[string]interface{}{
  353. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  354. esUpdate,
  355. }
  356. }
  357. tmp = make(map[string]interface{})
  358. }
  359. log.Info("Run Over...Count:", log.Int("count", count))
  360. }
  361. // dealBiddingTest 处理测试环境数据
  362. func dealBiddingTest() {
  363. defer util.Catch()
  364. sess := MgoT.GetMgoConn()
  365. defer MgoT.DestoryMongoConn(sess)
  366. it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter()
  367. fmt.Println("taskRun 开始")
  368. count := 0
  369. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  370. if count%10000 == 0 {
  371. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  372. }
  373. update := map[string]interface{}{}
  374. // 2.更新中标单位,中标金额
  375. if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  376. update["tag_topinformation"] = tag_topinformation
  377. }
  378. if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  379. update["property_form"] = property_form
  380. }
  381. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  382. if len(update) > 0 {
  383. //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  384. //2.es 项目 更新字段
  385. //err := Es.UpdateDocument("bidding", biddingID, update)
  386. //if err != nil && err.Error() != "Document not updated: noop" {
  387. // log.Info("bidding es update err", err, biddingID)
  388. //}
  389. // 更新es
  390. updateEsPool <- []map[string]interface{}{
  391. {"_id": biddingID},
  392. update,
  393. }
  394. }
  395. }
  396. log.Info("Run Over...Count:", log.Int("count", count))
  397. }
  398. // updateMethod 更新MongoDB
  399. func updateMethod() {
  400. arru := make([][]map[string]interface{}, saveSize)
  401. indexu := 0
  402. for {
  403. select {
  404. case v := <-updatePool:
  405. arru[indexu] = v
  406. indexu++
  407. if indexu == saveSize {
  408. updateSp <- true
  409. go func(arru [][]map[string]interface{}) {
  410. defer func() {
  411. <-updateSp
  412. }()
  413. Mgo.UpdateBulk("bidding", arru...)
  414. }(arru)
  415. arru = make([][]map[string]interface{}, saveSize)
  416. indexu = 0
  417. }
  418. case <-time.After(1000 * time.Millisecond):
  419. if indexu > 0 {
  420. updateSp <- true
  421. go func(arru [][]map[string]interface{}) {
  422. defer func() {
  423. <-updateSp
  424. }()
  425. Mgo.UpdateBulk("bidding", arru...)
  426. }(arru[:indexu])
  427. arru = make([][]map[string]interface{}, saveSize)
  428. indexu = 0
  429. }
  430. }
  431. }
  432. }
  433. // updateEsMethod 更新es
  434. func updateEsMethod() {
  435. arru := make([][]map[string]interface{}, 200)
  436. indexu := 0
  437. for {
  438. select {
  439. case v := <-updateEsPool:
  440. arru[indexu] = v
  441. indexu++
  442. if indexu == 200 {
  443. updateEsSp <- true
  444. go func(arru [][]map[string]interface{}) {
  445. defer func() {
  446. <-updateEsSp
  447. }()
  448. Es.UpdateBulk("bidding", arru...)
  449. //EsNew.UpdateBulk("bidding", arru...)
  450. }(arru)
  451. arru = make([][]map[string]interface{}, 200)
  452. indexu = 0
  453. }
  454. case <-time.After(1000 * time.Millisecond):
  455. if indexu > 0 {
  456. updateEsSp <- true
  457. go func(arru [][]map[string]interface{}) {
  458. defer func() {
  459. <-updateEsSp
  460. }()
  461. Es.UpdateBulk("bidding", arru...)
  462. //EsNew.UpdateBulk("bidding", arru...)
  463. }(arru[:indexu])
  464. arru = make([][]map[string]interface{}, 200)
  465. indexu = 0
  466. }
  467. }
  468. }
  469. }
  470. // updateProjectEsMethod 更新项目索引
  471. func updateProjectEsMethod() {
  472. arru := make([][]map[string]interface{}, 200)
  473. indexu := 0
  474. for {
  475. select {
  476. case v := <-updateEsPool:
  477. arru[indexu] = v
  478. indexu++
  479. if indexu == 200 {
  480. updateEsSp <- true
  481. go func(arru [][]map[string]interface{}) {
  482. defer func() {
  483. <-updateEsSp
  484. }()
  485. Es.UpdateBulk("projectset", arru...)
  486. }(arru)
  487. arru = make([][]map[string]interface{}, 200)
  488. indexu = 0
  489. }
  490. case <-time.After(1000 * time.Millisecond):
  491. if indexu > 0 {
  492. updateEsSp <- true
  493. go func(arru [][]map[string]interface{}) {
  494. defer func() {
  495. <-updateEsSp
  496. }()
  497. Es.UpdateBulk("projectset", arru...)
  498. }(arru[:indexu])
  499. arru = make([][]map[string]interface{}, 200)
  500. indexu = 0
  501. }
  502. }
  503. }
  504. }