|
@@ -85,7 +85,6 @@ func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
|
|
|
func stareFusionData(tmpFusionMap map[string]string,isUpdateMap map[string]string) {
|
|
|
//根据重复组,重新划分新的组别
|
|
|
log.Println("开始融合操作......")
|
|
|
-
|
|
|
index,start :=0, int(time.Now().Unix())
|
|
|
//多线程保存数据
|
|
|
pool_mgo := make(chan bool, mgo_pool)
|
|
@@ -108,37 +107,41 @@ func stareFusionData(tmpFusionMap map[string]string,isUpdateMap map[string]strin
|
|
|
|
|
|
saveFusionData, saveRecordData:= map[string]interface{}{},map[string]interface{}{}
|
|
|
if fusionid!="" {//更新
|
|
|
- saveFusionData, saveRecordData:=weight.dealWithMultipleUpdateFusionStruct(fusionid)
|
|
|
- //更新融合表
|
|
|
- UpdateFusionPool.pool <- []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": StringTOBsonId(fusionid),
|
|
|
- },
|
|
|
- saveFusionData,
|
|
|
- }
|
|
|
- //更新日志表
|
|
|
- UpdateRecordPool.pool <- []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": StringTOBsonId(fusionid),
|
|
|
- },
|
|
|
- map[string]interface{}{
|
|
|
- "$set": saveRecordData,
|
|
|
- },
|
|
|
- }
|
|
|
- //更新分组表
|
|
|
- UpdateGroupPool.pool <- []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": StringTOBsonId(sourceid),
|
|
|
- },
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "allids":strings.Join(fusionArr, ","),
|
|
|
+ //去历史-当前未更新的融合数据
|
|
|
+ his_data := mgo.FindById(fusion_coll_name,fusionid)
|
|
|
+ if len(his_data)>2 && his_data!=nil {
|
|
|
+ saveFusionData, saveRecordData:=weight.dealWithMultipleUpdateFusionStruct(his_data)
|
|
|
+ //更新融合表
|
|
|
+ UpdateFusionPool.pool <- []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": StringTOBsonId(fusionid),
|
|
|
},
|
|
|
- },
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
+ saveFusionData,
|
|
|
+ }
|
|
|
+ //更新日志表
|
|
|
+ UpdateRecordPool.pool <- []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": StringTOBsonId(fusionid),
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": saveRecordData,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ //更新分组表
|
|
|
+ UpdateGroupPool.pool <- []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": StringTOBsonId(sourceid),
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "allids":strings.Join(fusionArr, ","),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ log.Println("问题很严重...未找到历史融合数据...",sourceid,"~",fusionid,"~",fusionArr)
|
|
|
|
|
|
+ }
|
|
|
}else {
|
|
|
if len(fusionArr) <= 1 {
|
|
|
saveFusionData, saveRecordData = weight.dealWithAddFusionStruct()
|