main.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. es7 "github.com/olivere/elastic/v7"
  7. "github.com/wcc4869/common_utils/log"
  8. "go.uber.org/zap"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "reflect"
  14. "strings"
  15. "time"
  16. )
  17. var (
  18. Mgo *mongodb.MongodbSim
  19. MgoB *mongodb.MongodbSim
  20. MgoBAi *mongodb.MongodbSim
  21. MgoT *mongodb.MongodbSim //测试环境链接
  22. MgoR *mongodb.MongodbSim
  23. saveSize = 50
  24. Es *elastic.Elastic // 19908
  25. EsNew *elastic.Elastic //19905
  26. //EsT *elastic.Elastic
  27. // 更新mongo
  28. updatePool = make(chan []map[string]interface{}, 5000)
  29. updateSp = make(chan bool, 5)
  30. //更新es
  31. updateEsPool = make(chan []map[string]interface{}, 5000)
  32. updateEsSp = make(chan bool, 5) //保存协程
  33. updateProjectEsPool = make(chan []map[string]interface{}, 5000)
  34. updateProjectEsSp = make(chan bool, 5) //保存协程
  35. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  36. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  37. )
  38. func Init() {
  39. MgoB = &mongodb.MongodbSim{
  40. //MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  41. MongodbAddr: "127.0.0.1:27083",
  42. DbName: "qfw",
  43. Size: 10,
  44. UserName: "SJZY_RWbid_ES",
  45. Password: "SJZY@B4i4D5e6S",
  46. Direct: true,
  47. }
  48. MgoB.InitPool()
  49. //MgoBAi = &mongodb.MongodbSim{
  50. // //MongodbAddr: "172.17.189.140:27080",
  51. // MongodbAddr: "127.0.0.1:27083",
  52. // DbName: "qfw_ai",
  53. // Size: 10,
  54. // UserName: "SJZY_RWbid_ES",
  55. // Password: "SJZY@B4i4D5e6S",
  56. // Direct: true,
  57. //}
  58. //MgoBAi.InitPool()
  59. //mongodb 163
  60. //Mgo = &mongodb.MongodbSim{
  61. // //MongodbAddr: "172.17.189.140:27080",
  62. // MongodbAddr: "127.0.0.1:27083",
  63. // DbName: "qfw",
  64. // Size: 10,
  65. // UserName: "SJZY_RWbid_ES",
  66. // Password: "SJZY@B4i4D5e6S",
  67. // Direct: true,
  68. //}
  69. //Mgo.InitPool()
  70. //85
  71. MgoR = &mongodb.MongodbSim{
  72. MongodbAddr: "127.0.0.1:27080",
  73. //MongodbAddr: "172.17.4.85:27080",
  74. DbName: "qfw",
  75. Size: 10,
  76. Direct: true,
  77. }
  78. MgoR.InitPool()
  79. //测试环境MongoDB
  80. //MgoT = &mongodb.MongodbSim{
  81. // //MongodbAddr: "172.17.189.140:27080",
  82. // MongodbAddr: "192.168.3.206:27002",
  83. // DbName: "qfw_data",
  84. // Size: 10,
  85. // UserName: "root",
  86. // Password: "root",
  87. // //Direct: true,
  88. //}
  89. //MgoT.InitPool()
  90. ////测试环境es
  91. //Es = &elastic.Elastic{
  92. // S_esurl: "http://192.168.3.149:9201",
  93. // //S_esurl: "http://172.17.4.184:19805",
  94. // I_size: 5,
  95. // Username: "",
  96. // Password: "",
  97. //}
  98. //Es.InitElasticSize()
  99. //es
  100. Es = &elastic.Elastic{
  101. S_esurl: "http://127.0.0.1:19908",
  102. //S_esurl: "http://172.17.4.184:19908",
  103. I_size: 5,
  104. Username: "jybid",
  105. Password: "Top2023_JEB01i@31",
  106. }
  107. Es.InitElasticSize()
  108. //es 新集群
  109. //EsNew = &elastic.Elastic{
  110. // S_esurl: "http://127.0.0.1:19905",
  111. // //S_esurl: "http://172.17.4.184:19905",
  112. // I_size: 5,
  113. // Username: "jybid",
  114. // Password: "Top2023_JEB01i@31",
  115. //}
  116. //EsNew.InitElasticSize()
  117. }
  118. func main() {
  119. Init()
  120. //InitEsBiddingField()
  121. //go updateMethod() //更新mongodb
  122. //go updateEsMethod() //更新es
  123. //go updateEsHrefMethod() //更新es href 字段
  124. //go updateProjectEsMethod()
  125. //taskRunProject()
  126. //taskRunBidding()
  127. //dealBidding() //正式环境bidding数据处理
  128. //dealBiddingAi() //正式环境bidding数据处理
  129. //dealBiddingTest() // 测试环境数据处理
  130. //dealBiddingEsHref() // 根据临时表,更新es href 字段
  131. //dealBiddingNiJian() //更新拟建数据中buyer = owner
  132. //updateBiddingBidamount()
  133. //updateProject()
  134. log.Info("over")
  135. c := make(chan bool, 1)
  136. <-c
  137. }
  138. func InitEsBiddingField() {
  139. now := time.Now()
  140. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  141. if len(*info) > 0 {
  142. for _, m := range *info {
  143. if util.IntAll(m["level"]) == 1 {
  144. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  145. } else if util.IntAll(m["level"]) == 2 {
  146. pfield := util.ObjToString(m["pfield"])
  147. pfieldMap := BiddingLevelField[pfield]
  148. if pfieldMap == nil {
  149. pfieldMap = make(map[string]string, 0)
  150. }
  151. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  152. BiddingLevelField[pfield] = pfieldMap
  153. }
  154. }
  155. }
  156. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  157. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  158. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  159. }
  160. // taskRun 更新es 省市区三个字段
  161. func taskRunBidding() {
  162. defer util.Catch()
  163. sess := Mgo.GetMgoConn()
  164. defer Mgo.DestoryMongoConn(sess)
  165. //查询条件
  166. //q := map[string]interface{}{
  167. // //"_id": map[string]interface{}{
  168. // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  169. // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  170. // //
  171. // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  172. // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  173. // //},
  174. // //"comeintime": map[string]interface{}{
  175. // // "$gt": 1669824000,
  176. // // //"$lte": 1669864950,
  177. // // "$lte": 1702265941,
  178. // //},
  179. // //"site": "国家能源e购",
  180. // "toptype": map[string]interface{}{"$exists": 0},
  181. //}
  182. //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  183. it := sess.DB("qfw").C("zktest_bidding_0706").Find(nil).Select(nil).Sort("_id").Iter()
  184. fmt.Println("taskRun 开始")
  185. count := 0
  186. //realNum := 0
  187. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  188. if count%1000 == 0 {
  189. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  190. }
  191. update := map[string]interface{}{}
  192. // 1.更新省市区
  193. //if area, ok := tmp["area"]; ok && area != nil {
  194. // update["area"] = area
  195. //} else {
  196. // update["area"] = ""
  197. //}
  198. //
  199. //if city, ok := tmp["city"]; ok && city != nil {
  200. // update["city"] = city
  201. //} else {
  202. // update["city"] = ""
  203. //}
  204. //
  205. //if district, ok := tmp["district"]; ok && district != nil {
  206. // update["district"] = district
  207. //} else {
  208. // update["district"] = ""
  209. //}
  210. // 2.更新中标单位
  211. biddingID := util.ObjToString(tmp["id"])
  212. //biddingID := mongodb.BsonIdToSId(tmp["_id"])
  213. update["winner"] = ""
  214. update["s_winner"] = ""
  215. if len(update) > 0 {
  216. //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  217. //2.es 项目 更新字段
  218. err := Es.UpdateDocument("bidding", biddingID, update)
  219. err = EsNew.UpdateDocument("bidding", biddingID, update)
  220. if err != nil && err.Error() != "Document not updated: noop" {
  221. log.Info("bidding es update err", err, biddingID)
  222. }
  223. }
  224. }
  225. log.Info("Run Over...Count:", log.Int("count", count))
  226. }
  227. // taskRunProject 更新项目表 省市区
  228. func taskRunProject() {
  229. defer util.Catch()
  230. sess := Mgo.GetMgoConn()
  231. defer Mgo.DestoryMongoConn(sess)
  232. // 项目数据
  233. MgoP := &mongodb.MongodbSim{
  234. MongodbAddr: "172.17.4.85:27080",
  235. //MongodbAddr: "127.0.0.1:27080",
  236. Size: 10,
  237. DbName: "qfw",
  238. //Direct: true,
  239. }
  240. MgoP.InitPool()
  241. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  242. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  243. fmt.Println("taskRun 开始")
  244. count := 0
  245. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  246. if count%10000 == 0 {
  247. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  248. }
  249. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  250. where := map[string]interface{}{
  251. "ids": biddingID,
  252. }
  253. // 找到对应项目数据
  254. p, _ := MgoP.FindOne("projectset_20230904", where)
  255. projectId := mongodb.BsonIdToSId((*p)["_id"])
  256. //1.更新MongoDB
  257. update := map[string]interface{}{}
  258. if area, ok := tmp["area"]; ok && area != nil {
  259. update["area"] = area
  260. } else {
  261. update["area"] = ""
  262. }
  263. if city, ok := tmp["city"]; ok && city != nil {
  264. update["city"] = city
  265. } else {
  266. update["city"] = ""
  267. }
  268. if district, ok := tmp["district"]; ok && district != nil {
  269. update["district"] = district
  270. } else {
  271. update["district"] = ""
  272. }
  273. if len(update) > 0 {
  274. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  275. //2.es 项目 更新字段
  276. err := Es.UpdateDocument("projectset", projectId, update)
  277. if err != nil {
  278. log.Info("es update err", err, projectId)
  279. }
  280. }
  281. //2.es 项目 更新字段
  282. //if len(update) > 0 {
  283. // // 更新es
  284. // //updateEsPool <- []map[string]interface{}{
  285. // // {"_id": projectId},
  286. // // update,
  287. // //}
  288. //}
  289. }
  290. log.Info("Run Over...Count:", log.Int("count", count))
  291. }
  292. // dealData 正式环境,同步合同期限
  293. func dealData() {
  294. defer util.Catch()
  295. sess := Mgo.GetMgoConn()
  296. defer Mgo.DestoryMongoConn(sess)
  297. //where := map[string]interface{}{
  298. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  299. //}
  300. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  301. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  302. count := 0
  303. realNum := 0
  304. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  305. if count%1000 == 0 {
  306. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  307. }
  308. idStr := mongodb.BsonIdToSId(tmp["_id"])
  309. update := make(map[string]interface{})
  310. if tmp["signaturedate"] != nil {
  311. update["signaturedate"] = tmp["signaturedate"]
  312. }
  313. if tmp["contractperiod"] != nil {
  314. update["contractperiod"] = tmp["contractperiod"]
  315. }
  316. if tmp["expiredate"] != nil {
  317. update["expiredate"] = tmp["expiredate"]
  318. }
  319. if len(update) == 0 {
  320. continue
  321. }
  322. //bidding 表
  323. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  324. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  325. data := *bidding
  326. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  327. // 针对存量数据,重复数据不进索引
  328. if util.IntAll(data["extracttype"]) == -1 {
  329. tmp = make(map[string]interface{})
  330. continue
  331. }
  332. } else {
  333. //bidding_back
  334. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  335. data := *bidding
  336. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  337. // 针对存量数据,重复数据不进索引
  338. if util.IntAll(data["extracttype"]) == -1 {
  339. tmp = make(map[string]interface{})
  340. continue
  341. }
  342. }
  343. realNum++
  344. //2.es 更新字段
  345. esUpdate := update
  346. esUpdate["id"] = idStr
  347. if len(esUpdate) > 0 {
  348. // 更新es
  349. updateEsPool <- []map[string]interface{}{
  350. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  351. esUpdate,
  352. }
  353. }
  354. tmp = make(map[string]interface{})
  355. }
  356. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  357. }
  358. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  359. func dealResult() {
  360. defer util.Catch()
  361. sess := MgoR.GetMgoConn()
  362. defer MgoR.DestoryMongoConn(sess)
  363. where := map[string]interface{}{
  364. "_id": map[string]interface{}{
  365. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  366. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  367. },
  368. "subtype": "合同",
  369. }
  370. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  371. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  372. count := 0
  373. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  374. if count%1000 == 0 {
  375. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  376. }
  377. idStr := mongodb.BsonIdToSId(tmp["_id"])
  378. update := make(map[string]interface{})
  379. if tmp["signaturedate"] != nil {
  380. update["signaturedate"] = tmp["signaturedate"]
  381. }
  382. if tmp["contractperiod"] != nil {
  383. update["contractperiod"] = tmp["contractperiod"]
  384. }
  385. if tmp["expiredate"] != nil {
  386. update["expiredate"] = tmp["expiredate"]
  387. }
  388. if len(update) == 0 {
  389. continue
  390. }
  391. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  392. data := *bidding
  393. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  394. // 针对存量数据,重复数据不进索引
  395. if util.IntAll(data["extracttype"]) == -1 {
  396. tmp = make(map[string]interface{})
  397. continue
  398. }
  399. //2.es 更新字段
  400. esUpdate := update
  401. esUpdate["id"] = idStr
  402. if len(esUpdate) > 0 {
  403. // 更新es
  404. updateEsPool <- []map[string]interface{}{
  405. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  406. esUpdate,
  407. }
  408. }
  409. tmp = make(map[string]interface{})
  410. }
  411. log.Info("Run Over...Count:", log.Int("count", count))
  412. }
  413. // dealBidding 处理bidding数据
  414. func dealBidding() {
  415. defer util.Catch()
  416. sess := MgoB.GetMgoConn()
  417. defer MgoB.DestoryMongoConn(sess)
  418. where := map[string]interface{}{
  419. "comeintime": map[string]interface{}{
  420. "$lt": 1722009600,
  421. //"$lt": 1718812802,
  422. "$gte": 1718899200,
  423. },
  424. }
  425. //where := map[string]interface{}{
  426. // "_id": map[string]interface{}{
  427. // "$gte": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea71e"),
  428. // "$lt": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea720"),
  429. // },
  430. //}
  431. it := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter()
  432. fmt.Println("taskRun 开始")
  433. count := 0
  434. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  435. if count%10000 == 0 {
  436. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  437. }
  438. update := map[string]interface{}{}
  439. esUpdate := map[string]interface{}{}
  440. // 2.更新中标单位,中标金额
  441. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  442. // update["tag_topinformation"] = tag_topinformation
  443. //}
  444. //
  445. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  446. // update["property_form"] = property_form
  447. //}
  448. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  449. //fmt.Println(biddingID)
  450. /**
  451. "s_subscopeclass" : "其它",
  452. "s_topscopeclass" : "其它",
  453. "subscopeclass" : [
  454. "其它"
  455. ],
  456. "topscopeclass" : [
  457. "其它"
  458. ],
  459. */
  460. // 行业分类默认值
  461. resultSubs := make([]string, 0)
  462. resultTobs := make([]string, 0)
  463. if topscopeclass, ok := tmp["topscopeclass"]; ok && topscopeclass != nil {
  464. if topps, ok2 := topscopeclass.([]interface{}); ok2 {
  465. for _, v := range topps {
  466. top := util.ObjToString(v)
  467. if top != "" {
  468. resultTobs = append(resultTobs, top)
  469. }
  470. }
  471. }
  472. //1.一级分类是空数组或者 是 其它
  473. if len(resultTobs) == 0 || resultTobs[0] == "其它" {
  474. update["topscopeclass"] = []string{"其它"}
  475. update["subscopeclass"] = []string{"其它"}
  476. update["s_topscopeclass"] = "其它"
  477. update["s_subscopeclass"] = "其它"
  478. esUpdate["s_topscopeclass"] = "其它"
  479. esUpdate["s_subscopeclass"] = "其它"
  480. esUpdate["topscopeclass"] = []string{"其它"}
  481. } else {
  482. if subs, ok3 := tmp["subscopeclass"]; ok3 {
  483. if subbs, ok4 := subs.([]interface{}); ok4 {
  484. for _, v := range subbs {
  485. sub := util.ObjToString(v)
  486. if sub != "" && sub != "其它" {
  487. resultSubs = append(resultSubs, sub)
  488. }
  489. }
  490. }
  491. }
  492. newTops, newSubs, cleanedTops := ProcessTopscopeclass(resultTobs, resultSubs)
  493. update["topscopeclass"] = newTops
  494. update["subscopeclass"] = newSubs
  495. update["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  496. update["s_subscopeclass"] = strings.Join(newSubs, ",")
  497. esUpdate["s_topscopeclass"] = strings.Join(cleanedTops, ",")
  498. esUpdate["s_subscopeclass"] = strings.Join(newSubs, ",")
  499. esUpdate["topscopeclass"] = newTops
  500. }
  501. } else {
  502. update["topscopeclass"] = []string{"其它"}
  503. update["subscopeclass"] = []string{"其它"}
  504. update["s_topscopeclass"] = "其它"
  505. update["s_subscopeclass"] = "其它"
  506. esUpdate["s_topscopeclass"] = "其它"
  507. esUpdate["s_subscopeclass"] = "其它"
  508. esUpdate["topscopeclass"] = []string{"其它"}
  509. }
  510. //procurementlist 处理预计采购时间
  511. //if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  512. // field := "procurementlist"
  513. // if tmp[field] != nil {
  514. // if field == "procurementlist" {
  515. // if tmp["procurementlist"] != nil {
  516. // var arr []interface{}
  517. // plist := tmp["procurementlist"].([]interface{})
  518. // for _, p := range plist {
  519. // p1 := p.(map[string]interface{})
  520. // p2 := make(map[string]interface{})
  521. // for k, v := range BiddingLevelField[field] {
  522. // if k == "projectname" && util.ObjToString(p1[k]) == "" {
  523. // p2[k] = util.ObjToString(tmp["projectname"])
  524. // } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  525. // p2[k] = util.ObjToString(tmp["buyer"])
  526. // } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  527. // res := getMethod(util.ObjToString(p1[k]))
  528. // if res != 0 {
  529. // p2[k] = res
  530. // }
  531. // } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  532. // p2[k] = p1[k]
  533. // }
  534. //
  535. // }
  536. // arr = append(arr, p2)
  537. // }
  538. // if len(arr) > 0 {
  539. // esUpdate[field] = arr
  540. // }
  541. // }
  542. // }
  543. // }
  544. //}
  545. if len(update) > 0 {
  546. //fmt.Println("aaaaa", biddingID)
  547. //更新mongo
  548. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  549. //更新MongoDB
  550. updatePool <- []map[string]interface{}{
  551. {"_id": tmp["_id"]},
  552. {"$set": update},
  553. }
  554. //2.es 项目 更新字段
  555. //err := Es.UpdateDocument("bidding", biddingID, update)
  556. //if err != nil && err.Error() != "Document not updated: noop" {
  557. // log.Info("bidding es update err", err, biddingID)
  558. //}
  559. //// 更新es
  560. //updateEsPool <- []map[string]interface{}{
  561. // {"_id": biddingID},
  562. // update,
  563. //}
  564. }
  565. // 更新Es 数据
  566. if len(esUpdate) > 0 {
  567. // 更新es
  568. updateEsPool <- []map[string]interface{}{
  569. {"_id": biddingID},
  570. esUpdate,
  571. }
  572. }
  573. }
  574. log.Info("Run Over...Count:", log.Int("count", count))
  575. }
  576. // dealBiddingAi 处理qfw_ai 数据库bidding 数据
  577. func dealBiddingAi() {
  578. defer util.Catch()
  579. sess := MgoBAi.GetMgoConn()
  580. defer MgoBAi.DestoryMongoConn(sess)
  581. it := sess.DB("qfw_ai").C("zxl_20240926").Find(nil).Select(nil).Iter()
  582. fmt.Println("taskRun 开始")
  583. count := 0
  584. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  585. if count%1000 == 0 {
  586. fmt.Println("current:", count)
  587. }
  588. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  589. update := map[string]interface{}{}
  590. //if budget, ok := tmp["budget"]; ok && budget != nil {
  591. // update["budget"] = budget
  592. //}
  593. if bidamount, ok := tmp["bidamount"]; ok && bidamount != nil {
  594. update["bidamount"] = bidamount
  595. } else {
  596. update["bidamount"] = 0.0
  597. }
  598. //if projectcode, ok := tmp["projectcode"]; ok && projectcode != nil {
  599. // update["projectcode"] = projectcode
  600. //}
  601. if len(update) > 0 {
  602. MgoBAi.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  603. //2.es 项目 更新字段
  604. err := Es.UpdateDocument("bidding_ai", biddingID, update)
  605. if err != nil && err.Error() != "Document not updated: noop" {
  606. log.Info("bidding es update err", err, biddingID)
  607. }
  608. }
  609. }
  610. fmt.Println("over ----------- over ")
  611. }
  612. func dealBiddingByEs() {
  613. //url := "http://172.17.4.184:19908"
  614. url := "http://127.0.0.1:19908"
  615. username := "jybid"
  616. password := "Top2023_JEB01i@31"
  617. index := "bidding" //索引名称
  618. //index := "projectset" //索引名称
  619. // 创建 Elasticsearch 客户端
  620. client, err := es7.NewClient(
  621. es7.SetURL(url),
  622. es7.SetBasicAuth(username, password),
  623. es7.SetSniff(false),
  624. )
  625. if err != nil {
  626. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  627. }
  628. query := es7.NewBoolQuery()
  629. query.Must(es7.NewRangeQuery("comeintime").Gt(1718812800))
  630. query.MustNot(es7.NewExistsQuery("s_topscopeclass"))
  631. ctx := context.Background()
  632. //开始滚动搜索
  633. scrollID := ""
  634. scroll := "10m"
  635. searchSource := es7.NewSearchSource().
  636. Query(query).
  637. Size(10000).
  638. Sort("_doc", true) //升序排序
  639. //Sort("_doc", false) //降序排序
  640. searchService := client.Scroll(index).
  641. Size(10000).
  642. Scroll(scroll).
  643. SearchSource(searchSource)
  644. res, err := searchService.Do(ctx)
  645. if err != nil {
  646. if err == io.EOF {
  647. fmt.Println("没有数据")
  648. } else {
  649. panic(err)
  650. }
  651. }
  652. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  653. fmt.Println("总数是:", res.TotalHits())
  654. total := 0
  655. for len(res.Hits.Hits) > 0 {
  656. for _, hit := range res.Hits.Hits {
  657. var doc map[string]interface{}
  658. err := json.Unmarshal(hit.Source, &doc)
  659. if err != nil {
  660. fmt.Printf("解析文档失败:%s", err)
  661. continue
  662. }
  663. //delete(doc, "filetext")
  664. //delete(doc, "detail")
  665. //
  666. ////存入新表
  667. //err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc)
  668. //if err != nil {
  669. // fmt.Println("error", doc["id"])
  670. //}
  671. }
  672. total = total + len(res.Hits.Hits)
  673. scrollID = res.ScrollId
  674. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  675. fmt.Println("current count:", total)
  676. if err != nil {
  677. if err == io.EOF {
  678. // 滚动到最后一批数据,退出循环
  679. break
  680. }
  681. fmt.Println("滚动搜索失败:", err, res)
  682. break // 处理错误时退出循环
  683. }
  684. }
  685. // 在循环外调用 ClearScroll
  686. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  687. if err != nil {
  688. fmt.Printf("清理滚动搜索失败:%s", err)
  689. }
  690. fmt.Println("结束~~~~~~~~~~~~~~~")
  691. }
  692. // dealBiddingTest 处理测试环境数据
  693. func dealBiddingTest() {
  694. defer util.Catch()
  695. sess := MgoT.GetMgoConn()
  696. defer MgoT.DestoryMongoConn(sess)
  697. it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter()
  698. fmt.Println("taskRun 开始")
  699. count := 0
  700. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  701. if count%10000 == 0 {
  702. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  703. }
  704. update := map[string]interface{}{}
  705. // 2.更新中标单位,中标金额
  706. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  707. // update["tag_topinformation"] = tag_topinformation
  708. //}
  709. //
  710. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  711. // update["property_form"] = property_form
  712. //}
  713. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  714. /**
  715. "s_subscopeclass" : "其它",
  716. "s_topscopeclass" : "其它",
  717. "subscopeclass" : [
  718. "其它"
  719. ],
  720. "topscopeclass" : [
  721. "其它"
  722. ],
  723. */
  724. // 行业分类默认值
  725. if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
  726. update["topscopeclass"] = []string{"其它"}
  727. update["s_topscopeclass"] = "其它"
  728. }
  729. if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
  730. update["subscopeclass"] = []string{"其它"}
  731. update["s_subscopeclass"] = "其它"
  732. }
  733. if util.ObjToString(tmp["s_topscopeclass"]) == "其它" {
  734. update["topscopeclass"] = []string{"其它"}
  735. update["s_topscopeclass"] = "其它"
  736. }
  737. if util.ObjToString(tmp["s_subscopeclass"]) == "其它" {
  738. update["subscopeclass"] = []string{"其它"}
  739. update["s_subscopeclass"] = "其它"
  740. }
  741. //procurementlist 处理预计采购时间
  742. if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  743. for field, _ := range BiddingField {
  744. if tmp[field] != nil {
  745. if field == "procurementlist" {
  746. if tmp["procurementlist"] != nil {
  747. var arr []interface{}
  748. plist := tmp["procurementlist"].([]interface{})
  749. for _, p := range plist {
  750. p1 := p.(map[string]interface{})
  751. p2 := make(map[string]interface{})
  752. for k, v := range BiddingLevelField[field] {
  753. if k == "projectname" && util.ObjToString(p1[k]) == "" {
  754. p2[k] = util.ObjToString(tmp["projectname"])
  755. } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  756. p2[k] = util.ObjToString(tmp["buyer"])
  757. } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  758. res := getMethod(util.ObjToString(p1[k]))
  759. if res != 0 {
  760. p2[k] = res
  761. }
  762. } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  763. p2[k] = p1[k]
  764. }
  765. }
  766. arr = append(arr, p2)
  767. }
  768. if len(arr) > 0 {
  769. update[field] = arr
  770. }
  771. }
  772. }
  773. }
  774. }
  775. }
  776. if len(update) > 0 {
  777. fmt.Println("aaaaa", biddingID)
  778. //更新mongo
  779. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  780. //更新MongoDB
  781. //updatePool <- []map[string]interface{}{
  782. // {"_id": tmp["_id"]},
  783. // {"$set": update},
  784. //}
  785. //2.es 项目 更新字段
  786. //err := Es.UpdateDocument("bidding", biddingID, update)
  787. //if err != nil && err.Error() != "Document not updated: noop" {
  788. // log.Info("bidding es update err", err, biddingID)
  789. //}
  790. // 更新es
  791. //updateEsPool <- []map[string]interface{}{
  792. // {"_id": biddingID},
  793. // update,
  794. //}
  795. }
  796. }
  797. log.Info("Run Over...Count:", log.Int("count", count))
  798. }
  799. // updateMethod 更新MongoDB
  800. func updateMethod() {
  801. arru := make([][]map[string]interface{}, saveSize)
  802. indexu := 0
  803. for {
  804. select {
  805. case v := <-updatePool:
  806. arru[indexu] = v
  807. indexu++
  808. if indexu == saveSize {
  809. updateSp <- true
  810. go func(arru [][]map[string]interface{}) {
  811. defer func() {
  812. <-updateSp
  813. }()
  814. MgoB.UpdateBulk("bidding", arru...)
  815. }(arru)
  816. arru = make([][]map[string]interface{}, saveSize)
  817. indexu = 0
  818. }
  819. case <-time.After(1000 * time.Millisecond):
  820. if indexu > 0 {
  821. updateSp <- true
  822. go func(arru [][]map[string]interface{}) {
  823. defer func() {
  824. <-updateSp
  825. }()
  826. MgoB.UpdateBulk("bidding", arru...)
  827. }(arru[:indexu])
  828. arru = make([][]map[string]interface{}, saveSize)
  829. indexu = 0
  830. }
  831. }
  832. }
  833. }
  834. // updateEsMethod 更新es
  835. func updateEsMethod() {
  836. arru := make([][]map[string]interface{}, 200)
  837. indexu := 0
  838. for {
  839. select {
  840. case v := <-updateEsPool:
  841. arru[indexu] = v
  842. indexu++
  843. if indexu == 200 {
  844. updateEsSp <- true
  845. go func(arru [][]map[string]interface{}) {
  846. defer func() {
  847. <-updateEsSp
  848. }()
  849. Es.UpdateBulk("bidding", arru...)
  850. EsNew.UpdateBulk("bidding", arru...)
  851. }(arru)
  852. arru = make([][]map[string]interface{}, 200)
  853. indexu = 0
  854. }
  855. case <-time.After(1000 * time.Millisecond):
  856. if indexu > 0 {
  857. updateEsSp <- true
  858. go func(arru [][]map[string]interface{}) {
  859. defer func() {
  860. <-updateEsSp
  861. }()
  862. Es.UpdateBulk("bidding", arru...)
  863. EsNew.UpdateBulk("bidding", arru...)
  864. }(arru[:indexu])
  865. arru = make([][]map[string]interface{}, 200)
  866. indexu = 0
  867. }
  868. }
  869. }
  870. }
  871. // updateEsMethod 更新es href 字段
  872. func updateEsHrefMethod() {
  873. arru := make([][]map[string]interface{}, 200)
  874. indexu := 0
  875. for {
  876. select {
  877. case v := <-updateEsPool:
  878. arru[indexu] = v
  879. indexu++
  880. if indexu == 200 {
  881. updateEsSp <- true
  882. go func(arru [][]map[string]interface{}) {
  883. defer func() {
  884. <-updateEsSp
  885. }()
  886. Es.UpdateBulk("bidding", arru...)
  887. Es.UpdateBulk("bidding_ai", arru...)
  888. Es.UpdateBulk("bidding_temporary", arru...)
  889. EsNew.UpdateBulk("bidding", arru...)
  890. EsNew.UpdateBulk("bidding_customer", arru...)
  891. EsNew.UpdateBulk("bidding_free", arru...)
  892. EsNew.UpdateBulk("bidding_year", arru...)
  893. EsNew.UpdateBulk("bidding_all", arru...)
  894. EsNew.UpdateBulk("bidding_temporary", arru...)
  895. }(arru)
  896. arru = make([][]map[string]interface{}, 200)
  897. indexu = 0
  898. }
  899. case <-time.After(1000 * time.Millisecond):
  900. if indexu > 0 {
  901. updateEsSp <- true
  902. go func(arru [][]map[string]interface{}) {
  903. defer func() {
  904. <-updateEsSp
  905. }()
  906. Es.UpdateBulk("bidding", arru...)
  907. Es.UpdateBulk("bidding_ai", arru...)
  908. Es.UpdateBulk("bidding_temporary", arru...)
  909. EsNew.UpdateBulk("bidding", arru...)
  910. EsNew.UpdateBulk("bidding_customer", arru...)
  911. EsNew.UpdateBulk("bidding_free", arru...)
  912. EsNew.UpdateBulk("bidding_year", arru...)
  913. EsNew.UpdateBulk("bidding_all", arru...)
  914. EsNew.UpdateBulk("bidding_temporary", arru...)
  915. }(arru[:indexu])
  916. arru = make([][]map[string]interface{}, 200)
  917. indexu = 0
  918. }
  919. }
  920. }
  921. }
  922. // updateProjectEsMethod 更新项目索引
  923. func updateProjectEsMethod() {
  924. arru := make([][]map[string]interface{}, 200)
  925. indexu := 0
  926. for {
  927. select {
  928. case v := <-updateProjectEsPool:
  929. arru[indexu] = v
  930. indexu++
  931. if indexu == 200 {
  932. updateProjectEsSp <- true
  933. go func(arru [][]map[string]interface{}) {
  934. defer func() {
  935. <-updateProjectEsSp
  936. }()
  937. Es.UpdateBulk("projectset", arru...)
  938. }(arru)
  939. arru = make([][]map[string]interface{}, 200)
  940. indexu = 0
  941. }
  942. case <-time.After(1000 * time.Millisecond):
  943. if indexu > 0 {
  944. updateProjectEsSp <- true
  945. go func(arru [][]map[string]interface{}) {
  946. defer func() {
  947. <-updateProjectEsSp
  948. }()
  949. Es.UpdateBulk("projectset", arru...)
  950. }(arru[:indexu])
  951. arru = make([][]map[string]interface{}, 200)
  952. indexu = 0
  953. }
  954. }
  955. }
  956. }