main.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/wcc4869/common_utils/log"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/mongo/options"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "time"
  12. )
  13. var (
  14. Mgo *mongodb.MongodbSim
  15. MgoT *mongodb.MongodbSim //测试环境链接
  16. MgoR *mongodb.MongodbSim
  17. saveSize = 50
  18. Es *elastic.Elastic
  19. EsNew *elastic.Elastic
  20. EsT *elastic.Elastic
  21. // 更新mongo
  22. updatePool = make(chan []map[string]interface{}, 5000)
  23. updateSp = make(chan bool, 5)
  24. //更新es
  25. updateEsPool = make(chan []map[string]interface{}, 5000)
  26. updateEsSp = make(chan bool, 2) //保存协程
  27. )
  28. func main() {
  29. //mongodb
  30. //Mgo = &mongodb.MongodbSim{
  31. // //MongodbAddr: "172.17.189.140:27080",
  32. // MongodbAddr: "127.0.0.1:27083",
  33. // DbName: "qfw",
  34. // Size: 10,
  35. // UserName: "SJZY_RWbid_ES",
  36. // Password: "SJZY@B4i4D5e6S",
  37. // Direct: true,
  38. //}
  39. //Mgo.InitPool()
  40. //85
  41. MgoR = &mongodb.MongodbSim{
  42. //MongodbAddr: "127.0.0.1:27080",
  43. MongodbAddr: "172.17.4.85:27080",
  44. DbName: "qfw",
  45. Size: 10,
  46. //Direct: true,
  47. }
  48. MgoR.InitPool()
  49. ////测试环境MongoDB
  50. //MgoT = &mongodb.MongodbSim{
  51. // //MongodbAddr: "172.17.189.140:27080",
  52. // MongodbAddr: "192.168.3.206:27002",
  53. // DbName: "qfw_data",
  54. // Size: 10,
  55. // UserName: "root",
  56. // Password: "root",
  57. // //Direct: true,
  58. //}
  59. //MgoT.InitPool()
  60. // 测试环境es
  61. //Es = &elastic.Elastic{
  62. // S_esurl: "http://192.168.3.149:9201",
  63. // //S_esurl: "http://172.17.4.184:19805",
  64. // I_size: 5,
  65. // Username: "",
  66. // Password: "",
  67. //}
  68. //Es.InitElasticSize()
  69. //es
  70. Es = &elastic.Elastic{
  71. //S_esurl: "http://127.0.0.1:19805",
  72. S_esurl: "http://172.17.4.184:19805",
  73. I_size: 5,
  74. Username: "es_all",
  75. Password: "TopJkO2E_d1x",
  76. }
  77. Es.InitElasticSize()
  78. // es 新集群
  79. //EsNew = &elastic.Elastic{
  80. // S_esurl: "http://127.0.0.1:19905",
  81. // //S_esurl: "http://172.17.4.184:19905",
  82. // I_size: 5,
  83. // Username: "jybid",
  84. // Password: "Top2023_JEB01i@31",
  85. //}
  86. //EsNew.InitElasticSize()
  87. //go updateMethod() //更新mongodb
  88. go updateEsMethod() //更新es
  89. taskRun()
  90. //dealDataTest()// 测试环境数据处理
  91. fmt.Println(111)
  92. c := make(chan bool, 1)
  93. <-c
  94. }
  95. // taskRun 更新es 省市区三个字段
  96. func taskRun() {
  97. defer util.Catch()
  98. sess := MgoR.GetMgoConn()
  99. defer MgoR.DestoryMongoConn(sess)
  100. //pool := make(chan bool, 2) //处理协程
  101. //wg := &sync.WaitGroup{}
  102. //查询条件
  103. q := map[string]interface{}{
  104. "_id": map[string]interface{}{
  105. //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  106. //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  107. //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  108. "$lte": mongodb.StringTOBsonId("624949c64f7bde5444ed7c6c"),
  109. },
  110. //"comeintime": map[string]interface{}{
  111. // "$gt": 1669824000,
  112. // //"$lte": 1669864950,
  113. // "$lte": 1702265941,
  114. //},
  115. //"site": "国家能源e购",
  116. }
  117. //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  118. it := sess.DB("qfw").C("projectset_20230904").Find(q).Select(nil).Iter()
  119. fmt.Println("taskRun 开始")
  120. count := 0
  121. //realNum := 0
  122. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  123. if count%10000 == 0 {
  124. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  125. }
  126. esUpdate := make(map[string]interface{})
  127. if subtitle_projectname, ok := tmp["subtitle_projectname"]; ok && subtitle_projectname != nil {
  128. esUpdate["subtitle_projectname"] = subtitle_projectname
  129. }
  130. //if len(esUpdate) > 0 {
  131. // id := mongodb.BsonIdToSId(tmp["_id"])
  132. // err := Es.UpdateDocument("projectset", id, esUpdate)
  133. // if err != nil {
  134. // if strings.Contains(err.Error(), "Document not updated:") {
  135. // continue
  136. // } else {
  137. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  138. // }
  139. // }
  140. //}
  141. if len(esUpdate) > 0 {
  142. // 更新es
  143. updateEsPool <- []map[string]interface{}{
  144. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  145. esUpdate,
  146. }
  147. }
  148. //if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  149. // tmp = make(map[string]interface{})
  150. // continue
  151. //}
  152. //// 针对存量数据,重复数据不进索引
  153. //if util.IntAll(tmp["extracttype"]) == -1 {
  154. // continue
  155. //}
  156. //
  157. ////针对产权数据,暂时不入es 索引库
  158. //if util.IntAll(tmp["infoformat"]) == 3 {
  159. // continue
  160. //}
  161. //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交
  162. //channel := util.ObjToString(tmp["channel"])
  163. //if channel != "紧急直接零星采购公告" {
  164. // continue
  165. //}
  166. //realNum++
  167. //pool <- true
  168. //wg.Add(1)
  169. //go func(tmp map[string]interface{}) {
  170. // defer func() {
  171. // <-pool
  172. // wg.Done()
  173. // }()
  174. ////1.更新MongoDB
  175. //update := map[string]interface{}{
  176. // "tag_set": tmp["tag_set"],
  177. // "tag_topinformation": tmp["tag_topinformation"],
  178. // "tag_subinformation": tmp["tag_subinformation"],
  179. //}
  180. //if len(update) > 0 {
  181. // //更新MongoDB
  182. // updatePool <- []map[string]interface{}{
  183. // {"_id": tmp["_id"]},
  184. // {"$set": update},
  185. // }
  186. //}
  187. //2.es 更新字段
  188. //esUpdate := make(map[string]interface{})
  189. //if autoid, ok := tmp["autoid"]; ok && autoid != nil {
  190. // esUpdate["autoid"] = autoid
  191. //}
  192. //
  193. //if len(esUpdate) > 0 {
  194. // err := Es.UpdateDocument("bidding", mongodb.BsonIdToSId(tmp["_id"]), esUpdate)
  195. // if err != nil {
  196. // if strings.Contains(err.Error(), "Document not updated:") {
  197. // return
  198. // } else {
  199. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  200. // }
  201. // }
  202. //}
  203. //if err != nil {
  204. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  205. //}
  206. //if tag_set, ok := tmp["tag_set"]; ok && tag_set != nil {
  207. // esUpdate["tag_set"] = tag_set
  208. //}
  209. //
  210. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  211. // esUpdate["tag_topinformation"] = tag_topinformation
  212. //}
  213. //
  214. //if tag_subinformation, ok := tmp["tag_subinformation"]; ok && tag_subinformation != nil {
  215. // esUpdate["tag_subinformation"] = tag_subinformation
  216. //}
  217. //if len(esUpdate) > 0 {
  218. // // 更新es
  219. // updateEsPool <- []map[string]interface{}{
  220. // {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  221. // esUpdate,
  222. // }
  223. //}
  224. //}(tmp)
  225. //tmp = make(map[string]interface{})
  226. }
  227. //wg.Wait()
  228. log.Info("Run Over...Count:", log.Int("count", count))
  229. }
  230. // dealData 正式环境,同步合同期限
  231. func dealData() {
  232. defer util.Catch()
  233. sess := Mgo.GetMgoConn()
  234. defer Mgo.DestoryMongoConn(sess)
  235. //where := map[string]interface{}{
  236. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  237. //}
  238. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  239. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  240. count := 0
  241. realNum := 0
  242. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  243. if count%1000 == 0 {
  244. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  245. }
  246. idStr := mongodb.BsonIdToSId(tmp["_id"])
  247. update := make(map[string]interface{})
  248. if tmp["signaturedate"] != nil {
  249. update["signaturedate"] = tmp["signaturedate"]
  250. }
  251. if tmp["contractperiod"] != nil {
  252. update["contractperiod"] = tmp["contractperiod"]
  253. }
  254. if tmp["expiredate"] != nil {
  255. update["expiredate"] = tmp["expiredate"]
  256. }
  257. if len(update) == 0 {
  258. continue
  259. }
  260. //bidding 表
  261. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  262. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  263. data := *bidding
  264. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  265. // 针对存量数据,重复数据不进索引
  266. if util.IntAll(data["extracttype"]) == -1 {
  267. tmp = make(map[string]interface{})
  268. continue
  269. }
  270. } else {
  271. //bidding_back
  272. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  273. data := *bidding
  274. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  275. // 针对存量数据,重复数据不进索引
  276. if util.IntAll(data["extracttype"]) == -1 {
  277. tmp = make(map[string]interface{})
  278. continue
  279. }
  280. }
  281. realNum++
  282. //2.es 更新字段
  283. esUpdate := update
  284. esUpdate["id"] = idStr
  285. if len(esUpdate) > 0 {
  286. // 更新es
  287. updateEsPool <- []map[string]interface{}{
  288. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  289. esUpdate,
  290. }
  291. }
  292. tmp = make(map[string]interface{})
  293. }
  294. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  295. }
  296. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  297. func dealResult() {
  298. defer util.Catch()
  299. sess := MgoR.GetMgoConn()
  300. defer MgoR.DestoryMongoConn(sess)
  301. where := map[string]interface{}{
  302. "_id": map[string]interface{}{
  303. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  304. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  305. },
  306. "subtype": "合同",
  307. }
  308. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  309. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  310. count := 0
  311. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  312. if count%1000 == 0 {
  313. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  314. }
  315. idStr := mongodb.BsonIdToSId(tmp["_id"])
  316. update := make(map[string]interface{})
  317. if tmp["signaturedate"] != nil {
  318. update["signaturedate"] = tmp["signaturedate"]
  319. }
  320. if tmp["contractperiod"] != nil {
  321. update["contractperiod"] = tmp["contractperiod"]
  322. }
  323. if tmp["expiredate"] != nil {
  324. update["expiredate"] = tmp["expiredate"]
  325. }
  326. if len(update) == 0 {
  327. continue
  328. }
  329. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  330. data := *bidding
  331. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  332. // 针对存量数据,重复数据不进索引
  333. if util.IntAll(data["extracttype"]) == -1 {
  334. tmp = make(map[string]interface{})
  335. continue
  336. }
  337. //2.es 更新字段
  338. esUpdate := update
  339. esUpdate["id"] = idStr
  340. if len(esUpdate) > 0 {
  341. // 更新es
  342. updateEsPool <- []map[string]interface{}{
  343. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  344. esUpdate,
  345. }
  346. }
  347. tmp = make(map[string]interface{})
  348. }
  349. log.Info("Run Over...Count:", log.Int("count", count))
  350. }
  351. // dealDataTest 处理测试环境数据
  352. func dealDataTest() {
  353. defer util.Catch()
  354. sess := MgoT.GetMgoConn()
  355. defer MgoT.DestoryMongoConn(sess)
  356. where := map[string]interface{}{
  357. "_id": map[string]interface{}{
  358. "$gte": mongodb.StringTOBsonId("635051528aea8786d196e24a"),
  359. "$lte": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
  360. },
  361. }
  362. //where := map[string]interface{}{
  363. // "_id": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
  364. //}
  365. ctx := context.Background()
  366. coll := sess.M.C.Database("qfw_data").Collection("bidding")
  367. find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"_id", -1}}).SetProjection(bson.M{"_id": 1, "title": 1, "subtype": 1})
  368. cur, err := coll.Find(ctx, where, find)
  369. if err != nil {
  370. fmt.Println(err)
  371. }
  372. ///////
  373. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  374. //it := sess.DB("qfw_data").C("bidding").Find(where).Select(nil).Iter()
  375. count := 0
  376. realNum := 0
  377. for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
  378. if cur != nil {
  379. cur.Decode(&tmp)
  380. }
  381. if count%1000 == 0 {
  382. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  383. }
  384. idStr := mongodb.BsonIdToSId(tmp["_id"])
  385. data, _ := Mgo.FindById("zktest_quanliang_0210_fbs", idStr, selected)
  386. if len(*data) == 0 {
  387. continue
  388. }
  389. update := make(map[string]interface{})
  390. if (*data)["signaturedate"] != nil {
  391. update["signaturedate"] = (*data)["signaturedate"]
  392. }
  393. if (*data)["contractperiod"] != nil {
  394. update["contractperiod"] = (*data)["contractperiod"]
  395. }
  396. if (*data)["expiredate"] != nil {
  397. update["expiredate"] = (*data)["expiredate"]
  398. }
  399. if len(update) == 0 {
  400. continue
  401. }
  402. fmt.Println(idStr)
  403. MgoT.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  404. //bidding 表
  405. //if idStr > "5a862e7040d2d9bbe88e3b1f" {
  406. // bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  407. // data := *bidding
  408. // Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  409. //
  410. // // 针对存量数据,重复数据不进索引
  411. // if util.IntAll(data["extracttype"]) == -1 {
  412. // tmp = make(map[string]interface{})
  413. // continue
  414. // }
  415. //
  416. //} else {
  417. // //bidding_back
  418. // bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  419. // data := *bidding
  420. // Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  421. // // 针对存量数据,重复数据不进索引
  422. // if util.IntAll(data["extracttype"]) == -1 {
  423. // tmp = make(map[string]interface{})
  424. // continue
  425. // }
  426. //}
  427. realNum++
  428. //2.es 更新字段
  429. esUpdate := update
  430. esUpdate["id"] = idStr
  431. if len(esUpdate) > 0 {
  432. fmt.Println(idStr)
  433. // 更新es
  434. updateEsPool <- []map[string]interface{}{
  435. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  436. esUpdate,
  437. }
  438. }
  439. //err := Es.UpdateDocument("bidding", idStr, update)
  440. //if err != nil {
  441. // log.Error("es update", err)
  442. //}
  443. //
  444. //err = EsNew.UpdateDocument("bidding", idStr, update)
  445. //if err != nil {
  446. // log.Error("esNew update", err)
  447. //}
  448. tmp = make(map[string]interface{})
  449. }
  450. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  451. }
  452. // updateMethod 更新MongoDB
  453. func updateMethod() {
  454. arru := make([][]map[string]interface{}, saveSize)
  455. indexu := 0
  456. for {
  457. select {
  458. case v := <-updatePool:
  459. arru[indexu] = v
  460. indexu++
  461. if indexu == saveSize {
  462. updateSp <- true
  463. go func(arru [][]map[string]interface{}) {
  464. defer func() {
  465. <-updateSp
  466. }()
  467. Mgo.UpdateBulk("bidding", arru...)
  468. }(arru)
  469. arru = make([][]map[string]interface{}, saveSize)
  470. indexu = 0
  471. }
  472. case <-time.After(1000 * time.Millisecond):
  473. if indexu > 0 {
  474. updateSp <- true
  475. go func(arru [][]map[string]interface{}) {
  476. defer func() {
  477. <-updateSp
  478. }()
  479. Mgo.UpdateBulk("bidding", arru...)
  480. }(arru[:indexu])
  481. arru = make([][]map[string]interface{}, saveSize)
  482. indexu = 0
  483. }
  484. }
  485. }
  486. }
  487. // updateEsMethod 更新es
  488. func updateEsMethod() {
  489. arru := make([][]map[string]interface{}, 200)
  490. indexu := 0
  491. for {
  492. select {
  493. case v := <-updateEsPool:
  494. arru[indexu] = v
  495. indexu++
  496. if indexu == 200 {
  497. updateEsSp <- true
  498. go func(arru [][]map[string]interface{}) {
  499. defer func() {
  500. <-updateEsSp
  501. }()
  502. Es.UpdateBulk("projectset", arru...)
  503. //EsNew.UpdateBulk("bidding", arru...)
  504. }(arru)
  505. arru = make([][]map[string]interface{}, 200)
  506. indexu = 0
  507. }
  508. case <-time.After(1000 * time.Millisecond):
  509. if indexu > 0 {
  510. updateEsSp <- true
  511. go func(arru [][]map[string]interface{}) {
  512. defer func() {
  513. <-updateEsSp
  514. }()
  515. Es.UpdateBulk("projectset", arru...)
  516. //EsNew.UpdateBulk("bidding", arru...)
  517. }(arru[:indexu])
  518. arru = make([][]map[string]interface{}, 200)
  519. indexu = 0
  520. }
  521. }
  522. }
  523. }