main.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. es7 "github.com/olivere/elastic/v7"
  7. "github.com/wcc4869/common_utils/log"
  8. "go.uber.org/zap"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "reflect"
  14. "strings"
  15. "time"
  16. )
  17. var (
  18. Mgo *mongodb.MongodbSim
  19. MgoB *mongodb.MongodbSim
  20. MgoBAi *mongodb.MongodbSim
  21. MgoT *mongodb.MongodbSim //测试环境链接
  22. MgoR *mongodb.MongodbSim
  23. saveSize = 50
  24. Es *elastic.Elastic
  25. EsNew *elastic.Elastic
  26. //EsT *elastic.Elastic
  27. // 更新mongo
  28. updatePool = make(chan []map[string]interface{}, 5000)
  29. updateSp = make(chan bool, 5)
  30. //更新es
  31. updateEsPool = make(chan []map[string]interface{}, 5000)
  32. updateEsSp = make(chan bool, 5) //保存协程
  33. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  34. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  35. )
  36. func Init() {
  37. //MgoB = &mongodb.MongodbSim{
  38. // MongodbAddr: "172.17.189.140:27080",
  39. // //MongodbAddr: "127.0.0.1:27083",
  40. // DbName: "qfw",
  41. // Size: 10,
  42. // UserName: "SJZY_RWbid_ES",
  43. // Password: "SJZY@B4i4D5e6S",
  44. // //Direct: true,
  45. //}
  46. //MgoB.InitPool()
  47. MgoBAi = &mongodb.MongodbSim{
  48. MongodbAddr: "172.17.189.140:27080",
  49. //MongodbAddr: "127.0.0.1:27083",
  50. DbName: "qfw_ai",
  51. Size: 10,
  52. UserName: "SJZY_RWbid_ES",
  53. Password: "SJZY@B4i4D5e6S",
  54. //Direct: true,
  55. }
  56. MgoBAi.InitPool()
  57. //mongodb 163
  58. //Mgo = &mongodb.MongodbSim{
  59. // //MongodbAddr: "172.17.189.140:27080",
  60. // MongodbAddr: "127.0.0.1:27083",
  61. // DbName: "qfw",
  62. // Size: 10,
  63. // UserName: "SJZY_RWbid_ES",
  64. // Password: "SJZY@B4i4D5e6S",
  65. // Direct: true,
  66. //}
  67. //Mgo.InitPool()
  68. //85
  69. //MgoR = &mongodb.MongodbSim{
  70. // //MongodbAddr: "127.0.0.1:27080",
  71. // MongodbAddr: "172.17.4.85:27080",
  72. // DbName: "qfw",
  73. // Size: 10,
  74. // //Direct: true,
  75. //}
  76. //MgoR.InitPool()
  77. //测试环境MongoDB
  78. //MgoT = &mongodb.MongodbSim{
  79. // //MongodbAddr: "172.17.189.140:27080",
  80. // MongodbAddr: "192.168.3.206:27002",
  81. // DbName: "qfw_data",
  82. // Size: 10,
  83. // UserName: "root",
  84. // Password: "root",
  85. // //Direct: true,
  86. //}
  87. //MgoT.InitPool()
  88. ////测试环境es
  89. //Es = &elastic.Elastic{
  90. // S_esurl: "http://192.168.3.149:9201",
  91. // //S_esurl: "http://172.17.4.184:19805",
  92. // I_size: 5,
  93. // Username: "",
  94. // Password: "",
  95. //}
  96. //Es.InitElasticSize()
  97. //es
  98. Es = &elastic.Elastic{
  99. //S_esurl: "http://127.0.0.1:19908",
  100. S_esurl: "http://172.17.4.184:19908",
  101. I_size: 5,
  102. Username: "jybid",
  103. Password: "Top2023_JEB01i@31",
  104. }
  105. Es.InitElasticSize()
  106. //es 新集群
  107. //EsNew = &elastic.Elastic{
  108. // //S_esurl: "http://127.0.0.1:19905",
  109. // S_esurl: "http://172.17.4.184:19905",
  110. // I_size: 5,
  111. // Username: "jybid",
  112. // Password: "Top2023_JEB01i@31",
  113. //}
  114. //EsNew.InitElasticSize()
  115. }
  116. func main() {
  117. Init()
  118. //InitEsBiddingField()
  119. //go updateMethod() //更新mongodb
  120. //go updateEsMethod() //更新es
  121. //go updateProjectEsMethod()
  122. //taskRunProject()
  123. //taskRunBidding()
  124. //dealBidding() //正式环境bidding数据处理
  125. dealBiddingAi() //正式环境bidding数据处理
  126. //dealBiddingTest() // 测试环境数据处理
  127. //updateProject()
  128. log.Info("over")
  129. //c := make(chan bool, 1)
  130. //<-c
  131. }
  132. func InitEsBiddingField() {
  133. now := time.Now()
  134. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  135. if len(*info) > 0 {
  136. for _, m := range *info {
  137. if util.IntAll(m["level"]) == 1 {
  138. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  139. } else if util.IntAll(m["level"]) == 2 {
  140. pfield := util.ObjToString(m["pfield"])
  141. pfieldMap := BiddingLevelField[pfield]
  142. if pfieldMap == nil {
  143. pfieldMap = make(map[string]string, 0)
  144. }
  145. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  146. BiddingLevelField[pfield] = pfieldMap
  147. }
  148. }
  149. }
  150. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  151. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  152. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  153. }
  154. // taskRun 更新es 省市区三个字段
  155. func taskRunBidding() {
  156. defer util.Catch()
  157. sess := Mgo.GetMgoConn()
  158. defer Mgo.DestoryMongoConn(sess)
  159. //查询条件
  160. //q := map[string]interface{}{
  161. // //"_id": map[string]interface{}{
  162. // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  163. // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  164. // //
  165. // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  166. // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  167. // //},
  168. // //"comeintime": map[string]interface{}{
  169. // // "$gt": 1669824000,
  170. // // //"$lte": 1669864950,
  171. // // "$lte": 1702265941,
  172. // //},
  173. // //"site": "国家能源e购",
  174. // "toptype": map[string]interface{}{"$exists": 0},
  175. //}
  176. //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  177. it := sess.DB("qfw").C("zktest_bidding_0706").Find(nil).Select(nil).Sort("_id").Iter()
  178. fmt.Println("taskRun 开始")
  179. count := 0
  180. //realNum := 0
  181. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  182. if count%1000 == 0 {
  183. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  184. }
  185. update := map[string]interface{}{}
  186. // 1.更新省市区
  187. //if area, ok := tmp["area"]; ok && area != nil {
  188. // update["area"] = area
  189. //} else {
  190. // update["area"] = ""
  191. //}
  192. //
  193. //if city, ok := tmp["city"]; ok && city != nil {
  194. // update["city"] = city
  195. //} else {
  196. // update["city"] = ""
  197. //}
  198. //
  199. //if district, ok := tmp["district"]; ok && district != nil {
  200. // update["district"] = district
  201. //} else {
  202. // update["district"] = ""
  203. //}
  204. // 2.更新中标单位
  205. biddingID := util.ObjToString(tmp["id"])
  206. //biddingID := mongodb.BsonIdToSId(tmp["_id"])
  207. update["winner"] = ""
  208. update["s_winner"] = ""
  209. if len(update) > 0 {
  210. //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  211. //2.es 项目 更新字段
  212. err := Es.UpdateDocument("bidding", biddingID, update)
  213. err = EsNew.UpdateDocument("bidding", biddingID, update)
  214. if err != nil && err.Error() != "Document not updated: noop" {
  215. log.Info("bidding es update err", err, biddingID)
  216. }
  217. }
  218. }
  219. log.Info("Run Over...Count:", log.Int("count", count))
  220. }
  221. // taskRunProject 更新项目表 省市区
  222. func taskRunProject() {
  223. defer util.Catch()
  224. sess := Mgo.GetMgoConn()
  225. defer Mgo.DestoryMongoConn(sess)
  226. // 项目数据
  227. MgoP := &mongodb.MongodbSim{
  228. MongodbAddr: "172.17.4.85:27080",
  229. //MongodbAddr: "127.0.0.1:27080",
  230. Size: 10,
  231. DbName: "qfw",
  232. //Direct: true,
  233. }
  234. MgoP.InitPool()
  235. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  236. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  237. fmt.Println("taskRun 开始")
  238. count := 0
  239. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  240. if count%10000 == 0 {
  241. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  242. }
  243. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  244. where := map[string]interface{}{
  245. "ids": biddingID,
  246. }
  247. // 找到对应项目数据
  248. p, _ := MgoP.FindOne("projectset_20230904", where)
  249. projectId := mongodb.BsonIdToSId((*p)["_id"])
  250. //1.更新MongoDB
  251. update := map[string]interface{}{}
  252. if area, ok := tmp["area"]; ok && area != nil {
  253. update["area"] = area
  254. } else {
  255. update["area"] = ""
  256. }
  257. if city, ok := tmp["city"]; ok && city != nil {
  258. update["city"] = city
  259. } else {
  260. update["city"] = ""
  261. }
  262. if district, ok := tmp["district"]; ok && district != nil {
  263. update["district"] = district
  264. } else {
  265. update["district"] = ""
  266. }
  267. if len(update) > 0 {
  268. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  269. //2.es 项目 更新字段
  270. err := Es.UpdateDocument("projectset", projectId, update)
  271. if err != nil {
  272. log.Info("es update err", err, projectId)
  273. }
  274. }
  275. //2.es 项目 更新字段
  276. //if len(update) > 0 {
  277. // // 更新es
  278. // //updateEsPool <- []map[string]interface{}{
  279. // // {"_id": projectId},
  280. // // update,
  281. // //}
  282. //}
  283. }
  284. log.Info("Run Over...Count:", log.Int("count", count))
  285. }
  286. // dealData 正式环境,同步合同期限
  287. func dealData() {
  288. defer util.Catch()
  289. sess := Mgo.GetMgoConn()
  290. defer Mgo.DestoryMongoConn(sess)
  291. //where := map[string]interface{}{
  292. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  293. //}
  294. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  295. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  296. count := 0
  297. realNum := 0
  298. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  299. if count%1000 == 0 {
  300. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  301. }
  302. idStr := mongodb.BsonIdToSId(tmp["_id"])
  303. update := make(map[string]interface{})
  304. if tmp["signaturedate"] != nil {
  305. update["signaturedate"] = tmp["signaturedate"]
  306. }
  307. if tmp["contractperiod"] != nil {
  308. update["contractperiod"] = tmp["contractperiod"]
  309. }
  310. if tmp["expiredate"] != nil {
  311. update["expiredate"] = tmp["expiredate"]
  312. }
  313. if len(update) == 0 {
  314. continue
  315. }
  316. //bidding 表
  317. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  318. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  319. data := *bidding
  320. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  321. // 针对存量数据,重复数据不进索引
  322. if util.IntAll(data["extracttype"]) == -1 {
  323. tmp = make(map[string]interface{})
  324. continue
  325. }
  326. } else {
  327. //bidding_back
  328. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  329. data := *bidding
  330. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  331. // 针对存量数据,重复数据不进索引
  332. if util.IntAll(data["extracttype"]) == -1 {
  333. tmp = make(map[string]interface{})
  334. continue
  335. }
  336. }
  337. realNum++
  338. //2.es 更新字段
  339. esUpdate := update
  340. esUpdate["id"] = idStr
  341. if len(esUpdate) > 0 {
  342. // 更新es
  343. updateEsPool <- []map[string]interface{}{
  344. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  345. esUpdate,
  346. }
  347. }
  348. tmp = make(map[string]interface{})
  349. }
  350. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  351. }
  352. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  353. func dealResult() {
  354. defer util.Catch()
  355. sess := MgoR.GetMgoConn()
  356. defer MgoR.DestoryMongoConn(sess)
  357. where := map[string]interface{}{
  358. "_id": map[string]interface{}{
  359. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  360. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  361. },
  362. "subtype": "合同",
  363. }
  364. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  365. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  366. count := 0
  367. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  368. if count%1000 == 0 {
  369. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  370. }
  371. idStr := mongodb.BsonIdToSId(tmp["_id"])
  372. update := make(map[string]interface{})
  373. if tmp["signaturedate"] != nil {
  374. update["signaturedate"] = tmp["signaturedate"]
  375. }
  376. if tmp["contractperiod"] != nil {
  377. update["contractperiod"] = tmp["contractperiod"]
  378. }
  379. if tmp["expiredate"] != nil {
  380. update["expiredate"] = tmp["expiredate"]
  381. }
  382. if len(update) == 0 {
  383. continue
  384. }
  385. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  386. data := *bidding
  387. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  388. // 针对存量数据,重复数据不进索引
  389. if util.IntAll(data["extracttype"]) == -1 {
  390. tmp = make(map[string]interface{})
  391. continue
  392. }
  393. //2.es 更新字段
  394. esUpdate := update
  395. esUpdate["id"] = idStr
  396. if len(esUpdate) > 0 {
  397. // 更新es
  398. updateEsPool <- []map[string]interface{}{
  399. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  400. esUpdate,
  401. }
  402. }
  403. tmp = make(map[string]interface{})
  404. }
  405. log.Info("Run Over...Count:", log.Int("count", count))
  406. }
  407. // dealBidding 处理bidding数据
  408. func dealBidding() {
  409. defer util.Catch()
  410. sess := MgoB.GetMgoConn()
  411. defer MgoB.DestoryMongoConn(sess)
  412. where := map[string]interface{}{
  413. "comeintime": map[string]interface{}{
  414. "$lt": 1722009600,
  415. //"$lt": 1718812802,
  416. "$gte": 1718899200,
  417. },
  418. }
  419. //where := map[string]interface{}{
  420. // "_id": map[string]interface{}{
  421. // "$gte": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea71e"),
  422. // "$lt": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea720"),
  423. // },
  424. //}
  425. it := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter()
  426. fmt.Println("taskRun 开始")
  427. count := 0
  428. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  429. if count%10000 == 0 {
  430. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  431. }
  432. update := map[string]interface{}{}
  433. esUpdate := map[string]interface{}{}
  434. // 2.更新中标单位,中标金额
  435. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  436. // update["tag_topinformation"] = tag_topinformation
  437. //}
  438. //
  439. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  440. // update["property_form"] = property_form
  441. //}
  442. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  443. //fmt.Println(biddingID)
  444. /**
  445. "s_subscopeclass" : "其它",
  446. "s_topscopeclass" : "其它",
  447. "subscopeclass" : [
  448. "其它"
  449. ],
  450. "topscopeclass" : [
  451. "其它"
  452. ],
  453. */
  454. // 行业分类默认值
  455. resultSubs := make([]string, 0)
  456. resultTobs := make([]string, 0)
  457. if topscopeclass, ok := tmp["topscopeclass"]; ok && topscopeclass != nil {
  458. if topps, ok2 := topscopeclass.([]interface{}); ok2 {
  459. for _, v := range topps {
  460. top := util.ObjToString(v)
  461. if top != "" {
  462. resultTobs = append(resultTobs, top)
  463. }
  464. }
  465. }
  466. //1.一级分类是空数组或者 是 其它
  467. if len(resultTobs) == 0 || resultTobs[0] == "其它" {
  468. update["topscopeclass"] = []string{"其它"}
  469. update["subscopeclass"] = []string{"其它"}
  470. update["s_topscopeclass"] = "其它"
  471. update["s_subscopeclass"] = "其它"
  472. esUpdate["s_topscopeclass"] = "其它"
  473. esUpdate["s_subscopeclass"] = "其它"
  474. esUpdate["topscopeclass"] = []string{"其它"}
  475. } else {
  476. if subs, ok3 := tmp["subscopeclass"]; ok3 {
  477. if subbs, ok4 := subs.([]interface{}); ok4 {
  478. for _, v := range subbs {
  479. sub := util.ObjToString(v)
  480. if sub != "" && sub != "其它" {
  481. resultSubs = append(resultSubs, sub)
  482. }
  483. }
  484. }
  485. }
  486. newTops, newSubs, cleanedTops := ProcessTopscopeclass(resultTobs, resultSubs)
  487. update["topscopeclass"] = newTops
  488. update["subscopeclass"] = newSubs
  489. update["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  490. update["s_subscopeclass"] = strings.Join(newSubs, ",")
  491. esUpdate["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  492. esUpdate["s_subscopeclass"] = strings.Join(newSubs, ",")
  493. esUpdate["topscopeclass"] = newTops
  494. }
  495. } else {
  496. update["topscopeclass"] = []string{"其它"}
  497. update["subscopeclass"] = []string{"其它"}
  498. update["s_topscopeclass"] = "其它"
  499. update["s_subscopeclass"] = "其它"
  500. esUpdate["s_topscopeclass"] = "其它"
  501. esUpdate["s_subscopeclass"] = "其它"
  502. esUpdate["topscopeclass"] = []string{"其它"}
  503. }
  504. //procurementlist 处理预计采购时间
  505. //if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  506. // field := "procurementlist"
  507. // if tmp[field] != nil {
  508. // if field == "procurementlist" {
  509. // if tmp["procurementlist"] != nil {
  510. // var arr []interface{}
  511. // plist := tmp["procurementlist"].([]interface{})
  512. // for _, p := range plist {
  513. // p1 := p.(map[string]interface{})
  514. // p2 := make(map[string]interface{})
  515. // for k, v := range BiddingLevelField[field] {
  516. // if k == "projectname" && util.ObjToString(p1[k]) == "" {
  517. // p2[k] = util.ObjToString(tmp["projectname"])
  518. // } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  519. // p2[k] = util.ObjToString(tmp["buyer"])
  520. // } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  521. // res := getMethod(util.ObjToString(p1[k]))
  522. // if res != 0 {
  523. // p2[k] = res
  524. // }
  525. // } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  526. // p2[k] = p1[k]
  527. // }
  528. //
  529. // }
  530. // arr = append(arr, p2)
  531. // }
  532. // if len(arr) > 0 {
  533. // esUpdate[field] = arr
  534. // }
  535. // }
  536. // }
  537. // }
  538. //}
  539. if len(update) > 0 {
  540. //fmt.Println("aaaaa", biddingID)
  541. //更新mongo
  542. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  543. //更新MongoDB
  544. updatePool <- []map[string]interface{}{
  545. {"_id": tmp["_id"]},
  546. {"$set": update},
  547. }
  548. //2.es 项目 更新字段
  549. //err := Es.UpdateDocument("bidding", biddingID, update)
  550. //if err != nil && err.Error() != "Document not updated: noop" {
  551. // log.Info("bidding es update err", err, biddingID)
  552. //}
  553. //// 更新es
  554. //updateEsPool <- []map[string]interface{}{
  555. // {"_id": biddingID},
  556. // update,
  557. //}
  558. }
  559. // 更新Es 数据
  560. if len(esUpdate) > 0 {
  561. // 更新es
  562. updateEsPool <- []map[string]interface{}{
  563. {"_id": biddingID},
  564. esUpdate,
  565. }
  566. }
  567. }
  568. log.Info("Run Over...Count:", log.Int("count", count))
  569. }
  570. // dealBiddingAi 处理qfw_ai 数据库bidding 数据
  571. func dealBiddingAi() {
  572. defer util.Catch()
  573. sess := MgoBAi.GetMgoConn()
  574. defer MgoBAi.DestoryMongoConn(sess)
  575. it := sess.DB("qfw_ai").C("zzzzz_kkk_uc_0907").Find(nil).Select(nil).Iter()
  576. fmt.Println("taskRun 开始")
  577. count := 0
  578. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  579. if count%1000 == 0 {
  580. fmt.Println("current:", count)
  581. }
  582. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  583. update := map[string]interface{}{}
  584. if budget, ok := tmp["budget"]; ok && budget != nil {
  585. update["budget"] = budget
  586. }
  587. if bidamount, ok := tmp["bidamount"]; ok && bidamount != nil {
  588. update["bidamount"] = bidamount
  589. }
  590. if projectcode, ok := tmp["projectcode"]; ok && projectcode != nil {
  591. update["projectcode"] = projectcode
  592. }
  593. if len(update) > 0 {
  594. MgoBAi.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  595. //2.es 项目 更新字段
  596. err := Es.UpdateDocument("bidding_ai", biddingID, update)
  597. if err != nil && err.Error() != "Document not updated: noop" {
  598. log.Info("bidding es update err", err, biddingID)
  599. }
  600. }
  601. }
  602. fmt.Println("over ----------- over ")
  603. }
  604. func dealBiddingByEs() {
  605. //url := "http://172.17.4.184:19908"
  606. url := "http://127.0.0.1:19908"
  607. username := "jybid"
  608. password := "Top2023_JEB01i@31"
  609. index := "bidding" //索引名称
  610. //index := "projectset" //索引名称
  611. // 创建 Elasticsearch 客户端
  612. client, err := es7.NewClient(
  613. es7.SetURL(url),
  614. es7.SetBasicAuth(username, password),
  615. es7.SetSniff(false),
  616. )
  617. if err != nil {
  618. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  619. }
  620. query := es7.NewBoolQuery()
  621. query.Must(es7.NewRangeQuery("comeintime").Gt(1718812800))
  622. query.MustNot(es7.NewExistsQuery("s_topscopeclass"))
  623. ctx := context.Background()
  624. //开始滚动搜索
  625. scrollID := ""
  626. scroll := "10m"
  627. searchSource := es7.NewSearchSource().
  628. Query(query).
  629. Size(10000).
  630. Sort("_doc", true) //升序排序
  631. //Sort("_doc", false) //降序排序
  632. searchService := client.Scroll(index).
  633. Size(10000).
  634. Scroll(scroll).
  635. SearchSource(searchSource)
  636. res, err := searchService.Do(ctx)
  637. if err != nil {
  638. if err == io.EOF {
  639. fmt.Println("没有数据")
  640. } else {
  641. panic(err)
  642. }
  643. }
  644. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  645. fmt.Println("总数是:", res.TotalHits())
  646. total := 0
  647. for len(res.Hits.Hits) > 0 {
  648. for _, hit := range res.Hits.Hits {
  649. var doc map[string]interface{}
  650. err := json.Unmarshal(hit.Source, &doc)
  651. if err != nil {
  652. fmt.Printf("解析文档失败:%s", err)
  653. continue
  654. }
  655. //delete(doc, "filetext")
  656. //delete(doc, "detail")
  657. //
  658. ////存入新表
  659. //err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc)
  660. //if err != nil {
  661. // fmt.Println("error", doc["id"])
  662. //}
  663. }
  664. total = total + len(res.Hits.Hits)
  665. scrollID = res.ScrollId
  666. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  667. fmt.Println("current count:", total)
  668. if err != nil {
  669. if err == io.EOF {
  670. // 滚动到最后一批数据,退出循环
  671. break
  672. }
  673. fmt.Println("滚动搜索失败:", err, res)
  674. break // 处理错误时退出循环
  675. }
  676. }
  677. // 在循环外调用 ClearScroll
  678. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  679. if err != nil {
  680. fmt.Printf("清理滚动搜索失败:%s", err)
  681. }
  682. fmt.Println("结束~~~~~~~~~~~~~~~")
  683. }
  684. // dealBiddingTest 处理测试环境数据
  685. func dealBiddingTest() {
  686. defer util.Catch()
  687. sess := MgoT.GetMgoConn()
  688. defer MgoT.DestoryMongoConn(sess)
  689. it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter()
  690. fmt.Println("taskRun 开始")
  691. count := 0
  692. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  693. if count%10000 == 0 {
  694. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  695. }
  696. update := map[string]interface{}{}
  697. // 2.更新中标单位,中标金额
  698. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  699. // update["tag_topinformation"] = tag_topinformation
  700. //}
  701. //
  702. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  703. // update["property_form"] = property_form
  704. //}
  705. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  706. /**
  707. "s_subscopeclass" : "其它",
  708. "s_topscopeclass" : "其它",
  709. "subscopeclass" : [
  710. "其它"
  711. ],
  712. "topscopeclass" : [
  713. "其它"
  714. ],
  715. */
  716. // 行业分类默认值
  717. if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
  718. update["topscopeclass"] = []string{"其它"}
  719. update["s_topscopeclass"] = "其它"
  720. }
  721. if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
  722. update["subscopeclass"] = []string{"其它"}
  723. update["s_subscopeclass"] = "其它"
  724. }
  725. if util.ObjToString(tmp["s_topscopeclass"]) == "其它" {
  726. update["topscopeclass"] = []string{"其它"}
  727. update["s_topscopeclass"] = "其它"
  728. }
  729. if util.ObjToString(tmp["s_subscopeclass"]) == "其它" {
  730. update["subscopeclass"] = []string{"其它"}
  731. update["s_subscopeclass"] = "其它"
  732. }
  733. //procurementlist 处理预计采购时间
  734. if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  735. for field, _ := range BiddingField {
  736. if tmp[field] != nil {
  737. if field == "procurementlist" {
  738. if tmp["procurementlist"] != nil {
  739. var arr []interface{}
  740. plist := tmp["procurementlist"].([]interface{})
  741. for _, p := range plist {
  742. p1 := p.(map[string]interface{})
  743. p2 := make(map[string]interface{})
  744. for k, v := range BiddingLevelField[field] {
  745. if k == "projectname" && util.ObjToString(p1[k]) == "" {
  746. p2[k] = util.ObjToString(tmp["projectname"])
  747. } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  748. p2[k] = util.ObjToString(tmp["buyer"])
  749. } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  750. res := getMethod(util.ObjToString(p1[k]))
  751. if res != 0 {
  752. p2[k] = res
  753. }
  754. } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  755. p2[k] = p1[k]
  756. }
  757. }
  758. arr = append(arr, p2)
  759. }
  760. if len(arr) > 0 {
  761. update[field] = arr
  762. }
  763. }
  764. }
  765. }
  766. }
  767. }
  768. if len(update) > 0 {
  769. fmt.Println("aaaaa", biddingID)
  770. //更新mongo
  771. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  772. //更新MongoDB
  773. //updatePool <- []map[string]interface{}{
  774. // {"_id": tmp["_id"]},
  775. // {"$set": update},
  776. //}
  777. //2.es 项目 更新字段
  778. //err := Es.UpdateDocument("bidding", biddingID, update)
  779. //if err != nil && err.Error() != "Document not updated: noop" {
  780. // log.Info("bidding es update err", err, biddingID)
  781. //}
  782. // 更新es
  783. //updateEsPool <- []map[string]interface{}{
  784. // {"_id": biddingID},
  785. // update,
  786. //}
  787. }
  788. }
  789. log.Info("Run Over...Count:", log.Int("count", count))
  790. }
  791. // updateMethod 更新MongoDB
  792. func updateMethod() {
  793. arru := make([][]map[string]interface{}, saveSize)
  794. indexu := 0
  795. for {
  796. select {
  797. case v := <-updatePool:
  798. arru[indexu] = v
  799. indexu++
  800. if indexu == saveSize {
  801. updateSp <- true
  802. go func(arru [][]map[string]interface{}) {
  803. defer func() {
  804. <-updateSp
  805. }()
  806. MgoB.UpdateBulk("bidding", arru...)
  807. }(arru)
  808. arru = make([][]map[string]interface{}, saveSize)
  809. indexu = 0
  810. }
  811. case <-time.After(1000 * time.Millisecond):
  812. if indexu > 0 {
  813. updateSp <- true
  814. go func(arru [][]map[string]interface{}) {
  815. defer func() {
  816. <-updateSp
  817. }()
  818. MgoB.UpdateBulk("bidding", arru...)
  819. }(arru[:indexu])
  820. arru = make([][]map[string]interface{}, saveSize)
  821. indexu = 0
  822. }
  823. }
  824. }
  825. }
  826. // updateEsMethod 更新es
  827. func updateEsMethod() {
  828. arru := make([][]map[string]interface{}, 200)
  829. indexu := 0
  830. for {
  831. select {
  832. case v := <-updateEsPool:
  833. arru[indexu] = v
  834. indexu++
  835. if indexu == 200 {
  836. updateEsSp <- true
  837. go func(arru [][]map[string]interface{}) {
  838. defer func() {
  839. <-updateEsSp
  840. }()
  841. Es.UpdateBulk("bidding", arru...)
  842. EsNew.UpdateBulk("bidding", arru...)
  843. }(arru)
  844. arru = make([][]map[string]interface{}, 200)
  845. indexu = 0
  846. }
  847. case <-time.After(1000 * time.Millisecond):
  848. if indexu > 0 {
  849. updateEsSp <- true
  850. go func(arru [][]map[string]interface{}) {
  851. defer func() {
  852. <-updateEsSp
  853. }()
  854. Es.UpdateBulk("bidding", arru...)
  855. EsNew.UpdateBulk("bidding", arru...)
  856. }(arru[:indexu])
  857. arru = make([][]map[string]interface{}, 200)
  858. indexu = 0
  859. }
  860. }
  861. }
  862. }
  863. // updateProjectEsMethod 更新项目索引
  864. func updateProjectEsMethod() {
  865. arru := make([][]map[string]interface{}, 200)
  866. indexu := 0
  867. for {
  868. select {
  869. case v := <-updateEsPool:
  870. arru[indexu] = v
  871. indexu++
  872. if indexu == 200 {
  873. updateEsSp <- true
  874. go func(arru [][]map[string]interface{}) {
  875. defer func() {
  876. <-updateEsSp
  877. }()
  878. Es.UpdateBulk("projectset", arru...)
  879. }(arru)
  880. arru = make([][]map[string]interface{}, 200)
  881. indexu = 0
  882. }
  883. case <-time.After(1000 * time.Millisecond):
  884. if indexu > 0 {
  885. updateEsSp <- true
  886. go func(arru [][]map[string]interface{}) {
  887. defer func() {
  888. <-updateEsSp
  889. }()
  890. Es.UpdateBulk("projectset", arru...)
  891. }(arru[:indexu])
  892. arru = make([][]map[string]interface{}, 200)
  893. indexu = 0
  894. }
  895. }
  896. }
  897. }