main.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gogf/gf/v2/util/gconv"
  7. "github.com/olivere/elastic/v7"
  8. "github.com/robfig/cron"
  9. "io"
  10. "log"
  11. "sync"
  12. )
  13. func init() {
  14. ReadConfig(&Config) //初始化
  15. InitMgo() //mgo
  16. InitCkh() //clickhouse
  17. InitEs() //es
  18. InitOther()
  19. }
  20. func main() {
  21. c := cron.New()
  22. //增量
  23. c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
  24. c.Start()
  25. //历史
  26. //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
  27. //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
  28. //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
  29. //临时处理(信息补充)
  30. //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
  31. //IncTransactionDataMgoToCkhAndEs() //数据迁移
  32. ch := make(chan bool)
  33. <-ch
  34. }
  35. func tmp() {
  36. sess := MgoPro.GetMgoConn()
  37. defer MgoPro.DestoryMongoConn(sess)
  38. ch := make(chan bool, 5)
  39. wg := &sync.WaitGroup{}
  40. lock := &sync.Mutex{}
  41. query := map[string]interface{}{
  42. //"project_bidstatus": 4,
  43. //"_id": map[string]interface{}{
  44. // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"),
  45. //},
  46. //"update_time": map[string]interface{}{
  47. // "$lt": 1714959573,
  48. //},
  49. //"_id": mongodb.StringTOBsonId("6630eae76f6c86a3962f3a07"),
  50. //"repeat": true,
  51. "update_time": map[string]interface{}{
  52. "$gte": 1714959573,
  53. "$lte": 1719795791,
  54. },
  55. }
  56. repeat := map[string]bool{}
  57. count := MgoPro.Count("projectset_wy_back", query)
  58. fmt.Println("count:", count)
  59. it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
  60. n := 0
  61. arr := [][]map[string]interface{}{}
  62. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  63. ch <- true
  64. wg.Add(1)
  65. go func(tmp map[string]interface{}) {
  66. defer func() {
  67. <-ch
  68. wg.Done()
  69. }()
  70. update := []map[string]interface{}{}
  71. project_id := gconv.String(tmp["project_id"])
  72. //lock.Lock()
  73. //if !repeat[project_id] {
  74. //Es.DelById(Config.Es.Index, project_id)
  75. //CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id)
  76. //repeat[project_id] = true
  77. //}
  78. //lock.Unlock()
  79. //err, result := Es.GetById(Config.Es.Index, project_id)
  80. //Es.DelById()
  81. //if err != nil || len(result) == 0 {
  82. // fmt.Println(project_id)
  83. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  84. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"es": false}})
  85. //} else {
  86. // if gconv.Int(result["project_bidstatus"]) != 0 {
  87. // fmt.Println("11", project_id)
  88. // }
  89. //}
  90. tt := map[string]bool{}
  91. err, result := Es.GetById(Config.Es.Index, gconv.String(tmp["_id"]))
  92. if err != nil || len(result) == 0 {
  93. tt["es"] = true
  94. }
  95. if FindClickHouseByProjectId(project_id) == 0 {
  96. tt["click"] = true
  97. }
  98. if len(tt) > 0 {
  99. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  100. update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": tt}})
  101. }
  102. //if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 {
  103. // fmt.Println("project_id")
  104. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  105. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "project"}})
  106. //} else if MgoB.Count("projectset_wy", map[string]interface{}{"project_id": tmp["project_id"]}) > 0 {
  107. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  108. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "bidding"}})
  109. //}
  110. if len(update) > 0 {
  111. lock.Lock()
  112. arr = append(arr, update)
  113. if len(arr) > 500 {
  114. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  115. arr = [][]map[string]interface{}{}
  116. }
  117. lock.Unlock()
  118. }
  119. }(tmp)
  120. if n%1000 == 0 {
  121. fmt.Println("current:", n)
  122. }
  123. tmp = map[string]interface{}{}
  124. }
  125. wg.Wait()
  126. if len(arr) > 0 {
  127. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  128. arr = [][]map[string]interface{}{}
  129. }
  130. fmt.Println("迁移结束...", len(repeat))
  131. }
  132. func getBiddingData() {
  133. url := "http://172.17.4.184:19908"
  134. //url := "http://127.0.0.1:19908"
  135. //url := "http://192.168.3.149:9200"
  136. username := "jybid"
  137. password := "Top2023_JEB01i@31"
  138. index := "transaction_info" //索引名称
  139. // 创建 Elasticsearch 客户端
  140. client, err := elastic.NewClient(
  141. elastic.SetURL(url),
  142. elastic.SetBasicAuth(username, password),
  143. elastic.SetSniff(false),
  144. )
  145. if err != nil {
  146. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  147. }
  148. //rangeQuery := elastic.NewRangeQuery("project_bidstatus").Lt(3)
  149. query := elastic.NewBoolQuery().
  150. Must(elastic.NewTermQuery("id", "64d7146cb44bf08751e3c133")) //
  151. //Must(rangeQuery)
  152. //Must(elastic.NewTermQuery("subtype", "招标"))
  153. //query := elastic.NewBoolQuery().
  154. // //北京,天津,河北,上海,江苏,浙江,安徽
  155. // //Must(elastic.NewTermQuery("area", "北京市")).
  156. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  157. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  158. // Must(rangeQuery)
  159. ctx := context.Background()
  160. //开始滚动搜索
  161. scrollID := ""
  162. scroll := "10m"
  163. searchSource := elastic.NewSearchSource().
  164. Query(query).
  165. Size(500)
  166. //Sort("_doc", true) //升序排序
  167. //Sort("_doc", false) //降序排序
  168. searchService := client.Scroll(index).
  169. Size(500).
  170. Scroll(scroll).
  171. SearchSource(searchSource)
  172. res, err := searchService.Do(ctx)
  173. if err != nil {
  174. if err == io.EOF {
  175. fmt.Println("没有数据")
  176. } else {
  177. panic(err)
  178. }
  179. }
  180. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  181. fmt.Println("总数是:", res.TotalHits())
  182. total := 0
  183. for len(res.Hits.Hits) > 0 {
  184. for _, hit := range res.Hits.Hits {
  185. var doc map[string]interface{}
  186. err := json.Unmarshal(hit.Source, &doc)
  187. if err != nil {
  188. log.Printf("解析文档失败:%s", err)
  189. continue
  190. }
  191. set := map[string]interface{}{}
  192. id := gconv.String(doc["id"])
  193. //情报
  194. information_id := gconv.String(doc["information_id"])
  195. info_ids := gconv.Strings(doc["info_ids"])
  196. info := FindInfomationData(info_ids...)
  197. if information_id != info.Id {
  198. doc["information_id"] = info.Id
  199. doc["starttime"] = info.Starttime
  200. doc["endtime"] = info.Endtime
  201. set["information_id"] = info.Id
  202. set["starttime"] = info.Starttime
  203. set["endtime"] = info.Endtime
  204. }
  205. //法人
  206. buyer, agency := "", ""
  207. winners := []string{}
  208. if gconv.String(doc["buyer_id"]) == "" {
  209. buyer = gconv.String(doc["buyer"])
  210. }
  211. if gconv.String(doc["agency_id"]) == "" {
  212. agency = gconv.String(doc["agency"])
  213. }
  214. if len(gconv.Strings(doc["winner_id"])) == 0 {
  215. if winnersTmp := gconv.Strings(doc["winner"]); len(winnersTmp) > 0 {
  216. winners = winnersTmp
  217. }
  218. }
  219. buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
  220. if buyer_id != "" {
  221. doc["buyer_id"] = buyer_id
  222. set["buyer_id"] = buyer_id
  223. }
  224. if agency_id != "" {
  225. doc["agency_id"] = agency_id
  226. set["agency_id"] = agency_id
  227. }
  228. if len(winner_ids) > 0 {
  229. doc["winner_id"] = winner_ids
  230. set["winner_id"] = winner_ids
  231. }
  232. //
  233. coll := "projectset_wy_back"
  234. business_type := gconv.String(doc["business_type"])
  235. if business_type == "新增项目" {
  236. coll = "projectset_wy"
  237. doc["business_type"] = "新增物业项目"
  238. set["business_type"] = "新增物业项目"
  239. } else if business_type == "采购意向" {
  240. coll = "projectset_wy"
  241. }
  242. fmt.Println(set)
  243. if len(set) > 0 {
  244. //更新es
  245. client.Update().Index(index).Id(id).Doc(doc).Do(context.Background())
  246. //更新clickhouse
  247. err := UpdateDataToClickHouse(set, map[string]interface{}{"project_id": id})
  248. fmt.Println("11", err)
  249. //更新mgo
  250. MgoPro.Update(coll, map[string]interface{}{"project_id": id}, map[string]interface{}{"$set": set}, false, false)
  251. }
  252. }
  253. total = total + len(res.Hits.Hits)
  254. scrollID = res.ScrollId
  255. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  256. log.Println("current count:", total)
  257. if err != nil {
  258. if err == io.EOF {
  259. // 滚动到最后一批数据,退出循环
  260. break
  261. }
  262. log.Println("滚动搜索失败:", err, res)
  263. break // 处理错误时退出循环
  264. }
  265. }
  266. // 在循环外调用 ClearScroll
  267. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  268. if err != nil {
  269. log.Printf("清理滚动搜索失败:%s", err)
  270. }
  271. fmt.Println("结束~~~~~~~~~~~~~~~")
  272. }