main.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/wcc4869/common_utils/log"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/mongo/options"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "time"
  12. )
  13. var (
  14. Mgo *mongodb.MongodbSim
  15. MgoT *mongodb.MongodbSim //测试环境链接
  16. MgoR *mongodb.MongodbSim
  17. saveSize = 50
  18. Es *elastic.Elastic
  19. EsNew *elastic.Elastic
  20. EsT *elastic.Elastic
  21. // 更新mongo
  22. updatePool = make(chan []map[string]interface{}, 5000)
  23. updateSp = make(chan bool, 5)
  24. //更新es
  25. updateEsPool = make(chan []map[string]interface{}, 5000)
  26. updateEsSp = make(chan bool, 2) //保存协程
  27. )
  28. func main() {
  29. //mongodb
  30. Mgo = &mongodb.MongodbSim{
  31. MongodbAddr: "172.17.189.140:27080",
  32. //MongodbAddr: "127.0.0.1:27083",
  33. DbName: "qfw",
  34. Size: 10,
  35. UserName: "SJZY_RWbid_ES",
  36. Password: "SJZY@B4i4D5e6S",
  37. //Direct: true,
  38. }
  39. Mgo.InitPool()
  40. //85
  41. //MgoR = &mongodb.MongodbSim{
  42. // //MongodbAddr: "127.0.0.1:27080",
  43. // MongodbAddr: "172.17.4.85:27080",
  44. // DbName: "qfw",
  45. // Size: 10,
  46. // //Direct: true,
  47. //}
  48. //MgoR.InitPool()
  49. ////测试环境MongoDB
  50. //MgoT = &mongodb.MongodbSim{
  51. // //MongodbAddr: "172.17.189.140:27080",
  52. // MongodbAddr: "192.168.3.206:27002",
  53. // DbName: "qfw_data",
  54. // Size: 10,
  55. // UserName: "root",
  56. // Password: "root",
  57. // //Direct: true,
  58. //}
  59. //MgoT.InitPool()
  60. // 测试环境es
  61. //Es = &elastic.Elastic{
  62. // S_esurl: "http://192.168.3.149:9201",
  63. // //S_esurl: "http://172.17.4.184:19805",
  64. // I_size: 5,
  65. // Username: "",
  66. // Password: "",
  67. //}
  68. //Es.InitElasticSize()
  69. //es
  70. Es = &elastic.Elastic{
  71. //S_esurl: "http://127.0.0.1:19908",
  72. S_esurl: "http://172.17.4.184:19908",
  73. I_size: 5,
  74. Username: "jybid",
  75. Password: "Top2023_JEB01i@31",
  76. }
  77. Es.InitElasticSize()
  78. // es 新集群
  79. EsNew = &elastic.Elastic{
  80. //S_esurl: "http://127.0.0.1:19905",
  81. S_esurl: "http://172.17.4.184:19905",
  82. I_size: 5,
  83. Username: "jybid",
  84. Password: "Top2023_JEB01i@31",
  85. }
  86. EsNew.InitElasticSize()
  87. //go updateMethod() //更新mongodb
  88. go updateEsMethod() //更新es
  89. //taskRunProject()
  90. taskRunBidding()
  91. //dealDataTest()// 测试环境数据处理
  92. fmt.Println("over")
  93. c := make(chan bool, 1)
  94. <-c
  95. }
  96. // taskRun 更新es 省市区三个字段
  97. func taskRunBidding() {
  98. defer util.Catch()
  99. sess := Mgo.GetMgoConn()
  100. defer Mgo.DestoryMongoConn(sess)
  101. //pool := make(chan bool, 2) //处理协程
  102. //wg := &sync.WaitGroup{}
  103. //查询条件
  104. //q := map[string]interface{}{
  105. // //"_id": map[string]interface{}{
  106. // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  107. // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  108. // //
  109. // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  110. // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  111. // //},
  112. // //"comeintime": map[string]interface{}{
  113. // // "$gt": 1669824000,
  114. // // //"$lte": 1669864950,
  115. // // "$lte": 1702265941,
  116. // //},
  117. // //"site": "国家能源e购",
  118. // "toptype": map[string]interface{}{"$exists": 0},
  119. //}
  120. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  121. it := sess.DB("qfw").C("zktest_0520_id").Find(nil).Select(selected).Sort("_id").Iter()
  122. fmt.Println("taskRun 开始")
  123. count := 0
  124. //realNum := 0
  125. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  126. if count%1000 == 0 {
  127. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  128. }
  129. //1.更新MongoDB
  130. update := map[string]interface{}{}
  131. //if area, ok := tmp["area"]; ok && area != nil {
  132. // update["area"] = area
  133. //} else {
  134. // update["area"] = ""
  135. //}
  136. //
  137. //if city, ok := tmp["city"]; ok && city != nil {
  138. // update["city"] = city
  139. //} else {
  140. // update["city"] = ""
  141. //}
  142. //
  143. //if district, ok := tmp["district"]; ok && district != nil {
  144. // update["district"] = district
  145. //} else {
  146. // update["district"] = ""
  147. //}
  148. //========//
  149. //toptype := ""
  150. //if toptype, ok := tmp["toptype"]; ok && toptype != nil {
  151. // update["toptype"] = toptype
  152. //} else {
  153. // update["toptype"] = ""
  154. //}
  155. ////
  156. //if subtype, ok := tmp["subtype"]; ok && subtype != nil {
  157. // if util.ObjToString(tmp["toptype"]) == "结果" && util.ObjToString(tmp["subtype"]) == "招标" {
  158. // update["subtype"] = ""
  159. // }
  160. //} else {
  161. // update["subtype"] = ""
  162. //}
  163. //update["toptype"] = "其它"
  164. //update["subtype"] = "其它"
  165. //if len(update) > 0 {
  166. // //更新MongoDB
  167. // updatePool <- []map[string]interface{}{
  168. // //{"_id": tmp["id"]},
  169. // {"_id": tmp["_id"]},
  170. // {"$set": update},
  171. // }
  172. // //====//
  173. // //biddingID := util.ObjToString(tmp["id"])
  174. // //biddingID := mongodb.BsonIdToSId(tmp["_id"])
  175. // //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  176. // ////2.es 项目 更新字段
  177. // //Es.UpdateDocument("bidding", biddingID, update)
  178. // //EsNew.UpdateDocument("bidding", biddingID, update)
  179. // //if err != nil {
  180. // // log.Info("bidding es update err", err, biddingID)
  181. // //}
  182. //}
  183. //2.es 更新字段
  184. //esUpdate := make(map[string]interface{})
  185. //if subtitle_projectname, ok := tmp["subtitle_projectname"]; ok && subtitle_projectname != nil {
  186. // esUpdate["subtitle_projectname"] = subtitle_projectname
  187. //}
  188. biddingID := util.ObjToString(tmp["id"])
  189. bidamount := util.Float64All(tmp["bidamount"])
  190. _, ok := tmp["bidamount"]
  191. if ok && bidamount > 0 {
  192. if biddingID != "" {
  193. update["bidamount"] = bidamount
  194. }
  195. }
  196. if len(update) > 0 {
  197. // 更新es
  198. updateEsPool <- []map[string]interface{}{
  199. {"_id": biddingID},
  200. //{"_id": mongodb.BsonIdToSId(tmp["_id"])},
  201. update,
  202. }
  203. }
  204. //if len(update) > 0 {
  205. // id := mongodb.BsonIdToSId(tmp["_id"])
  206. // //id := mongodb.BsonIdToSId(tmp["_id"])
  207. // err := Es.UpdateDocument("projectset", id, esUpdate)
  208. // if err != nil {
  209. // if strings.Contains(err.Error(), "Document not updated:") {
  210. // continue
  211. // } else {
  212. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  213. // }
  214. // }
  215. //}
  216. //if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  217. // tmp = make(map[string]interface{})
  218. // continue
  219. //}
  220. //// 针对存量数据,重复数据不进索引
  221. //if util.IntAll(tmp["extracttype"]) == -1 {
  222. // continue
  223. //}
  224. //
  225. ////针对产权数据,暂时不入es 索引库
  226. //if util.IntAll(tmp["infoformat"]) == 3 {
  227. // continue
  228. //}
  229. //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交
  230. //channel := util.ObjToString(tmp["channel"])
  231. //if channel != "紧急直接零星采购公告" {
  232. // continue
  233. //}
  234. //realNum++
  235. //pool <- true
  236. //wg.Add(1)
  237. //go func(tmp map[string]interface{}) {
  238. // defer func() {
  239. // <-pool
  240. // wg.Done()
  241. // }()
  242. ////2.es 更新字段
  243. //esUpdate := make(map[string]interface{})
  244. //if autoid, ok := tmp["autoid"]; ok && autoid != nil {
  245. // esUpdate["autoid"] = autoid
  246. //}
  247. //
  248. //if len(esUpdate) > 0 {
  249. // err := Es.UpdateDocument("bidding", mongodb.BsonIdToSId(tmp["_id"]), esUpdate)
  250. // if err != nil {
  251. // if strings.Contains(err.Error(), "Document not updated:") {
  252. // return
  253. // } else {
  254. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  255. // }
  256. // }
  257. //}
  258. //if err != nil {
  259. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  260. //}
  261. //if tag_set, ok := tmp["tag_set"]; ok && tag_set != nil {
  262. // esUpdate["tag_set"] = tag_set
  263. //}
  264. //
  265. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  266. // esUpdate["tag_topinformation"] = tag_topinformation
  267. //}
  268. //
  269. //if tag_subinformation, ok := tmp["tag_subinformation"]; ok && tag_subinformation != nil {
  270. // esUpdate["tag_subinformation"] = tag_subinformation
  271. //}
  272. //if len(esUpdate) > 0 {
  273. // // 更新es
  274. // updateEsPool <- []map[string]interface{}{
  275. // {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  276. // esUpdate,
  277. // }
  278. //}
  279. //}(tmp)
  280. //tmp = make(map[string]interface{})
  281. }
  282. //wg.Wait()
  283. log.Info("Run Over...Count:", log.Int("count", count))
  284. }
  285. // taskRunProject 更新项目表 省市区
  286. func taskRunProject() {
  287. defer util.Catch()
  288. sess := Mgo.GetMgoConn()
  289. defer Mgo.DestoryMongoConn(sess)
  290. // 项目数据
  291. MgoP := &mongodb.MongodbSim{
  292. MongodbAddr: "172.17.4.85:27080",
  293. //MongodbAddr: "127.0.0.1:27080",
  294. Size: 10,
  295. DbName: "qfw",
  296. //Direct: true,
  297. }
  298. MgoP.InitPool()
  299. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  300. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  301. fmt.Println("taskRun 开始")
  302. count := 0
  303. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  304. if count%10000 == 0 {
  305. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  306. }
  307. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  308. where := map[string]interface{}{
  309. "ids": biddingID,
  310. }
  311. // 找到对应项目数据
  312. p, _ := MgoP.FindOne("projectset_20230904", where)
  313. projectId := mongodb.BsonIdToSId((*p)["_id"])
  314. //1.更新MongoDB
  315. update := map[string]interface{}{}
  316. if area, ok := tmp["area"]; ok && area != nil {
  317. update["area"] = area
  318. } else {
  319. update["area"] = ""
  320. }
  321. if city, ok := tmp["city"]; ok && city != nil {
  322. update["city"] = city
  323. } else {
  324. update["city"] = ""
  325. }
  326. if district, ok := tmp["district"]; ok && district != nil {
  327. update["district"] = district
  328. } else {
  329. update["district"] = ""
  330. }
  331. if len(update) > 0 {
  332. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  333. //2.es 项目 更新字段
  334. err := Es.UpdateDocument("projectset", projectId, update)
  335. if err != nil {
  336. log.Info("es update err", err, projectId)
  337. }
  338. }
  339. //2.es 项目 更新字段
  340. //if len(update) > 0 {
  341. // // 更新es
  342. // //updateEsPool <- []map[string]interface{}{
  343. // // {"_id": projectId},
  344. // // update,
  345. // //}
  346. //}
  347. }
  348. log.Info("Run Over...Count:", log.Int("count", count))
  349. }
  350. // dealData 正式环境,同步合同期限
  351. func dealData() {
  352. defer util.Catch()
  353. sess := Mgo.GetMgoConn()
  354. defer Mgo.DestoryMongoConn(sess)
  355. //where := map[string]interface{}{
  356. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  357. //}
  358. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  359. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  360. count := 0
  361. realNum := 0
  362. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  363. if count%1000 == 0 {
  364. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  365. }
  366. idStr := mongodb.BsonIdToSId(tmp["_id"])
  367. update := make(map[string]interface{})
  368. if tmp["signaturedate"] != nil {
  369. update["signaturedate"] = tmp["signaturedate"]
  370. }
  371. if tmp["contractperiod"] != nil {
  372. update["contractperiod"] = tmp["contractperiod"]
  373. }
  374. if tmp["expiredate"] != nil {
  375. update["expiredate"] = tmp["expiredate"]
  376. }
  377. if len(update) == 0 {
  378. continue
  379. }
  380. //bidding 表
  381. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  382. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  383. data := *bidding
  384. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  385. // 针对存量数据,重复数据不进索引
  386. if util.IntAll(data["extracttype"]) == -1 {
  387. tmp = make(map[string]interface{})
  388. continue
  389. }
  390. } else {
  391. //bidding_back
  392. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  393. data := *bidding
  394. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  395. // 针对存量数据,重复数据不进索引
  396. if util.IntAll(data["extracttype"]) == -1 {
  397. tmp = make(map[string]interface{})
  398. continue
  399. }
  400. }
  401. realNum++
  402. //2.es 更新字段
  403. esUpdate := update
  404. esUpdate["id"] = idStr
  405. if len(esUpdate) > 0 {
  406. // 更新es
  407. updateEsPool <- []map[string]interface{}{
  408. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  409. esUpdate,
  410. }
  411. }
  412. tmp = make(map[string]interface{})
  413. }
  414. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  415. }
  416. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  417. func dealResult() {
  418. defer util.Catch()
  419. sess := MgoR.GetMgoConn()
  420. defer MgoR.DestoryMongoConn(sess)
  421. where := map[string]interface{}{
  422. "_id": map[string]interface{}{
  423. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  424. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  425. },
  426. "subtype": "合同",
  427. }
  428. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  429. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  430. count := 0
  431. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  432. if count%1000 == 0 {
  433. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  434. }
  435. idStr := mongodb.BsonIdToSId(tmp["_id"])
  436. update := make(map[string]interface{})
  437. if tmp["signaturedate"] != nil {
  438. update["signaturedate"] = tmp["signaturedate"]
  439. }
  440. if tmp["contractperiod"] != nil {
  441. update["contractperiod"] = tmp["contractperiod"]
  442. }
  443. if tmp["expiredate"] != nil {
  444. update["expiredate"] = tmp["expiredate"]
  445. }
  446. if len(update) == 0 {
  447. continue
  448. }
  449. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  450. data := *bidding
  451. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  452. // 针对存量数据,重复数据不进索引
  453. if util.IntAll(data["extracttype"]) == -1 {
  454. tmp = make(map[string]interface{})
  455. continue
  456. }
  457. //2.es 更新字段
  458. esUpdate := update
  459. esUpdate["id"] = idStr
  460. if len(esUpdate) > 0 {
  461. // 更新es
  462. updateEsPool <- []map[string]interface{}{
  463. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  464. esUpdate,
  465. }
  466. }
  467. tmp = make(map[string]interface{})
  468. }
  469. log.Info("Run Over...Count:", log.Int("count", count))
  470. }
  471. // dealDataTest 处理测试环境数据
  472. func dealDataTest() {
  473. defer util.Catch()
  474. sess := MgoT.GetMgoConn()
  475. defer MgoT.DestoryMongoConn(sess)
  476. where := map[string]interface{}{
  477. "_id": map[string]interface{}{
  478. "$gte": mongodb.StringTOBsonId("635051528aea8786d196e24a"),
  479. "$lte": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
  480. },
  481. }
  482. //where := map[string]interface{}{
  483. // "_id": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
  484. //}
  485. ctx := context.Background()
  486. coll := sess.M.C.Database("qfw_data").Collection("bidding")
  487. find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"_id", -1}}).SetProjection(bson.M{"_id": 1, "title": 1, "subtype": 1})
  488. cur, err := coll.Find(ctx, where, find)
  489. if err != nil {
  490. fmt.Println(err)
  491. }
  492. ///////
  493. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  494. //it := sess.DB("qfw_data").C("bidding").Find(where).Select(nil).Iter()
  495. count := 0
  496. realNum := 0
  497. for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
  498. if cur != nil {
  499. cur.Decode(&tmp)
  500. }
  501. if count%1000 == 0 {
  502. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  503. }
  504. idStr := mongodb.BsonIdToSId(tmp["_id"])
  505. data, _ := Mgo.FindById("zktest_quanliang_0210_fbs", idStr, selected)
  506. if len(*data) == 0 {
  507. continue
  508. }
  509. update := make(map[string]interface{})
  510. if (*data)["signaturedate"] != nil {
  511. update["signaturedate"] = (*data)["signaturedate"]
  512. }
  513. if (*data)["contractperiod"] != nil {
  514. update["contractperiod"] = (*data)["contractperiod"]
  515. }
  516. if (*data)["expiredate"] != nil {
  517. update["expiredate"] = (*data)["expiredate"]
  518. }
  519. if len(update) == 0 {
  520. continue
  521. }
  522. fmt.Println(idStr)
  523. MgoT.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  524. //bidding 表
  525. //if idStr > "5a862e7040d2d9bbe88e3b1f" {
  526. // bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  527. // data := *bidding
  528. // Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  529. //
  530. // // 针对存量数据,重复数据不进索引
  531. // if util.IntAll(data["extracttype"]) == -1 {
  532. // tmp = make(map[string]interface{})
  533. // continue
  534. // }
  535. //
  536. //} else {
  537. // //bidding_back
  538. // bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  539. // data := *bidding
  540. // Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  541. // // 针对存量数据,重复数据不进索引
  542. // if util.IntAll(data["extracttype"]) == -1 {
  543. // tmp = make(map[string]interface{})
  544. // continue
  545. // }
  546. //}
  547. realNum++
  548. //2.es 更新字段
  549. esUpdate := update
  550. esUpdate["id"] = idStr
  551. if len(esUpdate) > 0 {
  552. fmt.Println(idStr)
  553. // 更新es
  554. updateEsPool <- []map[string]interface{}{
  555. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  556. esUpdate,
  557. }
  558. }
  559. //err := Es.UpdateDocument("bidding", idStr, update)
  560. //if err != nil {
  561. // log.Error("es update", err)
  562. //}
  563. //
  564. //err = EsNew.UpdateDocument("bidding", idStr, update)
  565. //if err != nil {
  566. // log.Error("esNew update", err)
  567. //}
  568. tmp = make(map[string]interface{})
  569. }
  570. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  571. }
  572. // updateMethod 更新MongoDB
  573. func updateMethod() {
  574. arru := make([][]map[string]interface{}, saveSize)
  575. indexu := 0
  576. for {
  577. select {
  578. case v := <-updatePool:
  579. arru[indexu] = v
  580. indexu++
  581. if indexu == saveSize {
  582. updateSp <- true
  583. go func(arru [][]map[string]interface{}) {
  584. defer func() {
  585. <-updateSp
  586. }()
  587. Mgo.UpdateBulk("bidding", arru...)
  588. }(arru)
  589. arru = make([][]map[string]interface{}, saveSize)
  590. indexu = 0
  591. }
  592. case <-time.After(1000 * time.Millisecond):
  593. if indexu > 0 {
  594. updateSp <- true
  595. go func(arru [][]map[string]interface{}) {
  596. defer func() {
  597. <-updateSp
  598. }()
  599. Mgo.UpdateBulk("bidding", arru...)
  600. }(arru[:indexu])
  601. arru = make([][]map[string]interface{}, saveSize)
  602. indexu = 0
  603. }
  604. }
  605. }
  606. }
  607. // updateEsMethod 更新es
  608. func updateEsMethod() {
  609. arru := make([][]map[string]interface{}, 200)
  610. indexu := 0
  611. for {
  612. select {
  613. case v := <-updateEsPool:
  614. arru[indexu] = v
  615. indexu++
  616. if indexu == 200 {
  617. updateEsSp <- true
  618. go func(arru [][]map[string]interface{}) {
  619. defer func() {
  620. <-updateEsSp
  621. }()
  622. Es.UpdateBulk("bidding", arru...)
  623. EsNew.UpdateBulk("bidding", arru...)
  624. }(arru)
  625. arru = make([][]map[string]interface{}, 200)
  626. indexu = 0
  627. }
  628. case <-time.After(1000 * time.Millisecond):
  629. if indexu > 0 {
  630. updateEsSp <- true
  631. go func(arru [][]map[string]interface{}) {
  632. defer func() {
  633. <-updateEsSp
  634. }()
  635. Es.UpdateBulk("bidding", arru...)
  636. EsNew.UpdateBulk("bidding", arru...)
  637. }(arru[:indexu])
  638. arru = make([][]map[string]interface{}, 200)
  639. indexu = 0
  640. }
  641. }
  642. }
  643. }