task.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/util/gconv"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. type Transaction struct {
  13. Project_Id string `bson:"project_id"`
  14. Project_Name string `bson:"project_name"`
  15. Project_Budget float64 `bson:"project_budget"`
  16. Project_Bidamount float64 `bson:"project_bidamount"`
  17. Project_Money float64 `bson:"project_money"`
  18. Business_Type string `bson:"business_type"`
  19. Project_Bidstatus int `bson:"project_bidstatus"`
  20. Info_Id string `bson:"info_id"`
  21. Information_Id string `bson:"information_id"`
  22. Buyer string `bson:"buyer"`
  23. Buyer_Id string `bson:"buyer_id"`
  24. Winner []string `bson:"winner"`
  25. Winner_Id []string `bson:"winner_id"`
  26. Agency string `bson:"agency"`
  27. Agency_Id string `bson:"agency_id"`
  28. Property_Form []string `bson:"property_form"`
  29. SubClass []string `bson:"subclass"`
  30. MultiPackage int `bson:"multipackage"`
  31. Area string `bson:"area"`
  32. City string `bson:"city"`
  33. District string `bson:"district"`
  34. ZbTime int64 `bson:"zbtime"`
  35. JgTime int64 `bson:"jgtime"`
  36. StartTime int64 `bson:"starttime"`
  37. EndTime int64 `bson:"endtime"`
  38. Create_Time int64 `bson:"create_time"`
  39. Update_Time int64 `bson:"update_time"`
  40. //
  41. From string `bson:"from"`
  42. }
  43. func IncTransactionDataFromBidAndPro() {
  44. IncTransactionDataFromBid() //bidding
  45. IncTransactionDataFromPro() //project
  46. return
  47. IncTransactionDataMgoToCkh() //mongodb迁移至clickhouse
  48. }
  49. // IncTransactionDataFromBid 增量bidding
  50. func IncTransactionDataFromBid() {
  51. endTime := GetTime(-1) //前一天凌晨
  52. fmt.Println("开始执行增量采购意向信息", BidStartTime, endTime)
  53. if BidStartTime >= endTime {
  54. fmt.Println("增量bidding采购意向查询异常:", BidStartTime, endTime)
  55. return
  56. }
  57. query := map[string]interface{}{
  58. "comeintime": map[string]interface{}{
  59. "$gte": BidStartTime,
  60. "$lt": endTime,
  61. },
  62. }
  63. fmt.Println("增量bidding采购意向query:", query)
  64. sess := MgoB.GetMgoConn()
  65. defer MgoB.DestoryMongoConn(sess)
  66. ch := make(chan bool, 5)
  67. wg := &sync.WaitGroup{}
  68. lock := &sync.Mutex{}
  69. fields := map[string]interface{}{
  70. "projectname": 1,
  71. "budget": 1,
  72. "bidamount": 1,
  73. "buyer": 1,
  74. "s_winner": 1,
  75. "agency": 1,
  76. "property_form": 1,
  77. "multipackage": 1,
  78. "area": 1,
  79. "city": 1,
  80. "district": 1,
  81. //
  82. "publishtime": 1,
  83. "toptype": 1,
  84. "extracttype": 1,
  85. "tag_subinformation": 1,
  86. "tag_subinformation_ai": 1,
  87. "tag_topinformation": 1,
  88. "tag_topinformation_ai": 1,
  89. }
  90. arr := []map[string]interface{}{}
  91. it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
  92. n := 0
  93. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  94. ch <- true
  95. wg.Add(1)
  96. go func(tmp map[string]interface{}) {
  97. defer func() {
  98. <-ch
  99. wg.Done()
  100. }()
  101. if gconv.String(tmp["toptype"]) != "采购意向" { //非采购意向数据过滤
  102. return
  103. }
  104. if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
  105. return
  106. }
  107. if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
  108. return
  109. }
  110. result := DealTransactionForBid(tmp)
  111. lock.Lock()
  112. if len(result) > 0 {
  113. arr = append(arr, result)
  114. }
  115. if len(arr) > 50 {
  116. MgoPro.SaveBulk("projectset_wy", arr...)
  117. arr = []map[string]interface{}{}
  118. }
  119. lock.Unlock()
  120. }(tmp)
  121. if n%1000 == 0 {
  122. fmt.Println("current:", n)
  123. }
  124. tmp = map[string]interface{}{}
  125. }
  126. wg.Wait()
  127. if len(arr) > 0 {
  128. MgoPro.SaveBulk("projectset_wy", arr...)
  129. arr = []map[string]interface{}{}
  130. }
  131. fmt.Println("执行增量采购意向信息完毕", BidStartTime, endTime)
  132. BidStartTime = endTime //替换
  133. }
  134. // DealTransactionForBid bidding采购意向数据处理
  135. func DealTransactionForBid(tmp map[string]interface{}) map[string]interface{} {
  136. //基本信息封装
  137. id := mongodb.BsonIdToSId(tmp["_id"])
  138. buyer := gconv.String(tmp["buyer"])
  139. winner := gconv.String(tmp["s_winner"])
  140. agency := gconv.String(tmp["agency"])
  141. property_form := []string{}
  142. if tmp["property_form"] != nil {
  143. property_form = gconv.Strings(tmp["property_form"])
  144. }
  145. bidamount := gconv.Float64(tmp["bidamount"])
  146. budget := gconv.Float64(tmp["budget"])
  147. money := bidamount
  148. if money <= 0 {
  149. money = budget
  150. }
  151. //物业分类
  152. subclass := []string{}
  153. if tag_subinformation := tmp["tag_subinformation"]; tag_subinformation != nil {
  154. subclass = gconv.Strings(tag_subinformation)
  155. } else if tag_subinformation_ai := tmp["tag_subinformation_ai"]; tag_subinformation_ai != nil {
  156. subclass = gconv.Strings(tag_subinformation_ai)
  157. }
  158. //TODO 查询法人库信息(待补充)
  159. winners := []string{}
  160. if winner != "" {
  161. winners = strings.Split(winner, ",")
  162. }
  163. buyer_id, agency_id := "", ""
  164. winner_ids := []string{}
  165. //buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
  166. //物业信息
  167. t := &Transaction{
  168. Project_Id: id,
  169. Project_Name: gconv.String(tmp["projectname"]),
  170. Project_Budget: budget,
  171. Project_Bidamount: bidamount,
  172. Project_Money: money,
  173. Business_Type: "采购意向",
  174. Project_Bidstatus: 3,
  175. Info_Id: id,
  176. Information_Id: "",
  177. Buyer: buyer,
  178. Winner: winners,
  179. Agency: agency,
  180. Buyer_Id: buyer_id,
  181. Winner_Id: winner_ids,
  182. Agency_Id: agency_id,
  183. Property_Form: property_form,
  184. SubClass: subclass,
  185. MultiPackage: gconv.Int(tmp["multipackage"]),
  186. Area: gconv.String(tmp["area"]),
  187. City: gconv.String(tmp["city"]),
  188. District: gconv.String(tmp["district"]),
  189. ZbTime: gconv.Int64(tmp["publishtime"]),
  190. JgTime: int64(0),
  191. StartTime: int64(0),
  192. EndTime: int64(0),
  193. Create_Time: time.Now().Unix(),
  194. Update_Time: time.Now().Unix(),
  195. //
  196. From: "bidding",
  197. }
  198. result := map[string]interface{}{}
  199. infomation, _ := bson.Marshal(t)
  200. bson.Unmarshal(infomation, &result)
  201. return result
  202. }
  203. // IncTransactionDataFromProject 增量project
  204. func IncTransactionDataFromPro() {
  205. endTime := GetTime(-1) //前一天凌晨
  206. fmt.Println("开始执行增量项目信息", ProStartTime, endTime)
  207. if ProStartTime >= endTime {
  208. fmt.Println("增量项目信息查询异常:", ProStartTime, endTime)
  209. return
  210. }
  211. query := map[string]interface{}{
  212. "pici": map[string]interface{}{
  213. "$gte": ProStartTime,
  214. "$lt": endTime,
  215. },
  216. }
  217. fmt.Println("增量项目查询query:", query)
  218. sess := MgoPro.GetMgoConn()
  219. defer MgoPro.DestoryMongoConn(sess)
  220. ch := make(chan bool, 5)
  221. wg := &sync.WaitGroup{}
  222. lock := &sync.Mutex{}
  223. fields := map[string]interface{}{
  224. "projectname": 1,
  225. "budget": 1,
  226. "bidamount": 1,
  227. "buyer": 1,
  228. "s_winner": 1,
  229. "agency": 1,
  230. "property_form": 1,
  231. "multipackage": 1,
  232. "area": 1,
  233. "city": 1,
  234. "district": 1,
  235. "zbtime": 1,
  236. "jgtime": 1,
  237. "bidstatus": 1,
  238. //
  239. "firsttime": 1,
  240. "pici": 1,
  241. "ids": 1,
  242. "sourceinfoid": 1,
  243. "tag_subinformation": 1,
  244. "tag_subinformation_ai": 1,
  245. "tag_topinformation": 1,
  246. "tag_topinformation_ai": 1,
  247. }
  248. arr := [][]map[string]interface{}{}
  249. it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
  250. n := 0
  251. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  252. ch <- true
  253. wg.Add(1)
  254. go func(tmp map[string]interface{}) {
  255. defer func() {
  256. <-ch
  257. wg.Done()
  258. }()
  259. if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
  260. return
  261. }
  262. result := DealTransactionForPro(tmp)
  263. lock.Lock()
  264. if len(result) > 0 {
  265. update := []map[string]interface{}{
  266. {"project_id": tmp["_id"]},
  267. {"$set": result},
  268. }
  269. arr = append(arr, update)
  270. }
  271. if len(arr) > 50 {
  272. MgoPro.UpSertBulk("projectset_wy_back", arr...)
  273. arr = [][]map[string]interface{}{}
  274. }
  275. lock.Unlock()
  276. }(tmp)
  277. if n%1000 == 0 {
  278. fmt.Println("current:", n)
  279. }
  280. tmp = map[string]interface{}{}
  281. }
  282. wg.Wait()
  283. if len(arr) > 0 {
  284. MgoPro.UpSertBulk("projectset_wy_back", arr...)
  285. arr = [][]map[string]interface{}{}
  286. }
  287. fmt.Println("执行增量项目信息完毕", ProStartTime, endTime)
  288. ProStartTime = endTime //替换
  289. }
  290. // DealTransactionForPro project数据处理
  291. func DealTransactionForPro(data map[string]interface{}) map[string]interface{} {
  292. //基本信息封装
  293. id := mongodb.BsonIdToSId(data["_id"])
  294. buyer := gconv.String(data["buyer"])
  295. winner := gconv.String(data["s_winner"])
  296. agency := gconv.String(data["agency"])
  297. zbtime := gconv.Int64(data["zbtime"])
  298. property_form := []string{}
  299. if data["property_form"] != nil {
  300. property_form = gconv.Strings(data["property_form"])
  301. }
  302. bidamount := gconv.Float64(data["bidamount"])
  303. budget := gconv.Float64(data["budget"])
  304. money := bidamount
  305. if money <= 0 {
  306. money = budget
  307. }
  308. //物业分类
  309. subclass := []string{}
  310. if tag_subinformation := data["tag_subinformation"]; tag_subinformation != nil {
  311. subclass = gconv.Strings(tag_subinformation)
  312. } else if tag_subinformation_ai := data["tag_subinformation_ai"]; tag_subinformation_ai != nil {
  313. subclass = gconv.Strings(tag_subinformation_ai)
  314. }
  315. //项目状态、商机类型
  316. business_type := ""
  317. project_bidstatus := 2
  318. bidstatus := gconv.String(data["bidstatus"])
  319. if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" {
  320. project_bidstatus = 1
  321. business_type = "存量项目"
  322. } else if bidstatus == "废标" || bidstatus == "流标" {
  323. project_bidstatus = 0
  324. } else if bidstatus == "招标" {
  325. business_type = "招标项目"
  326. if zbtime == 0 {
  327. zbtime = gconv.Int64(data["firsttime"])
  328. }
  329. }
  330. //查询情报信息
  331. //bidId = "65fbf3f566cf0db42a2a99d2"
  332. ids := gconv.Strings(data["ids"])
  333. info := FindInfomationData(ids...) //情报信息查询
  334. //TODO 查询法人库信息(待补充)
  335. winners := []string{}
  336. if winner != "" {
  337. winners = strings.Split(winner, ",")
  338. }
  339. buyer_id, agency_id := "", ""
  340. winner_ids := []string{}
  341. //buyer_id, agency_id, winner_ids = FindEntInfoData(id, buyer, agency, winners)
  342. //物业信息
  343. t := &Transaction{
  344. Project_Id: id,
  345. Project_Name: gconv.String(data["projectname"]),
  346. Project_Budget: budget,
  347. Project_Bidamount: bidamount,
  348. Project_Money: money,
  349. Business_Type: business_type,
  350. Project_Bidstatus: project_bidstatus,
  351. Info_Id: gconv.String(data["sourceinfoid"]),
  352. Information_Id: info.Id,
  353. Buyer: buyer,
  354. Winner: winners,
  355. Agency: agency,
  356. Buyer_Id: buyer_id,
  357. Winner_Id: winner_ids,
  358. Agency_Id: agency_id,
  359. Property_Form: property_form,
  360. SubClass: subclass,
  361. MultiPackage: gconv.Int(data["multipackage"]),
  362. Area: gconv.String(data["area"]),
  363. City: gconv.String(data["city"]),
  364. District: gconv.String(data["district"]),
  365. ZbTime: zbtime,
  366. JgTime: gconv.Int64(data["jgtime"]),
  367. StartTime: info.Starttime,
  368. EndTime: info.Endtime,
  369. Create_Time: time.Now().Unix(),
  370. Update_Time: time.Now().Unix(),
  371. //
  372. From: "project",
  373. }
  374. result := map[string]interface{}{}
  375. infomation, _ := bson.Marshal(t)
  376. bson.Unmarshal(infomation, &result)
  377. return result
  378. }
  379. // IncTransactionDataMgoToCkh 数据迁移
  380. func IncTransactionDataMgoToCkh() {
  381. /*
  382. 数据根据update_time查询
  383. 1、采购意向数据(from=bidding)只插入
  384. 2、项目信息先查,有则更新,无则插入
  385. */
  386. fmt.Println("开始执行迁移...")
  387. sess := MgoPro.GetMgoConn()
  388. defer MgoPro.DestoryMongoConn(sess)
  389. ch := make(chan bool, 10)
  390. wg := &sync.WaitGroup{}
  391. query := map[string]interface{}{
  392. "update_time": map[string]interface{}{
  393. "$gte": GetTime(0),
  394. },
  395. }
  396. it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
  397. n := 0
  398. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  399. ch <- true
  400. wg.Add(1)
  401. go func(tmp map[string]interface{}) {
  402. defer func() {
  403. <-ch
  404. wg.Done()
  405. }()
  406. var err error
  407. from := gconv.String(tmp["from"])
  408. delete(tmp, "from") //无用字段删除
  409. delete(tmp, "_id") //无用字段删除
  410. if from == "bidding" { //采购意向,插入
  411. err = SaveDataToClickHouse(tmp)
  412. } else { //项目信息,更新,插入
  413. project_id := gconv.String(tmp["project_id"])
  414. err = UpdateOrSaveDataToClickHouse(project_id, tmp)
  415. }
  416. if err != nil {
  417. fmt.Println("数据迁移失败,数据类型", from, " 项目project_id", tmp["project_id"], err)
  418. }
  419. }(tmp)
  420. if n%100 == 0 {
  421. fmt.Println("current:", n)
  422. }
  423. tmp = map[string]interface{}{}
  424. }
  425. wg.Wait()
  426. fmt.Println("迁移结束...")
  427. }
  428. type Infomation struct {
  429. Id string
  430. Starttime int64
  431. Endtime int64
  432. }
  433. // 情报信息查询
  434. func FindInfomationData(ids ...string) (info Infomation) {
  435. for _, id := range ids {
  436. query := fmt.Sprintf(`SELECT id,starttime,endtime FROM %s WHERE datajson_id = ?`, Config.ClickHouse.DataBase+".information")
  437. rows, err := CkhTool.Query(context.Background(), query, id)
  438. if err != nil {
  439. continue
  440. }
  441. for rows.Next() {
  442. info = Infomation{}
  443. if err := rows.Scan(&info.Id, &info.Starttime, &info.Endtime); err != nil {
  444. fmt.Println("查询情报信息异常:", id, err)
  445. }
  446. if info.Id != "" {
  447. return
  448. }
  449. //break //目前只有一条结果
  450. }
  451. }
  452. return
  453. }
  454. // 法人信息查询
  455. func FindEntInfoData(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) {
  456. winner_ids = []string{}
  457. winnerMap := map[string]bool{} //记录所有中标单位
  458. values := []interface{}{}
  459. placeholders := []string{}
  460. if buyer != "" {
  461. placeholders = append(placeholders, "?")
  462. values = append(values, buyer)
  463. }
  464. if len(winners) > 0 {
  465. for _, w := range winners {
  466. winnerMap[w] = true
  467. placeholders = append(placeholders, "?")
  468. values = append(values, w)
  469. }
  470. }
  471. if agency != "" {
  472. placeholders = append(placeholders, "?")
  473. values = append(values, agency)
  474. }
  475. if len(values) == 0 {
  476. return
  477. }
  478. query := fmt.Sprintf(`SELECT id,company_name FROM %s WHERE company_name IN (%s)`, Config.ClickHouse.DataBase+".ent_info", strings.Join(placeholders, ","))
  479. rows, err := CkhTool.Query(context.Background(), query, values...)
  480. if err != nil {
  481. return
  482. }
  483. for rows.Next() {
  484. var id, company_name string
  485. if err := rows.Scan(&id, &company_name); err == nil {
  486. if company_name == buyer {
  487. buyer_id = id
  488. } else if company_name == agency {
  489. agency_id = id
  490. } else if winnerMap[company_name] {
  491. winner_ids = append(winner_ids, id)
  492. }
  493. } else {
  494. fmt.Println("查询情报信息异常:", err, bid)
  495. }
  496. }
  497. return
  498. }
  499. // UpdateOrSaveDataToClickHouse 判断clickhouse更新or保存
  500. func UpdateOrSaveDataToClickHouse(project_id string, data map[string]interface{}) (err error) {
  501. count := FindClickHouseByProjectId(project_id) //查询
  502. if count > 0 { //更新
  503. delete(data, "create_time") //不更新创建时间
  504. delete(data, "project_id") //不更新项目id(主键)
  505. err = UpdateDataToClickHouse(data, map[string]interface{}{"project_id": project_id})
  506. } else { //插入
  507. err = SaveDataToClickHouse(data)
  508. }
  509. return
  510. }
  511. // SaveDataToClickHouse 数据保存clickhouse
  512. func SaveDataToClickHouse(data map[string]interface{}) error {
  513. fields, placeholders := []string{}, []string{}
  514. values := []interface{}{}
  515. for k, v := range data {
  516. fields = append(fields, k)
  517. values = append(values, v)
  518. placeholders = append(placeholders, "?")
  519. }
  520. query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", Config.ClickHouse.DataBase+".transaction_info", strings.Join(fields, ","), strings.Join(placeholders, ","))
  521. return CkhTool.Exec(context.Background(), query, values...)
  522. }
  523. // FindClickHouseByProjectId 根据条件count clickhouse
  524. func FindClickHouseByProjectId(project_id string) int {
  525. query := fmt.Sprintf(`SELECT COUNT(1) FROM %s WHERE project_id = ?`, Config.ClickHouse.DataBase+".transaction_info")
  526. row := CkhTool.QueryRow(context.Background(), query, project_id)
  527. var count uint64
  528. row.Scan(&count)
  529. return gconv.Int(count)
  530. }
  531. // UpdateDataToClickHouse 数据更新clickhouse
  532. func UpdateDataToClickHouse(data, querys map[string]interface{}) error {
  533. sets := []string{}
  534. values := []interface{}{}
  535. for k, v := range data {
  536. sets = append(sets, fmt.Sprintf("%s=?", k))
  537. values = append(values, v)
  538. }
  539. qs := []string{}
  540. for k, v := range querys {
  541. qs = append(qs, fmt.Sprintf("%s=?", k))
  542. values = append(values, v)
  543. }
  544. query := fmt.Sprintf("ALTER TABLE %s UPDATE %s WHERE %s", Config.ClickHouse.DataBase+".transaction_info", strings.Join(sets, ","), strings.Join(qs, ","))
  545. //query := `ALTER TABLE information.transaction_info UPDATE update_time = ? WHERE project_id = '5c9ee78ca5cb26b9b7fd0b57'`
  546. return CkhTool.Exec(context.Background(), query, values...)
  547. }
  548. /*// SaveTransactionData 保存增量物业信息
  549. func SaveTransactionData() {
  550. fmt.Println("save projectset_wy...")
  551. savearr := make([]map[string]interface{}, 100)
  552. indexdb := 0
  553. for {
  554. select {
  555. case v := <-TransactionSaveCache:
  556. savearr[indexdb] = v
  557. indexdb++
  558. if indexdb == 100 {
  559. Transaction_Ch <- true
  560. go func(tmp []map[string]interface{}) {
  561. defer func() {
  562. <-Transaction_Ch
  563. }()
  564. MgoPro.SaveBulk("projectset_wy", tmp...)
  565. }(savearr)
  566. savearr = make([]map[string]interface{}, 100)
  567. indexdb = 0
  568. }
  569. case <-time.After(30 * time.Second):
  570. if indexdb > 0 {
  571. Transaction_Ch <- true
  572. go func(tmp []map[string]interface{}) {
  573. defer func() {
  574. <-Transaction_Ch
  575. }()
  576. MgoPro.SaveBulk("projectset_wy", tmp...)
  577. }(savearr[:indexdb])
  578. savearr = make([]map[string]interface{}, 100)
  579. indexdb = 0
  580. }
  581. }
  582. }
  583. }*/