package main import ( "encoding/json" "fmt" du "jy/util" "log" mu "mfw/util" "net" "qfw/util" "qfw/util/mongodb" "qfw/util/redis" "sync" "time" "gopkg.in/mgo.v2/bson" ) const ( REDISIDS = "ids" REDISKEYS = "keys" INFOID = "info" INFOTIMEOUT = 86400 * 30 ) var ( Sysconfig map[string]interface{} MQFW mongodb.MongodbSim extractColl, projectColl string lenprojectname int udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //下节点数组 toaddr = []*net.UDPAddr{} //下节点对象 MultiThread chan bool SingleThread = make(chan bool, 1) //udp调用 IdLock = &sync.Mutex{} PncbMayLock = &sync.Mutex{} MegerFieldsLen *MegerFields //三组lock,对应的(PNKey)key为项目名称,值对应的是此项目名称对应的项目id数组 PNKey, PCKey, PBKey = NewKeyMap(), NewKeyMap(), NewKeyMap() PNKeyMap, PCKeyMap, PBKeyMap = sync.Map{}, sync.Map{}, sync.Map{} ) type MegerFields struct { ProjectNamelen int ProjectCodelen int } type KeyMap struct { Lock sync.Mutex Map map[string]*Key } type Key struct { Arr *[]string Lock *sync.Mutex } func NewKeyMap() *KeyMap { return &KeyMap{ Map: map[string]*Key{}, } } func init() { initarea() du.SetConsole(false) du.SetRollingDaily("./", "project.log") du.SetLevel(du.DEBUG) util.ReadConfig(&Sysconfig) MultiThread = make(chan bool, util.IntAllDef(Sysconfig["thread"], 200)) lenprojectname = util.IntAllDef(Sysconfig["lenprojectname"], 20) - 1 megerfields, _ := Sysconfig["megerfields"].(map[string]interface{}) MegerFieldsLen = &MegerFields{ ProjectNamelen: util.IntAllDef(megerfields["projectlen"], 5), ProjectCodelen: util.IntAllDef(megerfields["projectcodelen"], 8), } redis.InitRedisBySize(Sysconfig["redisaddrs"].(string), util.IntAllDef(Sysconfig["redisPoolSize"], 100), 30, 300) MQFW = mongodb.MongodbSim{ MongodbAddr: Sysconfig["mongodbServers"].(string), Size: util.IntAll(Sysconfig["mongodbPoolSize"]), DbName: Sysconfig["mongodbName"].(string), } MQFW.InitPool() extractColl = Sysconfig["extractColl"].(string) projectColl = Sysconfig["projectColl"].(string) nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) for _, m := range nextNode { toaddr = append(toaddr, &net.UDPAddr{ IP: net.ParseIP(m["addr"].(string)), Port: util.IntAll(m["port"]), }) } } func main() { go checkMapJob() //先加载所有锁 log.Println("loading data from redis...") n := 0 km := []*KeyMap{PNKey, PCKey, PBKey} for pos, key := range []string{"pn_*", "pc_*", "pb_*"} { res := redis.GetKeysByPattern(REDISKEYS, key) pk := km[pos] if res != nil { //一次500条 num := 0 arr := []string{} for _, v := range res { n++ num++ k := string(v.([]uint8)) arr = append(arr, k) //根据正则找到key if num == 500 { num = 0 ret := redis.Mget(REDISKEYS, arr) //根据key批量取内容 if len(ret) > 0 { for k1, v1 := range ret { if v1 != nil { var a1 []string json.Unmarshal(v1.([]uint8), &a1) pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}} //pn_项目名称 id数组 } } } arr = []string{} } } if num > 0 { ret := redis.Mget(REDISKEYS, arr) if len(ret) > 0 { for k1, v1 := range ret { if v1 != nil { var a1 []string json.Unmarshal(v1.([]uint8), &a1) pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}} } } } arr = []string{} } } } log.Println("load data from redis finished.", n) //清理redis //clearedis() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) SingleThread <- true go task(data, mapInfo) } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Println("ok:", ok) udptaskmap.Delete(ok) } } } func task(data []byte, mapInfo map[string]interface{}) { defer func() { <-SingleThread }() defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } } sess := MQFW.GetMgoConn() defer MQFW.DestoryMongoConn(sess) //数据正序处理 it := sess.DB(MQFW.DbName).C(extractColl).Find(&q).Iter() //.Sort("publishtime") count, index := 0, 0 pici := time.Now().Unix() wg := &sync.WaitGroup{} idmap := &sync.Map{} for tmp := make(map[string]interface{}); it.Next(tmp); { if index%10000 == 0 { log.Println(index, tmp["_id"]) } index++ if util.IntAll(tmp["repeat"]) == 1 { tmp = make(map[string]interface{}) continue } count++ thisid := util.BsonIdToSId(tmp["_id"]) b, err := redis.Exists(INFOID, thisid) if err != nil { log.Println("checkid err", err.Error()) } if !b { wg.Add(1) idmap.Store(tmp["_id"], true) //增加判重逻辑,重复id不再生成 MultiThread <- true go func(tmp map[string]interface{}, thisid string) { defer func() { <-MultiThread wg.Done() idmap.Delete(tmp["_id"]) }() info := PreThisInfo(tmp) if info != nil { lockPNCBMap(info) startProjectMerge(info, tmp) redis.Put(INFOID, thisid, 1, INFOTIMEOUT) unlockPNCBMap(info) } }(tmp, thisid) } if count%500 == 0 { log.Println("count:", count) } tmp = make(map[string]interface{}) } for { time.Sleep(5 * time.Second) n := 0 idmap.Range(func(key interface{}, v interface{}) bool { n++ log.Println(key, v) return true }) if n < 1 { break } } wg.Wait() log.Println("task over...", index, count) //发送udp,调用生成项目索引 mapInfo["stype"] = "project" if mapInfo["stop"] == nil && len(toaddr) > 0 { for n, to := range toaddr { key := fmt.Sprintf("%d-%s-%d", pici, "project", n) mapInfo["query"] = map[string]interface{}{ "pici": pici, } mapInfo["key"] = key datas, _ := json.Marshal(mapInfo) node := &udpNode{datas, to, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to) } } } func NewPushInfo(tmp map[string]interface{}) bson.M { return bson.M{ "comeintime": tmp["comeintime"], "publishtime": tmp["publishtime"], "title": tmp["title"], "toptype": tmp["toptype"], "subtype": tmp["subtype"], "infoformat": tmp["infoformat"], "infoid": util.BsonIdToSId(tmp["_id"]), "href": tmp["href"], "area": tmp["area"], "city": tmp["city"], "cresult": tmp["cresult"], "score": tmp["score"], } }