main.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "sync"
  9. "github.com/gogf/gf/v2/util/gconv"
  10. "github.com/olivere/elastic/v7"
  11. // "github.com/robfig/cron"
  12. )
  13. func init() {
  14. ReadConfig(&Config) //初始化
  15. InitMgo() //mgo
  16. InitCkh() //clickhouse
  17. InitEs() //es
  18. InitOther()
  19. }
  20. func main() {
  21. // c := cron.New()
  22. //增量
  23. // c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
  24. // c.Start()
  25. IncTransactionDataFromBidAndPro()
  26. //历史
  27. //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
  28. //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
  29. //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
  30. //临时处理(信息补充)
  31. //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
  32. //IncTransactionDataMgoToCkhAndEs() //数据迁移
  33. ch := make(chan bool)
  34. <-ch
  35. }
  36. func tmp() {
  37. sess := MgoPro.GetMgoConn()
  38. defer MgoPro.DestoryMongoConn(sess)
  39. ch := make(chan bool, 5)
  40. wg := &sync.WaitGroup{}
  41. lock := &sync.Mutex{}
  42. query := map[string]interface{}{
  43. //"project_bidstatus": 4,
  44. //"_id": map[string]interface{}{
  45. // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"),
  46. //},
  47. //"update_time": map[string]interface{}{
  48. // "$lt": 1714959573,
  49. //},
  50. //"_id": mongodb.StringTOBsonId("6630eae76f6c86a3962f3a07"),
  51. //"repeat": true,
  52. "update_time": map[string]interface{}{
  53. "$gte": 1714959573,
  54. "$lte": 1719795791,
  55. },
  56. }
  57. repeat := map[string]bool{}
  58. count := MgoPro.Count("projectset_wy_back", query)
  59. fmt.Println("count:", count)
  60. it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
  61. n := 0
  62. arr := [][]map[string]interface{}{}
  63. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  64. ch <- true
  65. wg.Add(1)
  66. go func(tmp map[string]interface{}) {
  67. defer func() {
  68. <-ch
  69. wg.Done()
  70. }()
  71. update := []map[string]interface{}{}
  72. project_id := gconv.String(tmp["project_id"])
  73. //lock.Lock()
  74. //if !repeat[project_id] {
  75. //Es.DelById(Config.Es.Index, project_id)
  76. //CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id)
  77. //repeat[project_id] = true
  78. //}
  79. //lock.Unlock()
  80. //err, result := Es.GetById(Config.Es.Index, project_id)
  81. //Es.DelById()
  82. //if err != nil || len(result) == 0 {
  83. // fmt.Println(project_id)
  84. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  85. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"es": false}})
  86. //} else {
  87. // if gconv.Int(result["project_bidstatus"]) != 0 {
  88. // fmt.Println("11", project_id)
  89. // }
  90. //}
  91. tt := map[string]bool{}
  92. err, result := Es.GetById(Config.Es.Index, gconv.String(tmp["_id"]))
  93. if err != nil || len(result) == 0 {
  94. tt["es"] = true
  95. }
  96. if FindClickHouseByProjectId(project_id) == 0 {
  97. tt["click"] = true
  98. }
  99. if len(tt) > 0 {
  100. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  101. update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": tt}})
  102. }
  103. //if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 {
  104. // fmt.Println("project_id")
  105. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  106. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "project"}})
  107. //} else if MgoB.Count("projectset_wy", map[string]interface{}{"project_id": tmp["project_id"]}) > 0 {
  108. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  109. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "bidding"}})
  110. //}
  111. if len(update) > 0 {
  112. lock.Lock()
  113. arr = append(arr, update)
  114. if len(arr) > 500 {
  115. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  116. arr = [][]map[string]interface{}{}
  117. }
  118. lock.Unlock()
  119. }
  120. }(tmp)
  121. if n%1000 == 0 {
  122. fmt.Println("current:", n)
  123. }
  124. tmp = map[string]interface{}{}
  125. }
  126. wg.Wait()
  127. if len(arr) > 0 {
  128. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  129. arr = [][]map[string]interface{}{}
  130. }
  131. fmt.Println("迁移结束...", len(repeat))
  132. }
  133. func getBiddingData() {
  134. url := "http://172.17.4.184:19908"
  135. //url := "http://127.0.0.1:19908"
  136. //url := "http://192.168.3.149:9200"
  137. username := "jybid"
  138. password := "Top2023_JEB01i@31"
  139. index := "transaction_info" //索引名称
  140. // 创建 Elasticsearch 客户端
  141. client, err := elastic.NewClient(
  142. elastic.SetURL(url),
  143. elastic.SetBasicAuth(username, password),
  144. elastic.SetSniff(false),
  145. )
  146. if err != nil {
  147. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  148. }
  149. //rangeQuery := elastic.NewRangeQuery("project_bidstatus").Lt(3)
  150. query := elastic.NewBoolQuery().
  151. Must(elastic.NewTermQuery("id", "64d7146cb44bf08751e3c133")) //
  152. //Must(rangeQuery)
  153. //Must(elastic.NewTermQuery("subtype", "招标"))
  154. //query := elastic.NewBoolQuery().
  155. // //北京,天津,河北,上海,江苏,浙江,安徽
  156. // //Must(elastic.NewTermQuery("area", "北京市")).
  157. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  158. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  159. // Must(rangeQuery)
  160. ctx := context.Background()
  161. //开始滚动搜索
  162. scrollID := ""
  163. scroll := "10m"
  164. searchSource := elastic.NewSearchSource().
  165. Query(query).
  166. Size(500)
  167. //Sort("_doc", true) //升序排序
  168. //Sort("_doc", false) //降序排序
  169. searchService := client.Scroll(index).
  170. Size(500).
  171. Scroll(scroll).
  172. SearchSource(searchSource)
  173. res, err := searchService.Do(ctx)
  174. if err != nil {
  175. if err == io.EOF {
  176. fmt.Println("没有数据")
  177. } else {
  178. panic(err)
  179. }
  180. }
  181. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  182. fmt.Println("总数是:", res.TotalHits())
  183. total := 0
  184. for len(res.Hits.Hits) > 0 {
  185. for _, hit := range res.Hits.Hits {
  186. var doc map[string]interface{}
  187. err := json.Unmarshal(hit.Source, &doc)
  188. if err != nil {
  189. log.Printf("解析文档失败:%s", err)
  190. continue
  191. }
  192. set := map[string]interface{}{}
  193. id := gconv.String(doc["id"])
  194. //情报
  195. information_id := gconv.String(doc["information_id"])
  196. info_ids := gconv.Strings(doc["info_ids"])
  197. info := FindInfomationData(info_ids...)
  198. if information_id != info.Id {
  199. doc["information_id"] = info.Id
  200. doc["starttime"] = info.Starttime
  201. doc["endtime"] = info.Endtime
  202. set["information_id"] = info.Id
  203. set["starttime"] = info.Starttime
  204. set["endtime"] = info.Endtime
  205. }
  206. //法人
  207. buyer, agency := "", ""
  208. winners := []string{}
  209. if gconv.String(doc["buyer_id"]) == "" {
  210. buyer = gconv.String(doc["buyer"])
  211. }
  212. if gconv.String(doc["agency_id"]) == "" {
  213. agency = gconv.String(doc["agency"])
  214. }
  215. if len(gconv.Strings(doc["winner_id"])) == 0 {
  216. if winnersTmp := gconv.Strings(doc["winner"]); len(winnersTmp) > 0 {
  217. winners = winnersTmp
  218. }
  219. }
  220. buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
  221. if buyer_id != "" {
  222. doc["buyer_id"] = buyer_id
  223. set["buyer_id"] = buyer_id
  224. }
  225. if agency_id != "" {
  226. doc["agency_id"] = agency_id
  227. set["agency_id"] = agency_id
  228. }
  229. if len(winner_ids) > 0 {
  230. doc["winner_id"] = winner_ids
  231. set["winner_id"] = winner_ids
  232. }
  233. //
  234. coll := "projectset_wy_back"
  235. business_type := gconv.String(doc["business_type"])
  236. if business_type == "新增项目" {
  237. coll = "projectset_wy"
  238. doc["business_type"] = "新增物业项目"
  239. set["business_type"] = "新增物业项目"
  240. } else if business_type == "采购意向" {
  241. coll = "projectset_wy"
  242. }
  243. fmt.Println(set)
  244. if len(set) > 0 {
  245. //更新es
  246. client.Update().Index(index).Id(id).Doc(doc).Do(context.Background())
  247. //更新clickhouse
  248. err := UpdateDataToClickHouse(set, map[string]interface{}{"project_id": id})
  249. fmt.Println("11", err)
  250. //更新mgo
  251. MgoPro.Update(coll, map[string]interface{}{"project_id": id}, map[string]interface{}{"$set": set}, false, false)
  252. }
  253. }
  254. total = total + len(res.Hits.Hits)
  255. scrollID = res.ScrollId
  256. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  257. log.Println("current count:", total)
  258. if err != nil {
  259. if err == io.EOF {
  260. // 滚动到最后一批数据,退出循环
  261. break
  262. }
  263. log.Println("滚动搜索失败:", err, res)
  264. break // 处理错误时退出循环
  265. }
  266. }
  267. // 在循环外调用 ClearScroll
  268. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  269. if err != nil {
  270. log.Printf("清理滚动搜索失败:%s", err)
  271. }
  272. fmt.Println("结束~~~~~~~~~~~~~~~")
  273. }