package main import ( "encoding/json" "fmt" du "jy/util" "log" mu "mfw/util" "net" "qfw/util" "qfw/util/mongodb" "qfw/util/redis" "sync" "time" "gopkg.in/mgo.v2/bson" ) const ( REDISIDS = "ids" REDISKEYS = "keys" INFOID = "info" INFOTIMEOUT = 86400 * 30 ) var ( Sysconfig map[string]interface{} MQFW mongodb.MongodbSim extractColl, projectColl string lenprojectname int udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //下节点数组 toaddr = []*net.UDPAddr{} //下节点对象 MultiThread chan bool SingleThread = make(chan bool, 1) //udp调用 IdLock = &sync.Mutex{} PncbMayLock = &sync.Mutex{} MegerFieldsLen *MegerFields //三组lock,对应的(PNKey)key为项目名称,值对应的是此项目名称对应的项目id数组 PNKey, PCKey, PBKey = NewKeyMap(), NewKeyMap(), NewKeyMap() PNKeyMap, PCKeyMap, PBKeyMap = sync.Map{}, sync.Map{}, sync.Map{} currentMegerTime int64 //合并项目的时间位置,用来清理几个月之前的项目 currentMegerCount int //合并项目的计数,用来定时清理 ) type MegerFields struct { ProjectNamelen int ProjectCodelen int } type KeyMap struct { Lock sync.Mutex Map map[string]*Key } type Key struct { Arr *[]string Lock *sync.Mutex } func NewKeyMap() *KeyMap { return &KeyMap{ Map: map[string]*Key{}, } } func init() { initarea() du.SetConsole(false) du.SetRollingDaily("./", "project.log") du.SetLevel(du.DEBUG) util.ReadConfig(&Sysconfig) MultiThread = make(chan bool, util.IntAllDef(Sysconfig["thread"], 200)) lenprojectname = util.IntAllDef(Sysconfig["lenprojectname"], 20) - 1 megerfields, _ := Sysconfig["megerfields"].(map[string]interface{}) MegerFieldsLen = &MegerFields{ ProjectNamelen: util.IntAllDef(megerfields["projectlen"], 5), ProjectCodelen: util.IntAllDef(megerfields["projectcodelen"], 8), } //插入合并参数 if insertmeger, ok := Sysconfig["insertmeger"].(map[string]interface{}); ok { OmitNumMax = util.Int64All(insertmeger["omitmax"]) DeviationDay = util.Int64All(insertmeger["deviationday"]) HourInterval = util.Int64All(insertmeger["hourinterval"]) } redis.InitRedisBySize(Sysconfig["redisaddrs"].(string), util.IntAllDef(Sysconfig["redisPoolSize"], 100), 30, 300) MQFW = mongodb.MongodbSim{ MongodbAddr: Sysconfig["mongodbServers"].(string), Size: util.IntAll(Sysconfig["mongodbPoolSize"]), DbName: Sysconfig["mongodbName"].(string), } MQFW.InitPool() extractColl = Sysconfig["extractColl"].(string) projectColl = Sysconfig["projectColl"].(string) nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) for _, m := range nextNode { toaddr = append(toaddr, &net.UDPAddr{ IP: net.ParseIP(m["addr"].(string)), Port: util.IntAll(m["port"]), }) } } func main() { go checkMapJob() //先加载所有锁 log.Println("loading data from redis...") n := 0 km := []*KeyMap{PNKey, PCKey, PBKey} for pos, key := range []string{"pn_*", "pc_*", "pb_*"} { res := redis.GetKeysByPattern(REDISKEYS, key) pk := km[pos] if res != nil { //一次500条 num := 0 arr := []string{} for _, v := range res { n++ num++ k := string(v.([]uint8)) arr = append(arr, k) //根据正则找到key if num == 500 { num = 0 ret := redis.Mget(REDISKEYS, arr) //根据key批量取内容 if len(ret) > 0 { for k1, v1 := range ret { if v1 != nil { var a1 []string json.Unmarshal(v1.([]uint8), &a1) pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}} //pn_项目名称 id数组 } } } arr = []string{} } } if num > 0 { ret := redis.Mget(REDISKEYS, arr) if len(ret) > 0 { for k1, v1 := range ret { if v1 != nil { var a1 []string json.Unmarshal(v1.([]uint8), &a1) pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}} } } } arr = []string{} } } } log.Println("load data from redis finished.", n) if taskstock, ok := Sysconfig["taskstock"].(map[string]interface{}); ok { //跑存量数据 if b, _ := taskstock["open"].(bool); b { RunFullData(util.Int64All(taskstock["startTime"])) } } updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) SingleThread <- true go taskInc(mapInfo) } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Println("ok:", ok) udptaskmap.Delete(ok) } } } func taskInc(mapInfo map[string]interface{}) { defer func() { <-SingleThread }() defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } } sess := MQFW.GetMgoConn() defer MQFW.DestoryMongoConn(sess) //数据正序处理 it := sess.DB(MQFW.DbName).C(extractColl).Find(q).Sort("publishtime").Iter() count, index := 0, 0 pici := time.Now().Unix() wg := &sync.WaitGroup{} //idmap := &sync.Map{} for tmp := make(map[string]interface{}); it.Next(tmp); { if index%10000 == 0 { log.Println(index, tmp["_id"]) } index++ if util.IntAll(tmp["repeat"]) == 1 { tmp = make(map[string]interface{}) continue } pt := util.Int64All(tmp["publishtime"]) if time.Now().Unix()-DeviationDay*86400 > pt { //DeviationDay前的数据不处理,走插入何必 continue } if pt > currentMegerTime { currentMegerTime = pt } count++ currentMegerCount++ if currentMegerCount > 300000 { time.Sleep(100 * time.Millisecond) clearPKey() } thisid := util.BsonIdToSId(tmp["_id"]) b, err := redis.Exists(INFOID, thisid) if err != nil { log.Println("checkid err", err.Error()) } if !b { wg.Add(1) //idmap.Store(tmp["_id"], true) //增加判重逻辑,重复id不再生成 MultiThread <- true go func(tmp map[string]interface{}, thisid string) { defer func() { <-MultiThread wg.Done() //idmap.Delete(tmp["_id"]) }() info := PreThisInfo(tmp) if info != nil { lockPNCBMap(info) storeLock(info) startProjectMerge(info, tmp) redis.Put(INFOID, thisid, 1, INFOTIMEOUT) currentMegerTime = info.Publishtime unlockPNCBMap(info) } }(tmp, thisid) } if count%500 == 0 { log.Println("count:", count) } tmp = make(map[string]interface{}) } // for { // time.Sleep(5 * time.Second) // n := 0 // idmap.Range(func(key interface{}, v interface{}) bool { // n++ // log.Println(key, v) // return true // }) // if n < 1 { // break // } // } wg.Wait() log.Println("task over...", index, count) //发送udp,调用生成项目索引 mapInfo["stype"] = "project" if mapInfo["stop"] == nil && len(toaddr) > 0 { for n, to := range toaddr { key := fmt.Sprintf("%d-%s-%d", pici, "project", n) mapInfo["query"] = map[string]interface{}{ "pici": pici, } mapInfo["key"] = key datas, _ := json.Marshal(mapInfo) node := &udpNode{datas, to, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to) } } } func NewPushInfo(tmp map[string]interface{}) bson.M { return bson.M{ "comeintime": tmp["comeintime"], "publishtime": tmp["publishtime"], "title": tmp["title"], "toptype": tmp["toptype"], "subtype": tmp["subtype"], "infoformat": tmp["infoformat"], "infoid": util.BsonIdToSId(tmp["_id"]), "href": tmp["href"], "area": tmp["area"], "city": tmp["city"], "cresult": tmp["cresult"], "score": tmp["score"], } }