123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- package main
- import (
- "encoding/json"
- "fmt"
- du "jy/util"
- "log"
- mu "mfw/util"
- "net"
- "qfw/util"
- "qfw/util/mongodb"
- "qfw/util/redis"
- "sync"
- "time"
- "gopkg.in/mgo.v2/bson"
- )
- const (
- REDISIDS = "ids"
- REDISKEYS = "keys"
- INFOID = "info"
- INFOTIMEOUT = 86400 * 30
- )
- var (
- Sysconfig map[string]interface{}
- MQFW mongodb.MongodbSim
- extractColl, projectColl string
- lenprojectname int
- udpclient mu.UdpClient //udp对象
- nextNode []map[string]interface{} //下节点数组
- toaddr = []*net.UDPAddr{} //下节点对象
- MultiThread chan bool
- SingleThread = make(chan bool, 1) //udp调用
- IdLock = &sync.Mutex{}
- PncbMayLock = &sync.Mutex{}
- MegerFieldsLen *MegerFields
- //三组lock,对应的(PNKey)key为项目名称,值对应的是此项目名称对应的项目id数组
- PNKey, PCKey, PBKey = NewKeyMap(), NewKeyMap(), NewKeyMap()
- PNKeyMap, PCKeyMap, PBKeyMap = sync.Map{}, sync.Map{}, sync.Map{}
- )
- type MegerFields struct {
- ProjectNamelen int
- ProjectCodelen int
- }
- type KeyMap struct {
- Lock sync.Mutex
- Map map[string]*Key
- }
- type Key struct {
- Arr *[]string
- Lock *sync.Mutex
- }
- func NewKeyMap() *KeyMap {
- return &KeyMap{
- Map: map[string]*Key{},
- }
- }
- func init() {
- initarea()
- du.SetConsole(false)
- du.SetRollingDaily("./", "project.log")
- du.SetLevel(du.DEBUG)
- util.ReadConfig(&Sysconfig)
- MultiThread = make(chan bool, util.IntAllDef(Sysconfig["thread"], 200))
- lenprojectname = util.IntAllDef(Sysconfig["lenprojectname"], 20) - 1
- megerfields, _ := Sysconfig["megerfields"].(map[string]interface{})
- MegerFieldsLen = &MegerFields{
- ProjectNamelen: util.IntAllDef(megerfields["projectlen"], 5),
- ProjectCodelen: util.IntAllDef(megerfields["projectcodelen"], 8),
- }
- redis.InitRedisBySize(Sysconfig["redisaddrs"].(string), util.IntAllDef(Sysconfig["redisPoolSize"], 100), 30, 300)
- MQFW = mongodb.MongodbSim{
- MongodbAddr: Sysconfig["mongodbServers"].(string),
- Size: util.IntAll(Sysconfig["mongodbPoolSize"]),
- DbName: Sysconfig["mongodbName"].(string),
- }
- MQFW.InitPool()
- extractColl = Sysconfig["extractColl"].(string)
- projectColl = Sysconfig["projectColl"].(string)
- nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
- for _, m := range nextNode {
- toaddr = append(toaddr, &net.UDPAddr{
- IP: net.ParseIP(m["addr"].(string)),
- Port: util.IntAll(m["port"]),
- })
- }
- }
- func main() {
- go checkMapJob()
- //先加载所有锁
- log.Println("loading data from redis...")
- n := 0
- km := []*KeyMap{PNKey, PCKey, PBKey}
- for pos, key := range []string{"pn_*", "pc_*", "pb_*"} {
- res := redis.GetKeysByPattern(REDISKEYS, key)
- pk := km[pos]
- if res != nil { //一次500条
- num := 0
- arr := []string{}
- for _, v := range res {
- n++
- num++
- k := string(v.([]uint8))
- arr = append(arr, k) //根据正则找到key
- if num == 500 {
- num = 0
- ret := redis.Mget(REDISKEYS, arr) //根据key批量取内容
- if len(ret) > 0 {
- for k1, v1 := range ret {
- if v1 != nil {
- var a1 []string
- json.Unmarshal(v1.([]uint8), &a1)
- pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}} //pn_项目名称 id数组
- }
- }
- }
- arr = []string{}
- }
- }
- if num > 0 {
- ret := redis.Mget(REDISKEYS, arr)
- if len(ret) > 0 {
- for k1, v1 := range ret {
- if v1 != nil {
- var a1 []string
- json.Unmarshal(v1.([]uint8), &a1)
- pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}}
- }
- }
- }
- arr = []string{}
- }
- }
- }
- log.Println("load data from redis finished.", n)
- //清理redis
- //clearedis()
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- time.Sleep(99999 * time.Hour)
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Println("err:", err, "mapInfo:", mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- SingleThread <- true
- go task(data, mapInfo)
- }
- case mu.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Println("ok:", ok)
- udptaskmap.Delete(ok)
- }
- }
- }
- func task(data []byte, mapInfo map[string]interface{}) {
- defer func() {
- <-SingleThread
- }()
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- sess := MQFW.GetMgoConn()
- defer MQFW.DestoryMongoConn(sess)
- //数据正序处理
- it := sess.DB(MQFW.DbName).C(extractColl).Find(&q).Iter() //.Sort("publishtime")
- count, index := 0, 0
- pici := time.Now().Unix()
- wg := &sync.WaitGroup{}
- idmap := &sync.Map{}
- for tmp := make(map[string]interface{}); it.Next(tmp); {
- if index%10000 == 0 {
- log.Println(index, tmp["_id"])
- }
- index++
- if util.IntAll(tmp["repeat"]) == 1 {
- tmp = make(map[string]interface{})
- continue
- }
- count++
- thisid := util.BsonIdToSId(tmp["_id"])
- b, err := redis.Exists(INFOID, thisid)
- if err != nil {
- log.Println("checkid err", err.Error())
- }
- if !b {
- wg.Add(1)
- idmap.Store(tmp["_id"], true) //增加判重逻辑,重复id不再生成
- MultiThread <- true
- go func(tmp map[string]interface{}, thisid string) {
- defer func() {
- <-MultiThread
- wg.Done()
- idmap.Delete(tmp["_id"])
- }()
- info := PreThisInfo(tmp)
- if info != nil {
- lockPNCBMap(info)
- startProjectMerge(info, tmp)
- redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
- unlockPNCBMap(info)
- }
- }(tmp, thisid)
- }
- if count%500 == 0 {
- log.Println("count:", count)
- }
- tmp = make(map[string]interface{})
- }
- for {
- time.Sleep(5 * time.Second)
- n := 0
- idmap.Range(func(key interface{}, v interface{}) bool {
- n++
- log.Println(key, v)
- return true
- })
- if n < 1 {
- break
- }
- }
- wg.Wait()
- log.Println("task over...", index, count)
- //发送udp,调用生成项目索引
- mapInfo["stype"] = "project"
- if mapInfo["stop"] == nil && len(toaddr) > 0 {
- for n, to := range toaddr {
- key := fmt.Sprintf("%d-%s-%d", pici, "project", n)
- mapInfo["query"] = map[string]interface{}{
- "pici": pici,
- }
- mapInfo["key"] = key
- datas, _ := json.Marshal(mapInfo)
- node := &udpNode{datas, to, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to)
- }
- }
- }
- func NewPushInfo(tmp map[string]interface{}) bson.M {
- return bson.M{
- "comeintime": tmp["comeintime"],
- "publishtime": tmp["publishtime"],
- "title": tmp["title"],
- "toptype": tmp["toptype"],
- "subtype": tmp["subtype"],
- "infoformat": tmp["infoformat"],
- "infoid": util.BsonIdToSId(tmp["_id"]),
- "href": tmp["href"],
- "area": tmp["area"],
- "city": tmp["city"],
- "cresult": tmp["cresult"],
- "score": tmp["score"],
- }
- }
|