package main /** 生成中标企业库 redis去重后存入mongodb库,然后调用下一个节点生成索引 **/ import ( "encoding/json" "fmt" "log" mu "mfw/util" "net" "qfw/util" //"qfw/util/elastic" "qfw/util/mongodb" "qfw/util/redis" "sync" "time" ) var ( Sysconfig map[string]interface{} //配置文件 mconf map[string]interface{} //mongodb配置信息 mgo *mongodb.MongodbSim //mongodb操作对象 udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //下节点数组 toaddr = []*net.UDPAddr{} //下节点对象 collect string //保存的库名 redisLock = &sync.Mutex{} ) func init() { util.ReadConfig(&Sysconfig) collect, _ = Sysconfig["collect"].(string) nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) for _, m := range nextNode { toaddr = append(toaddr, &net.UDPAddr{ IP: net.ParseIP(m["addr"].(string)), Port: util.IntAll(m["port"]), }) } mconf = Sysconfig["mongodb"].(map[string]interface{}) mgo = &mongodb.MongodbSim{ MongodbAddr: mconf["addr"].(string), DbName: mconf["db"].(string), Size: util.IntAllDef(mconf["pool"], 5), } mgo.InitPool() rconf := Sysconfig["redis"].(map[string]interface{}) redis.InitRedisBySize(rconf["addr"].(string), util.IntAllDef(rconf["pool"], 5), 5, 240) } func main() { go checkMapJob() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 //从表中开始处理生成企业数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { go task(data, mapInfo) key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Println("ok:", ok) udptaskmap.Delete(ok) } } } //开始判重程序 func task(data []byte, mapInfo map[string]interface{}) { defer util.Catch() //区间id sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } it := sess.DB(mgo.DbName).C(mconf["collect"].(string)).Find(&q).Iter() nameArr := []map[string]interface{}{} nameLock := &sync.Mutex{} pool := make(chan bool, 8) wg := &sync.WaitGroup{} n, newN := 0, 0 pici := time.Now().Unix() for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() //names := []string{} names := []map[string]string{} winner, _ := tmp["winner"].(string) if len(winner) > 9 { //names = append(names, winner) names = append(names, map[string]string{"name": winner, "id": util.BsonIdToSId(tmp["_id"]), "type": "winner"}) } winnerOrder, _ := tmp["winnerorder"].([]map[string]interface{}) if len(winnerOrder) > 0 { for _, m := range winnerOrder { wm := m["entname"].(string) if len(wm) > 9 { //names = append(names, wm) names = append(names, map[string]string{"name": wm, "id": util.BsonIdToSId(tmp["_id"]), "type": "winnerorder"}) } } } package1 := tmp["package"] if package1 != nil { packageM := package1.(map[string]interface{}) for _, p := range packageM { pm := p.(map[string]interface{}) pw, _ := pm["winner"].(string) if len(pw) > 9 { //names = append(names, pw) names = append(names, map[string]string{"name": pw, "id": util.BsonIdToSId(tmp["_id"]), "type": "package"}) } pwo, _ := pm["winnerorder"].([]map[string]interface{}) if len(pwo) > 0 { for _, m := range pwo { wm := m["entname"].(string) if len(wm) > 9 { //names = append(names, wm) names = append(names, map[string]string{"name": wm, "id": util.BsonIdToSId(tmp["_id"]), "type": "pwinnerorder"}) } } } } } if len(names) > 0 { for _, tmp := range names { redisLock.Lock() b, _ := redis.Exists("winner", tmp["name"]) if !b { go IS.Add("normal") newN++ redis.PutCKV("winner", tmp["name"], 1) nameLock.Lock() nameArr = append(nameArr, map[string]interface{}{ "name": tmp["name"], "sid": tmp["id"], "type": tmp["type"], "pici": pici, }) nameLock.Unlock() } else { go IS.Add("repeat") } redisLock.Unlock() } } if n%1000 == 0 { log.Println("current:", n) } nameLock.Lock() if len(nameArr) >= 800 { mgo.SaveBulk(collect, nameArr...) nameArr = []map[string]interface{}{} } nameLock.Unlock() }(tmp) tmp = make(map[string]interface{}) } wg.Wait() if len(nameArr) > 0 { mgo.SaveBulk(collect, nameArr...) } log.Println("this task over.", n, "newN:", newN) //任务完成,开始发送广播通知下面节点 if newN > 0 && mapInfo["stop"] == nil { mapInfo["stype"] = "winner" for n, to := range toaddr { key := fmt.Sprintf("%d-%s-%d", pici, "winner", n) mapInfo["query"] = map[string]interface{}{ "pici": pici, } mapInfo["key"] = key data, _ = json.Marshal(mapInfo) node := &udpNode{data, to, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(data, mu.OP_TYPE_DATA, to) } } }