full.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package extract
  2. import (
  3. "data_ai/ul"
  4. log "github.com/donnie4w/go-logger/logger"
  5. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "sync"
  7. )
  8. // 获取已存在数据···
  9. func getExistsInfo() map[string]interface{} {
  10. log.Debug("开始构建已存在数据···")
  11. sess := ul.SourceMgo.GetMgoConn()
  12. defer ul.SourceMgo.DestoryMongoConn(sess)
  13. dict := map[string]interface{}{}
  14. q, total := map[string]interface{}{}, 0
  15. it := sess.DB(ul.SourceMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("_id").Iter()
  16. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  17. if total%100000 == 0 {
  18. log.Debug("cur ai index ", total, tmp["_id"])
  19. }
  20. tmpid := ul.BsonTOStringId(tmp["_id"])
  21. dict[tmpid] = ""
  22. tmp = make(map[string]interface{})
  23. }
  24. log.Debug("is exists ...", total, "~", len(dict))
  25. return dict
  26. }
  27. // 识别结构化字段
  28. func MovingFullInfo(sid string, eid string) {
  29. dict := getExistsInfo()
  30. q := map[string]interface{}{
  31. "_id": map[string]interface{}{
  32. "$lt": ul.StringTOBsonId(eid),
  33. },
  34. }
  35. ul.FlashModel = "glm-4-flash"
  36. pool_mgo := make(chan bool, ul.Reading)
  37. wg_mgo := &sync.WaitGroup{}
  38. sess := ul.BidMgo.GetMgoConn()
  39. defer ul.BidMgo.DestoryMongoConn(sess)
  40. total := 0
  41. it := sess.DB(ul.BidMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("-_id").Iter()
  42. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  43. if total%1000 == 0 {
  44. log.Debug("cur ai index ", total, tmp["_id"])
  45. }
  46. tmpid := ul.BsonTOStringId(tmp["_id"])
  47. if tmpid == "" || dict[tmpid] != nil { //已存在数据···不迁移
  48. tmp = make(map[string]interface{})
  49. continue
  50. }
  51. pool_mgo <- true
  52. wg_mgo.Add(1)
  53. go func(tmp map[string]interface{}) {
  54. defer func() {
  55. <-pool_mgo
  56. wg_mgo.Done()
  57. }()
  58. infoformat := qu.IntAll(tmp["infoformat"])
  59. if infoformat == 1 { //正常数据处理···
  60. data := ResolveInfo(tmp)
  61. if len(data) > 0 {
  62. tmp["ai_zhipu"] = data
  63. update_check := make(map[string]interface{}, 0)
  64. is_unset := ul.ChooseCheckDataAI(tmp, &update_check)
  65. for k, v := range update_check {
  66. tmp[k] = v //覆盖值
  67. }
  68. if is_unset {
  69. for k, _ := range ul.Unset_Check {
  70. delete(tmp, k) //删除值
  71. }
  72. }
  73. }
  74. }
  75. //迁移数据···
  76. ul.SourceMgo.Save(ul.Bid_Name, tmp)
  77. }(tmp)
  78. tmp = make(map[string]interface{})
  79. }
  80. wg_mgo.Wait()
  81. log.Debug("ai is over ...", total)
  82. }