123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- package main
- /**
- 生成中标企业库
- redis去重后存入mongodb库,然后调用下一个节点生成索引
- **/
- import (
- "encoding/json"
- "fmt"
- "log"
- mu "mfw/util"
- "net"
- "qfw/util"
- //"qfw/util/elastic"
- "qfw/util/mongodb"
- "qfw/util/redis"
- "sync"
- "time"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- mconf map[string]interface{} //mongodb配置信息
- mgo *mongodb.MongodbSim //mongodb操作对象
- udpclient mu.UdpClient //udp对象
- nextNode []map[string]interface{} //下节点数组
- toaddr = []*net.UDPAddr{} //下节点对象
- collect string //保存的库名
- redisLock = &sync.Mutex{}
- )
- func init() {
- util.ReadConfig(&Sysconfig)
- collect, _ = Sysconfig["collect"].(string)
- nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
- for _, m := range nextNode {
- toaddr = append(toaddr, &net.UDPAddr{
- IP: net.ParseIP(m["addr"].(string)),
- Port: util.IntAll(m["port"]),
- })
- }
- mconf = Sysconfig["mongodb"].(map[string]interface{})
- mgo = &mongodb.MongodbSim{
- MongodbAddr: mconf["addr"].(string),
- DbName: mconf["db"].(string),
- Size: util.IntAllDef(mconf["pool"], 5),
- }
- mgo.InitPool()
- rconf := Sysconfig["redis"].(map[string]interface{})
- redis.InitRedisBySize(rconf["addr"].(string), util.IntAllDef(rconf["pool"], 5), 5, 240)
- }
- func main() {
- go checkMapJob()
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- time.Sleep(99999 * time.Hour)
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- //从表中开始处理生成企业数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Println("err:", err, "mapInfo:", mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- go task(data, mapInfo)
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- }
- case mu.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Println("ok:", ok)
- udptaskmap.Delete(ok)
- }
- }
- }
- //开始判重程序
- func task(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- //区间id
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- it := sess.DB(mgo.DbName).C(mconf["collect"].(string)).Find(&q).Iter()
- nameArr := []map[string]interface{}{}
- nameLock := &sync.Mutex{}
- pool := make(chan bool, 8)
- wg := &sync.WaitGroup{}
- n, newN := 0, 0
- pici := time.Now().Unix()
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- //names := []string{}
- names := []map[string]string{}
- winner, _ := tmp["winner"].(string)
- if len(winner) > 9 {
- //names = append(names, winner)
- names = append(names, map[string]string{"name": winner, "id": util.BsonIdToSId(tmp["_id"]), "type": "winner"})
- }
- winnerOrder, _ := tmp["winnerorder"].([]map[string]interface{})
- if len(winnerOrder) > 0 {
- for _, m := range winnerOrder {
- wm := m["entname"].(string)
- if len(wm) > 9 {
- //names = append(names, wm)
- names = append(names, map[string]string{"name": wm, "id": util.BsonIdToSId(tmp["_id"]), "type": "winnerorder"})
- }
- }
- }
- package1 := tmp["package"]
- if package1 != nil {
- packageM := package1.(map[string]interface{})
- for _, p := range packageM {
- pm := p.(map[string]interface{})
- pw, _ := pm["winner"].(string)
- if len(pw) > 9 {
- //names = append(names, pw)
- names = append(names, map[string]string{"name": pw, "id": util.BsonIdToSId(tmp["_id"]), "type": "package"})
- }
- pwo, _ := pm["winnerorder"].([]map[string]interface{})
- if len(pwo) > 0 {
- for _, m := range pwo {
- wm := m["entname"].(string)
- if len(wm) > 9 {
- //names = append(names, wm)
- names = append(names, map[string]string{"name": wm, "id": util.BsonIdToSId(tmp["_id"]), "type": "pwinnerorder"})
- }
- }
- }
- }
- }
- if len(names) > 0 {
- for _, tmp := range names {
- redisLock.Lock()
- b, _ := redis.Exists("winner", tmp["name"])
- if !b {
- go IS.Add("normal")
- newN++
- redis.PutCKV("winner", tmp["name"], 1)
- nameLock.Lock()
- nameArr = append(nameArr, map[string]interface{}{
- "name": tmp["name"],
- "sid": tmp["id"],
- "type": tmp["type"],
- "pici": pici,
- })
- nameLock.Unlock()
- } else {
- go IS.Add("repeat")
- }
- redisLock.Unlock()
- }
- }
- if n%1000 == 0 {
- log.Println("current:", n)
- }
- nameLock.Lock()
- if len(nameArr) >= 800 {
- mgo.SaveBulk(collect, nameArr...)
- nameArr = []map[string]interface{}{}
- }
- nameLock.Unlock()
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- if len(nameArr) > 0 {
- mgo.SaveBulk(collect, nameArr...)
- }
- log.Println("this task over.", n, "newN:", newN)
- //任务完成,开始发送广播通知下面节点
- if newN > 0 && mapInfo["stop"] == nil {
- mapInfo["stype"] = "winner"
- for n, to := range toaddr {
- key := fmt.Sprintf("%d-%s-%d", pici, "winner", n)
- mapInfo["query"] = map[string]interface{}{
- "pici": pici,
- }
- mapInfo["key"] = key
- data, _ = json.Marshal(mapInfo)
- node := &udpNode{data, to, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(data, mu.OP_TYPE_DATA, to)
- }
- }
- }
|