project.go 15 KB


  1. package main
  2. import (
  3. "context"
  4. "data_tidb/config"
  5. "encoding/json"
  6. "fmt"
  7. es "github.com/olivere/elastic/v7"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.uber.org/zap"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. func taskR() {
  19. client := Es.GetEsConn()
  20. defer Es.DestoryEsConn(client)
  21. wg := &sync.WaitGroup{}
  22. query := es.NewBoolQuery()
  23. //Must(es.NewTermsQuery("id", "64e7685255d5406905c94a64"))
  24. //Must(es.NewRangeQuery("comeintime").Gte(1688140800).Lte(1690444635)).
  25. //Must(es.NewExistsQuery("yuceendtime"))
  26. util.Debug(Es.Count("bidding", query))
  27. countDocs := 0
  28. res, err := client.Scroll().Index("bidding").Query(query).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
  29. if err == nil {
  30. taskInfoA(res, wg, &countDocs)
  31. scrollId := res.ScrollId
  32. for {
  33. searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
  34. if err != nil {
  35. util.Debug("Es Search Data Error:", err.Error())
  36. break
  37. }
  38. taskInfoA(searchResult, wg, &countDocs)
  39. scrollId = searchResult.ScrollId
  40. }
  41. wg.Wait()
  42. util.Debug("over---", countDocs)
  43. _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
  44. } else {
  45. util.Debug(err)
  46. }
  47. }
  48. func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
  49. ch := make(chan bool, 1)
  50. for _, hit := range searchResult.Hits.Hits {
  51. //开始处理数据
  52. wg.Add(1)
  53. ch <- true
  54. go func(tmpHit *es.SearchHit) {
  55. defer func() {
  56. <-ch
  57. wg.Done()
  58. }()
  59. tmp := make(map[string]interface{})
  60. if json.Unmarshal(tmpHit.Source, &tmp) == nil {
  61. taskRelation(tmp)
  62. }
  63. }(hit)
  64. *countDocs += 1
  65. if *countDocs%10000 == 0 {
  66. util.Debug("Current:", *countDocs)
  67. }
  68. }
  69. }
  70. func taskPAdd() {
  71. sess := MongoP.GetMgoConn()
  72. defer MongoP.DestoryMongoConn(sess)
  73. ch := make(chan bool, 20)
  74. wg := &sync.WaitGroup{}
  75. q := bson.M{"pici": bson.M{"$gt": Pici}}
  76. query := sess.DB(config.Conf.DB.MongoB.Dbname).C("projectset_20230904").Find(q).Iter()
  77. count := 0
  78. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  79. if count%20000 == 0 {
  80. log.Info(fmt.Sprintf("current --- %d", count))
  81. }
  82. if t := util.Int64All(tmp["pici"]); t > Pici {
  83. Pici = t
  84. }
  85. ch <- true
  86. wg.Add(1)
  87. go func(tmp map[string]interface{}) {
  88. defer func() {
  89. <-ch
  90. wg.Done()
  91. }()
  92. //taskPro(tmp)
  93. //taskBusiness(tmp)
  94. //taskProTag(tmp)
  95. taskRelation2(tmp)
  96. }(tmp)
  97. tmp = make(map[string]interface{})
  98. }
  99. wg.Wait()
  100. log.Info(fmt.Sprintf("over --- %d", count))
  101. }
  102. var BidStatus = map[string]int{
  103. "预告": 0,
  104. "拟建": 1,
  105. "招标": 2,
  106. "中标": 3,
  107. "成交": 4,
  108. "废标": 5,
  109. "流标": 6,
  110. "合同": 7,
  111. "其它": 8,
  112. }
  113. var BidType = map[string]int{
  114. "招标": 0,
  115. "邀标": 1,
  116. "单一": 2,
  117. "竞价": 3,
  118. "竞谈": 4,
  119. "询价": 5,
  120. }
  121. // @Description 基础信息
  122. // @Author J 2022/9/22 18:32
  123. func taskPro(tmp map[string]interface{}) {
  124. saveM := make(map[string]interface{})
  125. for _, f := range ProField {
  126. if f == "projectid" {
  127. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  128. } else if f == "area_code" {
  129. if tmp["area"] != nil {
  130. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  131. }
  132. } else if f == "city_code" {
  133. if tmp["area"] != nil && tmp["city"] != nil {
  134. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  135. saveM[f] = AreaCode[c]
  136. }
  137. } else if f == "district_code" {
  138. if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
  139. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
  140. saveM[f] = AreaCode[c]
  141. }
  142. } else if f == "updatetime" {
  143. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  144. } else if f == "buyerclass_code" {
  145. if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
  146. saveM[f] = BuyerCode[obj]
  147. }
  148. } else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
  149. if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
  150. t := util.Int64All(tmp[f])
  151. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  152. }
  153. } else if f == "bidstatus" {
  154. if b := util.ObjToString(tmp[f]); b != "" {
  155. tmp[f] = BidStatus[b]
  156. }
  157. } else if f == "bidtype" {
  158. if b := util.ObjToString(tmp[f]); b != "" {
  159. tmp[f] = BidType[b]
  160. }
  161. } else if f == "multipackage" {
  162. if tmp[f] == nil {
  163. saveM[f] = 0
  164. } else {
  165. saveM[f] = tmp[f]
  166. }
  167. } else if f == "buyer_id" {
  168. if b := util.ObjToString(tmp["buyer"]); b != "" {
  169. if code := redis.GetStr("qyxy_id", b); code != "" {
  170. saveM[f] = code
  171. }
  172. }
  173. } else if f == "agency_id" {
  174. if b := util.ObjToString(tmp["agency"]); b != "" {
  175. if code := redis.GetStr("qyxy_id", b); code != "" {
  176. saveM[f] = code
  177. }
  178. }
  179. } else {
  180. if tmp[f] != nil {
  181. if ProVMap[f] != nil {
  182. saveM[f], _ = verifyF(f, tmp[f], ProVMap[f])
  183. } else {
  184. saveM[f] = tmp[f]
  185. }
  186. }
  187. }
  188. }
  189. saveProPool <- saveM
  190. }
  191. // @Description 项目业务表
  192. // @Author J 2022/9/30 13:40
  193. func taskBusiness(tmp map[string]interface{}) {
  194. warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  195. if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
  196. warr = append(warr, util.ObjToString(tmp["winner"]))
  197. }
  198. for _, s := range warr {
  199. saveM := make(map[string]interface{})
  200. for _, f := range ProBusField {
  201. if f == "projectid" {
  202. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  203. } else if f == "area_code" {
  204. if tmp["area"] != nil {
  205. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  206. }
  207. } else if f == "city_code" {
  208. if tmp["area"] != nil && tmp["city"] != nil {
  209. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  210. saveM[f] = AreaCode[c]
  211. }
  212. } else if f == "district_code" {
  213. if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
  214. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
  215. saveM[f] = AreaCode[c]
  216. }
  217. } else if f == "updatetime" {
  218. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  219. } else if f == "buyerclass_code" {
  220. if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
  221. saveM[f] = BuyerCode[obj]
  222. }
  223. } else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
  224. if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
  225. t := util.Int64All(tmp[f])
  226. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  227. }
  228. } else if f == "bidstatus" {
  229. if b := util.ObjToString(tmp[f]); b != "" {
  230. tmp[f] = BidStatus[b]
  231. }
  232. } else if f == "bidtype" {
  233. if b := util.ObjToString(tmp[f]); b != "" {
  234. tmp[f] = BidType[b]
  235. }
  236. } else if f == "buyer_id" {
  237. if b := util.ObjToString(tmp["buyer"]); b != "" {
  238. saveM["buyer"] = b
  239. if code := redis.GetStr("qyxy_id", b); code != "" {
  240. saveM[f] = code
  241. }
  242. }
  243. } else if f == "agency_id" {
  244. if b := util.ObjToString(tmp["agency"]); b != "" {
  245. saveM["agency"] = b
  246. if code := redis.GetStr("qyxy_id", b); code != "" {
  247. saveM[f] = code
  248. }
  249. }
  250. } else if f == "winner_id" {
  251. if s != "" {
  252. saveM["winner"] = s
  253. if code := redis.GetStr("qyxy_id", s); code != "" {
  254. saveM[f] = code
  255. }
  256. }
  257. } else {
  258. if tmp[f] != nil {
  259. if ProBusVMap[f] != nil {
  260. saveM[f], _ = verifyF(f, tmp[f], ProBusVMap[f])
  261. } else {
  262. saveM[f] = tmp[f]
  263. }
  264. }
  265. }
  266. }
  267. saveProbPool <- saveM
  268. }
  269. }
  270. // @Description 项目信息标签
  271. // @Author J 2022/9/30 13:54
  272. func taskProTag(tmp map[string]interface{}) {
  273. id := mongodb.BsonIdToSId(tmp["_id"])
  274. if topArr, ok := tmp["topscopeclass"].([]interface{}); ok {
  275. for _, i2 := range topArr {
  276. tclass := regLetter.ReplaceAllString(util.ObjToString(i2), "") // 去除字母
  277. code := TopScopeCode[tclass]
  278. saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
  279. }
  280. }
  281. if subArr, ok := tmp["subscopeclass"].([]interface{}); ok {
  282. for _, i2 := range subArr {
  283. sc := strings.Split(util.ObjToString(i2), "_")
  284. code := SubScopeCode[sc[1]]
  285. saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
  286. }
  287. }
  288. }
  289. // @Description 关系表
  290. // @Author J 2022/9/30 13:56
  291. func taskRelation(tmp map[string]interface{}) {
  292. id := util.ObjToString(tmp["id"])
  293. pid := ""
  294. if str := redis.GetStr("project_ids", id); str == "" {
  295. info, _ := MongoP.FindOneByField("projectset_20230407", bson.M{"ids": id}, bson.M{"ids": 1})
  296. if len(*info) > 0 {
  297. pid = mongodb.BsonIdToSId((*info)["_id"])
  298. for _, s := range util.ObjArrToStringArr((*info)["ids"].([]interface{})) {
  299. redis.PutCKV("project_ids", s, mongodb.BsonIdToSId((*info)["_id"]))
  300. }
  301. } else {
  302. //log.Info("taskRelation pid err", zap.Any("id", id))
  303. //return
  304. }
  305. } else {
  306. pid = str
  307. }
  308. if b := util.ObjToString(tmp["buyer"]); b != "" {
  309. saveM := make(map[string]interface{})
  310. saveM["projectid"] = pid
  311. saveM["infoid"] = id
  312. saveM["identity_type"] = 1
  313. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  314. if code := redis.GetStr("qyxy_id", b); code != "" {
  315. saveM["name_id"] = code
  316. if util.ObjToString(tmp["buyertel"]) != "" {
  317. q := make(map[string]interface{})
  318. q["name_id"] = code
  319. q["identity_type"] = 1
  320. q["contact_tel"] = util.ObjToString(tmp["buyertel"])
  321. if util.ObjToString(tmp["buyerperson"]) != "" {
  322. q["contact_name"] = util.ObjToString(tmp["buyerperson"])
  323. }
  324. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  325. if cinfo != nil && len(*cinfo) > 0 {
  326. saveM["contact_id"] = (*cinfo)["id"]
  327. saveRelationPool <- saveM
  328. }
  329. }
  330. }
  331. }
  332. if a := util.ObjToString(tmp["agency"]); a != "" {
  333. saveM := make(map[string]interface{})
  334. saveM["projectid"] = pid
  335. saveM["infoid"] = id
  336. saveM["identity_type"] = 4
  337. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  338. if code := redis.GetStr("qyxy_id", a); code != "" {
  339. saveM["name_id"] = code
  340. if util.ObjToString(tmp["agencytel"]) != "" {
  341. q := make(map[string]interface{})
  342. q["name_id"] = code
  343. q["identity_type"] = 4 // 100
  344. q["contact_tel"] = util.ObjToString(tmp["agencytel"])
  345. if util.ObjToString(tmp["agencyperson"]) != "" {
  346. q["contact_name"] = util.ObjToString(tmp["agencyperson"])
  347. }
  348. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  349. if cinfo != nil && len(*cinfo) > 0 {
  350. saveM["contact_id"] = (*cinfo)["id"]
  351. saveRelationPool <- saveM
  352. }
  353. }
  354. }
  355. }
  356. sw := util.ObjToString(tmp["s_winner"])
  357. if !strings.Contains(sw, ",") {
  358. if code := redis.GetStr("qyxy_id", sw); code != "" {
  359. saveM := make(map[string]interface{})
  360. saveM["projectid"] = pid
  361. saveM["infoid"] = id
  362. saveM["identity_type"] = 2
  363. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  364. saveM["name_id"] = code
  365. if util.ObjToString(tmp["winnertel"]) != "" {
  366. q := make(map[string]interface{})
  367. q["name_id"] = code
  368. q["identity_type"] = 2 // 010
  369. q["contact_tel"] = util.ObjToString(tmp["winnertel"])
  370. if util.ObjToString(tmp["winnerperson"]) != "" {
  371. q["contact_name"] = util.ObjToString(tmp["winnerperson"])
  372. }
  373. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  374. if cinfo != nil && len(*cinfo) > 0 {
  375. saveM["contact_id"] = (*cinfo)["id"]
  376. saveRelationPool <- saveM
  377. }
  378. }
  379. }
  380. }
  381. }
  382. func taskRelation2(tmp map[string]interface{}) {
  383. pid := mongodb.BsonIdToSId(tmp["_id"])
  384. if tmp["ids"] == nil {
  385. log.Info("taskRelation ids err", zap.Any("id", pid))
  386. return
  387. }
  388. info := MysqlTool.Find("dws_f_bpmc_relation", bson.M{"projectid": pid}, "", "", -1, -1)
  389. if len(*info) > 0 {
  390. } else {
  391. ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
  392. lid := ids[len(ids)-1]
  393. if b := util.ObjToString(tmp["buyer"]); b != "" {
  394. saveM := make(map[string]interface{})
  395. for _, f := range RelationField {
  396. if f == "projectid" {
  397. saveM[f] = pid
  398. } else if f == "infoid" {
  399. saveM[f] = lid
  400. } else if f == "name_id" {
  401. if code := redis.GetStr("qyxy_id", b); code != "" {
  402. saveM[f] = code
  403. if util.ObjToString(tmp["buyertel"]) != "" {
  404. q := make(map[string]interface{})
  405. q["name_id"] = code
  406. q["identity_type"] = 1
  407. q["contact_tel"] = util.ObjToString(tmp["buyertel"])
  408. if util.ObjToString(tmp["buyerperson"]) != "" {
  409. q["contact_name"] = util.ObjToString(tmp["buyerperson"])
  410. }
  411. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  412. if cinfo != nil && len(*cinfo) > 0 {
  413. saveM["contact_id"] = (*cinfo)["id"]
  414. }
  415. }
  416. }
  417. } else if f == "identity_type" {
  418. saveM[f] = 1 // 001
  419. } else if f == "createtime" {
  420. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  421. }
  422. }
  423. saveRelationPool <- saveM
  424. }
  425. if a := util.ObjToString(tmp["agency"]); a != "" {
  426. saveM := make(map[string]interface{})
  427. for _, f := range RelationField {
  428. if f == "projectid" {
  429. saveM[f] = pid
  430. } else if f == "infoid" {
  431. saveM[f] = lid
  432. } else if f == "name_id" {
  433. if code := redis.GetStr("qyxy_id", a); code != "" {
  434. saveM[f] = code
  435. if util.ObjToString(tmp["buyertel"]) != "" {
  436. q := make(map[string]interface{})
  437. q["name_id"] = code
  438. q["identity_type"] = 4
  439. q["contact_tel"] = util.ObjToString(tmp["agencytel"])
  440. if util.ObjToString(tmp["agencyperson"]) != "" {
  441. q["contact_name"] = util.ObjToString(tmp["agencyperson"])
  442. }
  443. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  444. if cinfo != nil && len(*cinfo) > 0 {
  445. saveM["contact_id"] = (*cinfo)["id"]
  446. }
  447. }
  448. }
  449. } else if f == "identity_type" {
  450. saveM[f] = 4 // 100
  451. } else if f == "createtime" {
  452. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  453. }
  454. }
  455. saveRelationPool <- saveM
  456. }
  457. warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  458. if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
  459. warr = append(warr, util.ObjToString(tmp["winner"]))
  460. }
  461. for _, ws := range warr {
  462. saveM := make(map[string]interface{})
  463. for _, f := range RelationField {
  464. if f == "projectid" {
  465. saveM[f] = pid
  466. } else if f == "infoid" {
  467. saveM[f] = lid
  468. } else if f == "name_id" {
  469. if code := redis.GetStr("qyxy_id", ws); code != "" {
  470. saveM[f] = code
  471. if util.ObjToString(tmp["buyertel"]) != "" {
  472. q := make(map[string]interface{})
  473. q["name_id"] = code
  474. q["identity_type"] = 2
  475. q["contact_tel"] = util.ObjToString(tmp["winnertel"])
  476. if util.ObjToString(tmp["winnerperson"]) != "" {
  477. q["contact_name"] = util.ObjToString(tmp["winnerperson"])
  478. }
  479. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  480. if cinfo != nil && len(*cinfo) > 0 {
  481. saveM["contact_id"] = (*cinfo)["id"]
  482. }
  483. }
  484. }
  485. } else if f == "identity_type" {
  486. saveM[f] = 2 // 010
  487. } else if f == "createtime" {
  488. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  489. }
  490. }
  491. saveRelationPool <- saveM
  492. }
  493. }
  494. }