main.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. es7 "github.com/olivere/elastic/v7"
  7. "github.com/wcc4869/common_utils/log"
  8. "go.uber.org/zap"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "reflect"
  14. "strings"
  15. "time"
  16. )
  17. var (
  18. Mgo *mongodb.MongodbSim
  19. MgoB *mongodb.MongodbSim
  20. MgoT *mongodb.MongodbSim //测试环境链接
  21. MgoR *mongodb.MongodbSim
  22. saveSize = 50
  23. Es *elastic.Elastic
  24. EsNew *elastic.Elastic
  25. //EsT *elastic.Elastic
  26. // 更新mongo
  27. updatePool = make(chan []map[string]interface{}, 5000)
  28. updateSp = make(chan bool, 5)
  29. //更新es
  30. updateEsPool = make(chan []map[string]interface{}, 5000)
  31. updateEsSp = make(chan bool, 5) //保存协程
  32. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  33. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  34. )
  35. func Init() {
  36. MgoB = &mongodb.MongodbSim{
  37. MongodbAddr: "172.17.189.140:27080",
  38. //MongodbAddr: "127.0.0.1:27083",
  39. DbName: "qfw",
  40. Size: 10,
  41. UserName: "SJZY_RWbid_ES",
  42. Password: "SJZY@B4i4D5e6S",
  43. //Direct: true,
  44. }
  45. MgoB.InitPool()
  46. //mongodb 163
  47. //Mgo = &mongodb.MongodbSim{
  48. // //MongodbAddr: "172.17.189.140:27080",
  49. // MongodbAddr: "127.0.0.1:27083",
  50. // DbName: "qfw",
  51. // Size: 10,
  52. // UserName: "SJZY_RWbid_ES",
  53. // Password: "SJZY@B4i4D5e6S",
  54. // Direct: true,
  55. //}
  56. //Mgo.InitPool()
  57. //85
  58. //MgoR = &mongodb.MongodbSim{
  59. // //MongodbAddr: "127.0.0.1:27080",
  60. // MongodbAddr: "172.17.4.85:27080",
  61. // DbName: "qfw",
  62. // Size: 10,
  63. // //Direct: true,
  64. //}
  65. //MgoR.InitPool()
  66. //测试环境MongoDB
  67. //MgoT = &mongodb.MongodbSim{
  68. // //MongodbAddr: "172.17.189.140:27080",
  69. // MongodbAddr: "192.168.3.206:27002",
  70. // DbName: "qfw_data",
  71. // Size: 10,
  72. // UserName: "root",
  73. // Password: "root",
  74. // //Direct: true,
  75. //}
  76. //MgoT.InitPool()
  77. ////测试环境es
  78. //Es = &elastic.Elastic{
  79. // S_esurl: "http://192.168.3.149:9201",
  80. // //S_esurl: "http://172.17.4.184:19805",
  81. // I_size: 5,
  82. // Username: "",
  83. // Password: "",
  84. //}
  85. //Es.InitElasticSize()
  86. //es
  87. Es = &elastic.Elastic{
  88. //S_esurl: "http://127.0.0.1:19908",
  89. S_esurl: "http://172.17.4.184:19908",
  90. I_size: 5,
  91. Username: "jybid",
  92. Password: "Top2023_JEB01i@31",
  93. }
  94. Es.InitElasticSize()
  95. //es 新集群
  96. EsNew = &elastic.Elastic{
  97. //S_esurl: "http://127.0.0.1:19905",
  98. S_esurl: "http://172.17.4.184:19905",
  99. I_size: 5,
  100. Username: "jybid",
  101. Password: "Top2023_JEB01i@31",
  102. }
  103. EsNew.InitElasticSize()
  104. }
  105. func main() {
  106. Init()
  107. //InitEsBiddingField()
  108. go updateMethod() //更新mongodb
  109. go updateEsMethod() //更新es
  110. //go updateProjectEsMethod()
  111. //taskRunProject()
  112. //taskRunBidding()
  113. dealBidding() //正式环境bidding数据处理
  114. //dealBiddingTest() // 测试环境数据处理
  115. //updateProject()
  116. fmt.Println("over")
  117. c := make(chan bool, 1)
  118. <-c
  119. }
  120. func InitEsBiddingField() {
  121. now := time.Now()
  122. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  123. if len(*info) > 0 {
  124. for _, m := range *info {
  125. if util.IntAll(m["level"]) == 1 {
  126. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  127. } else if util.IntAll(m["level"]) == 2 {
  128. pfield := util.ObjToString(m["pfield"])
  129. pfieldMap := BiddingLevelField[pfield]
  130. if pfieldMap == nil {
  131. pfieldMap = make(map[string]string, 0)
  132. }
  133. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  134. BiddingLevelField[pfield] = pfieldMap
  135. }
  136. }
  137. }
  138. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  139. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  140. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  141. }
  142. // taskRun 更新es 省市区三个字段
  143. func taskRunBidding() {
  144. defer util.Catch()
  145. sess := Mgo.GetMgoConn()
  146. defer Mgo.DestoryMongoConn(sess)
  147. //查询条件
  148. //q := map[string]interface{}{
  149. // //"_id": map[string]interface{}{
  150. // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  151. // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  152. // //
  153. // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  154. // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  155. // //},
  156. // //"comeintime": map[string]interface{}{
  157. // // "$gt": 1669824000,
  158. // // //"$lte": 1669864950,
  159. // // "$lte": 1702265941,
  160. // //},
  161. // //"site": "国家能源e购",
  162. // "toptype": map[string]interface{}{"$exists": 0},
  163. //}
  164. //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  165. it := sess.DB("qfw").C("zktest_bidding_0706").Find(nil).Select(nil).Sort("_id").Iter()
  166. fmt.Println("taskRun 开始")
  167. count := 0
  168. //realNum := 0
  169. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  170. if count%1000 == 0 {
  171. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  172. }
  173. update := map[string]interface{}{}
  174. // 1.更新省市区
  175. //if area, ok := tmp["area"]; ok && area != nil {
  176. // update["area"] = area
  177. //} else {
  178. // update["area"] = ""
  179. //}
  180. //
  181. //if city, ok := tmp["city"]; ok && city != nil {
  182. // update["city"] = city
  183. //} else {
  184. // update["city"] = ""
  185. //}
  186. //
  187. //if district, ok := tmp["district"]; ok && district != nil {
  188. // update["district"] = district
  189. //} else {
  190. // update["district"] = ""
  191. //}
  192. // 2.更新中标单位
  193. biddingID := util.ObjToString(tmp["id"])
  194. //biddingID := mongodb.BsonIdToSId(tmp["_id"])
  195. update["winner"] = ""
  196. update["s_winner"] = ""
  197. if len(update) > 0 {
  198. //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  199. //2.es 项目 更新字段
  200. err := Es.UpdateDocument("bidding", biddingID, update)
  201. err = EsNew.UpdateDocument("bidding", biddingID, update)
  202. if err != nil && err.Error() != "Document not updated: noop" {
  203. log.Info("bidding es update err", err, biddingID)
  204. }
  205. }
  206. }
  207. log.Info("Run Over...Count:", log.Int("count", count))
  208. }
  209. // taskRunProject 更新项目表 省市区
  210. func taskRunProject() {
  211. defer util.Catch()
  212. sess := Mgo.GetMgoConn()
  213. defer Mgo.DestoryMongoConn(sess)
  214. // 项目数据
  215. MgoP := &mongodb.MongodbSim{
  216. MongodbAddr: "172.17.4.85:27080",
  217. //MongodbAddr: "127.0.0.1:27080",
  218. Size: 10,
  219. DbName: "qfw",
  220. //Direct: true,
  221. }
  222. MgoP.InitPool()
  223. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  224. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  225. fmt.Println("taskRun 开始")
  226. count := 0
  227. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  228. if count%10000 == 0 {
  229. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  230. }
  231. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  232. where := map[string]interface{}{
  233. "ids": biddingID,
  234. }
  235. // 找到对应项目数据
  236. p, _ := MgoP.FindOne("projectset_20230904", where)
  237. projectId := mongodb.BsonIdToSId((*p)["_id"])
  238. //1.更新MongoDB
  239. update := map[string]interface{}{}
  240. if area, ok := tmp["area"]; ok && area != nil {
  241. update["area"] = area
  242. } else {
  243. update["area"] = ""
  244. }
  245. if city, ok := tmp["city"]; ok && city != nil {
  246. update["city"] = city
  247. } else {
  248. update["city"] = ""
  249. }
  250. if district, ok := tmp["district"]; ok && district != nil {
  251. update["district"] = district
  252. } else {
  253. update["district"] = ""
  254. }
  255. if len(update) > 0 {
  256. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  257. //2.es 项目 更新字段
  258. err := Es.UpdateDocument("projectset", projectId, update)
  259. if err != nil {
  260. log.Info("es update err", err, projectId)
  261. }
  262. }
  263. //2.es 项目 更新字段
  264. //if len(update) > 0 {
  265. // // 更新es
  266. // //updateEsPool <- []map[string]interface{}{
  267. // // {"_id": projectId},
  268. // // update,
  269. // //}
  270. //}
  271. }
  272. log.Info("Run Over...Count:", log.Int("count", count))
  273. }
  274. // dealData 正式环境,同步合同期限
  275. func dealData() {
  276. defer util.Catch()
  277. sess := Mgo.GetMgoConn()
  278. defer Mgo.DestoryMongoConn(sess)
  279. //where := map[string]interface{}{
  280. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  281. //}
  282. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  283. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  284. count := 0
  285. realNum := 0
  286. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  287. if count%1000 == 0 {
  288. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  289. }
  290. idStr := mongodb.BsonIdToSId(tmp["_id"])
  291. update := make(map[string]interface{})
  292. if tmp["signaturedate"] != nil {
  293. update["signaturedate"] = tmp["signaturedate"]
  294. }
  295. if tmp["contractperiod"] != nil {
  296. update["contractperiod"] = tmp["contractperiod"]
  297. }
  298. if tmp["expiredate"] != nil {
  299. update["expiredate"] = tmp["expiredate"]
  300. }
  301. if len(update) == 0 {
  302. continue
  303. }
  304. //bidding 表
  305. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  306. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  307. data := *bidding
  308. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  309. // 针对存量数据,重复数据不进索引
  310. if util.IntAll(data["extracttype"]) == -1 {
  311. tmp = make(map[string]interface{})
  312. continue
  313. }
  314. } else {
  315. //bidding_back
  316. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  317. data := *bidding
  318. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  319. // 针对存量数据,重复数据不进索引
  320. if util.IntAll(data["extracttype"]) == -1 {
  321. tmp = make(map[string]interface{})
  322. continue
  323. }
  324. }
  325. realNum++
  326. //2.es 更新字段
  327. esUpdate := update
  328. esUpdate["id"] = idStr
  329. if len(esUpdate) > 0 {
  330. // 更新es
  331. updateEsPool <- []map[string]interface{}{
  332. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  333. esUpdate,
  334. }
  335. }
  336. tmp = make(map[string]interface{})
  337. }
  338. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  339. }
  340. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  341. func dealResult() {
  342. defer util.Catch()
  343. sess := MgoR.GetMgoConn()
  344. defer MgoR.DestoryMongoConn(sess)
  345. where := map[string]interface{}{
  346. "_id": map[string]interface{}{
  347. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  348. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  349. },
  350. "subtype": "合同",
  351. }
  352. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  353. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  354. count := 0
  355. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  356. if count%1000 == 0 {
  357. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  358. }
  359. idStr := mongodb.BsonIdToSId(tmp["_id"])
  360. update := make(map[string]interface{})
  361. if tmp["signaturedate"] != nil {
  362. update["signaturedate"] = tmp["signaturedate"]
  363. }
  364. if tmp["contractperiod"] != nil {
  365. update["contractperiod"] = tmp["contractperiod"]
  366. }
  367. if tmp["expiredate"] != nil {
  368. update["expiredate"] = tmp["expiredate"]
  369. }
  370. if len(update) == 0 {
  371. continue
  372. }
  373. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  374. data := *bidding
  375. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  376. // 针对存量数据,重复数据不进索引
  377. if util.IntAll(data["extracttype"]) == -1 {
  378. tmp = make(map[string]interface{})
  379. continue
  380. }
  381. //2.es 更新字段
  382. esUpdate := update
  383. esUpdate["id"] = idStr
  384. if len(esUpdate) > 0 {
  385. // 更新es
  386. updateEsPool <- []map[string]interface{}{
  387. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  388. esUpdate,
  389. }
  390. }
  391. tmp = make(map[string]interface{})
  392. }
  393. log.Info("Run Over...Count:", log.Int("count", count))
  394. }
  395. // dealBidding 处理bidding数据
  396. func dealBidding() {
  397. defer util.Catch()
  398. sess := MgoB.GetMgoConn()
  399. defer MgoB.DestoryMongoConn(sess)
  400. where := map[string]interface{}{
  401. "comeintime": map[string]interface{}{
  402. "$lt": 1722418770,
  403. //"$lt": 1718812802,
  404. "$gte": 1722009600,
  405. },
  406. }
  407. //where := map[string]interface{}{
  408. // "_id": map[string]interface{}{
  409. // "$gte": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea71e"),
  410. // "$lt": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea720"),
  411. // },
  412. //}
  413. it := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter()
  414. fmt.Println("taskRun 开始")
  415. count := 0
  416. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  417. if count%10000 == 0 {
  418. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  419. }
  420. update := map[string]interface{}{}
  421. esUpdate := map[string]interface{}{}
  422. // 2.更新中标单位,中标金额
  423. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  424. // update["tag_topinformation"] = tag_topinformation
  425. //}
  426. //
  427. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  428. // update["property_form"] = property_form
  429. //}
  430. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  431. //fmt.Println(biddingID)
  432. /**
  433. "s_subscopeclass" : "其它",
  434. "s_topscopeclass" : "其它",
  435. "subscopeclass" : [
  436. "其它"
  437. ],
  438. "topscopeclass" : [
  439. "其它"
  440. ],
  441. */
  442. // 行业分类默认值
  443. resultSubs := make([]string, 0)
  444. resultTobs := make([]string, 0)
  445. if topscopeclass, ok := tmp["topscopeclass"]; ok && topscopeclass != nil {
  446. if topps, ok2 := topscopeclass.([]interface{}); ok2 {
  447. for _, v := range topps {
  448. top := util.ObjToString(v)
  449. if top != "" {
  450. resultTobs = append(resultTobs, top)
  451. }
  452. }
  453. }
  454. //1.一级分类是空数组或者 是 其它
  455. if len(resultTobs) == 0 || resultTobs[0] == "其它" {
  456. update["topscopeclass"] = []string{"其它"}
  457. update["subscopeclass"] = []string{"其它"}
  458. update["s_topscopeclass"] = "其它"
  459. update["s_subscopeclass"] = "其它"
  460. esUpdate["s_topscopeclass"] = "其它"
  461. esUpdate["s_subscopeclass"] = "其它"
  462. esUpdate["topscopeclass"] = []string{"其它"}
  463. } else {
  464. if subs, ok3 := tmp["subscopeclass"]; ok3 {
  465. if subbs, ok4 := subs.([]interface{}); ok4 {
  466. for _, v := range subbs {
  467. sub := util.ObjToString(v)
  468. if sub != "" && sub != "其它" {
  469. resultSubs = append(resultSubs, sub)
  470. }
  471. }
  472. }
  473. }
  474. newTops, newSubs, cleanedTops := ProcessTopscopeclass(resultTobs, resultSubs)
  475. update["topscopeclass"] = newTops
  476. update["subscopeclass"] = newSubs
  477. update["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  478. update["s_subscopeclass"] = strings.Join(newSubs, ",")
  479. esUpdate["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  480. esUpdate["s_subscopeclass"] = strings.Join(newSubs, ",")
  481. esUpdate["topscopeclass"] = newTops
  482. }
  483. } else {
  484. update["topscopeclass"] = []string{"其它"}
  485. update["subscopeclass"] = []string{"其它"}
  486. update["s_topscopeclass"] = "其它"
  487. update["s_subscopeclass"] = "其它"
  488. esUpdate["s_topscopeclass"] = "其它"
  489. esUpdate["s_subscopeclass"] = "其它"
  490. esUpdate["topscopeclass"] = []string{"其它"}
  491. }
  492. //procurementlist 处理预计采购时间
  493. //if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  494. // field := "procurementlist"
  495. // if tmp[field] != nil {
  496. // if field == "procurementlist" {
  497. // if tmp["procurementlist"] != nil {
  498. // var arr []interface{}
  499. // plist := tmp["procurementlist"].([]interface{})
  500. // for _, p := range plist {
  501. // p1 := p.(map[string]interface{})
  502. // p2 := make(map[string]interface{})
  503. // for k, v := range BiddingLevelField[field] {
  504. // if k == "projectname" && util.ObjToString(p1[k]) == "" {
  505. // p2[k] = util.ObjToString(tmp["projectname"])
  506. // } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  507. // p2[k] = util.ObjToString(tmp["buyer"])
  508. // } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  509. // res := getMethod(util.ObjToString(p1[k]))
  510. // if res != 0 {
  511. // p2[k] = res
  512. // }
  513. // } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  514. // p2[k] = p1[k]
  515. // }
  516. //
  517. // }
  518. // arr = append(arr, p2)
  519. // }
  520. // if len(arr) > 0 {
  521. // esUpdate[field] = arr
  522. // }
  523. // }
  524. // }
  525. // }
  526. //}
  527. if len(update) > 0 {
  528. //fmt.Println("aaaaa", biddingID)
  529. //更新mongo
  530. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  531. //更新MongoDB
  532. updatePool <- []map[string]interface{}{
  533. {"_id": tmp["_id"]},
  534. {"$set": update},
  535. }
  536. //2.es 项目 更新字段
  537. //err := Es.UpdateDocument("bidding", biddingID, update)
  538. //if err != nil && err.Error() != "Document not updated: noop" {
  539. // log.Info("bidding es update err", err, biddingID)
  540. //}
  541. //// 更新es
  542. //updateEsPool <- []map[string]interface{}{
  543. // {"_id": biddingID},
  544. // update,
  545. //}
  546. }
  547. // 更新Es 数据
  548. if len(esUpdate) > 0 {
  549. // 更新es
  550. updateEsPool <- []map[string]interface{}{
  551. {"_id": biddingID},
  552. esUpdate,
  553. }
  554. }
  555. }
  556. log.Info("Run Over...Count:", log.Int("count", count))
  557. }
  558. func dealBiddingByEs() {
  559. //url := "http://172.17.4.184:19908"
  560. url := "http://127.0.0.1:19908"
  561. username := "jybid"
  562. password := "Top2023_JEB01i@31"
  563. index := "bidding" //索引名称
  564. //index := "projectset" //索引名称
  565. // 创建 Elasticsearch 客户端
  566. client, err := es7.NewClient(
  567. es7.SetURL(url),
  568. es7.SetBasicAuth(username, password),
  569. es7.SetSniff(false),
  570. )
  571. if err != nil {
  572. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  573. }
  574. query := es7.NewBoolQuery()
  575. query.Must(es7.NewRangeQuery("comeintime").Gt(1718812800))
  576. query.MustNot(es7.NewExistsQuery("s_topscopeclass"))
  577. ctx := context.Background()
  578. //开始滚动搜索
  579. scrollID := ""
  580. scroll := "10m"
  581. searchSource := es7.NewSearchSource().
  582. Query(query).
  583. Size(10000).
  584. Sort("_doc", true) //升序排序
  585. //Sort("_doc", false) //降序排序
  586. searchService := client.Scroll(index).
  587. Size(10000).
  588. Scroll(scroll).
  589. SearchSource(searchSource)
  590. res, err := searchService.Do(ctx)
  591. if err != nil {
  592. if err == io.EOF {
  593. fmt.Println("没有数据")
  594. } else {
  595. panic(err)
  596. }
  597. }
  598. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  599. fmt.Println("总数是:", res.TotalHits())
  600. total := 0
  601. for len(res.Hits.Hits) > 0 {
  602. for _, hit := range res.Hits.Hits {
  603. var doc map[string]interface{}
  604. err := json.Unmarshal(hit.Source, &doc)
  605. if err != nil {
  606. fmt.Printf("解析文档失败:%s", err)
  607. continue
  608. }
  609. //delete(doc, "filetext")
  610. //delete(doc, "detail")
  611. //
  612. ////存入新表
  613. //err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc)
  614. //if err != nil {
  615. // fmt.Println("error", doc["id"])
  616. //}
  617. }
  618. total = total + len(res.Hits.Hits)
  619. scrollID = res.ScrollId
  620. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  621. fmt.Println("current count:", total)
  622. if err != nil {
  623. if err == io.EOF {
  624. // 滚动到最后一批数据,退出循环
  625. break
  626. }
  627. fmt.Println("滚动搜索失败:", err, res)
  628. break // 处理错误时退出循环
  629. }
  630. }
  631. // 在循环外调用 ClearScroll
  632. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  633. if err != nil {
  634. fmt.Printf("清理滚动搜索失败:%s", err)
  635. }
  636. fmt.Println("结束~~~~~~~~~~~~~~~")
  637. }
  638. // dealBiddingTest 处理测试环境数据
  639. func dealBiddingTest() {
  640. defer util.Catch()
  641. sess := MgoT.GetMgoConn()
  642. defer MgoT.DestoryMongoConn(sess)
  643. it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter()
  644. fmt.Println("taskRun 开始")
  645. count := 0
  646. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  647. if count%10000 == 0 {
  648. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  649. }
  650. update := map[string]interface{}{}
  651. // 2.更新中标单位,中标金额
  652. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  653. // update["tag_topinformation"] = tag_topinformation
  654. //}
  655. //
  656. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  657. // update["property_form"] = property_form
  658. //}
  659. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  660. /**
  661. "s_subscopeclass" : "其它",
  662. "s_topscopeclass" : "其它",
  663. "subscopeclass" : [
  664. "其它"
  665. ],
  666. "topscopeclass" : [
  667. "其它"
  668. ],
  669. */
  670. // 行业分类默认值
  671. if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
  672. update["topscopeclass"] = []string{"其它"}
  673. update["s_topscopeclass"] = "其它"
  674. }
  675. if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
  676. update["subscopeclass"] = []string{"其它"}
  677. update["s_subscopeclass"] = "其它"
  678. }
  679. if util.ObjToString(tmp["s_topscopeclass"]) == "其它" {
  680. update["topscopeclass"] = []string{"其它"}
  681. update["s_topscopeclass"] = "其它"
  682. }
  683. if util.ObjToString(tmp["s_subscopeclass"]) == "其它" {
  684. update["subscopeclass"] = []string{"其它"}
  685. update["s_subscopeclass"] = "其它"
  686. }
  687. //procurementlist 处理预计采购时间
  688. if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  689. for field, _ := range BiddingField {
  690. if tmp[field] != nil {
  691. if field == "procurementlist" {
  692. if tmp["procurementlist"] != nil {
  693. var arr []interface{}
  694. plist := tmp["procurementlist"].([]interface{})
  695. for _, p := range plist {
  696. p1 := p.(map[string]interface{})
  697. p2 := make(map[string]interface{})
  698. for k, v := range BiddingLevelField[field] {
  699. if k == "projectname" && util.ObjToString(p1[k]) == "" {
  700. p2[k] = util.ObjToString(tmp["projectname"])
  701. } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  702. p2[k] = util.ObjToString(tmp["buyer"])
  703. } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  704. res := getMethod(util.ObjToString(p1[k]))
  705. if res != 0 {
  706. p2[k] = res
  707. }
  708. } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  709. p2[k] = p1[k]
  710. }
  711. }
  712. arr = append(arr, p2)
  713. }
  714. if len(arr) > 0 {
  715. update[field] = arr
  716. }
  717. }
  718. }
  719. }
  720. }
  721. }
  722. if len(update) > 0 {
  723. fmt.Println("aaaaa", biddingID)
  724. //更新mongo
  725. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  726. //更新MongoDB
  727. //updatePool <- []map[string]interface{}{
  728. // {"_id": tmp["_id"]},
  729. // {"$set": update},
  730. //}
  731. //2.es 项目 更新字段
  732. //err := Es.UpdateDocument("bidding", biddingID, update)
  733. //if err != nil && err.Error() != "Document not updated: noop" {
  734. // log.Info("bidding es update err", err, biddingID)
  735. //}
  736. // 更新es
  737. //updateEsPool <- []map[string]interface{}{
  738. // {"_id": biddingID},
  739. // update,
  740. //}
  741. }
  742. }
  743. log.Info("Run Over...Count:", log.Int("count", count))
  744. }
  745. // updateMethod 更新MongoDB
  746. func updateMethod() {
  747. arru := make([][]map[string]interface{}, saveSize)
  748. indexu := 0
  749. for {
  750. select {
  751. case v := <-updatePool:
  752. arru[indexu] = v
  753. indexu++
  754. if indexu == saveSize {
  755. updateSp <- true
  756. go func(arru [][]map[string]interface{}) {
  757. defer func() {
  758. <-updateSp
  759. }()
  760. MgoB.UpdateBulk("bidding", arru...)
  761. }(arru)
  762. arru = make([][]map[string]interface{}, saveSize)
  763. indexu = 0
  764. }
  765. case <-time.After(1000 * time.Millisecond):
  766. if indexu > 0 {
  767. updateSp <- true
  768. go func(arru [][]map[string]interface{}) {
  769. defer func() {
  770. <-updateSp
  771. }()
  772. MgoB.UpdateBulk("bidding", arru...)
  773. }(arru[:indexu])
  774. arru = make([][]map[string]interface{}, saveSize)
  775. indexu = 0
  776. }
  777. }
  778. }
  779. }
  780. // updateEsMethod 更新es
  781. func updateEsMethod() {
  782. arru := make([][]map[string]interface{}, 200)
  783. indexu := 0
  784. for {
  785. select {
  786. case v := <-updateEsPool:
  787. arru[indexu] = v
  788. indexu++
  789. if indexu == 200 {
  790. updateEsSp <- true
  791. go func(arru [][]map[string]interface{}) {
  792. defer func() {
  793. <-updateEsSp
  794. }()
  795. Es.UpdateBulk("bidding", arru...)
  796. EsNew.UpdateBulk("bidding", arru...)
  797. }(arru)
  798. arru = make([][]map[string]interface{}, 200)
  799. indexu = 0
  800. }
  801. case <-time.After(1000 * time.Millisecond):
  802. if indexu > 0 {
  803. updateEsSp <- true
  804. go func(arru [][]map[string]interface{}) {
  805. defer func() {
  806. <-updateEsSp
  807. }()
  808. Es.UpdateBulk("bidding", arru...)
  809. EsNew.UpdateBulk("bidding", arru...)
  810. }(arru[:indexu])
  811. arru = make([][]map[string]interface{}, 200)
  812. indexu = 0
  813. }
  814. }
  815. }
  816. }
  817. // updateProjectEsMethod 更新项目索引
  818. func updateProjectEsMethod() {
  819. arru := make([][]map[string]interface{}, 200)
  820. indexu := 0
  821. for {
  822. select {
  823. case v := <-updateEsPool:
  824. arru[indexu] = v
  825. indexu++
  826. if indexu == 200 {
  827. updateEsSp <- true
  828. go func(arru [][]map[string]interface{}) {
  829. defer func() {
  830. <-updateEsSp
  831. }()
  832. Es.UpdateBulk("projectset", arru...)
  833. }(arru)
  834. arru = make([][]map[string]interface{}, 200)
  835. indexu = 0
  836. }
  837. case <-time.After(1000 * time.Millisecond):
  838. if indexu > 0 {
  839. updateEsSp <- true
  840. go func(arru [][]map[string]interface{}) {
  841. defer func() {
  842. <-updateEsSp
  843. }()
  844. Es.UpdateBulk("projectset", arru...)
  845. }(arru[:indexu])
  846. arru = make([][]map[string]interface{}, 200)
  847. indexu = 0
  848. }
  849. }
  850. }
  851. }