full.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package extract
  2. import (
  3. "data_ai/ul"
  4. log "github.com/donnie4w/go-logger/logger"
  5. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "sync"
  7. )
  8. // 获取已存在数据···
  9. func getExistsInfo() map[string]interface{} {
  10. log.Debug("开始构建已存在数据···")
  11. sess := ul.SourceMgo.GetMgoConn()
  12. defer ul.SourceMgo.DestoryMongoConn(sess)
  13. dict := map[string]interface{}{}
  14. q, total := map[string]interface{}{}, 0
  15. it := sess.DB(ul.SourceMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("_id").Iter()
  16. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  17. if total%100000 == 0 {
  18. log.Debug("cur ai index ", total, tmp["_id"])
  19. }
  20. tmpid := ul.BsonTOStringId(tmp["_id"])
  21. dict[tmpid] = ""
  22. tmp = make(map[string]interface{})
  23. }
  24. log.Debug("is exists ...", total, "~", len(dict))
  25. return dict
  26. }
  27. // 识别结构化字段
  28. func ExtractFullInfo(eid string) {
  29. q := map[string]interface{}{
  30. "_id": map[string]interface{}{
  31. "$lt": ul.StringTOBsonId(eid),
  32. },
  33. }
  34. //6776b8000000000000000000 , 1月3日
  35. log.Debug("刷历史存量数据语句:", q)
  36. ul.FlashModel = "glm-4-flash"
  37. pool_mgo := make(chan bool, ul.Reading)
  38. wg_mgo := &sync.WaitGroup{}
  39. sess := ul.BidMgo.GetMgoConn()
  40. defer ul.BidMgo.DestoryMongoConn(sess)
  41. total := 0
  42. it := sess.DB(ul.BidMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("-_id").Iter()
  43. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  44. if total%5000 == 0 {
  45. log.Debug("cur ai index ", total, tmp["_id"])
  46. }
  47. tmpid := ul.BsonTOStringId(tmp["_id"])
  48. if tmpid == "" {
  49. tmp = make(map[string]interface{})
  50. continue
  51. }
  52. pool_mgo <- true
  53. wg_mgo.Add(1)
  54. go func(tmp map[string]interface{}) {
  55. defer func() {
  56. <-pool_mgo
  57. wg_mgo.Done()
  58. }()
  59. infoformat := qu.IntAll(tmp["infoformat"])
  60. if infoformat == 1 || infoformat == 0 { //正常数据处理···
  61. u_id := ul.BsonTOStringId(tmp["_id"])
  62. data := ResolveInfo(tmp, u_id)
  63. if len(data) > 0 && u_id != "" {
  64. tmp["ai_zhipu"] = data
  65. update_info := make(map[string]interface{}, 0)
  66. is_unset := ul.ChooseCheckDataAI(tmp, &update_info)
  67. if update_info["com_package"] == nil { //构建单包信息···
  68. com_package := ul.CreatSingleFieldInfo(tmp, update_info)
  69. update_info["com_package"] = com_package
  70. }
  71. update_info["ai_zhipu"] = data
  72. //清洗与记录
  73. if len(update_info) > 0 {
  74. //$set
  75. ul.SourceMgo.UpdateById(ul.Ext_Name, u_id, map[string]interface{}{
  76. "$set": update_info,
  77. })
  78. }
  79. if is_unset {
  80. //"$unset"
  81. ul.SourceMgo.UpdateById(ul.Ext_Name, u_id, map[string]interface{}{
  82. "$unset": ul.Unset_Check,
  83. })
  84. }
  85. }
  86. }
  87. }(tmp)
  88. tmp = make(map[string]interface{})
  89. }
  90. wg_mgo.Wait()
  91. log.Debug("ai is over ...", total)
  92. }
  93. func MovingFullInfoCopy(sid string, eid string) {
  94. q := map[string]interface{}{
  95. "_id": map[string]interface{}{
  96. "$gte": ul.StringTOBsonId(sid),
  97. },
  98. }
  99. log.Debug("迁移语句:", q)
  100. pool_mgo := make(chan bool, ul.Reading)
  101. wg_mgo := &sync.WaitGroup{}
  102. sess := ul.SourceMgo.GetMgoConn()
  103. defer ul.SourceMgo.DestoryMongoConn(sess)
  104. total := 0
  105. it := sess.DB(ul.SourceMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("_id").Iter()
  106. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  107. if total%1000 == 0 {
  108. log.Debug("cur move index ", total, tmp["_id"])
  109. }
  110. pool_mgo <- true
  111. wg_mgo.Add(1)
  112. go func(tmp map[string]interface{}) {
  113. defer func() {
  114. <-pool_mgo
  115. wg_mgo.Done()
  116. }()
  117. delete(tmp, "detail")
  118. delete(tmp, "contenthtml")
  119. ul.SourceMgo.Save("bidding_copy", tmp)
  120. }(tmp)
  121. tmp = make(map[string]interface{})
  122. }
  123. wg_mgo.Wait()
  124. log.Debug("move is over ...", total)
  125. }