main.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/wcc4869/common_utils/log"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/mongo/options"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "time"
  12. )
  13. var (
  14. Mgo *mongodb.MongodbSim
  15. MgoT *mongodb.MongodbSim //测试环境链接
  16. MgoR *mongodb.MongodbSim
  17. saveSize = 50
  18. Es *elastic.Elastic
  19. EsNew *elastic.Elastic
  20. EsT *elastic.Elastic
  21. // 更新mongo
  22. updatePool = make(chan []map[string]interface{}, 5000)
  23. updateSp = make(chan bool, 5)
  24. //更新es
  25. updateEsPool = make(chan []map[string]interface{}, 5000)
  26. updateEsSp = make(chan bool, 5) //保存协程
  27. )
  28. func main() {
  29. //mongodb 163
  30. //Mgo = &mongodb.MongodbSim{
  31. // MongodbAddr: "172.17.189.140:27080",
  32. // //MongodbAddr: "127.0.0.1:27083",
  33. // DbName: "qfw",
  34. // Size: 10,
  35. // UserName: "SJZY_RWbid_ES",
  36. // Password: "SJZY@B4i4D5e6S",
  37. // //Direct: true,
  38. //}
  39. //Mgo.InitPool()
  40. //85
  41. //MgoR = &mongodb.MongodbSim{
  42. // //MongodbAddr: "127.0.0.1:27080",
  43. // MongodbAddr: "172.17.4.85:27080",
  44. // DbName: "qfw",
  45. // Size: 10,
  46. // //Direct: true,
  47. //}
  48. //MgoR.InitPool()
  49. ////测试环境MongoDB
  50. //MgoT = &mongodb.MongodbSim{
  51. // //MongodbAddr: "172.17.189.140:27080",
  52. // MongodbAddr: "192.168.3.206:27002",
  53. // DbName: "qfw_data",
  54. // Size: 10,
  55. // UserName: "root",
  56. // Password: "root",
  57. // //Direct: true,
  58. //}
  59. //MgoT.InitPool()
  60. // 测试环境es
  61. //Es = &elastic.Elastic{
  62. // S_esurl: "http://192.168.3.149:9201",
  63. // //S_esurl: "http://172.17.4.184:19805",
  64. // I_size: 5,
  65. // Username: "",
  66. // Password: "",
  67. //}
  68. //Es.InitElasticSize()
  69. //es
  70. Es = &elastic.Elastic{
  71. //S_esurl: "http://127.0.0.1:19908",
  72. S_esurl: "http://172.17.4.184:19908",
  73. I_size: 5,
  74. Username: "jybid",
  75. Password: "Top2023_JEB01i@31",
  76. }
  77. Es.InitElasticSize()
  78. // es 新集群
  79. //EsNew = &elastic.Elastic{
  80. // //S_esurl: "http://127.0.0.1:19905",
  81. // S_esurl: "http://172.17.4.184:19905",
  82. // I_size: 5,
  83. // Username: "jybid",
  84. // Password: "Top2023_JEB01i@31",
  85. //}
  86. //EsNew.InitElasticSize()
  87. //go updateMethod() //更新mongodb
  88. //go updateEsMethod() //更新es
  89. go updateProjectEsMethod()
  90. //taskRunProject()
  91. //taskRunBidding()
  92. //dealDataTest()// 测试环境数据处理
  93. updateProject()
  94. fmt.Println("over")
  95. c := make(chan bool, 1)
  96. <-c
  97. }
  98. // taskRun 更新es 省市区三个字段
  99. func taskRunBidding() {
  100. defer util.Catch()
  101. sess := Mgo.GetMgoConn()
  102. defer Mgo.DestoryMongoConn(sess)
  103. //pool := make(chan bool, 2) //处理协程
  104. //wg := &sync.WaitGroup{}
  105. //查询条件
  106. //q := map[string]interface{}{
  107. // //"_id": map[string]interface{}{
  108. // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  109. // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  110. // //
  111. // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  112. // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  113. // //},
  114. // //"comeintime": map[string]interface{}{
  115. // // "$gt": 1669824000,
  116. // // //"$lte": 1669864950,
  117. // // "$lte": 1702265941,
  118. // //},
  119. // //"site": "国家能源e购",
  120. // "toptype": map[string]interface{}{"$exists": 0},
  121. //}
  122. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  123. it := sess.DB("qfw").C("zktest_0520_id").Find(nil).Select(selected).Sort("_id").Iter()
  124. fmt.Println("taskRun 开始")
  125. count := 0
  126. //realNum := 0
  127. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  128. if count%1000 == 0 {
  129. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  130. }
  131. //1.更新MongoDB
  132. update := map[string]interface{}{}
  133. //if area, ok := tmp["area"]; ok && area != nil {
  134. // update["area"] = area
  135. //} else {
  136. // update["area"] = ""
  137. //}
  138. //
  139. //if city, ok := tmp["city"]; ok && city != nil {
  140. // update["city"] = city
  141. //} else {
  142. // update["city"] = ""
  143. //}
  144. //
  145. //if district, ok := tmp["district"]; ok && district != nil {
  146. // update["district"] = district
  147. //} else {
  148. // update["district"] = ""
  149. //}
  150. //========//
  151. //toptype := ""
  152. //if toptype, ok := tmp["toptype"]; ok && toptype != nil {
  153. // update["toptype"] = toptype
  154. //} else {
  155. // update["toptype"] = ""
  156. //}
  157. ////
  158. //if subtype, ok := tmp["subtype"]; ok && subtype != nil {
  159. // if util.ObjToString(tmp["toptype"]) == "结果" && util.ObjToString(tmp["subtype"]) == "招标" {
  160. // update["subtype"] = ""
  161. // }
  162. //} else {
  163. // update["subtype"] = ""
  164. //}
  165. //update["toptype"] = "其它"
  166. //update["subtype"] = "其它"
  167. //if len(update) > 0 {
  168. // //更新MongoDB
  169. // updatePool <- []map[string]interface{}{
  170. // //{"_id": tmp["id"]},
  171. // {"_id": tmp["_id"]},
  172. // {"$set": update},
  173. // }
  174. // //====//
  175. // //biddingID := util.ObjToString(tmp["id"])
  176. // //biddingID := mongodb.BsonIdToSId(tmp["_id"])
  177. // //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  178. // ////2.es 项目 更新字段
  179. // //Es.UpdateDocument("bidding", biddingID, update)
  180. // //EsNew.UpdateDocument("bidding", biddingID, update)
  181. // //if err != nil {
  182. // // log.Info("bidding es update err", err, biddingID)
  183. // //}
  184. //}
  185. //2.es 更新字段
  186. //esUpdate := make(map[string]interface{})
  187. //if subtitle_projectname, ok := tmp["subtitle_projectname"]; ok && subtitle_projectname != nil {
  188. // esUpdate["subtitle_projectname"] = subtitle_projectname
  189. //}
  190. biddingID := util.ObjToString(tmp["id"])
  191. bidamount := util.Float64All(tmp["bidamount"])
  192. _, ok := tmp["bidamount"]
  193. if ok && bidamount > 0 {
  194. if biddingID != "" {
  195. update["bidamount"] = bidamount
  196. }
  197. }
  198. if len(update) > 0 {
  199. // 更新es
  200. updateEsPool <- []map[string]interface{}{
  201. {"_id": biddingID},
  202. //{"_id": mongodb.BsonIdToSId(tmp["_id"])},
  203. update,
  204. }
  205. }
  206. //if len(update) > 0 {
  207. // id := mongodb.BsonIdToSId(tmp["_id"])
  208. // //id := mongodb.BsonIdToSId(tmp["_id"])
  209. // err := Es.UpdateDocument("projectset", id, esUpdate)
  210. // if err != nil {
  211. // if strings.Contains(err.Error(), "Document not updated:") {
  212. // continue
  213. // } else {
  214. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  215. // }
  216. // }
  217. //}
  218. //if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  219. // tmp = make(map[string]interface{})
  220. // continue
  221. //}
  222. //// 针对存量数据,重复数据不进索引
  223. //if util.IntAll(tmp["extracttype"]) == -1 {
  224. // continue
  225. //}
  226. //
  227. ////针对产权数据,暂时不入es 索引库
  228. //if util.IntAll(tmp["infoformat"]) == 3 {
  229. // continue
  230. //}
  231. //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交
  232. //channel := util.ObjToString(tmp["channel"])
  233. //if channel != "紧急直接零星采购公告" {
  234. // continue
  235. //}
  236. //realNum++
  237. //pool <- true
  238. //wg.Add(1)
  239. //go func(tmp map[string]interface{}) {
  240. // defer func() {
  241. // <-pool
  242. // wg.Done()
  243. // }()
  244. ////2.es 更新字段
  245. //esUpdate := make(map[string]interface{})
  246. //if autoid, ok := tmp["autoid"]; ok && autoid != nil {
  247. // esUpdate["autoid"] = autoid
  248. //}
  249. //
  250. //if len(esUpdate) > 0 {
  251. // err := Es.UpdateDocument("bidding", mongodb.BsonIdToSId(tmp["_id"]), esUpdate)
  252. // if err != nil {
  253. // if strings.Contains(err.Error(), "Document not updated:") {
  254. // return
  255. // } else {
  256. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  257. // }
  258. // }
  259. //}
  260. //if err != nil {
  261. // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
  262. //}
  263. //if tag_set, ok := tmp["tag_set"]; ok && tag_set != nil {
  264. // esUpdate["tag_set"] = tag_set
  265. //}
  266. //
  267. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  268. // esUpdate["tag_topinformation"] = tag_topinformation
  269. //}
  270. //
  271. //if tag_subinformation, ok := tmp["tag_subinformation"]; ok && tag_subinformation != nil {
  272. // esUpdate["tag_subinformation"] = tag_subinformation
  273. //}
  274. //if len(esUpdate) > 0 {
  275. // // 更新es
  276. // updateEsPool <- []map[string]interface{}{
  277. // {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  278. // esUpdate,
  279. // }
  280. //}
  281. //}(tmp)
  282. //tmp = make(map[string]interface{})
  283. }
  284. //wg.Wait()
  285. log.Info("Run Over...Count:", log.Int("count", count))
  286. }
  287. // taskRunProject 更新项目表 省市区
  288. func taskRunProject() {
  289. defer util.Catch()
  290. sess := Mgo.GetMgoConn()
  291. defer Mgo.DestoryMongoConn(sess)
  292. // 项目数据
  293. MgoP := &mongodb.MongodbSim{
  294. MongodbAddr: "172.17.4.85:27080",
  295. //MongodbAddr: "127.0.0.1:27080",
  296. Size: 10,
  297. DbName: "qfw",
  298. //Direct: true,
  299. }
  300. MgoP.InitPool()
  301. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  302. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  303. fmt.Println("taskRun 开始")
  304. count := 0
  305. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  306. if count%10000 == 0 {
  307. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  308. }
  309. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  310. where := map[string]interface{}{
  311. "ids": biddingID,
  312. }
  313. // 找到对应项目数据
  314. p, _ := MgoP.FindOne("projectset_20230904", where)
  315. projectId := mongodb.BsonIdToSId((*p)["_id"])
  316. //1.更新MongoDB
  317. update := map[string]interface{}{}
  318. if area, ok := tmp["area"]; ok && area != nil {
  319. update["area"] = area
  320. } else {
  321. update["area"] = ""
  322. }
  323. if city, ok := tmp["city"]; ok && city != nil {
  324. update["city"] = city
  325. } else {
  326. update["city"] = ""
  327. }
  328. if district, ok := tmp["district"]; ok && district != nil {
  329. update["district"] = district
  330. } else {
  331. update["district"] = ""
  332. }
  333. if len(update) > 0 {
  334. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  335. //2.es 项目 更新字段
  336. err := Es.UpdateDocument("projectset", projectId, update)
  337. if err != nil {
  338. log.Info("es update err", err, projectId)
  339. }
  340. }
  341. //2.es 项目 更新字段
  342. //if len(update) > 0 {
  343. // // 更新es
  344. // //updateEsPool <- []map[string]interface{}{
  345. // // {"_id": projectId},
  346. // // update,
  347. // //}
  348. //}
  349. }
  350. log.Info("Run Over...Count:", log.Int("count", count))
  351. }
  352. // dealData 正式环境,同步合同期限
  353. func dealData() {
  354. defer util.Catch()
  355. sess := Mgo.GetMgoConn()
  356. defer Mgo.DestoryMongoConn(sess)
  357. //where := map[string]interface{}{
  358. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  359. //}
  360. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  361. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  362. count := 0
  363. realNum := 0
  364. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  365. if count%1000 == 0 {
  366. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  367. }
  368. idStr := mongodb.BsonIdToSId(tmp["_id"])
  369. update := make(map[string]interface{})
  370. if tmp["signaturedate"] != nil {
  371. update["signaturedate"] = tmp["signaturedate"]
  372. }
  373. if tmp["contractperiod"] != nil {
  374. update["contractperiod"] = tmp["contractperiod"]
  375. }
  376. if tmp["expiredate"] != nil {
  377. update["expiredate"] = tmp["expiredate"]
  378. }
  379. if len(update) == 0 {
  380. continue
  381. }
  382. //bidding 表
  383. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  384. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  385. data := *bidding
  386. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  387. // 针对存量数据,重复数据不进索引
  388. if util.IntAll(data["extracttype"]) == -1 {
  389. tmp = make(map[string]interface{})
  390. continue
  391. }
  392. } else {
  393. //bidding_back
  394. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  395. data := *bidding
  396. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  397. // 针对存量数据,重复数据不进索引
  398. if util.IntAll(data["extracttype"]) == -1 {
  399. tmp = make(map[string]interface{})
  400. continue
  401. }
  402. }
  403. realNum++
  404. //2.es 更新字段
  405. esUpdate := update
  406. esUpdate["id"] = idStr
  407. if len(esUpdate) > 0 {
  408. // 更新es
  409. updateEsPool <- []map[string]interface{}{
  410. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  411. esUpdate,
  412. }
  413. }
  414. tmp = make(map[string]interface{})
  415. }
  416. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  417. }
  418. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  419. func dealResult() {
  420. defer util.Catch()
  421. sess := MgoR.GetMgoConn()
  422. defer MgoR.DestoryMongoConn(sess)
  423. where := map[string]interface{}{
  424. "_id": map[string]interface{}{
  425. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  426. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  427. },
  428. "subtype": "合同",
  429. }
  430. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  431. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  432. count := 0
  433. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  434. if count%1000 == 0 {
  435. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  436. }
  437. idStr := mongodb.BsonIdToSId(tmp["_id"])
  438. update := make(map[string]interface{})
  439. if tmp["signaturedate"] != nil {
  440. update["signaturedate"] = tmp["signaturedate"]
  441. }
  442. if tmp["contractperiod"] != nil {
  443. update["contractperiod"] = tmp["contractperiod"]
  444. }
  445. if tmp["expiredate"] != nil {
  446. update["expiredate"] = tmp["expiredate"]
  447. }
  448. if len(update) == 0 {
  449. continue
  450. }
  451. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  452. data := *bidding
  453. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  454. // 针对存量数据,重复数据不进索引
  455. if util.IntAll(data["extracttype"]) == -1 {
  456. tmp = make(map[string]interface{})
  457. continue
  458. }
  459. //2.es 更新字段
  460. esUpdate := update
  461. esUpdate["id"] = idStr
  462. if len(esUpdate) > 0 {
  463. // 更新es
  464. updateEsPool <- []map[string]interface{}{
  465. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  466. esUpdate,
  467. }
  468. }
  469. tmp = make(map[string]interface{})
  470. }
  471. log.Info("Run Over...Count:", log.Int("count", count))
  472. }
  473. // dealDataTest 处理测试环境数据
  474. func dealDataTest() {
  475. defer util.Catch()
  476. sess := MgoT.GetMgoConn()
  477. defer MgoT.DestoryMongoConn(sess)
  478. where := map[string]interface{}{
  479. "_id": map[string]interface{}{
  480. "$gte": mongodb.StringTOBsonId("635051528aea8786d196e24a"),
  481. "$lte": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
  482. },
  483. }
  484. //where := map[string]interface{}{
  485. // "_id": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
  486. //}
  487. ctx := context.Background()
  488. coll := sess.M.C.Database("qfw_data").Collection("bidding")
  489. find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"_id", -1}}).SetProjection(bson.M{"_id": 1, "title": 1, "subtype": 1})
  490. cur, err := coll.Find(ctx, where, find)
  491. if err != nil {
  492. fmt.Println(err)
  493. }
  494. ///////
  495. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  496. //it := sess.DB("qfw_data").C("bidding").Find(where).Select(nil).Iter()
  497. count := 0
  498. realNum := 0
  499. for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
  500. if cur != nil {
  501. cur.Decode(&tmp)
  502. }
  503. if count%1000 == 0 {
  504. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  505. }
  506. idStr := mongodb.BsonIdToSId(tmp["_id"])
  507. data, _ := Mgo.FindById("zktest_quanliang_0210_fbs", idStr, selected)
  508. if len(*data) == 0 {
  509. continue
  510. }
  511. update := make(map[string]interface{})
  512. if (*data)["signaturedate"] != nil {
  513. update["signaturedate"] = (*data)["signaturedate"]
  514. }
  515. if (*data)["contractperiod"] != nil {
  516. update["contractperiod"] = (*data)["contractperiod"]
  517. }
  518. if (*data)["expiredate"] != nil {
  519. update["expiredate"] = (*data)["expiredate"]
  520. }
  521. if len(update) == 0 {
  522. continue
  523. }
  524. fmt.Println(idStr)
  525. MgoT.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  526. //bidding 表
  527. //if idStr > "5a862e7040d2d9bbe88e3b1f" {
  528. // bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  529. // data := *bidding
  530. // Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  531. //
  532. // // 针对存量数据,重复数据不进索引
  533. // if util.IntAll(data["extracttype"]) == -1 {
  534. // tmp = make(map[string]interface{})
  535. // continue
  536. // }
  537. //
  538. //} else {
  539. // //bidding_back
  540. // bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  541. // data := *bidding
  542. // Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  543. // // 针对存量数据,重复数据不进索引
  544. // if util.IntAll(data["extracttype"]) == -1 {
  545. // tmp = make(map[string]interface{})
  546. // continue
  547. // }
  548. //}
  549. realNum++
  550. //2.es 更新字段
  551. esUpdate := update
  552. esUpdate["id"] = idStr
  553. if len(esUpdate) > 0 {
  554. fmt.Println(idStr)
  555. // 更新es
  556. updateEsPool <- []map[string]interface{}{
  557. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  558. esUpdate,
  559. }
  560. }
  561. //err := Es.UpdateDocument("bidding", idStr, update)
  562. //if err != nil {
  563. // log.Error("es update", err)
  564. //}
  565. //
  566. //err = EsNew.UpdateDocument("bidding", idStr, update)
  567. //if err != nil {
  568. // log.Error("esNew update", err)
  569. //}
  570. tmp = make(map[string]interface{})
  571. }
  572. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  573. }
  574. // updateMethod 更新MongoDB
  575. func updateMethod() {
  576. arru := make([][]map[string]interface{}, saveSize)
  577. indexu := 0
  578. for {
  579. select {
  580. case v := <-updatePool:
  581. arru[indexu] = v
  582. indexu++
  583. if indexu == saveSize {
  584. updateSp <- true
  585. go func(arru [][]map[string]interface{}) {
  586. defer func() {
  587. <-updateSp
  588. }()
  589. Mgo.UpdateBulk("bidding", arru...)
  590. }(arru)
  591. arru = make([][]map[string]interface{}, saveSize)
  592. indexu = 0
  593. }
  594. case <-time.After(1000 * time.Millisecond):
  595. if indexu > 0 {
  596. updateSp <- true
  597. go func(arru [][]map[string]interface{}) {
  598. defer func() {
  599. <-updateSp
  600. }()
  601. Mgo.UpdateBulk("bidding", arru...)
  602. }(arru[:indexu])
  603. arru = make([][]map[string]interface{}, saveSize)
  604. indexu = 0
  605. }
  606. }
  607. }
  608. }
  609. // updateEsMethod 更新es
  610. func updateEsMethod() {
  611. arru := make([][]map[string]interface{}, 200)
  612. indexu := 0
  613. for {
  614. select {
  615. case v := <-updateEsPool:
  616. arru[indexu] = v
  617. indexu++
  618. if indexu == 200 {
  619. updateEsSp <- true
  620. go func(arru [][]map[string]interface{}) {
  621. defer func() {
  622. <-updateEsSp
  623. }()
  624. Es.UpdateBulk("bidding", arru...)
  625. EsNew.UpdateBulk("bidding", arru...)
  626. }(arru)
  627. arru = make([][]map[string]interface{}, 200)
  628. indexu = 0
  629. }
  630. case <-time.After(1000 * time.Millisecond):
  631. if indexu > 0 {
  632. updateEsSp <- true
  633. go func(arru [][]map[string]interface{}) {
  634. defer func() {
  635. <-updateEsSp
  636. }()
  637. Es.UpdateBulk("bidding", arru...)
  638. EsNew.UpdateBulk("bidding", arru...)
  639. }(arru[:indexu])
  640. arru = make([][]map[string]interface{}, 200)
  641. indexu = 0
  642. }
  643. }
  644. }
  645. }
  646. // updateProjectEsMethod 更新项目索引
  647. func updateProjectEsMethod() {
  648. arru := make([][]map[string]interface{}, 200)
  649. indexu := 0
  650. for {
  651. select {
  652. case v := <-updateEsPool:
  653. arru[indexu] = v
  654. indexu++
  655. if indexu == 200 {
  656. updateEsSp <- true
  657. go func(arru [][]map[string]interface{}) {
  658. defer func() {
  659. <-updateEsSp
  660. }()
  661. Es.UpdateBulk("projectset", arru...)
  662. }(arru)
  663. arru = make([][]map[string]interface{}, 200)
  664. indexu = 0
  665. }
  666. case <-time.After(1000 * time.Millisecond):
  667. if indexu > 0 {
  668. updateEsSp <- true
  669. go func(arru [][]map[string]interface{}) {
  670. defer func() {
  671. <-updateEsSp
  672. }()
  673. Es.UpdateBulk("projectset", arru...)
  674. }(arru[:indexu])
  675. arru = make([][]map[string]interface{}, 200)
  676. indexu = 0
  677. }
  678. }
  679. }
  680. }