main.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gogf/gf/v2/util/gconv"
  7. "github.com/olivere/elastic/v7"
  8. "github.com/robfig/cron"
  9. "io"
  10. "log"
  11. "sync"
  12. )
  13. func init() {
  14. ReadConfig(&Config) //初始化
  15. InitMgo() //mgo
  16. InitCkh() //clickhouse
  17. InitEs() //es
  18. InitOther()
  19. }
  20. func main() {
  21. c := cron.New()
  22. //增量
  23. c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
  24. c.Start()
  25. //历史
  26. //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
  27. //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
  28. //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
  29. //临时处理(信息补充)
  30. //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
  31. //IncTransactionDataMgoToCkhAndEs() //数据迁移
  32. ch := make(chan bool)
  33. <-ch
  34. }
  35. func tmp() {
  36. sess := MgoPro.GetMgoConn()
  37. defer MgoPro.DestoryMongoConn(sess)
  38. ch := make(chan bool, 1)
  39. wg := &sync.WaitGroup{}
  40. //lock := &sync.Mutex{}
  41. query := map[string]interface{}{
  42. //"project_bidstatus": 4,
  43. //"_id": map[string]interface{}{
  44. // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"),
  45. //},
  46. //"update_time": map[string]interface{}{
  47. // "$lt": 1714959573,
  48. //},
  49. //"_id": mongodb.StringTOBsonId("6630eae76f6c86a3962f3a07"),
  50. "repeat": true,
  51. }
  52. repeat := map[string]bool{}
  53. count := MgoPro.Count("projectset_wy_back", query)
  54. fmt.Println("count:", count)
  55. it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
  56. n := 0
  57. //arr := []map[string]interface{}{}
  58. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  59. ch <- true
  60. wg.Add(1)
  61. go func(tmp map[string]interface{}) {
  62. defer func() {
  63. <-ch
  64. wg.Done()
  65. }()
  66. //update := []map[string]interface{}{}
  67. //project_id := gconv.String(tmp["project_id"])
  68. //lock.Lock()
  69. //if !repeat[project_id] {
  70. // Es.DelById(Config.Es.Index, project_id)
  71. // CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id)
  72. // repeat[project_id] = true
  73. //}
  74. //lock.Unlock()
  75. //err, result := Es.GetById(Config.Es.Index, project_id)
  76. //Es.DelById()
  77. //if err != nil || len(result) == 0 {
  78. // fmt.Println(project_id)
  79. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  80. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"es": false}})
  81. //} else {
  82. // if gconv.Int(result["project_bidstatus"]) != 0 {
  83. // fmt.Println("11", project_id)
  84. // }
  85. //}
  86. if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 {
  87. fmt.Println("project_id")
  88. //update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  89. //update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": true}})
  90. }
  91. //if len(update) > 0 {
  92. // lock.Lock()
  93. // arr = append(arr, update)
  94. // if len(arr) > 500 {
  95. // MgoPro.UpdateBulk("projectset_wy_back", arr...)
  96. // arr = [][]map[string]interface{}{}
  97. // }
  98. // lock.Unlock()
  99. //}
  100. }(tmp)
  101. if n%1000 == 0 {
  102. fmt.Println("current:", n)
  103. }
  104. tmp = map[string]interface{}{}
  105. }
  106. wg.Wait()
  107. //if len(arr) > 0 {
  108. // MgoPro.SaveBulk("projectset_wy_tmp2", arr...)
  109. // arr = []map[string]interface{}{}
  110. //}
  111. fmt.Println("迁移结束...", len(repeat))
  112. }
  113. func getBiddingData() {
  114. url := "http://172.17.4.184:19908"
  115. //url := "http://192.168.3.149:9200"
  116. username := "jybid"
  117. password := "Top2023_JEB01i@31"
  118. index := "transaction_info" //索引名称
  119. // 创建 Elasticsearch 客户端
  120. client, err := elastic.NewClient(
  121. elastic.SetURL(url),
  122. elastic.SetBasicAuth(username, password),
  123. elastic.SetSniff(false),
  124. )
  125. if err != nil {
  126. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  127. }
  128. //rangeQuery := elastic.NewRangeQuery("project_bidstatus").Lt(3)
  129. //query := elastic.NewBoolQuery().
  130. // Must(elastic.NewTermQuery("id", "652d2f432ccb08936fe2ed60")) //
  131. //Must(rangeQuery)
  132. //Must(elastic.NewTermQuery("subtype", "招标"))
  133. //query := elastic.NewBoolQuery().
  134. // //北京,天津,河北,上海,江苏,浙江,安徽
  135. // //Must(elastic.NewTermQuery("area", "北京市")).
  136. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  137. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  138. // Must(rangeQuery)
  139. ctx := context.Background()
  140. //开始滚动搜索
  141. scrollID := ""
  142. scroll := "10m"
  143. searchSource := elastic.NewSearchSource().
  144. //Query(query).
  145. Size(500)
  146. //Sort("_doc", true) //升序排序
  147. //Sort("_doc", false) //降序排序
  148. searchService := client.Scroll(index).
  149. Size(500).
  150. Scroll(scroll).
  151. SearchSource(searchSource)
  152. res, err := searchService.Do(ctx)
  153. if err != nil {
  154. if err == io.EOF {
  155. fmt.Println("没有数据")
  156. } else {
  157. panic(err)
  158. }
  159. }
  160. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  161. fmt.Println("总数是:", res.TotalHits())
  162. total := 0
  163. for len(res.Hits.Hits) > 0 {
  164. for _, hit := range res.Hits.Hits {
  165. var doc map[string]interface{}
  166. err := json.Unmarshal(hit.Source, &doc)
  167. if err != nil {
  168. log.Printf("解析文档失败:%s", err)
  169. continue
  170. }
  171. id := gconv.String(doc["id"])
  172. //情报
  173. info_ids := gconv.Strings(doc["info_ids"])
  174. info := FindInfomationData(info_ids...)
  175. doc["information_id"] = info.Id
  176. doc["starttime"] = info.Starttime
  177. doc["endtime"] = info.Endtime
  178. set := map[string]interface{}{
  179. "information_id": info.Id,
  180. "starttime": info.Starttime,
  181. "endtime": info.Endtime,
  182. }
  183. //法人
  184. buyer, agency := "", ""
  185. winners := []string{}
  186. if gconv.String(doc["buyer_id"]) == "" {
  187. buyer = gconv.String(doc["buyer"])
  188. }
  189. if gconv.String(doc["agency_id"]) == "" {
  190. agency = gconv.String(doc["agency"])
  191. }
  192. if len(gconv.Strings(doc["winner_id"])) == 0 {
  193. if winnersTmp := gconv.Strings(doc["winner"]); len(winnersTmp) > 0 {
  194. winners = winnersTmp
  195. }
  196. }
  197. buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
  198. if buyer_id != "" {
  199. doc["buyer_id"] = buyer_id
  200. set["buyer_id"] = buyer_id
  201. }
  202. if agency_id != "" {
  203. doc["agency_id"] = agency_id
  204. set["agency_id"] = agency_id
  205. }
  206. if len(winner_ids) > 0 {
  207. doc["winner_id"] = winner_ids
  208. set["winner_id"] = winner_ids
  209. }
  210. //
  211. coll := "projectset_wy_back"
  212. business_type := gconv.String(doc["business_type"])
  213. if business_type == "新增项目" {
  214. coll = "projectset_wy"
  215. doc["business_type"] = "新增物业项目"
  216. set["business_type"] = "新增物业项目"
  217. } else if business_type == "采购意向" {
  218. coll = "projectset_wy"
  219. }
  220. //更新es
  221. client.Update().Index(index).Id(id).Doc(doc).Do(context.Background())
  222. //更新clickhouse
  223. UpdateDataToClickHouse(set, map[string]interface{}{"project_id": id})
  224. //更新mgo
  225. MgoPro.Update(coll, map[string]interface{}{"project_id": id}, map[string]interface{}{"$set": set}, false, false)
  226. }
  227. total = total + len(res.Hits.Hits)
  228. scrollID = res.ScrollId
  229. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  230. log.Println("current count:", total)
  231. if err != nil {
  232. if err == io.EOF {
  233. // 滚动到最后一批数据,退出循环
  234. break
  235. }
  236. log.Println("滚动搜索失败:", err, res)
  237. break // 处理错误时退出循环
  238. }
  239. }
  240. // 在循环外调用 ClearScroll
  241. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  242. if err != nil {
  243. log.Printf("清理滚动搜索失败:%s", err)
  244. }
  245. fmt.Println("结束~~~~~~~~~~~~~~~")
  246. }