package main /** 招标信息判重 **/ import ( "encoding/json" "flag" "log" mu "mfw/util" "net" "qfw/util" "qfw/util/mongodb" "regexp" "sync" "time" ) var ( Sysconfig map[string]interface{} //配置文件 mconf map[string]interface{} //mongodb配置信息 mgo *mongodb.MongodbSim //mongodb操作对象 extract string bidding string udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //下节点数组 dupdays = 3 //初始化判重范围 DM *datamap //判重数据 FilterRegexp = regexp.MustCompile("^_$") lastid = "" ) func init() { flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据 flag.Parse() util.ReadConfig(&Sysconfig) nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) mconf = Sysconfig["mongodb"].(map[string]interface{}) mgo = &mongodb.MongodbSim{ MongodbAddr: mconf["addr"].(string), DbName: mconf["db"].(string), Size: util.IntAllDef(mconf["pool"], 10), } extract = mconf["extract"].(string) //bidding = mconf["bidding"].(string) mgo.InitPool() dupdays = util.IntAllDef(Sysconfig["dupdays"], 3) //加载数据 DM = NewDatamap(dupdays, lastid) sw := util.ObjToString(Sysconfig["specialwords"]) if sw != "" { FilterRegexp = regexp.MustCompile(sw) } } func main() { go checkMapJob() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 //从表中开始处理 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { go task(data, mapInfo) key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Println("ok:", ok) udptaskmap.Delete(ok) } } } //开始判重程序 func task(data []byte, mapInfo map[string]interface{}) { defer util.Catch() //区间id sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter() updateExtract := [][]map[string]interface{}{} pool := make(chan bool, 16) wg := &sync.WaitGroup{} mapLock := &sync.Mutex{} n, repeateN := 0, 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { if util.ObjToString(tmp["subtype"]) == "变更" { //go IS.Add("update") continue } if n%10000 == 0 { log.Println("current:", n, tmp["_id"]) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() info := NewInfo(tmp) b, id := DM.check(info) if b { //有重复,生成更新语句,更新抽取和更新招标 //IS.Add("repeat") repeateN++ mapLock.Lock() updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeatid": id, }, }, }) // updateBidding = append(updateBidding, []map[string]interface{}{ // map[string]interface{}{ // "_id": tmp["_id"], // }, // map[string]interface{}{ // "$set": map[string]interface{}{ // "extracttype": -1, // "repeatid": id, // }, // }, // }) if len(updateExtract) > 500 { mgo.UpdateBulk(extract, updateExtract...) //mgo.UpdateBulk(bidding, updateBidding...) //updateExtract, updateBidding = [][]map[string]interface{}{}, [][]map[string]interface{}{} updateExtract = [][]map[string]interface{}{} } mapLock.Unlock() } else { //IS.Add("new") } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() if len(updateExtract) > 0 { mgo.UpdateBulk(extract, updateExtract...) //mgo.UpdateBulk(bidding, updateBidding...) } log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"]) //任务完成,开始发送广播通知下面节点 if n > repeateN && mapInfo["stop"] == nil { for _, to := range nextNode { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) key := sid + "-" + eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } }