package main import ( "fmt" "go.mongodb.org/mongo-driver/bson" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "log" "regexp" "strings" "sync" "time" ) var MgoSaveCache = make(chan map[string]interface{}, 5000) var SP = make(chan bool, 5) var updatePool = make(chan []map[string]interface{}, 5000) var updateSp = make(chan bool, 5) var mutex sync.Mutex //IncData 增量处理数据 func IncData() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) q := bson.M{"jy_updatetime": bson.M{"$gt": jyUpdatetime}} var zid int it := sess.DB("mixdata").C("company_change").Find(q).Select(nil).Iter() count := 0 ch := make(chan bool, 16) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%20000 == 0 { log.Println("current:", count) } if util.ObjToString(tmp["_operation_type"]) == "update" { continue } //废弃 if util.Int64All(tmp["use_flag"]) > 8 { continue } zid = util.IntAll(tmp["_id"]) ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() mutex.Lock() defer mutex.Unlock() // currentTime := time.Now().Unix() query := bson.M{"company_id": tmp["company_id"]} info, b := MgoMix.FindOneByField(CollSave, query, bson.M{"changes": 1}) //原来数据有changes 字段就更新,追加数据 if b && len(*info) > 0 { if util.ObjToString(tmp["_operation_type"]) == "insert" { update := make(map[string]interface{}) item := make(map[string]interface{}) item["change_field"] = tmp["change_field"] item["content_before"] = tmp["content_before"] item["content_after"] = tmp["content_after"] item["change_date"] = tmp["change_date"] setMark(item) //change_name_new //update["changes"] = changes update["update_time"] = currentTime saveInfo := map[string]interface{}{"$set": update, "$push": map[string]interface{}{"changes": item}} MgoMix.Update("qyxy_change", map[string]interface{}{"company_id": util.ObjToString(tmp["company_id"])}, saveInfo, true, false) } } else { //没有的直接写入 query := bson.M{"_id": tmp["company_id"]} qyxy, b1 := MgoMix.FindOne("qyxy_std", query) if b1 && len(*qyxy) > 0 { save := make(map[string]interface{}) var changes []map[string]interface{} item := make(map[string]interface{}) item["change_field"] = tmp["change_field"] item["content_before"] = tmp["content_before"] item["content_after"] = tmp["content_after"] item["change_date"] = tmp["change_date"] setMark(item) //change_name_new changes = append(changes, item) //save["company_name"] = (*qyxy)["company_name"] save["company_id"] = (*qyxy)["_id"] save["changes"] = changes save["create_time"] = currentTime save["update_time"] = currentTime //saveInfo := map[string]interface{}{"$set": save} MgoMix.Save(CollSave, save) } } }(tmp) tmp = map[string]interface{}{} } wg.Wait() util.Debug("over---", count, zid) } //IncByID 通过传入id 更新数据 func IncByID(mapinfo map[string]interface{}) { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) ch := make(chan bool, 16) wg := &sync.WaitGroup{} q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.Int64All(mapinfo["gtid"]), }, } var zid int it := sess.DB("mixdata").C("company_change").Find(q).Select(nil).Iter() count := 0 total := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%20000 == 0 { log.Println("current:", count) log.Println("current _id:", tmp["_id"], "total", total) } //表更时间 if util.ObjToString(tmp["change_date"]) < "2023-01-01" { continue } if util.ObjToString(tmp["_operation_type"]) == "update" { continue } //废弃 if util.Int64All(tmp["use_flag"]) > 8 { continue } zid = util.IntAll(tmp["_id"]) ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() mutex.Lock() defer mutex.Unlock() currentTime := time.Now().Unix() query := bson.M{"company_id": tmp["company_id"]} info, b := MgoMix.FindOneByField(CollSave, query, bson.M{"changes": 1}) //原来数据有changes 字段就更新,追加数据 if b && len(*info) > 0 { if util.ObjToString(tmp["_operation_type"]) == "insert" { update := make(map[string]interface{}) item := make(map[string]interface{}) item["change_field"] = tmp["change_field"] item["content_before"] = tmp["content_before"] item["content_after"] = tmp["content_after"] item["change_date"] = tmp["change_date"] setMark(item) //change_name_new //update["changes"] = changes update["update_time"] = currentTime saveInfo := map[string]interface{}{"$set": update, "$push": map[string]interface{}{"changes": item}} MgoMix.Update("qyxy_change", map[string]interface{}{"company_id": util.ObjToString(tmp["company_id"])}, saveInfo, true, false) total++ } } else { total++ //没有的直接写入 query := bson.M{"_id": tmp["company_id"]} qyxy, b1 := MgoMix.FindOne("qyxy_std", query) if b1 && len(*qyxy) > 0 { save := make(map[string]interface{}) var changes []map[string]interface{} item := make(map[string]interface{}) item["change_field"] = tmp["change_field"] item["content_before"] = tmp["content_before"] item["content_after"] = tmp["content_after"] item["change_date"] = tmp["change_date"] setMark(item) //change_name_new changes = append(changes, item) //save["company_name"] = (*qyxy)["company_name"] save["company_id"] = (*qyxy)["_id"] save["changes"] = changes save["create_time"] = currentTime save["update_time"] = currentTime //saveInfo := map[string]interface{}{"$set": save} MgoMix.Save(CollSave, save) } } }(tmp) tmp = map[string]interface{}{} } wg.Wait() util.Debug("over---", count, zid) } func setMark(tmp map[string]interface{}) { for _, v := range ChangeMap { str := util.ObjToString(tmp["change_field"]) regArr := v["change_key_reg"].([]string) for _, v1 := range regArr { matched, _ := regexp.MatchString(v1, str) if matched { tmp["change_name_new"] = v["change_name"] return } } } } func clearRepeat(list []interface{}) []interface{} { var tmp []interface{} if len(list) > 1 { for k, v := range list { if k < len(list)-1 { if fmt.Sprint(list[k]) != fmt.Sprint(list[k+1]) { tmp = append(tmp, v) } } else { tmp = append(tmp, v) } } return tmp } else { return list } } // TaskAll 存量数据 func TaskAll() { defer util.Catch() sess := MgoMix.GetMgoConn() defer MgoMix.DestoryMongoConn(sess) pool := make(chan bool, 10) wg := &sync.WaitGroup{} field := bson.M{"company_name": 1} it := sess.DB("mixdata").C(CollQy).Find(nil).Select(field).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%20000 == 0 { log.Println("current:", count, tmp["_id"]) } if strings.Contains(util.ObjToString(tmp["company_type"]), "个体") { continue } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() tmp["company_id"] = tmp["_id"] delete(tmp, "_id") query := bson.M{"company_id": tmp["company_id"]} info, b := Mgo.Find("company_change", query, nil, nil, false, -1, -1) if b && len(*info) > 0 { disposeFuc(*info, tmp) } }(tmp) tmp = map[string]interface{}{} } wg.Wait() } func taskInfo(id string) { update := make(map[string]interface{}) q := map[string]interface{}{"company_id": id} info, _ := Mgo.Find("company_change", q, nil, nil, false, -1, -1) if len(*info) > 0 { var changes []map[string]interface{} currentTime := time.Now().Unix() for _, v := range *info { item := make(map[string]interface{}) item["change_field"] = v["change_field"] item["content_before"] = v["content_before"] item["content_after"] = v["content_after"] item["change_date"] = v["change_date"] setMark(item) //change_name_new changes = append(changes, item) } update["changes"] = changes update["update_time"] = currentTime } util.Debug(update) MgoMix.Update("qyxy_change", q, map[string]interface{}{"$set": update}, false, false) } func disposeFuc(maps []map[string]interface{}, tmp map[string]interface{}) { var changes []map[string]interface{} currentTime := time.Now().Unix() for _, v := range maps { item := make(map[string]interface{}) item["change_field"] = v["change_field"] item["content_before"] = v["content_before"] item["content_after"] = v["content_after"] item["change_date"] = v["change_date"] setMark(item) //change_name_new changes = append(changes, item) } tmp["changes"] = changes tmp["create_time"] = currentTime tmp["update_time"] = currentTime MgoSaveCache <- tmp } //SaveData 存量保存 func SaveData() { log.Println("Mgo Save...") arru := make([]map[string]interface{}, 200) indexu := 0 for { select { case v := <-MgoSaveCache: arru[indexu] = v indexu++ if indexu == 200 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() MgoMix.SaveBulk(CollSave, arru...) }(arru) arru = make([]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() MgoMix.SaveBulk(CollSave, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, 200) indexu = 0 } } } } func updateMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 200 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoMix.UpSertBulk(CollSave, arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoMix.UpSertBulk(CollSave, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }