project.go 16 KB


  1. package main
  2. import (
  3. "context"
  4. "data_tidb/config"
  5. "encoding/json"
  6. "fmt"
  7. es "github.com/olivere/elastic/v7"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.uber.org/zap"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. func taskR() {
  19. client := Es.GetEsConn()
  20. defer Es.DestoryEsConn(client)
  21. wg := &sync.WaitGroup{}
  22. query := es.NewBoolQuery().
  23. //Must(es.NewTermsQuery("id", "64e7685255d5406905c94a64"))
  24. Must(es.NewRangeQuery("comeintime").Gt(Pici))
  25. //Must(es.NewExistsQuery("yuceendtime"))
  26. fsc := es.NewFetchSourceContext(true).Include("buyer", "buyertel", "buyerperson", "agency", "agencytel",
  27. "agencyperson", "s_winner", "winnertel", "winnerperson", "id", "pici")
  28. util.Debug(Es.Count("bidding", query))
  29. countDocs := 0
  30. res, err := client.Scroll().Index("bidding").Query(query).Sort("comeintime", true).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
  31. if err == nil {
  32. taskInfoA(res, wg, &countDocs)
  33. scrollId := res.ScrollId
  34. for {
  35. searchResult, err := client.Scroll("1m").Index("bidding").Sort("comeintime", true).ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
  36. if err != nil {
  37. util.Debug("Es Search Data Error:", err.Error())
  38. break
  39. }
  40. taskInfoA(searchResult, wg, &countDocs)
  41. scrollId = searchResult.ScrollId
  42. }
  43. wg.Wait()
  44. util.Debug("over---", countDocs, Pici)
  45. _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
  46. } else {
  47. util.Debug(err)
  48. }
  49. }
  50. func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
  51. ch := make(chan bool, 2)
  52. for _, hit := range searchResult.Hits.Hits {
  53. //开始处理数据
  54. wg.Add(1)
  55. ch <- true
  56. go func(tmpHit *es.SearchHit) {
  57. defer func() {
  58. <-ch
  59. wg.Done()
  60. }()
  61. tmp := make(map[string]interface{})
  62. if json.Unmarshal(tmpHit.Source, &tmp) == nil {
  63. if t := util.Int64All(tmp["pici"]); t > Pici {
  64. piciLock.Lock()
  65. Pici = t
  66. piciLock.Unlock()
  67. }
  68. taskRelation(tmp)
  69. }
  70. }(hit)
  71. *countDocs += 1
  72. if *countDocs%10000 == 0 {
  73. util.Debug("Current:", *countDocs)
  74. }
  75. }
  76. }
  77. func taskPAdd() {
  78. sess := MongoP.GetMgoConn()
  79. defer MongoP.DestoryMongoConn(sess)
  80. ch := make(chan bool, 20)
  81. wg := &sync.WaitGroup{}
  82. q := bson.M{"pici": bson.M{"$gt": Pici}}
  83. query := sess.DB(config.Conf.DB.MongoB.Dbname).C("projectset_20230904").Find(q).Iter()
  84. count := 0
  85. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  86. if count%20000 == 0 {
  87. log.Info(fmt.Sprintf("current --- %d", count))
  88. }
  89. if t := util.Int64All(tmp["pici"]); t > Pici {
  90. Pici = t
  91. }
  92. ch <- true
  93. wg.Add(1)
  94. go func(tmp map[string]interface{}) {
  95. defer func() {
  96. <-ch
  97. wg.Done()
  98. }()
  99. //taskPro(tmp)
  100. //taskBusiness(tmp)
  101. //taskProTag(tmp)
  102. taskRelation2(tmp)
  103. }(tmp)
  104. tmp = make(map[string]interface{})
  105. }
  106. wg.Wait()
  107. log.Info(fmt.Sprintf("over --- %d", count))
  108. }
  109. var BidStatus = map[string]int{
  110. "预告": 0,
  111. "拟建": 1,
  112. "招标": 2,
  113. "中标": 3,
  114. "成交": 4,
  115. "废标": 5,
  116. "流标": 6,
  117. "合同": 7,
  118. "其它": 8,
  119. }
  120. var BidType = map[string]int{
  121. "招标": 0,
  122. "邀标": 1,
  123. "单一": 2,
  124. "竞价": 3,
  125. "竞谈": 4,
  126. "询价": 5,
  127. }
  128. // @Description 基础信息
  129. // @Author J 2022/9/22 18:32
  130. func taskPro(tmp map[string]interface{}) {
  131. saveM := make(map[string]interface{})
  132. for _, f := range ProField {
  133. if f == "projectid" {
  134. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  135. } else if f == "area_code" {
  136. if tmp["area"] != nil {
  137. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  138. }
  139. } else if f == "city_code" {
  140. if tmp["area"] != nil && tmp["city"] != nil {
  141. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  142. saveM[f] = AreaCode[c]
  143. }
  144. } else if f == "district_code" {
  145. if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
  146. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
  147. saveM[f] = AreaCode[c]
  148. }
  149. } else if f == "updatetime" {
  150. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  151. } else if f == "buyerclass_code" {
  152. if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
  153. saveM[f] = BuyerCode[obj]
  154. }
  155. } else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
  156. if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
  157. t := util.Int64All(tmp[f])
  158. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  159. }
  160. } else if f == "bidstatus" {
  161. if b := util.ObjToString(tmp[f]); b != "" {
  162. tmp[f] = BidStatus[b]
  163. }
  164. } else if f == "bidtype" {
  165. if b := util.ObjToString(tmp[f]); b != "" {
  166. tmp[f] = BidType[b]
  167. }
  168. } else if f == "multipackage" {
  169. if tmp[f] == nil {
  170. saveM[f] = 0
  171. } else {
  172. saveM[f] = tmp[f]
  173. }
  174. } else if f == "buyer_id" {
  175. if b := util.ObjToString(tmp["buyer"]); b != "" {
  176. if code := redis.GetStr("qyxy_id", b); code != "" {
  177. saveM[f] = code
  178. }
  179. }
  180. } else if f == "agency_id" {
  181. if b := util.ObjToString(tmp["agency"]); b != "" {
  182. if code := redis.GetStr("qyxy_id", b); code != "" {
  183. saveM[f] = code
  184. }
  185. }
  186. } else {
  187. if tmp[f] != nil {
  188. if ProVMap[f] != nil {
  189. saveM[f], _ = verifyF(f, tmp[f], ProVMap[f])
  190. } else {
  191. saveM[f] = tmp[f]
  192. }
  193. }
  194. }
  195. }
  196. saveProPool <- saveM
  197. }
  198. // @Description 项目业务表
  199. // @Author J 2022/9/30 13:40
  200. func taskBusiness(tmp map[string]interface{}) {
  201. warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  202. if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
  203. warr = append(warr, util.ObjToString(tmp["winner"]))
  204. }
  205. for _, s := range warr {
  206. saveM := make(map[string]interface{})
  207. for _, f := range ProBusField {
  208. if f == "projectid" {
  209. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  210. } else if f == "area_code" {
  211. if tmp["area"] != nil {
  212. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  213. }
  214. } else if f == "city_code" {
  215. if tmp["area"] != nil && tmp["city"] != nil {
  216. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  217. saveM[f] = AreaCode[c]
  218. }
  219. } else if f == "district_code" {
  220. if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
  221. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
  222. saveM[f] = AreaCode[c]
  223. }
  224. } else if f == "updatetime" {
  225. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  226. } else if f == "buyerclass_code" {
  227. if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
  228. saveM[f] = BuyerCode[obj]
  229. }
  230. } else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
  231. if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
  232. t := util.Int64All(tmp[f])
  233. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  234. }
  235. } else if f == "bidstatus" {
  236. if b := util.ObjToString(tmp[f]); b != "" {
  237. tmp[f] = BidStatus[b]
  238. }
  239. } else if f == "bidtype" {
  240. if b := util.ObjToString(tmp[f]); b != "" {
  241. tmp[f] = BidType[b]
  242. }
  243. } else if f == "buyer_id" {
  244. if b := util.ObjToString(tmp["buyer"]); b != "" {
  245. saveM["buyer"] = b
  246. if code := redis.GetStr("qyxy_id", b); code != "" {
  247. saveM[f] = code
  248. }
  249. }
  250. } else if f == "agency_id" {
  251. if b := util.ObjToString(tmp["agency"]); b != "" {
  252. saveM["agency"] = b
  253. if code := redis.GetStr("qyxy_id", b); code != "" {
  254. saveM[f] = code
  255. }
  256. }
  257. } else if f == "winner_id" {
  258. if s != "" {
  259. saveM["winner"] = s
  260. if code := redis.GetStr("qyxy_id", s); code != "" {
  261. saveM[f] = code
  262. }
  263. }
  264. } else {
  265. if tmp[f] != nil {
  266. if ProBusVMap[f] != nil {
  267. saveM[f], _ = verifyF(f, tmp[f], ProBusVMap[f])
  268. } else {
  269. saveM[f] = tmp[f]
  270. }
  271. }
  272. }
  273. }
  274. saveProbPool <- saveM
  275. }
  276. }
  277. // @Description 项目信息标签
  278. // @Author J 2022/9/30 13:54
  279. func taskProTag(tmp map[string]interface{}) {
  280. id := mongodb.BsonIdToSId(tmp["_id"])
  281. if topArr, ok := tmp["topscopeclass"].([]interface{}); ok {
  282. for _, i2 := range topArr {
  283. tclass := regLetter.ReplaceAllString(util.ObjToString(i2), "") // 去除字母
  284. code := TopScopeCode[tclass]
  285. saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
  286. }
  287. }
  288. if subArr, ok := tmp["subscopeclass"].([]interface{}); ok {
  289. for _, i2 := range subArr {
  290. sc := strings.Split(util.ObjToString(i2), "_")
  291. code := SubScopeCode[sc[1]]
  292. saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
  293. }
  294. }
  295. }
  296. // @Description 关系表
  297. // @Author J 2022/9/30 13:56
  298. func taskRelation(tmp map[string]interface{}) {
  299. id := util.ObjToString(tmp["id"])
  300. pid := ""
  301. if str := redis.GetStr("project_ids", id); str == "" {
  302. info, _ := MongoP.FindOneByField("projectset_20230904", bson.M{"ids": id}, bson.M{"ids": 1})
  303. if len(*info) > 0 {
  304. pid = mongodb.BsonIdToSId((*info)["_id"])
  305. for _, s := range util.ObjArrToStringArr((*info)["ids"].([]interface{})) {
  306. redis.PutCKV("project_ids", s, mongodb.BsonIdToSId((*info)["_id"]))
  307. }
  308. } else {
  309. //log.Info("taskRelation pid err", zap.Any("id", id))
  310. //return
  311. }
  312. } else {
  313. pid = str
  314. }
  315. if b := util.ObjToString(tmp["buyer"]); b != "" {
  316. saveM := make(map[string]interface{})
  317. saveM["projectid"] = pid
  318. saveM["infoid"] = id
  319. saveM["identity_type"] = 1
  320. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  321. if code := redis.GetStr("qyxy_id", b); code != "" {
  322. saveM["name_id"] = code
  323. if util.ObjToString(tmp["buyertel"]) != "" {
  324. q := make(map[string]interface{})
  325. q["name_id"] = code
  326. q["identity_type"] = 1
  327. q["contact_tel"] = util.ObjToString(tmp["buyertel"])
  328. if util.ObjToString(tmp["buyerperson"]) != "" {
  329. q["contact_name"] = util.ObjToString(tmp["buyerperson"])
  330. }
  331. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  332. if cinfo != nil && len(*cinfo) > 0 {
  333. saveM["contact_id"] = (*cinfo)["id"]
  334. saveRelationPool <- saveM
  335. }
  336. }
  337. }
  338. }
  339. if a := util.ObjToString(tmp["agency"]); a != "" {
  340. saveM := make(map[string]interface{})
  341. saveM["projectid"] = pid
  342. saveM["infoid"] = id
  343. saveM["identity_type"] = 4
  344. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  345. if code := redis.GetStr("qyxy_id", a); code != "" {
  346. saveM["name_id"] = code
  347. if util.ObjToString(tmp["agencytel"]) != "" {
  348. q := make(map[string]interface{})
  349. q["name_id"] = code
  350. q["identity_type"] = 4 // 100
  351. q["contact_tel"] = util.ObjToString(tmp["agencytel"])
  352. if util.ObjToString(tmp["agencyperson"]) != "" {
  353. q["contact_name"] = util.ObjToString(tmp["agencyperson"])
  354. }
  355. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  356. if cinfo != nil && len(*cinfo) > 0 {
  357. saveM["contact_id"] = (*cinfo)["id"]
  358. saveRelationPool <- saveM
  359. }
  360. }
  361. }
  362. }
  363. sw := util.ObjToString(tmp["s_winner"])
  364. if !strings.Contains(sw, ",") {
  365. if code := redis.GetStr("qyxy_id", sw); code != "" {
  366. saveM := make(map[string]interface{})
  367. saveM["projectid"] = pid
  368. saveM["infoid"] = id
  369. saveM["identity_type"] = 2
  370. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  371. saveM["name_id"] = code
  372. if util.ObjToString(tmp["winnertel"]) != "" {
  373. q := make(map[string]interface{})
  374. q["name_id"] = code
  375. q["identity_type"] = 2 // 010
  376. q["contact_tel"] = util.ObjToString(tmp["winnertel"])
  377. if util.ObjToString(tmp["winnerperson"]) != "" {
  378. q["contact_name"] = util.ObjToString(tmp["winnerperson"])
  379. }
  380. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  381. if cinfo != nil && len(*cinfo) > 0 {
  382. saveM["contact_id"] = (*cinfo)["id"]
  383. saveRelationPool <- saveM
  384. }
  385. }
  386. }
  387. }
  388. }
  389. func taskRelation2(tmp map[string]interface{}) {
  390. pid := mongodb.BsonIdToSId(tmp["_id"])
  391. if tmp["ids"] == nil {
  392. log.Info("taskRelation ids err", zap.Any("id", pid))
  393. return
  394. }
  395. info := MysqlTool.Find("dws_f_bpmc_relation", bson.M{"projectid": pid}, "", "", -1, -1)
  396. if len(*info) > 0 {
  397. } else {
  398. ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
  399. lid := ids[len(ids)-1]
  400. if b := util.ObjToString(tmp["buyer"]); b != "" {
  401. saveM := make(map[string]interface{})
  402. for _, f := range RelationField {
  403. if f == "projectid" {
  404. saveM[f] = pid
  405. } else if f == "infoid" {
  406. saveM[f] = lid
  407. } else if f == "name_id" {
  408. if code := redis.GetStr("qyxy_id", b); code != "" {
  409. saveM[f] = code
  410. if util.ObjToString(tmp["buyertel"]) != "" {
  411. q := make(map[string]interface{})
  412. q["name_id"] = code
  413. q["identity_type"] = 1
  414. q["contact_tel"] = util.ObjToString(tmp["buyertel"])
  415. if util.ObjToString(tmp["buyerperson"]) != "" {
  416. q["contact_name"] = util.ObjToString(tmp["buyerperson"])
  417. }
  418. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  419. if cinfo != nil && len(*cinfo) > 0 {
  420. saveM["contact_id"] = (*cinfo)["id"]
  421. }
  422. }
  423. }
  424. } else if f == "identity_type" {
  425. saveM[f] = 1 // 001
  426. } else if f == "createtime" {
  427. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  428. }
  429. }
  430. saveRelationPool <- saveM
  431. }
  432. if a := util.ObjToString(tmp["agency"]); a != "" {
  433. saveM := make(map[string]interface{})
  434. for _, f := range RelationField {
  435. if f == "projectid" {
  436. saveM[f] = pid
  437. } else if f == "infoid" {
  438. saveM[f] = lid
  439. } else if f == "name_id" {
  440. if code := redis.GetStr("qyxy_id", a); code != "" {
  441. saveM[f] = code
  442. if util.ObjToString(tmp["buyertel"]) != "" {
  443. q := make(map[string]interface{})
  444. q["name_id"] = code
  445. q["identity_type"] = 4
  446. q["contact_tel"] = util.ObjToString(tmp["agencytel"])
  447. if util.ObjToString(tmp["agencyperson"]) != "" {
  448. q["contact_name"] = util.ObjToString(tmp["agencyperson"])
  449. }
  450. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  451. if cinfo != nil && len(*cinfo) > 0 {
  452. saveM["contact_id"] = (*cinfo)["id"]
  453. }
  454. }
  455. }
  456. } else if f == "identity_type" {
  457. saveM[f] = 4 // 100
  458. } else if f == "createtime" {
  459. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  460. }
  461. }
  462. saveRelationPool <- saveM
  463. }
  464. warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  465. if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
  466. warr = append(warr, util.ObjToString(tmp["winner"]))
  467. }
  468. for _, ws := range warr {
  469. saveM := make(map[string]interface{})
  470. for _, f := range RelationField {
  471. if f == "projectid" {
  472. saveM[f] = pid
  473. } else if f == "infoid" {
  474. saveM[f] = lid
  475. } else if f == "name_id" {
  476. if code := redis.GetStr("qyxy_id", ws); code != "" {
  477. saveM[f] = code
  478. if util.ObjToString(tmp["buyertel"]) != "" {
  479. q := make(map[string]interface{})
  480. q["name_id"] = code
  481. q["identity_type"] = 2
  482. q["contact_tel"] = util.ObjToString(tmp["winnertel"])
  483. if util.ObjToString(tmp["winnerperson"]) != "" {
  484. q["contact_name"] = util.ObjToString(tmp["winnerperson"])
  485. }
  486. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  487. if cinfo != nil && len(*cinfo) > 0 {
  488. saveM["contact_id"] = (*cinfo)["id"]
  489. }
  490. }
  491. }
  492. } else if f == "identity_type" {
  493. saveM[f] = 2 // 010
  494. } else if f == "createtime" {
  495. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  496. }
  497. }
  498. saveRelationPool <- saveM
  499. }
  500. }
  501. }
  502. func taskRedisPid() {
  503. sess := MongoP.GetMgoConn()
  504. defer MongoP.DestoryMongoConn(sess)
  505. ch := make(chan bool, 5)
  506. wg := &sync.WaitGroup{}
  507. f := bson.M{"ids": 1, "id": 1}
  508. query := sess.DB(config.Conf.DB.MongoB.Dbname).C("projectset_20230904").Find(nil).Select(f).Iter()
  509. count := 0
  510. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  511. if count%20000 == 0 {
  512. log.Info(fmt.Sprintf("current --- %d", count))
  513. }
  514. if t := util.Int64All(tmp["pici"]); t > Pici {
  515. Pici = t
  516. }
  517. ch <- true
  518. wg.Add(1)
  519. go func(tmp map[string]interface{}) {
  520. defer func() {
  521. <-ch
  522. wg.Done()
  523. }()
  524. pid := mongodb.BsonIdToSId(tmp["_id"])
  525. for _, i := range tmp["ids"].([]interface{}) {
  526. redis.PutCKV("project_ids", util.ObjToString(i), pid)
  527. }
  528. }(tmp)
  529. tmp = make(map[string]interface{})
  530. }
  531. wg.Wait()
  532. log.Info(fmt.Sprintf("over --- %d", count))
  533. }