main.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "sync"
  9. "github.com/gogf/gf/v2/util/gconv"
  10. "github.com/olivere/elastic/v7"
  11. "github.com/robfig/cron"
  12. )
  13. func init() {
  14. ReadConfig(&Config) //初始化
  15. InitMgo() //mgo
  16. InitCkh() //clickhouse
  17. InitEs() //es
  18. InitOther()
  19. }
  20. func main() {
  21. c := cron.New()
  22. c.AddFunc("0 0 4 ? * *", IncTransactionDataFromBid) //增量bidding和项目数据
  23. c.Start()
  24. d := cron.New()
  25. d.AddFunc("0 0 5 ? * *", IncTransactionDataFromPro) //增量bidding和项目数据
  26. d.Start()
  27. //历史
  28. //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
  29. //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
  30. //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
  31. //临时处理(信息补充)
  32. //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
  33. //IncTransactionDataMgoToCkhAndEs() //数据迁移
  34. ch := make(chan bool)
  35. <-ch
  36. }
  37. func tmp() {
  38. sess := MgoPro.GetMgoConn()
  39. defer MgoPro.DestoryMongoConn(sess)
  40. ch := make(chan bool, 5)
  41. wg := &sync.WaitGroup{}
  42. lock := &sync.Mutex{}
  43. query := map[string]interface{}{
  44. //"project_bidstatus": 4,
  45. //"_id": map[string]interface{}{
  46. // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"),
  47. //},
  48. //"update_time": map[string]interface{}{
  49. // "$lt": 1714959573,
  50. //},
  51. //"_id": mongodb.StringTOBsonId("6630eae76f6c86a3962f3a07"),
  52. //"repeat": true,
  53. "update_time": map[string]interface{}{
  54. "$gte": 1714959573,
  55. "$lte": 1719795791,
  56. },
  57. }
  58. repeat := map[string]bool{}
  59. count := MgoPro.Count("projectset_wy_back", query)
  60. fmt.Println("count:", count)
  61. it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
  62. n := 0
  63. arr := [][]map[string]interface{}{}
  64. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  65. ch <- true
  66. wg.Add(1)
  67. go func(tmp map[string]interface{}) {
  68. defer func() {
  69. <-ch
  70. wg.Done()
  71. }()
  72. update := []map[string]interface{}{}
  73. project_id := gconv.String(tmp["project_id"])
  74. //lock.Lock()
  75. //if !repeat[project_id] {
  76. //Es.DelById(Config.Es.Index, project_id)
  77. //CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id)
  78. //repeat[project_id] = true
  79. //}
  80. //lock.Unlock()
  81. //err, result := Es.GetById(Config.Es.Index, project_id)
  82. //Es.DelById()
  83. //if err != nil || len(result) == 0 {
  84. // fmt.Println(project_id)
  85. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  86. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"es": false}})
  87. //} else {
  88. // if gconv.Int(result["project_bidstatus"]) != 0 {
  89. // fmt.Println("11", project_id)
  90. // }
  91. //}
  92. tt := map[string]bool{}
  93. err, result := Es.GetById(Config.Es.Index, gconv.String(tmp["_id"]))
  94. if err != nil || len(result) == 0 {
  95. tt["es"] = true
  96. }
  97. if FindClickHouseByProjectId(project_id) == 0 {
  98. tt["click"] = true
  99. }
  100. if len(tt) > 0 {
  101. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  102. update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": tt}})
  103. }
  104. //if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 {
  105. // fmt.Println("project_id")
  106. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  107. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "project"}})
  108. //} else if MgoB.Count("projectset_wy", map[string]interface{}{"project_id": tmp["project_id"]}) > 0 {
  109. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  110. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "bidding"}})
  111. //}
  112. if len(update) > 0 {
  113. lock.Lock()
  114. arr = append(arr, update)
  115. if len(arr) > 500 {
  116. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  117. arr = [][]map[string]interface{}{}
  118. }
  119. lock.Unlock()
  120. }
  121. }(tmp)
  122. if n%1000 == 0 {
  123. fmt.Println("current:", n)
  124. }
  125. tmp = map[string]interface{}{}
  126. }
  127. wg.Wait()
  128. if len(arr) > 0 {
  129. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  130. arr = [][]map[string]interface{}{}
  131. }
  132. fmt.Println("迁移结束...", len(repeat))
  133. }
  134. func getBiddingData() {
  135. url := "http://172.17.4.184:19908"
  136. //url := "http://127.0.0.1:19908"
  137. //url := "http://192.168.3.149:9200"
  138. username := "jybid"
  139. password := "Top2023_JEB01i@31"
  140. index := "transaction_info" //索引名称
  141. // 创建 Elasticsearch 客户端
  142. client, err := elastic.NewClient(
  143. elastic.SetURL(url),
  144. elastic.SetBasicAuth(username, password),
  145. elastic.SetSniff(false),
  146. )
  147. if err != nil {
  148. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  149. }
  150. //rangeQuery := elastic.NewRangeQuery("project_bidstatus").Lt(3)
  151. query := elastic.NewBoolQuery().
  152. Must(elastic.NewTermQuery("id", "64d7146cb44bf08751e3c133")) //
  153. //Must(rangeQuery)
  154. //Must(elastic.NewTermQuery("subtype", "招标"))
  155. //query := elastic.NewBoolQuery().
  156. // //北京,天津,河北,上海,江苏,浙江,安徽
  157. // //Must(elastic.NewTermQuery("area", "北京市")).
  158. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  159. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  160. // Must(rangeQuery)
  161. ctx := context.Background()
  162. //开始滚动搜索
  163. scrollID := ""
  164. scroll := "10m"
  165. searchSource := elastic.NewSearchSource().
  166. Query(query).
  167. Size(500)
  168. //Sort("_doc", true) //升序排序
  169. //Sort("_doc", false) //降序排序
  170. searchService := client.Scroll(index).
  171. Size(500).
  172. Scroll(scroll).
  173. SearchSource(searchSource)
  174. res, err := searchService.Do(ctx)
  175. if err != nil {
  176. if err == io.EOF {
  177. fmt.Println("没有数据")
  178. } else {
  179. panic(err)
  180. }
  181. }
  182. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  183. fmt.Println("总数是:", res.TotalHits())
  184. total := 0
  185. for len(res.Hits.Hits) > 0 {
  186. for _, hit := range res.Hits.Hits {
  187. var doc map[string]interface{}
  188. err := json.Unmarshal(hit.Source, &doc)
  189. if err != nil {
  190. log.Printf("解析文档失败:%s", err)
  191. continue
  192. }
  193. set := map[string]interface{}{}
  194. id := gconv.String(doc["id"])
  195. //情报
  196. information_id := gconv.String(doc["information_id"])
  197. info_ids := gconv.Strings(doc["info_ids"])
  198. info := FindInfomationData(info_ids...)
  199. if information_id != info.Id {
  200. doc["information_id"] = info.Id
  201. doc["starttime"] = info.Starttime
  202. doc["endtime"] = info.Endtime
  203. set["information_id"] = info.Id
  204. set["starttime"] = info.Starttime
  205. set["endtime"] = info.Endtime
  206. }
  207. //法人
  208. buyer, agency := "", ""
  209. winners := []string{}
  210. if gconv.String(doc["buyer_id"]) == "" {
  211. buyer = gconv.String(doc["buyer"])
  212. }
  213. if gconv.String(doc["agency_id"]) == "" {
  214. agency = gconv.String(doc["agency"])
  215. }
  216. if len(gconv.Strings(doc["winner_id"])) == 0 {
  217. if winnersTmp := gconv.Strings(doc["winner"]); len(winnersTmp) > 0 {
  218. winners = winnersTmp
  219. }
  220. }
  221. buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
  222. if buyer_id != "" {
  223. doc["buyer_id"] = buyer_id
  224. set["buyer_id"] = buyer_id
  225. }
  226. if agency_id != "" {
  227. doc["agency_id"] = agency_id
  228. set["agency_id"] = agency_id
  229. }
  230. if len(winner_ids) > 0 {
  231. doc["winner_id"] = winner_ids
  232. set["winner_id"] = winner_ids
  233. }
  234. //
  235. coll := "projectset_wy_back"
  236. business_type := gconv.String(doc["business_type"])
  237. if business_type == "新增项目" {
  238. coll = "projectset_wy"
  239. doc["business_type"] = "新增物业项目"
  240. set["business_type"] = "新增物业项目"
  241. } else if business_type == "采购意向" {
  242. coll = "projectset_wy"
  243. }
  244. fmt.Println(set)
  245. if len(set) > 0 {
  246. //更新es
  247. client.Update().Index(index).Id(id).Doc(doc).Do(context.Background())
  248. //更新clickhouse
  249. err := UpdateDataToClickHouse(set, map[string]interface{}{"project_id": id})
  250. fmt.Println("11", err)
  251. //更新mgo
  252. MgoPro.Update(coll, map[string]interface{}{"project_id": id}, map[string]interface{}{"$set": set}, false, false)
  253. }
  254. }
  255. total = total + len(res.Hits.Hits)
  256. scrollID = res.ScrollId
  257. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  258. log.Println("current count:", total)
  259. if err != nil {
  260. if err == io.EOF {
  261. // 滚动到最后一批数据,退出循环
  262. break
  263. }
  264. log.Println("滚动搜索失败:", err, res)
  265. break // 处理错误时退出循环
  266. }
  267. }
  268. // 在循环外调用 ClearScroll
  269. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  270. if err != nil {
  271. log.Printf("清理滚动搜索失败:%s", err)
  272. }
  273. fmt.Println("结束~~~~~~~~~~~~~~~")
  274. }