project.go 15 KB


  1. package main
  2. import (
  3. "context"
  4. "data_tidb/config"
  5. "encoding/json"
  6. "fmt"
  7. es "github.com/olivere/elastic/v7"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.uber.org/zap"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. func taskR() {
  19. client := Es.GetEsConn()
  20. defer Es.DestoryEsConn(client)
  21. wg := &sync.WaitGroup{}
  22. query := es.NewBoolQuery()
  23. //Must(es.NewTermsQuery("id", "64e7685255d5406905c94a64"))
  24. //Must(es.NewRangeQuery("comeintime").Gte(1688140800).Lte(1690444635)).
  25. //Must(es.NewExistsQuery("yuceendtime"))
  26. util.Debug(Es.Count("bidding", query))
  27. countDocs := 0
  28. res, err := client.Scroll().Index("bidding").Query(query).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
  29. if err == nil {
  30. taskInfoA(res, wg, &countDocs)
  31. scrollId := res.ScrollId
  32. for {
  33. searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
  34. if err != nil {
  35. util.Debug("Es Search Data Error:", err.Error())
  36. break
  37. }
  38. taskInfoA(searchResult, wg, &countDocs)
  39. scrollId = searchResult.ScrollId
  40. }
  41. wg.Wait()
  42. util.Debug("over---", countDocs)
  43. _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
  44. } else {
  45. util.Debug(err)
  46. }
  47. }
  48. func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
  49. ch := make(chan bool, 1)
  50. for _, hit := range searchResult.Hits.Hits {
  51. //开始处理数据
  52. wg.Add(1)
  53. ch <- true
  54. go func(tmpHit *es.SearchHit) {
  55. defer func() {
  56. <-ch
  57. wg.Done()
  58. }()
  59. tmp := make(map[string]interface{})
  60. if json.Unmarshal(tmpHit.Source, &tmp) == nil {
  61. taskRelation(tmp)
  62. }
  63. }(hit)
  64. *countDocs += 1
  65. if *countDocs%10000 == 0 {
  66. util.Debug("Current:", *countDocs)
  67. }
  68. }
  69. }
  70. func taskPAdd(pici int64) {
  71. sess := MongoP.GetMgoConn()
  72. defer MongoP.DestoryMongoConn(sess)
  73. ch := make(chan bool, 20)
  74. wg := &sync.WaitGroup{}
  75. q := bson.M{"pici": bson.M{"$gt": pici}}
  76. query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding_back").Find(q).Iter()
  77. count := 0
  78. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  79. if count%20000 == 0 {
  80. log.Info(fmt.Sprintf("current --- %d", count))
  81. }
  82. ch <- true
  83. wg.Add(1)
  84. go func(tmp map[string]interface{}) {
  85. defer func() {
  86. <-ch
  87. wg.Done()
  88. }()
  89. //taskPro(tmp)
  90. //taskBusiness(tmp)
  91. //taskProTag(tmp)
  92. taskRelation2(tmp)
  93. }(tmp)
  94. tmp = make(map[string]interface{})
  95. }
  96. wg.Wait()
  97. log.Info(fmt.Sprintf("over --- %d", count))
  98. }
  99. var BidStatus = map[string]int{
  100. "预告": 0,
  101. "拟建": 1,
  102. "招标": 2,
  103. "中标": 3,
  104. "成交": 4,
  105. "废标": 5,
  106. "流标": 6,
  107. "合同": 7,
  108. "其它": 8,
  109. }
  110. var BidType = map[string]int{
  111. "招标": 0,
  112. "邀标": 1,
  113. "单一": 2,
  114. "竞价": 3,
  115. "竞谈": 4,
  116. "询价": 5,
  117. }
  118. // @Description 基础信息
  119. // @Author J 2022/9/22 18:32
  120. func taskPro(tmp map[string]interface{}) {
  121. saveM := make(map[string]interface{})
  122. for _, f := range ProField {
  123. if f == "projectid" {
  124. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  125. } else if f == "area_code" {
  126. if tmp["area"] != nil {
  127. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  128. }
  129. } else if f == "city_code" {
  130. if tmp["area"] != nil && tmp["city"] != nil {
  131. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  132. saveM[f] = AreaCode[c]
  133. }
  134. } else if f == "district_code" {
  135. if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
  136. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
  137. saveM[f] = AreaCode[c]
  138. }
  139. } else if f == "updatetime" {
  140. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  141. } else if f == "buyerclass_code" {
  142. if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
  143. saveM[f] = BuyerCode[obj]
  144. }
  145. } else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
  146. if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
  147. t := util.Int64All(tmp[f])
  148. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  149. }
  150. } else if f == "bidstatus" {
  151. if b := util.ObjToString(tmp[f]); b != "" {
  152. tmp[f] = BidStatus[b]
  153. }
  154. } else if f == "bidtype" {
  155. if b := util.ObjToString(tmp[f]); b != "" {
  156. tmp[f] = BidType[b]
  157. }
  158. } else if f == "multipackage" {
  159. if tmp[f] == nil {
  160. saveM[f] = 0
  161. } else {
  162. saveM[f] = tmp[f]
  163. }
  164. } else if f == "buyer_id" {
  165. if b := util.ObjToString(tmp["buyer"]); b != "" {
  166. if code := redis.GetStr("qyxy_id", b); code != "" {
  167. saveM[f] = code
  168. }
  169. }
  170. } else if f == "agency_id" {
  171. if b := util.ObjToString(tmp["agency"]); b != "" {
  172. if code := redis.GetStr("qyxy_id", b); code != "" {
  173. saveM[f] = code
  174. }
  175. }
  176. } else {
  177. if tmp[f] != nil {
  178. if ProVMap[f] != nil {
  179. saveM[f], _ = verifyF(f, tmp[f], ProVMap[f])
  180. } else {
  181. saveM[f] = tmp[f]
  182. }
  183. }
  184. }
  185. }
  186. saveProPool <- saveM
  187. }
  188. // @Description 项目业务表
  189. // @Author J 2022/9/30 13:40
  190. func taskBusiness(tmp map[string]interface{}) {
  191. warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  192. if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
  193. warr = append(warr, util.ObjToString(tmp["winner"]))
  194. }
  195. for _, s := range warr {
  196. saveM := make(map[string]interface{})
  197. for _, f := range ProBusField {
  198. if f == "projectid" {
  199. saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
  200. } else if f == "area_code" {
  201. if tmp["area"] != nil {
  202. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  203. }
  204. } else if f == "city_code" {
  205. if tmp["area"] != nil && tmp["city"] != nil {
  206. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  207. saveM[f] = AreaCode[c]
  208. }
  209. } else if f == "district_code" {
  210. if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
  211. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
  212. saveM[f] = AreaCode[c]
  213. }
  214. } else if f == "updatetime" {
  215. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  216. } else if f == "buyerclass_code" {
  217. if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
  218. saveM[f] = BuyerCode[obj]
  219. }
  220. } else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
  221. if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
  222. t := util.Int64All(tmp[f])
  223. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  224. }
  225. } else if f == "bidstatus" {
  226. if b := util.ObjToString(tmp[f]); b != "" {
  227. tmp[f] = BidStatus[b]
  228. }
  229. } else if f == "bidtype" {
  230. if b := util.ObjToString(tmp[f]); b != "" {
  231. tmp[f] = BidType[b]
  232. }
  233. } else if f == "buyer_id" {
  234. if b := util.ObjToString(tmp["buyer"]); b != "" {
  235. saveM["buyer"] = b
  236. if code := redis.GetStr("qyxy_id", b); code != "" {
  237. saveM[f] = code
  238. }
  239. }
  240. } else if f == "agency_id" {
  241. if b := util.ObjToString(tmp["agency"]); b != "" {
  242. saveM["agency"] = b
  243. if code := redis.GetStr("qyxy_id", b); code != "" {
  244. saveM[f] = code
  245. }
  246. }
  247. } else if f == "winner_id" {
  248. if s != "" {
  249. saveM["winner"] = s
  250. if code := redis.GetStr("qyxy_id", s); code != "" {
  251. saveM[f] = code
  252. }
  253. }
  254. } else {
  255. if tmp[f] != nil {
  256. if ProBusVMap[f] != nil {
  257. saveM[f], _ = verifyF(f, tmp[f], ProBusVMap[f])
  258. } else {
  259. saveM[f] = tmp[f]
  260. }
  261. }
  262. }
  263. }
  264. saveProbPool <- saveM
  265. }
  266. }
  267. // @Description 项目信息标签
  268. // @Author J 2022/9/30 13:54
  269. func taskProTag(tmp map[string]interface{}) {
  270. id := mongodb.BsonIdToSId(tmp["_id"])
  271. if topArr, ok := tmp["topscopeclass"].([]interface{}); ok {
  272. for _, i2 := range topArr {
  273. tclass := regLetter.ReplaceAllString(util.ObjToString(i2), "") // 去除字母
  274. code := TopScopeCode[tclass]
  275. saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
  276. }
  277. }
  278. if subArr, ok := tmp["subscopeclass"].([]interface{}); ok {
  279. for _, i2 := range subArr {
  280. sc := strings.Split(util.ObjToString(i2), "_")
  281. code := SubScopeCode[sc[1]]
  282. saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
  283. }
  284. }
  285. }
  286. // @Description 关系表
  287. // @Author J 2022/9/30 13:56
  288. func taskRelation(tmp map[string]interface{}) {
  289. id := util.ObjToString(tmp["id"])
  290. pid := ""
  291. if str := redis.GetStr("project_ids", id); str == "" {
  292. info, _ := MongoP.FindOneByField("projectset_20230407", bson.M{"ids": id}, bson.M{"ids": 1})
  293. if len(*info) > 0 {
  294. pid = mongodb.BsonIdToSId((*info)["_id"])
  295. for _, s := range util.ObjArrToStringArr((*info)["ids"].([]interface{})) {
  296. redis.PutCKV("project_ids", s, mongodb.BsonIdToSId((*info)["_id"]))
  297. }
  298. } else {
  299. //log.Info("taskRelation pid err", zap.Any("id", id))
  300. //return
  301. }
  302. } else {
  303. pid = str
  304. }
  305. if b := util.ObjToString(tmp["buyer"]); b != "" {
  306. saveM := make(map[string]interface{})
  307. saveM["projectid"] = pid
  308. saveM["infoid"] = id
  309. saveM["identity_type"] = 1
  310. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  311. if code := redis.GetStr("qyxy_id", b); code != "" {
  312. saveM["name_id"] = code
  313. if util.ObjToString(tmp["buyertel"]) != "" {
  314. q := make(map[string]interface{})
  315. q["name_id"] = code
  316. q["identity_type"] = 1
  317. q["contact_tel"] = util.ObjToString(tmp["buyertel"])
  318. if util.ObjToString(tmp["buyerperson"]) != "" {
  319. q["contact_name"] = util.ObjToString(tmp["buyerperson"])
  320. }
  321. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  322. if cinfo != nil && len(*cinfo) > 0 {
  323. saveM["contact_id"] = (*cinfo)["id"]
  324. saveRelationPool <- saveM
  325. }
  326. }
  327. }
  328. }
  329. if a := util.ObjToString(tmp["agency"]); a != "" {
  330. saveM := make(map[string]interface{})
  331. saveM["projectid"] = pid
  332. saveM["infoid"] = id
  333. saveM["identity_type"] = 4
  334. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  335. if code := redis.GetStr("qyxy_id", a); code != "" {
  336. saveM["name_id"] = code
  337. if util.ObjToString(tmp["agencytel"]) != "" {
  338. q := make(map[string]interface{})
  339. q["name_id"] = code
  340. q["identity_type"] = 4 // 100
  341. q["contact_tel"] = util.ObjToString(tmp["agencytel"])
  342. if util.ObjToString(tmp["agencyperson"]) != "" {
  343. q["contact_name"] = util.ObjToString(tmp["agencyperson"])
  344. }
  345. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  346. if cinfo != nil && len(*cinfo) > 0 {
  347. saveM["contact_id"] = (*cinfo)["id"]
  348. saveRelationPool <- saveM
  349. }
  350. }
  351. }
  352. }
  353. sw := util.ObjToString(tmp["s_winner"])
  354. if !strings.Contains(sw, ",") {
  355. if code := redis.GetStr("qyxy_id", sw); code != "" {
  356. saveM := make(map[string]interface{})
  357. saveM["projectid"] = pid
  358. saveM["infoid"] = id
  359. saveM["identity_type"] = 2
  360. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  361. saveM["name_id"] = code
  362. if util.ObjToString(tmp["winnertel"]) != "" {
  363. q := make(map[string]interface{})
  364. q["name_id"] = code
  365. q["identity_type"] = 2 // 010
  366. q["contact_tel"] = util.ObjToString(tmp["winnertel"])
  367. if util.ObjToString(tmp["winnerperson"]) != "" {
  368. q["contact_name"] = util.ObjToString(tmp["winnerperson"])
  369. }
  370. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  371. if cinfo != nil && len(*cinfo) > 0 {
  372. saveM["contact_id"] = (*cinfo)["id"]
  373. saveRelationPool <- saveM
  374. }
  375. }
  376. }
  377. }
  378. }
  379. func taskRelation2(tmp map[string]interface{}) {
  380. pid := mongodb.BsonIdToSId(tmp["_id"])
  381. if tmp["ids"] == nil {
  382. log.Info("taskRelation ids err", zap.Any("id", pid))
  383. return
  384. }
  385. info := MysqlTool.Find("dws_f_bpmc_relation", bson.M{"projectid": pid}, "", "", -1, -1)
  386. if len(*info) > 0 {
  387. } else {
  388. ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
  389. lid := ids[len(ids)-1]
  390. if b := util.ObjToString(tmp["buyer"]); b != "" {
  391. saveM := make(map[string]interface{})
  392. for _, f := range RelationField {
  393. if f == "projectid" {
  394. saveM[f] = pid
  395. } else if f == "infoid" {
  396. saveM[f] = lid
  397. } else if f == "name_id" {
  398. if code := redis.GetStr("qyxy_id", b); code != "" {
  399. saveM[f] = code
  400. if util.ObjToString(tmp["buyertel"]) != "" {
  401. q := make(map[string]interface{})
  402. q["name_id"] = code
  403. q["identity_type"] = 1
  404. q["contact_tel"] = util.ObjToString(tmp["buyertel"])
  405. if util.ObjToString(tmp["buyerperson"]) != "" {
  406. q["contact_name"] = util.ObjToString(tmp["buyerperson"])
  407. }
  408. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  409. if cinfo != nil && len(*cinfo) > 0 {
  410. saveM["contact_id"] = (*cinfo)["id"]
  411. }
  412. }
  413. }
  414. } else if f == "identity_type" {
  415. saveM[f] = 1 // 001
  416. } else if f == "createtime" {
  417. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  418. }
  419. }
  420. saveRelationPool <- saveM
  421. }
  422. if a := util.ObjToString(tmp["agency"]); a != "" {
  423. saveM := make(map[string]interface{})
  424. for _, f := range RelationField {
  425. if f == "projectid" {
  426. saveM[f] = pid
  427. } else if f == "infoid" {
  428. saveM[f] = lid
  429. } else if f == "name_id" {
  430. if code := redis.GetStr("qyxy_id", a); code != "" {
  431. saveM[f] = code
  432. if util.ObjToString(tmp["buyertel"]) != "" {
  433. q := make(map[string]interface{})
  434. q["name_id"] = code
  435. q["identity_type"] = 4
  436. q["contact_tel"] = util.ObjToString(tmp["agencytel"])
  437. if util.ObjToString(tmp["agencyperson"]) != "" {
  438. q["contact_name"] = util.ObjToString(tmp["agencyperson"])
  439. }
  440. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  441. if cinfo != nil && len(*cinfo) > 0 {
  442. saveM["contact_id"] = (*cinfo)["id"]
  443. }
  444. }
  445. }
  446. } else if f == "identity_type" {
  447. saveM[f] = 4 // 100
  448. } else if f == "createtime" {
  449. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  450. }
  451. }
  452. saveRelationPool <- saveM
  453. }
  454. warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  455. if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
  456. warr = append(warr, util.ObjToString(tmp["winner"]))
  457. }
  458. for _, ws := range warr {
  459. saveM := make(map[string]interface{})
  460. for _, f := range RelationField {
  461. if f == "projectid" {
  462. saveM[f] = pid
  463. } else if f == "infoid" {
  464. saveM[f] = lid
  465. } else if f == "name_id" {
  466. if code := redis.GetStr("qyxy_id", ws); code != "" {
  467. saveM[f] = code
  468. if util.ObjToString(tmp["buyertel"]) != "" {
  469. q := make(map[string]interface{})
  470. q["name_id"] = code
  471. q["identity_type"] = 2
  472. q["contact_tel"] = util.ObjToString(tmp["winnertel"])
  473. if util.ObjToString(tmp["winnerperson"]) != "" {
  474. q["contact_name"] = util.ObjToString(tmp["winnerperson"])
  475. }
  476. cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  477. if cinfo != nil && len(*cinfo) > 0 {
  478. saveM["contact_id"] = (*cinfo)["id"]
  479. }
  480. }
  481. }
  482. } else if f == "identity_type" {
  483. saveM[f] = 2 // 010
  484. } else if f == "createtime" {
  485. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  486. }
  487. }
  488. saveRelationPool <- saveM
  489. }
  490. }
  491. }