full.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package extract
  2. import (
  3. "data_ai/ul"
  4. log "github.com/donnie4w/go-logger/logger"
  5. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "sync"
  7. )
  8. // 获取已存在数据···
  9. func getExistsInfo() map[string]interface{} {
  10. log.Debug("开始构建已存在数据···")
  11. sess := ul.SourceMgo.GetMgoConn()
  12. defer ul.SourceMgo.DestoryMongoConn(sess)
  13. dict := map[string]interface{}{}
  14. q, total := map[string]interface{}{}, 0
  15. it := sess.DB(ul.SourceMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("_id").Iter()
  16. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  17. if total%100000 == 0 {
  18. log.Debug("cur ai index ", total, tmp["_id"])
  19. }
  20. tmpid := ul.BsonTOStringId(tmp["_id"])
  21. dict[tmpid] = ""
  22. tmp = make(map[string]interface{})
  23. }
  24. log.Debug("is exists ...", total, "~", len(dict))
  25. return dict
  26. }
  27. // 识别结构化字段
  28. func MovingFullInfo(sid string, eid string) {
  29. q := map[string]interface{}{
  30. "_id": map[string]interface{}{
  31. "$lt": ul.StringTOBsonId(eid),
  32. },
  33. }
  34. log.Debug("迁移语句:", q)
  35. ul.FlashModel = "glm-4-flash"
  36. pool_mgo := make(chan bool, ul.Reading)
  37. wg_mgo := &sync.WaitGroup{}
  38. sess := ul.BidMgo.GetMgoConn()
  39. defer ul.BidMgo.DestoryMongoConn(sess)
  40. total := 0
  41. it := sess.DB(ul.BidMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("-_id").Iter()
  42. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  43. if total%1000 == 0 {
  44. log.Debug("cur ai index ", total, tmp["_id"])
  45. }
  46. tmpid := ul.BsonTOStringId(tmp["_id"])
  47. if tmpid == "" {
  48. tmp = make(map[string]interface{})
  49. continue
  50. }
  51. pool_mgo <- true
  52. wg_mgo.Add(1)
  53. go func(tmp map[string]interface{}) {
  54. defer func() {
  55. <-pool_mgo
  56. wg_mgo.Done()
  57. }()
  58. infoformat := qu.IntAll(tmp["infoformat"])
  59. if infoformat == 1 || infoformat == 0 { //正常数据处理···
  60. data := ResolveInfo(tmp)
  61. if len(data) > 0 {
  62. tmp["ai_zhipu"] = data
  63. update_check := make(map[string]interface{}, 0)
  64. is_unset := ul.ChooseCheckDataAI(tmp, &update_check)
  65. for k, v := range update_check {
  66. tmp[k] = v //覆盖值
  67. }
  68. if is_unset {
  69. for k, _ := range ul.Unset_Check {
  70. delete(tmp, k) //删除值
  71. }
  72. }
  73. }
  74. }
  75. //迁移数据···
  76. delete(tmp, "detail")
  77. delete(tmp, "contenthtml")
  78. ul.SourceMgo.Save(ul.Bid_Name, tmp)
  79. }(tmp)
  80. tmp = make(map[string]interface{})
  81. }
  82. wg_mgo.Wait()
  83. log.Debug("ai is over ...", total)
  84. }
  85. func MovingFullInfoCopy(sid string, eid string) {
  86. q := map[string]interface{}{
  87. "_id": map[string]interface{}{
  88. "$gte": ul.StringTOBsonId(sid),
  89. },
  90. }
  91. log.Debug("迁移语句:", q)
  92. pool_mgo := make(chan bool, ul.Reading)
  93. wg_mgo := &sync.WaitGroup{}
  94. sess := ul.SourceMgo.GetMgoConn()
  95. defer ul.SourceMgo.DestoryMongoConn(sess)
  96. total := 0
  97. it := sess.DB(ul.SourceMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("_id").Iter()
  98. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  99. if total%1000 == 0 {
  100. log.Debug("cur move index ", total, tmp["_id"])
  101. }
  102. pool_mgo <- true
  103. wg_mgo.Add(1)
  104. go func(tmp map[string]interface{}) {
  105. defer func() {
  106. <-pool_mgo
  107. wg_mgo.Done()
  108. }()
  109. delete(tmp, "detail")
  110. delete(tmp, "contenthtml")
  111. ul.SourceMgo.Save("bidding_copy", tmp)
  112. }(tmp)
  113. tmp = make(map[string]interface{})
  114. }
  115. wg_mgo.Wait()
  116. log.Debug("move is over ...", total)
  117. }