task.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/gogf/gf/v2/util/gconv"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. )
  12. type Transaction struct {
  13. Project_Id string `bson:"project_id"`
  14. Project_Name string `bson:"project_name"`
  15. Project_Budget float64 `bson:"project_budget"`
  16. Project_Bidamount float64 `bson:"project_bidamount"`
  17. Project_Money float64 `bson:"project_money"`
  18. Business_Type string `bson:"business_type"`
  19. Project_Bidstatus int `bson:"project_bidstatus"`
  20. Info_Id string `bson:"info_id"`
  21. Info_Ids []string `bson:"info_ids"`
  22. Information_Id string `bson:"information_id"`
  23. BuyerClass string `bson:"buyerclass"`
  24. Buyer string `bson:"buyer"`
  25. Buyer_Id string `bson:"buyer_id"`
  26. Winner []string `bson:"winner"`
  27. Winner_Id []string `bson:"winner_id"`
  28. Agency string `bson:"agency"`
  29. Agency_Id string `bson:"agency_id"`
  30. Property_Form []string `bson:"property_form"`
  31. SubClass []string `bson:"subclass"`
  32. MultiPackage int `bson:"multipackage"`
  33. Topscopeclass []string `bson:"topscopeclass"`
  34. Area string `bson:"area"`
  35. City string `bson:"city"`
  36. District string `bson:"district"`
  37. ZbTime int64 `bson:"zbtime"`
  38. JgTime int64 `bson:"jgtime"`
  39. StartTime int64 `bson:"starttime"`
  40. EndTime int64 `bson:"endtime"`
  41. Create_Time int64 `bson:"create_time"`
  42. Update_Time int64 `bson:"update_time"`
  43. //
  44. // From string `bson:"from"`
  45. }
  46. func IncTransactionDataFromBidAndPro() {
  47. // IncTransactionDataFromBid() //bidding
  48. IncTransactionDataFromPro() //project
  49. // IncTransactionDataMgoToCkhAndEs() //mongodb迁移至clickhouse
  50. }
  51. // IncTransactionDataFromBid 增量bidding
  52. func IncTransactionDataFromBid() {
  53. // endTime := GetTime(-1) //前一天凌晨
  54. // fmt.Println("开始执行增量采购意向、拟建信息", BidStartTime, endTime)
  55. // if BidStartTime >= endTime {
  56. // fmt.Println("增量bidding采购意向、拟建查询异常:", BidStartTime, endTime)
  57. // return
  58. // }
  59. query := map[string]interface{}{
  60. "pici": map[string]interface{}{
  61. // "$gte": BidStartTime,
  62. "$lt": 1729440000,
  63. },
  64. }
  65. fmt.Println("增量bidding采购意向query:", query)
  66. sess := MgoB.GetMgoConn()
  67. defer MgoB.DestoryMongoConn(sess)
  68. ch := make(chan bool, 10)
  69. wg := &sync.WaitGroup{}
  70. // lock := &sync.Mutex{}
  71. fields := map[string]interface{}{
  72. "projectname": 1,
  73. "budget": 1,
  74. "bidamount": 1,
  75. "buyer": 1,
  76. "s_winner": 1,
  77. "agency": 1,
  78. "property_form": 1,
  79. "multipackage": 1,
  80. "area": 1,
  81. "city": 1,
  82. "district": 1,
  83. "buyerclass": 1,
  84. //
  85. "owner": 1,
  86. "s_topscopeclass": 1,
  87. "publishtime": 1,
  88. "toptype": 1,
  89. "extracttype": 1,
  90. "tag_subinformation": 1,
  91. "tag_subinformation_ai": 1,
  92. "tag_topinformation": 1,
  93. "tag_topinformation_ai": 1,
  94. }
  95. // arr := []map[string]interface{}{}
  96. it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Sort("-_id").Iter()
  97. n := 0
  98. // count := 0
  99. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  100. ch <- true
  101. wg.Add(1)
  102. go func(tmp map[string]interface{}) {
  103. defer func() {
  104. <-ch
  105. wg.Done()
  106. }()
  107. if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
  108. return
  109. }
  110. toptype := gconv.String(tmp["toptype"])
  111. // tag_topinformation := gconv.String(tmp["tag_topinformation"])
  112. // tag_topinformation_ai := gconv.String(tmp["tag_topinformation_ai"])
  113. var business_type string
  114. var project_bidstatus int
  115. if toptype == "采购意向" { //采购意向数据
  116. // if !strings.Contains(tag_topinformation, "物业") && !strings.Contains(tag_topinformation_ai, "物业") {
  117. // return
  118. // }
  119. business_type = "采购意向"
  120. project_bidstatus = 3
  121. } else if toptype == "拟建" {
  122. s_topscopeclass := gconv.String(tmp["s_topscopeclass"])
  123. // if !strings.Contains(s_topscopeclass, "建筑工程") || strings.Contains(tag_topinformation, "物业") || strings.Contains(tag_topinformation_ai, "物业") {
  124. if !strings.Contains(s_topscopeclass, "建筑工程") {
  125. return
  126. }
  127. business_type = "新增物业项目"
  128. project_bidstatus = 4
  129. } else {
  130. return
  131. }
  132. result := DealTransactionForBid(tmp, business_type, project_bidstatus)
  133. if !SaveDataToEs(result) { //保存、更新es
  134. fmt.Println("数据保存es失败,项目project_id", result["project_id"])
  135. }
  136. SaveDataToClickHouse(result)
  137. // lock.Lock()
  138. // if len(result) > 0 {
  139. // arr = append(arr, result)
  140. // count++
  141. // }
  142. // if len(arr) > 50 {
  143. // MgoPro.SaveBulk("projectset_wy", arr...)
  144. // arr = []map[string]interface{}{}
  145. // }
  146. // lock.Unlock()
  147. }(tmp)
  148. if n%1000 == 0 {
  149. fmt.Println("current:", n)
  150. }
  151. tmp = map[string]interface{}{}
  152. }
  153. wg.Wait()
  154. // if len(arr) > 0 {
  155. // MgoPro.SaveBulk("projectset_wy", arr...)
  156. // arr = []map[string]interface{}{}
  157. // }
  158. // fmt.Println("执行增量采购意向、拟建信息完毕", BidStartTime, endTime, count)
  159. // BidStartTime = endTime //替换
  160. }
  161. // DealTransactionForBid bidding采购意向、拟建数据处理
  162. func DealTransactionForBid(tmp map[string]interface{}, business_type string, project_bidstatus int) map[string]interface{} {
  163. //基本信息封装
  164. id := mongodb.BsonIdToSId(tmp["_id"])
  165. buyerclass := gconv.String(tmp["buyerclass"])
  166. buyer := gconv.String(tmp["buyer"])
  167. if buyer == "" {
  168. buyer = gconv.String(tmp["owner"])
  169. }
  170. winner := gconv.String(tmp["s_winner"])
  171. agency := gconv.String(tmp["agency"])
  172. property_form := []string{}
  173. if tmp["property_form"] != nil {
  174. property_form = gconv.Strings(tmp["property_form"])
  175. }
  176. bidamount := gconv.Float64(tmp["bidamount"])
  177. budget := gconv.Float64(tmp["budget"])
  178. money := bidamount
  179. if money <= 0 {
  180. money = budget
  181. }
  182. //物业分类
  183. subclass := []string{}
  184. if tag_subinformation := tmp["tag_subinformation"]; tag_subinformation != nil {
  185. subclass = gconv.Strings(tag_subinformation)
  186. } else if tag_subinformation_ai := tmp["tag_subinformation_ai"]; tag_subinformation_ai != nil {
  187. subclass = gconv.Strings(tag_subinformation_ai)
  188. }
  189. //情报信息查询
  190. // info := FindInfomationData(id)
  191. topscopeclass := []string{}
  192. s_topscopeclass := gconv.String(tmp["s_topscopeclass"])
  193. if s_topscopeclass != "" {
  194. topscopeclass = strings.Split(s_topscopeclass, ",")
  195. }
  196. //法人信息
  197. winners := []string{}
  198. if winner != "" {
  199. winners = strings.Split(winner, ",")
  200. }
  201. buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
  202. //物业信息
  203. t := &Transaction{
  204. Project_Id: id,
  205. Project_Name: gconv.String(tmp["projectname"]),
  206. Project_Budget: budget,
  207. Project_Bidamount: bidamount,
  208. Project_Money: money,
  209. Business_Type: business_type,
  210. Project_Bidstatus: project_bidstatus,
  211. Info_Id: id,
  212. Info_Ids: []string{id},
  213. // Information_Id: info.Id,
  214. BuyerClass: buyerclass,
  215. Buyer: buyer,
  216. Topscopeclass: topscopeclass,
  217. Winner: winners,
  218. Agency: agency,
  219. Buyer_Id: buyer_id,
  220. Winner_Id: winner_ids,
  221. Agency_Id: agency_id,
  222. Property_Form: property_form,
  223. SubClass: subclass,
  224. MultiPackage: gconv.Int(tmp["multipackage"]),
  225. Area: gconv.String(tmp["area"]),
  226. City: gconv.String(tmp["city"]),
  227. District: gconv.String(tmp["district"]),
  228. ZbTime: gconv.Int64(tmp["publishtime"]),
  229. JgTime: int64(0),
  230. // StartTime: info.Starttime,
  231. // EndTime: info.Endtime,
  232. Create_Time: time.Now().Unix(),
  233. Update_Time: time.Now().Unix(),
  234. //
  235. // From: "bidding",
  236. }
  237. result := map[string]interface{}{}
  238. infomation, _ := bson.Marshal(t)
  239. bson.Unmarshal(infomation, &result)
  240. return result
  241. }
  242. // IncTransactionDataFromProject 增量project
  243. func IncTransactionDataFromPro() {
  244. // endTime := GetTime(-1) //前一天凌晨
  245. // fmt.Println("开始执行增量项目信息", ProStartTime, endTime)
  246. // if ProStartTime >= endTime {
  247. // fmt.Println("增量项目信息查询异常:", ProStartTime, endTime)
  248. // return
  249. // }
  250. query := map[string]interface{}{
  251. "pici": map[string]interface{}{
  252. // "$gte": ProStartTime,
  253. "$lt": 1729440000,
  254. },
  255. }
  256. fmt.Println("增量项目查询query:", query)
  257. sess := MgoPro.GetMgoConn()
  258. defer MgoPro.DestoryMongoConn(sess)
  259. ch := make(chan bool, 10)
  260. wg := &sync.WaitGroup{}
  261. // lock := &sync.Mutex{}
  262. fields := map[string]interface{}{
  263. "projectname": 1,
  264. "budget": 1,
  265. "bidamount": 1,
  266. "buyer": 1,
  267. "s_winner": 1,
  268. "agency": 1,
  269. "property_form": 1,
  270. "multipackage": 1,
  271. "area": 1,
  272. "city": 1,
  273. "district": 1,
  274. "zbtime": 1,
  275. "jgtime": 1,
  276. "bidstatus": 1,
  277. "buyerclass": 1,
  278. "s_topscopeclass": 1,
  279. //
  280. "firsttime": 1,
  281. "pici": 1,
  282. "ids": 1,
  283. "sourceinfoid": 1,
  284. "tag_subinformation": 1,
  285. "tag_subinformation_ai": 1,
  286. "tag_topinformation": 1,
  287. "tag_topinformation_ai": 1,
  288. }
  289. // arr := [][]map[string]interface{}{}
  290. it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Sort("-_id").Iter()
  291. n := 0
  292. // count := 0
  293. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  294. ch <- true
  295. wg.Add(1)
  296. go func(tmp map[string]interface{}) {
  297. defer func() {
  298. <-ch
  299. wg.Done()
  300. }()
  301. // if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
  302. // return
  303. // }
  304. bidstatus := gconv.String(tmp["bidstatus"])
  305. if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "招标" {
  306. result := DealTransactionForPro(tmp)
  307. project_id := gconv.String(result["project_id"])
  308. if !SaveDataToEs(result) { //保存、更新es
  309. fmt.Println("数据保存es失败,项目project_id", result["project_id"])
  310. }
  311. err := SaveDataToClickHouse(result)
  312. if err != nil {
  313. fmt.Println("clickhouse保存失败", project_id, result)
  314. }
  315. }
  316. // count := FindClickHouseByProjectId(project_id) //查询
  317. // if count > 0 { //更新
  318. // delete(result, "create_time") //不更新创建时间
  319. // delete(result, "project_id") //不更新项目id(主键)
  320. // err = UpdateDataToClickHouse(result, map[string]interface{}{"project_id": project_id})
  321. // if err != nil {
  322. // fmt.Println("clickhouse更新失败", project_id, data)
  323. // }
  324. // } else { //插入
  325. // }
  326. // lock.Lock()
  327. // if len(result) > 0 {
  328. // count++
  329. // update := []map[string]interface{}{
  330. // {"project_id": mongodb.BsonIdToSId(tmp["_id"])},
  331. // {"$set": result},
  332. // }
  333. // arr = append(arr, update)
  334. // }
  335. // if len(arr) > 50 {
  336. // MgoPro.UpSertBulk("projectset_wy_back", arr...)
  337. // arr = [][]map[string]interface{}{}
  338. // }
  339. // lock.Unlock()
  340. }(tmp)
  341. if n%1000 == 0 {
  342. fmt.Println("current:", n)
  343. }
  344. tmp = map[string]interface{}{}
  345. }
  346. wg.Wait()
  347. // if len(arr) > 0 {
  348. // MgoPro.UpSertBulk("projectset_wy_back", arr...)
  349. // arr = [][]map[string]interface{}{}
  350. // }
  351. // fmt.Println("执行增量项目信息完毕", ProStartTime, endTime, count)
  352. // ProStartTime = endTime //替换
  353. }
  354. // DealTransactionForPro project数据处理
  355. func DealTransactionForPro(data map[string]interface{}) map[string]interface{} {
  356. //基本信息封装
  357. id := mongodb.BsonIdToSId(data["_id"])
  358. buyerclass := gconv.String(data["buyerclass"])
  359. buyer := gconv.String(data["buyer"])
  360. winner := gconv.String(data["s_winner"])
  361. agency := gconv.String(data["agency"])
  362. zbtime := gconv.Int64(data["zbtime"])
  363. if zbtime == 0 {
  364. zbtime = gconv.Int64(data["firsttime"])
  365. }
  366. property_form := []string{}
  367. if data["property_form"] != nil {
  368. property_form = gconv.Strings(data["property_form"])
  369. }
  370. bidamount := gconv.Float64(data["bidamount"])
  371. budget := gconv.Float64(data["budget"])
  372. money := bidamount
  373. if money <= 0 {
  374. money = budget
  375. }
  376. //物业分类
  377. subclass := []string{}
  378. if tag_subinformation := data["tag_subinformation"]; tag_subinformation != nil {
  379. subclass = gconv.Strings(tag_subinformation)
  380. } else if tag_subinformation_ai := data["tag_subinformation_ai"]; tag_subinformation_ai != nil {
  381. subclass = gconv.Strings(tag_subinformation_ai)
  382. }
  383. //项目状态、商机类型
  384. business_type := ""
  385. project_bidstatus := 2
  386. bidstatus := gconv.String(data["bidstatus"])
  387. if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" {
  388. project_bidstatus = 1
  389. business_type = "合约到期项目"
  390. } else if bidstatus == "废标" || bidstatus == "流标" {
  391. project_bidstatus = 0
  392. } else if bidstatus == "拟建" {
  393. project_bidstatus = 4
  394. } else if bidstatus == "招标" {
  395. business_type = "招标项目"
  396. }
  397. //查询情报信息
  398. ids := gconv.Strings(data["ids"])
  399. // info := FindInfomationData(ids...) //情报信息查询
  400. topscopeclass := []string{}
  401. s_topscopeclass := gconv.String(data["s_topscopeclass"])
  402. if s_topscopeclass != "" {
  403. topscopeclass = strings.Split(s_topscopeclass, ",")
  404. }
  405. //查询法人信息
  406. winners := []string{}
  407. if winner != "" {
  408. winners = strings.Split(winner, ",")
  409. }
  410. buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
  411. //物业信息
  412. t := &Transaction{
  413. Project_Id: id,
  414. Project_Name: gconv.String(data["projectname"]),
  415. Project_Budget: budget,
  416. Project_Bidamount: bidamount,
  417. Project_Money: money,
  418. Business_Type: business_type,
  419. Project_Bidstatus: project_bidstatus,
  420. Info_Id: gconv.String(data["sourceinfoid"]),
  421. Info_Ids: ids,
  422. // Information_Id: info.Id,
  423. BuyerClass: buyerclass,
  424. Buyer: buyer,
  425. Topscopeclass: topscopeclass,
  426. Winner: winners,
  427. Agency: agency,
  428. Buyer_Id: buyer_id,
  429. Winner_Id: winner_ids,
  430. Agency_Id: agency_id,
  431. Property_Form: property_form,
  432. SubClass: subclass,
  433. MultiPackage: gconv.Int(data["multipackage"]),
  434. Area: gconv.String(data["area"]),
  435. City: gconv.String(data["city"]),
  436. District: gconv.String(data["district"]),
  437. ZbTime: zbtime,
  438. JgTime: gconv.Int64(data["jgtime"]),
  439. // StartTime: info.Starttime,
  440. // EndTime: info.Endtime,
  441. Create_Time: time.Now().Unix(),
  442. Update_Time: time.Now().Unix(),
  443. //
  444. // From: "project",
  445. }
  446. result := map[string]interface{}{}
  447. infomation, _ := bson.Marshal(t)
  448. bson.Unmarshal(infomation, &result)
  449. return result
  450. }
  451. // IncTransactionDataMgoToCkhAndEs 数据迁移
  452. func IncTransactionDataMgoToCkhAndEs() {
  453. /*
  454. 数据根据update_time查询
  455. 1、采购意向数据(from=bidding)只插入
  456. 2、项目信息先查,有则更新,无则插入
  457. */
  458. fmt.Println("开始执行迁移...")
  459. sess := MgoPro.GetMgoConn()
  460. defer MgoPro.DestoryMongoConn(sess)
  461. ch := make(chan bool, 1)
  462. wg := &sync.WaitGroup{}
  463. query := map[string]interface{}{
  464. "update_time": map[string]interface{}{
  465. "$gte": GetTime(0),
  466. },
  467. }
  468. it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
  469. n := 0
  470. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  471. ch <- true
  472. wg.Add(1)
  473. go func(tmp map[string]interface{}) {
  474. defer func() {
  475. <-ch
  476. wg.Done()
  477. }()
  478. from := gconv.String(tmp["from"])
  479. delete(tmp, "from") //无用字段删除
  480. delete(tmp, "_id") //无用字段删除
  481. if !SaveDataToEs(tmp) { //保存、更新es
  482. fmt.Println("数据保存es失败,项目project_id", tmp["project_id"])
  483. }
  484. if from == "bidding" { //采购意向、拟建,插入
  485. SaveDataToClickHouse(tmp)
  486. } else { //项目信息,更新,插入
  487. UpdateOrSaveDataToClickHouse(tmp)
  488. }
  489. }(tmp)
  490. if n%100 == 0 {
  491. fmt.Println("current:", n)
  492. }
  493. tmp = map[string]interface{}{}
  494. }
  495. wg.Wait()
  496. fmt.Println("迁移结束...")
  497. }
  498. type Infomation struct {
  499. Id string
  500. Starttime int64
  501. Endtime int64
  502. }
  503. // FindInfomationData 情报信息查询
  504. func FindInfomationData(ids ...string) (info Infomation) {
  505. for _, id := range ids {
  506. query := fmt.Sprintf(`SELECT id,starttime,endtime FROM %s WHERE datajson_id = ?`, Config.ClickHouse.DataBase+".information")
  507. rows, err := CkhTool.Query(context.Background(), query, id)
  508. if err != nil {
  509. continue
  510. }
  511. for rows.Next() {
  512. info = Infomation{}
  513. if err := rows.Scan(&info.Id, &info.Starttime, &info.Endtime); err != nil {
  514. fmt.Println("查询情报信息异常:", id, err)
  515. }
  516. if info.Id != "" {
  517. return
  518. }
  519. //break //目前只有一条结果
  520. }
  521. }
  522. return
  523. }
  524. // FindEntInfoData 法人信息查询
  525. func FindEntInfoData(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) {
  526. winner_ids = []string{}
  527. winnerMap := map[string]bool{} //记录所有中标单位
  528. values := []interface{}{}
  529. placeholders := []string{}
  530. if buyer != "" {
  531. placeholders = append(placeholders, "?")
  532. values = append(values, buyer)
  533. }
  534. if len(winners) > 0 {
  535. for _, w := range winners {
  536. winnerMap[w] = true
  537. placeholders = append(placeholders, "?")
  538. values = append(values, w)
  539. }
  540. }
  541. if agency != "" {
  542. placeholders = append(placeholders, "?")
  543. values = append(values, agency)
  544. }
  545. if len(values) == 0 {
  546. return
  547. }
  548. query := fmt.Sprintf(`SELECT id,company_name FROM %s WHERE company_name IN (%s)`, Config.ClickHouse.DataBase+".ent_info", strings.Join(placeholders, ","))
  549. rows, err := CkhTool.Query(context.Background(), query, values...)
  550. if err != nil {
  551. return
  552. }
  553. for rows.Next() {
  554. var id, company_name string
  555. if err := rows.Scan(&id, &company_name); err == nil {
  556. if company_name == buyer {
  557. buyer_id = id
  558. } else if company_name == agency {
  559. agency_id = id
  560. } else if winnerMap[company_name] {
  561. winner_ids = append(winner_ids, id)
  562. }
  563. } else {
  564. fmt.Println("查询法人信息异常:", err, bid)
  565. }
  566. }
  567. return
  568. }
  569. // UpdateOrSaveDataToClickHouse 判断clickhouse更新or保存
  570. func UpdateOrSaveDataToClickHouse(data map[string]interface{}) (err error) {
  571. project_id := gconv.String(data["project_id"])
  572. count := FindClickHouseByProjectId(project_id) //查询
  573. if count > 0 { //更新
  574. delete(data, "create_time") //不更新创建时间
  575. delete(data, "project_id") //不更新项目id(主键)
  576. err = UpdateDataToClickHouse(data, map[string]interface{}{"project_id": project_id})
  577. if err != nil {
  578. fmt.Println("clickhouse更新失败", project_id, data)
  579. }
  580. } else { //插入
  581. err = SaveDataToClickHouse(data)
  582. if err != nil {
  583. fmt.Println("clickhouse保存失败", project_id, data)
  584. }
  585. }
  586. return
  587. }
  588. // SaveDataToClickHouse 数据保存clickhouse
  589. func SaveDataToClickHouse(data map[string]interface{}) error {
  590. fields, placeholders := []string{}, []string{}
  591. values := []interface{}{}
  592. for k, v := range data {
  593. fields = append(fields, k)
  594. values = append(values, v)
  595. placeholders = append(placeholders, "?")
  596. }
  597. query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", Config.ClickHouse.DataBase+".transaction_info_all", strings.Join(fields, ","), strings.Join(placeholders, ","))
  598. return CkhTool.Exec(context.Background(), query, values...)
  599. }
  600. // FindClickHouseByProjectId 根据条件count clickhouse
  601. func FindClickHouseByProjectId(project_id string) int {
  602. query := fmt.Sprintf(`SELECT COUNT(1) FROM %s WHERE project_id = ?`, Config.ClickHouse.DataBase+".transaction_info_all")
  603. row := CkhTool.QueryRow(context.Background(), query, project_id)
  604. var count uint64
  605. row.Scan(&count)
  606. return gconv.Int(count)
  607. }
  608. // UpdateDataToClickHouse 数据更新clickhouse
  609. func UpdateDataToClickHouse(data, querys map[string]interface{}) error {
  610. sets := []string{}
  611. values := []interface{}{}
  612. for k, v := range data {
  613. sets = append(sets, fmt.Sprintf("%s=?", k))
  614. values = append(values, v)
  615. }
  616. qs := []string{}
  617. for k, v := range querys {
  618. qs = append(qs, fmt.Sprintf("%s=?", k))
  619. values = append(values, v)
  620. }
  621. query := fmt.Sprintf("ALTER TABLE %s UPDATE %s WHERE %s", Config.ClickHouse.DataBase+".transaction_info", strings.Join(sets, ","), strings.Join(qs, ","))
  622. //query := `ALTER TABLE information.transaction_info UPDATE update_time = ? WHERE project_id = '5c9ee78ca5cb26b9b7fd0b57'`
  623. return CkhTool.Exec(context.Background(), query, values...)
  624. }
  625. // SaveDataToEs es存储
  626. func SaveDataToEs(data map[string]interface{}) bool {
  627. tmp := map[string]interface{}{}
  628. for k, v := range data {
  629. if k == "project_id" {
  630. k = "_id"
  631. } else if k == "winner" || k == "winner_id" { //winner和winner_id无值不进es
  632. if len(gconv.Strings(v)) == 0 {
  633. continue
  634. }
  635. }
  636. tmp[k] = v
  637. }
  638. err, result := Es.GetById(Config.Es.Index, gconv.String(tmp["_id"]))
  639. if err == nil && len(result) > 0 { //存在,更新
  640. tmp["create_time"] = result["create_time"] //不更新create_time
  641. }
  642. return Es.Save(Config.Es.Index, tmp)
  643. }
  644. func FindEntInfoData2(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) {
  645. query := fmt.Sprintf(`SELECT id FROM %s WHERE company_name = ?`, Config.ClickHouse.DataBase+".ent_info")
  646. if buyer != "" {
  647. buyer_id = GetClickHouseData(bid, query, buyer)
  648. }
  649. if agency != "" {
  650. agency_id = GetClickHouseData(bid, query, agency)
  651. }
  652. if len(winners) > 0 {
  653. for _, w := range winners {
  654. winner_id := GetClickHouseData(bid, query, w)
  655. if winner_id != "" {
  656. winner_ids = append(winner_ids, winner_id)
  657. }
  658. }
  659. }
  660. return
  661. }
  662. func GetClickHouseData(bid, query, value string) string {
  663. rows, err := CkhTool.Query(context.Background(), query, value)
  664. if err != nil {
  665. return ""
  666. }
  667. for rows.Next() {
  668. var id string
  669. if err := rows.Scan(&id); err == nil {
  670. return id
  671. } else {
  672. fmt.Println("查询情报信息异常:", err, bid)
  673. }
  674. }
  675. return ""
  676. }
  677. /*// SaveTransactionData 保存增量物业信息
  678. func SaveTransactionData() {
  679. fmt.Println("save projectset_wy...")
  680. savearr := make([]map[string]interface{}, 100)
  681. indexdb := 0
  682. for {
  683. select {
  684. case v := <-TransactionSaveCache:
  685. savearr[indexdb] = v
  686. indexdb++
  687. if indexdb == 100 {
  688. Transaction_Ch <- true
  689. go func(tmp []map[string]interface{}) {
  690. defer func() {
  691. <-Transaction_Ch
  692. }()
  693. MgoPro.SaveBulk("projectset_wy", tmp...)
  694. }(savearr)
  695. savearr = make([]map[string]interface{}, 100)
  696. indexdb = 0
  697. }
  698. case <-time.After(30 * time.Second):
  699. if indexdb > 0 {
  700. Transaction_Ch <- true
  701. go func(tmp []map[string]interface{}) {
  702. defer func() {
  703. <-Transaction_Ch
  704. }()
  705. MgoPro.SaveBulk("projectset_wy", tmp...)
  706. }(savearr[:indexdb])
  707. savearr = make([]map[string]interface{}, 100)
  708. indexdb = 0
  709. }
  710. }
  711. }
  712. }*/