package main import ( "encoding/json" "go.mongodb.org/mongo-driver/bson/primitive" "log" mu "mfw/util" "net" "os" "qfw/common/src/qfw/util" qu "qfw/util" "strconv" "sync" "time" ) var ( sysconfig map[string]interface{} //配置文件 mgo *MongodbSim //mongodb操作对象 udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //下节点数组 coll_name string fusion_coll_name string record_coll_name string //表名 NoNeedFusionKey map[string]interface{} //不需要融合的key UpdateFusion *updateFusionInfo UpdateRecord *updateRecordInfo //更新池 ) func initMgo() { mconf := sysconfig["mongodb"].(map[string]interface{}) log.Println(mconf) mgo = &MongodbSim{ MongodbAddr: mconf["addrName"].(string), DbName: mconf["dbName"].(string), Size: qu.IntAllDef(mconf["pool"], 10), } mgo.InitPool() coll_name = mconf["collName"].(string) fusion_coll_name = sysconfig["fusion_coll_name"].(string) record_coll_name = sysconfig["record_coll_name"].(string) NoNeedFusionKey = sysconfig["notFusionKey"].(map[string]interface{}) } func init() { //加载配置文件 qu.ReadConfig(&sysconfig) initMgo() //更新池 UpdateFusion = newUpdateFusionPool() go UpdateFusion.updateFusionData() UpdateRecord = newUpdateRecordPool() go UpdateRecord.updateRecordData() log.Println("采用udp模式") } func mainT() { go checkMapJob() updport := sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } //快速测试使用 func main() { sid := "100000000000000000000000" eid := "900000000000000000000000" //log.Println(sid, "---", eid) mapinfo := map[string]interface{}{} if sid == "" || eid == "" { log.Println("sid,eid参数不能为空") os.Exit(0) } mapinfo["gtid"] = sid mapinfo["lteid"] = eid startTask([]byte{}, mapinfo) time.Sleep(99999 * time.Hour) } //融合具体方法 func startTask(data []byte, mapInfo map[string]interface{}) { log.Println("开始融合流程") defer qu.Catch() //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(mapInfo["gtid"].(string)), "$lte": StringTOBsonId(mapInfo["lteid"].(string)), }, } log.Println("查询条件:",q) sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter() //编译不同的融合组,如何划分组 /***********************/ /***********************/ /***y ********************/ /***********************/ fusionDataGroupArr := make([][]string,0) //待融合组 addOrUpdateArr := make([]bool,0) //新增-bool-记录-组新增,组更新 infoFusionArr := make([]map[string]interface{},0) //记录取融合表的数据 repeatArr,sourceArr,index := make([]string,0),make([]string,0),0 //重复数据组 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if index%1000 == 0 { log.Println("current index",index,tmp["_id"]) } tmpId:=BsonTOStringId(tmp["_id"]) repeat:=qu.IntAll(tmp["repeat"]) sourceid:=qu.ObjToString(tmp["repeat_id"]) if repeat==1 { repeatArr = append(repeatArr,tmpId) sourceArr = append(sourceArr,sourceid) }else { fusionDataGroupArr = append(fusionDataGroupArr,[]string{tmpId}) addOrUpdateArr = append(addOrUpdateArr,false) infoFusionArr = append(infoFusionArr, map[string]interface{}{}) } tmp = make(map[string]interface{}) } log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr)) //根据重复组,重新划分新的组别 for i:=0;igt_time&&cur_time<=lte_time { return true } return false }