main.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/wcc4869/common_utils/log"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/mongo/options"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "time"
  12. )
  13. var (
  14. Mgo *mongodb.MongodbSim
  15. MgoT *mongodb.MongodbSim //测试环境链接
  16. MgoR *mongodb.MongodbSim
  17. saveSize = 50
  18. Es *elastic.Elastic
  19. EsNew *elastic.Elastic
  20. EsT *elastic.Elastic
  21. // 更新mongo
  22. updatePool = make(chan []map[string]interface{}, 5000)
  23. updateSp = make(chan bool, 5)
  24. //更新es
  25. updateEsPool = make(chan []map[string]interface{}, 5000)
  26. updateEsSp = make(chan bool, 2) //保存协程
  27. )
  28. func main() {
  29. //mongodb
  30. Mgo = &mongodb.MongodbSim{
  31. //MongodbAddr: "172.17.189.140:27080",
  32. MongodbAddr: "127.0.0.1:27083",
  33. DbName: "qfw",
  34. Size: 10,
  35. UserName: "SJZY_RWbid_ES",
  36. Password: "SJZY@B4i4D5e6S",
  37. Direct: true,
  38. }
  39. Mgo.InitPool()
  40. //85
  41. //MgoR = &mongodb.MongodbSim{
  42. // //MongodbAddr: "127.0.0.1:27080",
  43. // MongodbAddr: "172.17.4.85:27080",
  44. // DbName: "qfw",
  45. // Size: 10,
  46. // //Direct: true,
  47. //}
  48. //MgoR.InitPool()
  49. ////测试环境MongoDB
  50. //MgoT = &mongodb.MongodbSim{
  51. // //MongodbAddr: "172.17.189.140:27080",
  52. // MongodbAddr: "192.168.3.206:27002",
  53. // DbName: "qfw_data",
  54. // Size: 10,
  55. // UserName: "root",
  56. // Password: "root",
  57. // //Direct: true,
  58. //}
  59. //MgoT.InitPool()
  60. // 测试环境es
  61. //Es = &elastic.Elastic{
  62. // S_esurl: "http://192.168.3.149:9201",
  63. // //S_esurl: "http://172.17.4.184:19805",
  64. // I_size: 5,
  65. // Username: "",
  66. // Password: "",
  67. //}
  68. //Es.InitElasticSize()
  69. //es
  70. Es = &elastic.Elastic{
  71. S_esurl: "http://127.0.0.1:19908",
  72. //S_esurl: "http://172.17.4.184:19908",
  73. I_size: 5,
  74. Username: "jybid",
  75. Password: "Top2023_JEB01i@31",
  76. }
  77. Es.InitElasticSize()
  78. // es 新集群
  79. EsNew = &elastic.Elastic{
  80. S_esurl: "http://127.0.0.1:19905",
  81. //S_esurl: "http://172.17.4.184:19905",
  82. I_size: 5,
  83. Username: "jybid",
  84. Password: "Top2023_JEB01i@31",
  85. }
  86. EsNew.InitElasticSize()
  87. go updateMethod() //更新mongodb
  88. go updateEsMethod() //更新es
  89. //taskRunProject()
  90. taskRunBidding()
  91. //dealDataTest()// 测试环境数据处理
  92. fmt.Println(111)
  93. c := make(chan bool, 1)
  94. <-c
  95. }
  96. // taskRun 更新es 省市区三个字段
  97. func taskRunBidding() {
  98. defer util.Catch()
  99. sess := Mgo.GetMgoConn()
  100. defer Mgo.DestoryMongoConn(sess)
  101. //pool := make(chan bool, 2) //处理协程
  102. //wg := &sync.WaitGroup{}
  103. //查询条件
  104. //q := map[string]interface{}{
  105. // "_id": map[string]interface{}{
  106. // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  107. // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  108. //
  109. // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  110. // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  111. // },
  112. // //"comeintime": map[string]interface{}{
  113. // // "$gt": 1669824000,
  114. // // //"$lte": 1669864950,
  115. // // "$lte": 1702265941,
  116. // //},
  117. // //"site": "国家能源e购",
  118. //}
  119. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  120. it := sess.DB("qfw").C("wcc_subtype_err_0429").Find(nil).Select(selected).Sort("_id").Iter()
  121. fmt.Println("taskRun 开始")
  122. count := 0
  123. //realNum := 0
  124. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  125. if count%10000 == 0 {
  126. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  127. }
  128. //1.更新MongoDB
  129. update := map[string]interface{}{}
  130. //if area, ok := tmp["area"]; ok && area != nil {
  131. // update["area"] = area
  132. //} else {
  133. // update["area"] = ""
  134. //}
  135. //
  136. //if city, ok := tmp["city"]; ok && city != nil {
  137. // update["city"] = city
  138. //} else {
  139. // update["city"] = ""
  140. //}
  141. //
  142. //if district, ok := tmp["district"]; ok && district != nil {
  143. // update["district"] = district
  144. //} else {
  145. // update["district"] = ""
  146. //}
  147. //========//
  148. //toptype := ""
  149. if toptype, ok := tmp["toptype"]; ok && toptype != nil {
  150. update["toptype"] = toptype
  151. } else {
  152. update["toptype"] = ""
  153. }
  154. //
  155. if subtype, ok := tmp["subtype"]; ok && subtype != nil {
  156. if util.ObjToString(tmp["toptype"]) == "结果" && util.ObjToString(tmp["subtype"]) == "招标" {
  157. update["subtype"] = ""
  158. }
  159. } else {
  160. update["subtype"] = ""
  161. }
  162. if len(update) > 0 {
  163. //更新MongoDB
  164. //updatePool <- []map[string]interface{}{
  165. // {"_id": tmp["id"]},
  166. // //{"_id": tmp["_id"]},
  167. // {"$set": update},
  168. //}
  169. //====//
  170. biddingID := util.ObjToString(tmp["id"])
  171. Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  172. //2.es 项目 更新字段
  173. err := Es.UpdateDocument("bidding", biddingID, update)
  174. err = EsNew.UpdateDocument("bidding", biddingID, update)
  175. if err != nil {
  176. log.Info("bidding es update err", err, biddingID)
  177. }
  178. }
  179. //2.es 更新字段
  180. //esUpdate := make(map[string]interface{})
  181. //if subtitle_projectname, ok := tmp["subtitle_projectname"]; ok && subtitle_projectname != nil {
  182. // esUpdate["subtitle_projectname"] = subtitle_projectname
  183. //}
  184. //if len(update) > 0 {
  185. // // 更新es
  186. // updateEsPool <- []map[string]interface{}{
  187. // {"_id": tmp["id"]},
  188. // //{"_id": mongodb.BsonIdToSId(tmp["_id"])},
  189. // update,
  190. // }
  191. //}
  192. //if len(update) > 0 {
  193. // id := mongodb.BsonIdToSId(tmp["_id"])
  194. // //id := mongodb.BsonIdToSId(tmp["_id"])
  195. // err := Es.UpdateDocument("projectset", id, esUpdate)
  196. // if err != nil {
  197. // if strings.Contains(err.Error(), "Document not updated:") {
  198. // continue
  199. // } else {
  200. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  201. // }
  202. // }
  203. //}
  204. //if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  205. // tmp = make(map[string]interface{})
  206. // continue
  207. //}
  208. //// 针对存量数据,重复数据不进索引
  209. //if util.IntAll(tmp["extracttype"]) == -1 {
  210. // continue
  211. //}
  212. //
  213. ////针对产权数据,暂时不入es 索引库
  214. //if util.IntAll(tmp["infoformat"]) == 3 {
  215. // continue
  216. //}
  217. //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交
  218. //channel := util.ObjToString(tmp["channel"])
  219. //if channel != "紧急直接零星采购公告" {
  220. // continue
  221. //}
  222. //realNum++
  223. //pool <- true
  224. //wg.Add(1)
  225. //go func(tmp map[string]interface{}) {
  226. // defer func() {
  227. // <-pool
  228. // wg.Done()
  229. // }()
  230. ////2.es 更新字段
  231. //esUpdate := make(map[string]interface{})
  232. //if autoid, ok := tmp["autoid"]; ok && autoid != nil {
  233. // esUpdate["autoid"] = autoid
  234. //}
  235. //
  236. //if len(esUpdate) > 0 {
  237. // err := Es.UpdateDocument("bidding", mongodb.BsonIdToSId(tmp["_id"]), esUpdate)
  238. // if err != nil {
  239. // if strings.Contains(err.Error(), "Document not updated:") {
  240. // return
  241. // } else {
  242. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  243. // }
  244. // }
  245. //}
  246. //if err != nil {
  247. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  248. //}
  249. //if tag_set, ok := tmp["tag_set"]; ok && tag_set != nil {
  250. // esUpdate["tag_set"] = tag_set
  251. //}
  252. //
  253. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  254. // esUpdate["tag_topinformation"] = tag_topinformation
  255. //}
  256. //
  257. //if tag_subinformation, ok := tmp["tag_subinformation"]; ok && tag_subinformation != nil {
  258. // esUpdate["tag_subinformation"] = tag_subinformation
  259. //}
  260. //if len(esUpdate) > 0 {
  261. // // 更新es
  262. // updateEsPool <- []map[string]interface{}{
  263. // {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  264. // esUpdate,
  265. // }
  266. //}
  267. //}(tmp)
  268. //tmp = make(map[string]interface{})
  269. }
  270. //wg.Wait()
  271. log.Info("Run Over...Count:", log.Int("count", count))
  272. }
  273. // taskRunProject 更新项目表 省市区
  274. func taskRunProject() {
  275. defer util.Catch()
  276. sess := Mgo.GetMgoConn()
  277. defer Mgo.DestoryMongoConn(sess)
  278. // 项目数据
  279. MgoP := &mongodb.MongodbSim{
  280. MongodbAddr: "172.17.4.85:27080",
  281. //MongodbAddr: "127.0.0.1:27080",
  282. Size: 10,
  283. DbName: "qfw",
  284. //Direct: true,
  285. }
  286. MgoP.InitPool()
  287. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  288. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  289. fmt.Println("taskRun 开始")
  290. count := 0
  291. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  292. if count%10000 == 0 {
  293. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  294. }
  295. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  296. where := map[string]interface{}{
  297. "ids": biddingID,
  298. }
  299. // 找到对应项目数据
  300. p, _ := MgoP.FindOne("projectset_20230904", where)
  301. projectId := mongodb.BsonIdToSId((*p)["_id"])
  302. //1.更新MongoDB
  303. update := map[string]interface{}{}
  304. if area, ok := tmp["area"]; ok && area != nil {
  305. update["area"] = area
  306. } else {
  307. update["area"] = ""
  308. }
  309. if city, ok := tmp["city"]; ok && city != nil {
  310. update["city"] = city
  311. } else {
  312. update["city"] = ""
  313. }
  314. if district, ok := tmp["district"]; ok && district != nil {
  315. update["district"] = district
  316. } else {
  317. update["district"] = ""
  318. }
  319. if len(update) > 0 {
  320. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  321. //2.es 项目 更新字段
  322. err := Es.UpdateDocument("projectset", projectId, update)
  323. if err != nil {
  324. log.Info("es update err", err, projectId)
  325. }
  326. }
  327. //2.es 项目 更新字段
  328. //if len(update) > 0 {
  329. // // 更新es
  330. // //updateEsPool <- []map[string]interface{}{
  331. // // {"_id": projectId},
  332. // // update,
  333. // //}
  334. //}
  335. }
  336. log.Info("Run Over...Count:", log.Int("count", count))
  337. }
  338. // dealData 正式环境,同步合同期限
  339. func dealData() {
  340. defer util.Catch()
  341. sess := Mgo.GetMgoConn()
  342. defer Mgo.DestoryMongoConn(sess)
  343. //where := map[string]interface{}{
  344. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  345. //}
  346. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  347. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  348. count := 0
  349. realNum := 0
  350. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  351. if count%1000 == 0 {
  352. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  353. }
  354. idStr := mongodb.BsonIdToSId(tmp["_id"])
  355. update := make(map[string]interface{})
  356. if tmp["signaturedate"] != nil {
  357. update["signaturedate"] = tmp["signaturedate"]
  358. }
  359. if tmp["contractperiod"] != nil {
  360. update["contractperiod"] = tmp["contractperiod"]
  361. }
  362. if tmp["expiredate"] != nil {
  363. update["expiredate"] = tmp["expiredate"]
  364. }
  365. if len(update) == 0 {
  366. continue
  367. }
  368. //bidding 表
  369. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  370. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  371. data := *bidding
  372. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  373. // 针对存量数据,重复数据不进索引
  374. if util.IntAll(data["extracttype"]) == -1 {
  375. tmp = make(map[string]interface{})
  376. continue
  377. }
  378. } else {
  379. //bidding_back
  380. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  381. data := *bidding
  382. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  383. // 针对存量数据,重复数据不进索引
  384. if util.IntAll(data["extracttype"]) == -1 {
  385. tmp = make(map[string]interface{})
  386. continue
  387. }
  388. }
  389. realNum++
  390. //2.es 更新字段
  391. esUpdate := update
  392. esUpdate["id"] = idStr
  393. if len(esUpdate) > 0 {
  394. // 更新es
  395. updateEsPool <- []map[string]interface{}{
  396. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  397. esUpdate,
  398. }
  399. }
  400. tmp = make(map[string]interface{})
  401. }
  402. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  403. }
  404. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  405. func dealResult() {
  406. defer util.Catch()
  407. sess := MgoR.GetMgoConn()
  408. defer MgoR.DestoryMongoConn(sess)
  409. where := map[string]interface{}{
  410. "_id": map[string]interface{}{
  411. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  412. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  413. },
  414. "subtype": "合同",
  415. }
  416. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  417. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  418. count := 0
  419. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  420. if count%1000 == 0 {
  421. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  422. }
  423. idStr := mongodb.BsonIdToSId(tmp["_id"])
  424. update := make(map[string]interface{})
  425. if tmp["signaturedate"] != nil {
  426. update["signaturedate"] = tmp["signaturedate"]
  427. }
  428. if tmp["contractperiod"] != nil {
  429. update["contractperiod"] = tmp["contractperiod"]
  430. }
  431. if tmp["expiredate"] != nil {
  432. update["expiredate"] = tmp["expiredate"]
  433. }
  434. if len(update) == 0 {
  435. continue
  436. }
  437. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  438. data := *bidding
  439. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  440. // 针对存量数据,重复数据不进索引
  441. if util.IntAll(data["extracttype"]) == -1 {
  442. tmp = make(map[string]interface{})
  443. continue
  444. }
  445. //2.es 更新字段
  446. esUpdate := update
  447. esUpdate["id"] = idStr
  448. if len(esUpdate) > 0 {
  449. // 更新es
  450. updateEsPool <- []map[string]interface{}{
  451. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  452. esUpdate,
  453. }
  454. }
  455. tmp = make(map[string]interface{})
  456. }
  457. log.Info("Run Over...Count:", log.Int("count", count))
  458. }
  459. // dealDataTest 处理测试环境数据
  460. func dealDataTest() {
  461. defer util.Catch()
  462. sess := MgoT.GetMgoConn()
  463. defer MgoT.DestoryMongoConn(sess)
  464. where := map[string]interface{}{
  465. "_id": map[string]interface{}{
  466. "$gte": mongodb.StringTOBsonId("635051528aea8786d196e24a"),
  467. "$lte": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
  468. },
  469. }
  470. //where := map[string]interface{}{
  471. // "_id": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
  472. //}
  473. ctx := context.Background()
  474. coll := sess.M.C.Database("qfw_data").Collection("bidding")
  475. find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"_id", -1}}).SetProjection(bson.M{"_id": 1, "title": 1, "subtype": 1})
  476. cur, err := coll.Find(ctx, where, find)
  477. if err != nil {
  478. fmt.Println(err)
  479. }
  480. ///////
  481. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  482. //it := sess.DB("qfw_data").C("bidding").Find(where).Select(nil).Iter()
  483. count := 0
  484. realNum := 0
  485. for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
  486. if cur != nil {
  487. cur.Decode(&tmp)
  488. }
  489. if count%1000 == 0 {
  490. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  491. }
  492. idStr := mongodb.BsonIdToSId(tmp["_id"])
  493. data, _ := Mgo.FindById("zktest_quanliang_0210_fbs", idStr, selected)
  494. if len(*data) == 0 {
  495. continue
  496. }
  497. update := make(map[string]interface{})
  498. if (*data)["signaturedate"] != nil {
  499. update["signaturedate"] = (*data)["signaturedate"]
  500. }
  501. if (*data)["contractperiod"] != nil {
  502. update["contractperiod"] = (*data)["contractperiod"]
  503. }
  504. if (*data)["expiredate"] != nil {
  505. update["expiredate"] = (*data)["expiredate"]
  506. }
  507. if len(update) == 0 {
  508. continue
  509. }
  510. fmt.Println(idStr)
  511. MgoT.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  512. //bidding 表
  513. //if idStr > "5a862e7040d2d9bbe88e3b1f" {
  514. // bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  515. // data := *bidding
  516. // Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  517. //
  518. // // 针对存量数据,重复数据不进索引
  519. // if util.IntAll(data["extracttype"]) == -1 {
  520. // tmp = make(map[string]interface{})
  521. // continue
  522. // }
  523. //
  524. //} else {
  525. // //bidding_back
  526. // bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  527. // data := *bidding
  528. // Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  529. // // 针对存量数据,重复数据不进索引
  530. // if util.IntAll(data["extracttype"]) == -1 {
  531. // tmp = make(map[string]interface{})
  532. // continue
  533. // }
  534. //}
  535. realNum++
  536. //2.es 更新字段
  537. esUpdate := update
  538. esUpdate["id"] = idStr
  539. if len(esUpdate) > 0 {
  540. fmt.Println(idStr)
  541. // 更新es
  542. updateEsPool <- []map[string]interface{}{
  543. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  544. esUpdate,
  545. }
  546. }
  547. //err := Es.UpdateDocument("bidding", idStr, update)
  548. //if err != nil {
  549. // log.Error("es update", err)
  550. //}
  551. //
  552. //err = EsNew.UpdateDocument("bidding", idStr, update)
  553. //if err != nil {
  554. // log.Error("esNew update", err)
  555. //}
  556. tmp = make(map[string]interface{})
  557. }
  558. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  559. }
  560. // updateMethod 更新MongoDB
  561. func updateMethod() {
  562. arru := make([][]map[string]interface{}, saveSize)
  563. indexu := 0
  564. for {
  565. select {
  566. case v := <-updatePool:
  567. arru[indexu] = v
  568. indexu++
  569. if indexu == saveSize {
  570. updateSp <- true
  571. go func(arru [][]map[string]interface{}) {
  572. defer func() {
  573. <-updateSp
  574. }()
  575. Mgo.UpdateBulk("bidding", arru...)
  576. }(arru)
  577. arru = make([][]map[string]interface{}, saveSize)
  578. indexu = 0
  579. }
  580. case <-time.After(1000 * time.Millisecond):
  581. if indexu > 0 {
  582. updateSp <- true
  583. go func(arru [][]map[string]interface{}) {
  584. defer func() {
  585. <-updateSp
  586. }()
  587. Mgo.UpdateBulk("bidding", arru...)
  588. }(arru[:indexu])
  589. arru = make([][]map[string]interface{}, saveSize)
  590. indexu = 0
  591. }
  592. }
  593. }
  594. }
  595. // updateEsMethod 更新es
  596. func updateEsMethod() {
  597. arru := make([][]map[string]interface{}, 200)
  598. indexu := 0
  599. for {
  600. select {
  601. case v := <-updateEsPool:
  602. arru[indexu] = v
  603. indexu++
  604. if indexu == 200 {
  605. updateEsSp <- true
  606. go func(arru [][]map[string]interface{}) {
  607. defer func() {
  608. <-updateEsSp
  609. }()
  610. Es.UpdateBulk("projectset", arru...)
  611. //EsNew.UpdateBulk("bidding", arru...)
  612. }(arru)
  613. arru = make([][]map[string]interface{}, 200)
  614. indexu = 0
  615. }
  616. case <-time.After(1000 * time.Millisecond):
  617. if indexu > 0 {
  618. updateEsSp <- true
  619. go func(arru [][]map[string]interface{}) {
  620. defer func() {
  621. <-updateEsSp
  622. }()
  623. Es.UpdateBulk("projectset", arru...)
  624. //EsNew.UpdateBulk("bidding", arru...)
  625. }(arru[:indexu])
  626. arru = make([][]map[string]interface{}, 200)
  627. indexu = 0
  628. }
  629. }
  630. }
  631. }