123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- package main
- /**
- 招标信息判重
- **/
- import (
- "encoding/json"
- "flag"
- "log"
- mu "mfw/util"
- "net"
- "qfw/util"
- "qfw/util/mongodb"
- "regexp"
- "sync"
- "time"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- mconf map[string]interface{} //mongodb配置信息
- mgo *mongodb.MongodbSim //mongodb操作对象
- extract string
- bidding string
- udpclient mu.UdpClient //udp对象
- nextNode []map[string]interface{} //下节点数组
- dupdays = 3 //初始化判重范围
- DM *datamap //判重数据
- FilterRegexp = regexp.MustCompile("^_$")
- lastid = ""
- )
- func init() {
- flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
- flag.Parse()
- util.ReadConfig(&Sysconfig)
- nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
- mconf = Sysconfig["mongodb"].(map[string]interface{})
- mgo = &mongodb.MongodbSim{
- MongodbAddr: mconf["addr"].(string),
- DbName: mconf["db"].(string),
- Size: util.IntAllDef(mconf["pool"], 10),
- }
- extract = mconf["extract"].(string)
- //bidding = mconf["bidding"].(string)
- mgo.InitPool()
- dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
- //加载数据
- DM = NewDatamap(dupdays, lastid)
- sw := util.ObjToString(Sysconfig["specialwords"])
- if sw != "" {
- FilterRegexp = regexp.MustCompile(sw)
- }
- }
- func main() {
- go checkMapJob()
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- time.Sleep(99999 * time.Hour)
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- //从表中开始处理
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Println("err:", err, "mapInfo:", mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- go task(data, mapInfo)
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- }
- case mu.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Println("ok:", ok)
- udptaskmap.Delete(ok)
- }
- }
- }
- //开始判重程序
- func task(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- //区间id
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
- updateExtract := [][]map[string]interface{}{}
- pool := make(chan bool, 16)
- wg := &sync.WaitGroup{}
- mapLock := &sync.Mutex{}
- n, repeateN := 0, 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- if util.ObjToString(tmp["subtype"]) == "变更" {
- //go IS.Add("update")
- continue
- }
- if n%10000 == 0 {
- log.Println("current:", n, tmp["_id"])
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- info := NewInfo(tmp)
- b, id := DM.check(info)
- if b { //有重复,生成更新语句,更新抽取和更新招标
- //IS.Add("repeat")
- repeateN++
- mapLock.Lock()
- updateExtract = append(updateExtract, []map[string]interface{}{
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": 1,
- "repeatid": id,
- },
- },
- })
- // updateBidding = append(updateBidding, []map[string]interface{}{
- // map[string]interface{}{
- // "_id": tmp["_id"],
- // },
- // map[string]interface{}{
- // "$set": map[string]interface{}{
- // "extracttype": -1,
- // "repeatid": id,
- // },
- // },
- // })
- if len(updateExtract) > 500 {
- mgo.UpdateBulk(extract, updateExtract...)
- //mgo.UpdateBulk(bidding, updateBidding...)
- //updateExtract, updateBidding = [][]map[string]interface{}{}, [][]map[string]interface{}{}
- updateExtract = [][]map[string]interface{}{}
- }
- mapLock.Unlock()
- } else {
- //IS.Add("new")
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- if len(updateExtract) > 0 {
- mgo.UpdateBulk(extract, updateExtract...)
- //mgo.UpdateBulk(bidding, updateBidding...)
- }
- log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
- //任务完成,开始发送广播通知下面节点
- if n > repeateN && mapInfo["stop"] == nil {
- for _, to := range nextNode {
- sid, _ := mapInfo["gtid"].(string)
- eid, _ := mapInfo["lteid"].(string)
- key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": util.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: util.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- }
|