main.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/wcc4869/common_utils/log"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "reflect"
  10. "time"
  11. )
  12. var (
  13. Mgo *mongodb.MongodbSim
  14. MgoB *mongodb.MongodbSim
  15. MgoT *mongodb.MongodbSim //测试环境链接
  16. MgoR *mongodb.MongodbSim
  17. saveSize = 50
  18. Es *elastic.Elastic
  19. EsNew *elastic.Elastic
  20. //EsT *elastic.Elastic
  21. // 更新mongo
  22. updatePool = make(chan []map[string]interface{}, 5000)
  23. updateSp = make(chan bool, 5)
  24. //更新es
  25. updateEsPool = make(chan []map[string]interface{}, 5000)
  26. updateEsSp = make(chan bool, 5) //保存协程
  27. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  28. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  29. )
  30. func Init() {
  31. MgoB = &mongodb.MongodbSim{
  32. MongodbAddr: "172.17.189.140:27080",
  33. //MongodbAddr: "127.0.0.1:27083",
  34. DbName: "qfw",
  35. Size: 10,
  36. UserName: "SJZY_RWbid_ES",
  37. Password: "SJZY@B4i4D5e6S",
  38. //Direct: true,
  39. }
  40. MgoB.InitPool()
  41. //mongodb 163
  42. //Mgo = &mongodb.MongodbSim{
  43. // //MongodbAddr: "172.17.189.140:27080",
  44. // MongodbAddr: "127.0.0.1:27083",
  45. // DbName: "qfw",
  46. // Size: 10,
  47. // UserName: "SJZY_RWbid_ES",
  48. // Password: "SJZY@B4i4D5e6S",
  49. // Direct: true,
  50. //}
  51. //Mgo.InitPool()
  52. //85
  53. //MgoR = &mongodb.MongodbSim{
  54. // //MongodbAddr: "127.0.0.1:27080",
  55. // MongodbAddr: "172.17.4.85:27080",
  56. // DbName: "qfw",
  57. // Size: 10,
  58. // //Direct: true,
  59. //}
  60. //MgoR.InitPool()
  61. //测试环境MongoDB
  62. //MgoT = &mongodb.MongodbSim{
  63. // //MongodbAddr: "172.17.189.140:27080",
  64. // MongodbAddr: "192.168.3.206:27002",
  65. // DbName: "qfw_data",
  66. // Size: 10,
  67. // UserName: "root",
  68. // Password: "root",
  69. // //Direct: true,
  70. //}
  71. //MgoT.InitPool()
  72. ////测试环境es
  73. //Es = &elastic.Elastic{
  74. // S_esurl: "http://192.168.3.149:9201",
  75. // //S_esurl: "http://172.17.4.184:19805",
  76. // I_size: 5,
  77. // Username: "",
  78. // Password: "",
  79. //}
  80. //Es.InitElasticSize()
  81. //es
  82. Es = &elastic.Elastic{
  83. //S_esurl: "http://127.0.0.1:19908",
  84. S_esurl: "http://172.17.4.184:19908",
  85. I_size: 5,
  86. Username: "jybid",
  87. Password: "Top2023_JEB01i@31",
  88. }
  89. Es.InitElasticSize()
  90. //es 新集群
  91. EsNew = &elastic.Elastic{
  92. //S_esurl: "http://127.0.0.1:19905",
  93. S_esurl: "http://172.17.4.184:19905",
  94. I_size: 5,
  95. Username: "jybid",
  96. Password: "Top2023_JEB01i@31",
  97. }
  98. EsNew.InitElasticSize()
  99. }
  100. func main() {
  101. Init()
  102. InitEsBiddingField()
  103. go updateMethod() //更新mongodb
  104. go updateEsMethod() //更新es
  105. //go updateProjectEsMethod()
  106. //taskRunProject()
  107. //taskRunBidding()
  108. dealBidding() //正式环境bidding数据处理
  109. //dealBiddingTest() // 测试环境数据处理
  110. //updateProject()
  111. fmt.Println("over")
  112. c := make(chan bool, 1)
  113. <-c
  114. }
  115. func InitEsBiddingField() {
  116. now := time.Now()
  117. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  118. if len(*info) > 0 {
  119. for _, m := range *info {
  120. if util.IntAll(m["level"]) == 1 {
  121. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  122. } else if util.IntAll(m["level"]) == 2 {
  123. pfield := util.ObjToString(m["pfield"])
  124. pfieldMap := BiddingLevelField[pfield]
  125. if pfieldMap == nil {
  126. pfieldMap = make(map[string]string, 0)
  127. }
  128. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  129. BiddingLevelField[pfield] = pfieldMap
  130. }
  131. }
  132. }
  133. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  134. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  135. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  136. }
  137. // taskRun 更新es 省市区三个字段
  138. func taskRunBidding() {
  139. defer util.Catch()
  140. sess := Mgo.GetMgoConn()
  141. defer Mgo.DestoryMongoConn(sess)
  142. //查询条件
  143. //q := map[string]interface{}{
  144. // //"_id": map[string]interface{}{
  145. // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
  146. // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  147. // //
  148. // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
  149. // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
  150. // //},
  151. // //"comeintime": map[string]interface{}{
  152. // // "$gt": 1669824000,
  153. // // //"$lte": 1669864950,
  154. // // "$lte": 1702265941,
  155. // //},
  156. // //"site": "国家能源e购",
  157. // "toptype": map[string]interface{}{"$exists": 0},
  158. //}
  159. //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  160. it := sess.DB("qfw").C("zktest_bidding_0706").Find(nil).Select(nil).Sort("_id").Iter()
  161. fmt.Println("taskRun 开始")
  162. count := 0
  163. //realNum := 0
  164. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  165. if count%1000 == 0 {
  166. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  167. }
  168. update := map[string]interface{}{}
  169. // 1.更新省市区
  170. //if area, ok := tmp["area"]; ok && area != nil {
  171. // update["area"] = area
  172. //} else {
  173. // update["area"] = ""
  174. //}
  175. //
  176. //if city, ok := tmp["city"]; ok && city != nil {
  177. // update["city"] = city
  178. //} else {
  179. // update["city"] = ""
  180. //}
  181. //
  182. //if district, ok := tmp["district"]; ok && district != nil {
  183. // update["district"] = district
  184. //} else {
  185. // update["district"] = ""
  186. //}
  187. // 2.更新中标单位
  188. biddingID := util.ObjToString(tmp["id"])
  189. //biddingID := mongodb.BsonIdToSId(tmp["_id"])
  190. update["winner"] = ""
  191. update["s_winner"] = ""
  192. if len(update) > 0 {
  193. //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  194. //2.es 项目 更新字段
  195. err := Es.UpdateDocument("bidding", biddingID, update)
  196. err = EsNew.UpdateDocument("bidding", biddingID, update)
  197. if err != nil && err.Error() != "Document not updated: noop" {
  198. log.Info("bidding es update err", err, biddingID)
  199. }
  200. }
  201. }
  202. log.Info("Run Over...Count:", log.Int("count", count))
  203. }
  204. // taskRunProject 更新项目表 省市区
  205. func taskRunProject() {
  206. defer util.Catch()
  207. sess := Mgo.GetMgoConn()
  208. defer Mgo.DestoryMongoConn(sess)
  209. // 项目数据
  210. MgoP := &mongodb.MongodbSim{
  211. MongodbAddr: "172.17.4.85:27080",
  212. //MongodbAddr: "127.0.0.1:27080",
  213. Size: 10,
  214. DbName: "qfw",
  215. //Direct: true,
  216. }
  217. MgoP.InitPool()
  218. selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
  219. it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
  220. fmt.Println("taskRun 开始")
  221. count := 0
  222. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  223. if count%10000 == 0 {
  224. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  225. }
  226. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  227. where := map[string]interface{}{
  228. "ids": biddingID,
  229. }
  230. // 找到对应项目数据
  231. p, _ := MgoP.FindOne("projectset_20230904", where)
  232. projectId := mongodb.BsonIdToSId((*p)["_id"])
  233. //1.更新MongoDB
  234. update := map[string]interface{}{}
  235. if area, ok := tmp["area"]; ok && area != nil {
  236. update["area"] = area
  237. } else {
  238. update["area"] = ""
  239. }
  240. if city, ok := tmp["city"]; ok && city != nil {
  241. update["city"] = city
  242. } else {
  243. update["city"] = ""
  244. }
  245. if district, ok := tmp["district"]; ok && district != nil {
  246. update["district"] = district
  247. } else {
  248. update["district"] = ""
  249. }
  250. if len(update) > 0 {
  251. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  252. //2.es 项目 更新字段
  253. err := Es.UpdateDocument("projectset", projectId, update)
  254. if err != nil {
  255. log.Info("es update err", err, projectId)
  256. }
  257. }
  258. //2.es 项目 更新字段
  259. //if len(update) > 0 {
  260. // // 更新es
  261. // //updateEsPool <- []map[string]interface{}{
  262. // // {"_id": projectId},
  263. // // update,
  264. // //}
  265. //}
  266. }
  267. log.Info("Run Over...Count:", log.Int("count", count))
  268. }
  269. // dealData 正式环境,同步合同期限
  270. func dealData() {
  271. defer util.Catch()
  272. sess := Mgo.GetMgoConn()
  273. defer Mgo.DestoryMongoConn(sess)
  274. //where := map[string]interface{}{
  275. // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
  276. //}
  277. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  278. it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
  279. count := 0
  280. realNum := 0
  281. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  282. if count%1000 == 0 {
  283. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  284. }
  285. idStr := mongodb.BsonIdToSId(tmp["_id"])
  286. update := make(map[string]interface{})
  287. if tmp["signaturedate"] != nil {
  288. update["signaturedate"] = tmp["signaturedate"]
  289. }
  290. if tmp["contractperiod"] != nil {
  291. update["contractperiod"] = tmp["contractperiod"]
  292. }
  293. if tmp["expiredate"] != nil {
  294. update["expiredate"] = tmp["expiredate"]
  295. }
  296. if len(update) == 0 {
  297. continue
  298. }
  299. //bidding 表
  300. if idStr > "5a862e7040d2d9bbe88e3b1f" {
  301. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  302. data := *bidding
  303. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  304. // 针对存量数据,重复数据不进索引
  305. if util.IntAll(data["extracttype"]) == -1 {
  306. tmp = make(map[string]interface{})
  307. continue
  308. }
  309. } else {
  310. //bidding_back
  311. bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
  312. data := *bidding
  313. Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
  314. // 针对存量数据,重复数据不进索引
  315. if util.IntAll(data["extracttype"]) == -1 {
  316. tmp = make(map[string]interface{})
  317. continue
  318. }
  319. }
  320. realNum++
  321. //2.es 更新字段
  322. esUpdate := update
  323. esUpdate["id"] = idStr
  324. if len(esUpdate) > 0 {
  325. // 更新es
  326. updateEsPool <- []map[string]interface{}{
  327. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  328. esUpdate,
  329. }
  330. }
  331. tmp = make(map[string]interface{})
  332. }
  333. log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
  334. }
  335. // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
  336. func dealResult() {
  337. defer util.Catch()
  338. sess := MgoR.GetMgoConn()
  339. defer MgoR.DestoryMongoConn(sess)
  340. where := map[string]interface{}{
  341. "_id": map[string]interface{}{
  342. "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
  343. "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
  344. },
  345. "subtype": "合同",
  346. }
  347. selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
  348. it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
  349. count := 0
  350. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  351. if count%1000 == 0 {
  352. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  353. }
  354. idStr := mongodb.BsonIdToSId(tmp["_id"])
  355. update := make(map[string]interface{})
  356. if tmp["signaturedate"] != nil {
  357. update["signaturedate"] = tmp["signaturedate"]
  358. }
  359. if tmp["contractperiod"] != nil {
  360. update["contractperiod"] = tmp["contractperiod"]
  361. }
  362. if tmp["expiredate"] != nil {
  363. update["expiredate"] = tmp["expiredate"]
  364. }
  365. if len(update) == 0 {
  366. continue
  367. }
  368. bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
  369. data := *bidding
  370. Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
  371. // 针对存量数据,重复数据不进索引
  372. if util.IntAll(data["extracttype"]) == -1 {
  373. tmp = make(map[string]interface{})
  374. continue
  375. }
  376. //2.es 更新字段
  377. esUpdate := update
  378. esUpdate["id"] = idStr
  379. if len(esUpdate) > 0 {
  380. // 更新es
  381. updateEsPool <- []map[string]interface{}{
  382. {"_id": mongodb.BsonIdToSId(tmp["_id"])},
  383. esUpdate,
  384. }
  385. }
  386. tmp = make(map[string]interface{})
  387. }
  388. log.Info("Run Over...Count:", log.Int("count", count))
  389. }
  390. // dealBidding 处理bidding数据
  391. func dealBidding() {
  392. defer util.Catch()
  393. sess := MgoB.GetMgoConn()
  394. defer MgoB.DestoryMongoConn(sess)
  395. //where := map[string]interface{}{
  396. // "title": "2020年12月采购意向项目-3",
  397. //}
  398. it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
  399. fmt.Println("taskRun 开始")
  400. count := 0
  401. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  402. if count%10000 == 0 {
  403. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  404. }
  405. update := map[string]interface{}{}
  406. esUpdate := map[string]interface{}{}
  407. // 2.更新中标单位,中标金额
  408. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  409. // update["tag_topinformation"] = tag_topinformation
  410. //}
  411. //
  412. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  413. // update["property_form"] = property_form
  414. //}
  415. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  416. //fmt.Println(biddingID)
  417. /**
  418. "s_subscopeclass" : "其它",
  419. "s_topscopeclass" : "其它",
  420. "subscopeclass" : [
  421. "其它"
  422. ],
  423. "topscopeclass" : [
  424. "其它"
  425. ],
  426. */
  427. // 行业分类默认值
  428. if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
  429. update["topscopeclass"] = []string{"其它"}
  430. update["s_topscopeclass"] = "其它"
  431. esUpdate["topscopeclass"] = []string{"其它"}
  432. esUpdate["s_topscopeclass"] = "其它"
  433. }
  434. if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
  435. update["subscopeclass"] = []string{"其它"}
  436. update["s_subscopeclass"] = "其它"
  437. esUpdate["subscopeclass"] = []string{"其它"}
  438. esUpdate["s_subscopeclass"] = "其它"
  439. }
  440. //procurementlist 处理预计采购时间
  441. if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  442. field := "procurementlist"
  443. if tmp[field] != nil {
  444. if field == "procurementlist" {
  445. if tmp["procurementlist"] != nil {
  446. var arr []interface{}
  447. plist := tmp["procurementlist"].([]interface{})
  448. for _, p := range plist {
  449. p1 := p.(map[string]interface{})
  450. p2 := make(map[string]interface{})
  451. for k, v := range BiddingLevelField[field] {
  452. if k == "projectname" && util.ObjToString(p1[k]) == "" {
  453. p2[k] = util.ObjToString(tmp["projectname"])
  454. } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  455. p2[k] = util.ObjToString(tmp["buyer"])
  456. } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  457. res := getMethod(util.ObjToString(p1[k]))
  458. if res != 0 {
  459. p2[k] = res
  460. }
  461. } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  462. p2[k] = p1[k]
  463. }
  464. }
  465. arr = append(arr, p2)
  466. }
  467. if len(arr) > 0 {
  468. esUpdate[field] = arr
  469. }
  470. }
  471. }
  472. }
  473. }
  474. if len(update) > 0 {
  475. //fmt.Println("aaaaa", biddingID)
  476. //更新mongo
  477. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  478. //更新MongoDB
  479. updatePool <- []map[string]interface{}{
  480. {"_id": tmp["_id"]},
  481. {"$set": update},
  482. }
  483. //2.es 项目 更新字段
  484. //err := Es.UpdateDocument("bidding", biddingID, update)
  485. //if err != nil && err.Error() != "Document not updated: noop" {
  486. // log.Info("bidding es update err", err, biddingID)
  487. //}
  488. //// 更新es
  489. //updateEsPool <- []map[string]interface{}{
  490. // {"_id": biddingID},
  491. // update,
  492. //}
  493. }
  494. // 更新Es 数据
  495. if len(esUpdate) > 0 {
  496. // 更新es
  497. updateEsPool <- []map[string]interface{}{
  498. {"_id": biddingID},
  499. esUpdate,
  500. }
  501. }
  502. }
  503. log.Info("Run Over...Count:", log.Int("count", count))
  504. }
  505. // dealBiddingTest 处理测试环境数据
  506. func dealBiddingTest() {
  507. defer util.Catch()
  508. sess := MgoT.GetMgoConn()
  509. defer MgoT.DestoryMongoConn(sess)
  510. it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter()
  511. fmt.Println("taskRun 开始")
  512. count := 0
  513. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  514. if count%10000 == 0 {
  515. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  516. }
  517. update := map[string]interface{}{}
  518. // 2.更新中标单位,中标金额
  519. //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
  520. // update["tag_topinformation"] = tag_topinformation
  521. //}
  522. //
  523. //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
  524. // update["property_form"] = property_form
  525. //}
  526. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  527. /**
  528. "s_subscopeclass" : "其它",
  529. "s_topscopeclass" : "其它",
  530. "subscopeclass" : [
  531. "其它"
  532. ],
  533. "topscopeclass" : [
  534. "其它"
  535. ],
  536. */
  537. // 行业分类默认值
  538. if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
  539. update["topscopeclass"] = []string{"其它"}
  540. update["s_topscopeclass"] = "其它"
  541. }
  542. if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
  543. update["subscopeclass"] = []string{"其它"}
  544. update["s_subscopeclass"] = "其它"
  545. }
  546. if util.ObjToString(tmp["s_topscopeclass"]) == "其它" {
  547. update["topscopeclass"] = []string{"其它"}
  548. update["s_topscopeclass"] = "其它"
  549. }
  550. if util.ObjToString(tmp["s_subscopeclass"]) == "其它" {
  551. update["subscopeclass"] = []string{"其它"}
  552. update["s_subscopeclass"] = "其它"
  553. }
  554. //procurementlist 处理预计采购时间
  555. if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
  556. for field, _ := range BiddingField {
  557. if tmp[field] != nil {
  558. if field == "procurementlist" {
  559. if tmp["procurementlist"] != nil {
  560. var arr []interface{}
  561. plist := tmp["procurementlist"].([]interface{})
  562. for _, p := range plist {
  563. p1 := p.(map[string]interface{})
  564. p2 := make(map[string]interface{})
  565. for k, v := range BiddingLevelField[field] {
  566. if k == "projectname" && util.ObjToString(p1[k]) == "" {
  567. p2[k] = util.ObjToString(tmp["projectname"])
  568. } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
  569. p2[k] = util.ObjToString(tmp["buyer"])
  570. } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
  571. res := getMethod(util.ObjToString(p1[k]))
  572. if res != 0 {
  573. p2[k] = res
  574. }
  575. } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
  576. p2[k] = p1[k]
  577. }
  578. }
  579. arr = append(arr, p2)
  580. }
  581. if len(arr) > 0 {
  582. update[field] = arr
  583. }
  584. }
  585. }
  586. }
  587. }
  588. }
  589. if len(update) > 0 {
  590. fmt.Println("aaaaa", biddingID)
  591. //更新mongo
  592. //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
  593. //更新MongoDB
  594. //updatePool <- []map[string]interface{}{
  595. // {"_id": tmp["_id"]},
  596. // {"$set": update},
  597. //}
  598. //2.es 项目 更新字段
  599. //err := Es.UpdateDocument("bidding", biddingID, update)
  600. //if err != nil && err.Error() != "Document not updated: noop" {
  601. // log.Info("bidding es update err", err, biddingID)
  602. //}
  603. // 更新es
  604. //updateEsPool <- []map[string]interface{}{
  605. // {"_id": biddingID},
  606. // update,
  607. //}
  608. }
  609. }
  610. log.Info("Run Over...Count:", log.Int("count", count))
  611. }
  612. // updateMethod 更新MongoDB
  613. func updateMethod() {
  614. arru := make([][]map[string]interface{}, saveSize)
  615. indexu := 0
  616. for {
  617. select {
  618. case v := <-updatePool:
  619. arru[indexu] = v
  620. indexu++
  621. if indexu == saveSize {
  622. updateSp <- true
  623. go func(arru [][]map[string]interface{}) {
  624. defer func() {
  625. <-updateSp
  626. }()
  627. MgoB.UpdateBulk("bidding", arru...)
  628. }(arru)
  629. arru = make([][]map[string]interface{}, saveSize)
  630. indexu = 0
  631. }
  632. case <-time.After(1000 * time.Millisecond):
  633. if indexu > 0 {
  634. updateSp <- true
  635. go func(arru [][]map[string]interface{}) {
  636. defer func() {
  637. <-updateSp
  638. }()
  639. MgoB.UpdateBulk("bidding", arru...)
  640. }(arru[:indexu])
  641. arru = make([][]map[string]interface{}, saveSize)
  642. indexu = 0
  643. }
  644. }
  645. }
  646. }
  647. // updateEsMethod 更新es
  648. func updateEsMethod() {
  649. arru := make([][]map[string]interface{}, 200)
  650. indexu := 0
  651. for {
  652. select {
  653. case v := <-updateEsPool:
  654. arru[indexu] = v
  655. indexu++
  656. if indexu == 200 {
  657. updateEsSp <- true
  658. go func(arru [][]map[string]interface{}) {
  659. defer func() {
  660. <-updateEsSp
  661. }()
  662. Es.UpdateBulk("bidding", arru...)
  663. EsNew.UpdateBulk("bidding", arru...)
  664. }(arru)
  665. arru = make([][]map[string]interface{}, 200)
  666. indexu = 0
  667. }
  668. case <-time.After(1000 * time.Millisecond):
  669. if indexu > 0 {
  670. updateEsSp <- true
  671. go func(arru [][]map[string]interface{}) {
  672. defer func() {
  673. <-updateEsSp
  674. }()
  675. Es.UpdateBulk("bidding", arru...)
  676. EsNew.UpdateBulk("bidding", arru...)
  677. }(arru[:indexu])
  678. arru = make([][]map[string]interface{}, 200)
  679. indexu = 0
  680. }
  681. }
  682. }
  683. }
  684. // updateProjectEsMethod 更新项目索引
  685. func updateProjectEsMethod() {
  686. arru := make([][]map[string]interface{}, 200)
  687. indexu := 0
  688. for {
  689. select {
  690. case v := <-updateEsPool:
  691. arru[indexu] = v
  692. indexu++
  693. if indexu == 200 {
  694. updateEsSp <- true
  695. go func(arru [][]map[string]interface{}) {
  696. defer func() {
  697. <-updateEsSp
  698. }()
  699. Es.UpdateBulk("projectset", arru...)
  700. }(arru)
  701. arru = make([][]map[string]interface{}, 200)
  702. indexu = 0
  703. }
  704. case <-time.After(1000 * time.Millisecond):
  705. if indexu > 0 {
  706. updateEsSp <- true
  707. go func(arru [][]map[string]interface{}) {
  708. defer func() {
  709. <-updateEsSp
  710. }()
  711. Es.UpdateBulk("projectset", arru...)
  712. }(arru[:indexu])
  713. arru = make([][]map[string]interface{}, 200)
  714. indexu = 0
  715. }
  716. }
  717. }
  718. }