package main /** 招标信息判重 **/ import ( "encoding/json" "flag" "log" mu "mfw/util" "net" "qfw/common/src/qfw/util" qu "qfw/util" "regexp" "sync" "time" ) var ( Sysconfig map[string]interface{} //配置文件 mconf map[string]interface{} //mongodb配置信息 data_mgo *MongodbSim //mongodb操作对象 task_mgo *MongodbSim //mongodb操作对象 task_collName string extract string extract_back string udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //下节点数组 dupdays = 7 //初始化判重范围 DM *datamap // Update *updateInfo //正则筛选相关 FilterRegTitle = regexp.MustCompile("^_$") FilterRegTitle_0 = regexp.MustCompile("^_$") FilterRegTitle_1 = regexp.MustCompile("^_$") FilterRegTitle_2 = regexp.MustCompile("^_$") threadNum int //线程数量 SiteMap map[string]map[string]interface{} //站点map LowHeavy bool //低质量数据判重 TimingTask bool //是否定时任务 timingSpanDay int64 //时间跨度 timingPubScope int64 //发布时间周期 gtid,lastid,sec_gtid,sec_lteid string //命令输入 lteid string //历史增量属性 IsFull bool //是否全量 updatelock sync.Mutex //锁4 numberlock sync.Mutex //锁4 userName,passWord string //mongo -用户密码 taskList []map[string]interface{} //任务池 ) func init() { flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量 flag.StringVar(>id, "gtid", "", "历史增量的起始id") //历史 flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id") flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id") flag.Parse() qu.ReadConfig(&Sysconfig) userName = qu.ObjToString(Sysconfig["userName"]) passWord = qu.ObjToString(Sysconfig["passWord"]) log.Println("集群用户密码:",userName,passWord) task_mconf := Sysconfig["task_mongodb"].(map[string]interface{}) task_mgo = &MongodbSim{ MongodbAddr: task_mconf["task_addrName"].(string), DbName: task_mconf["task_dbName"].(string), Size: util.IntAllDef(task_mconf["task_pool"], 10), UserName: userName, Password: passWord, } task_mgo.InitPool() task_collName = task_mconf["task_collName"].(string) nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) mconf = Sysconfig["mongodb"].(map[string]interface{}) data_mgo = &MongodbSim{ MongodbAddr: mconf["addr"].(string), DbName: mconf["db"].(string), Size: util.IntAllDef(mconf["pool"], 10), } data_mgo.InitPool() extract = mconf["extract"].(string) extract_back = mconf["extract_back"].(string) dupdays = util.IntAllDef(Sysconfig["dupdays"], 3) //加载数据 DM = NewDatamap(dupdays, lastid) //更新池 Update = newUpdatePool() go Update.updateData() FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"])) FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"])) FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"])) FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"])) threadNum = util.IntAllDef(Sysconfig["threads"], 1) LowHeavy = Sysconfig["lowHeavy"].(bool) TimingTask = Sysconfig["timingTask"].(bool) timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"]) timingPubScope = util.Int64All(Sysconfig["timingPubScope"]) //站点配置 site := mconf["site"].(map[string]interface{}) SiteMap = make(map[string]map[string]interface{}, 0) start := int(time.Now().Unix()) sess_site := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess_site) res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter() for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); { data_map := map[string]interface{}{ "area": util.ObjToString(site_dict["area"]), "city": util.ObjToString(site_dict["city"]), "district": util.ObjToString(site_dict["district"]), "sitetype": util.ObjToString(site_dict["sitetype"]), "level": util.ObjToString(site_dict["level"]), "weight": util.ObjToString(site_dict["weight"]), } SiteMap[util.ObjToString(site_dict["site"])] = data_map } log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap)) } //udp接收 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) //插入任务-判断任务-是否存在 updatelock.Lock() taskList = append(taskList,mapInfo) log.Println("udp收到任务...数量:",len(taskList),"具体任务:",taskList) updatelock.Unlock() } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Println("ok:", ok) udptaskmap.Delete(ok) } } } //监听-获取-分发判重任务 func getRepeatTask() { for { if len(taskList)>0 { updatelock.Lock() mapInfo := taskList[0] if mapInfo != nil { increaseRepeat(mapInfo) //判重方法 } taskList = taskList[1:] log.Println("此段落结束当前任务池...",len(taskList),taskList) updatelock.Unlock() }else { time.Sleep(15 * time.Second) } } } //主函数 func main() { go checkMapJob() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) if TimingTask { log.Println("正常历史部署") go historyRepeat() }else { //IsFull = true if !IsFull {//正常增量 log.Println("正常增量部署,监听任务") go getRepeatTask() //新增调试 //sid := "1fffffffffffffffffffffff" //eid := "9fffffffffffffffffffffff" //increaseRepeat(map[string]interface{}{ // "gtid":sid, // "lteid":eid, //}) }else { sid := "1fffffffffffffffffffffff" eid := "9fffffffffffffffffffffff" fullRepeat(sid,eid) } } time.Sleep(99999 * time.Hour) }