package main /** 招标信息判重 **/ import ( "encoding/json" "flag" "log" mu "mfw/util" "net" "nsqdata" qu "qfw/util" "regexp" "sync" "time" ) var ( Sysconfig map[string]interface{} //配置文件 data_mgo, task_mgo, spider_mgo *MongodbSim task_coll, task_bidding, spider_coll string extract, extract_back, extract_log string udpclient mu.UdpClient nextNode []map[string]interface{} dupdays = 7 DM, FullDM *datamap Update *updateInfo AddGroupPool *addGroupInfo //正则筛选相关 FilterRegTitle = regexp.MustCompile("^_$") FilterRegTitle_0 = regexp.MustCompile("^_$") FilterRegTitle_1 = regexp.MustCompile("^_$") FilterRegTitle_2 = regexp.MustCompile("^_$") threadNum int SiteMap map[string]map[string]interface{} LowHeavy, TimingTask, IsFull, isUpdateSite bool timingSpanDay, timingPubScope int64 gtid, lastid, sec_gtid, sec_lteid, lteid string updatelock, datalock, numlock, cronlock sync.Mutex jyfb_data map[string]string taskList []map[string]interface{} nspdata_1, nspdata_2 *nsqdata.Producer ) //初始化加载 func init() { flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量 flag.StringVar(>id, "gtid", "", "历史增量的起始id") //历史 flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id") flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id") flag.Parse() qu.ReadConfig(&Sysconfig) initMgo() initOther() initSite() } func mainT() { IsFull = true //AddGroupPool = newAddGroupPool() //go AddGroupPool.addGroupData() //fullDataRepeat() //全量判重 increaseRepeat(map[string]interface{}{ "gtid": "12ec61170ae152a3c2310f02", "lteid": "92ec61170ae152a3c2310f02", }) //gtid = "62ec2dd00ae152a3c230c1a1" //lteid = "62ec2dd00ae152a3c230c1e1" //historyRepeat() time.Sleep(99999 * time.Hour) } //主函数 func main() { go checkMapJob() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) if TimingTask { log.Println("正常历史部署") go historyRepeat() } else { if !IsFull { log.Println("正常增量部署,监听任务") go getRepeatTask() } } time.Sleep(99999 * time.Hour) } //udp接收 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"]) if sid == "" || eid == "" { log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid) } else { key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"]) udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) //计算是否需要加载站点~每天加载一次 if isUpdateSite { initSite() } //插入任务-判断任务-是否存在 updatelock.Lock() taskList = append(taskList, mapInfo) log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList) updatelock.Unlock() } } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Println("ok:", ok) udptaskmap.Delete(ok) } } } //监听-获取-分发判重任务 func getRepeatTask() { for { if len(taskList) > 0 { updatelock.Lock() len_list := len(taskList) if len_list > 1 { first_id := taskList[0]["gtid"] end_id := taskList[len_list-1]["lteid"] if first_id != "" && end_id != "" { log.Println("合并段落~正常~", first_id, "~", end_id) increaseRepeat(map[string]interface{}{ "gtid": first_id, "lteid": end_id, }) taskList = taskList[len_list:] log.Println("此段落结束当前任务池...", len(taskList), taskList) } else { log.Println("合并段落~错误~正常取段落~~~") mapInfo := taskList[0] if mapInfo != nil { increaseRepeat(mapInfo) //判重方法 } taskList = taskList[1:] log.Println("此段落结束当前任务池...", len(taskList), taskList) } } else { mapInfo := taskList[0] if mapInfo != nil { increaseRepeat(mapInfo) //判重方法 } taskList = taskList[1:] log.Println("此段落结束当前任务池...", len(taskList), taskList) } updatelock.Unlock() } else { time.Sleep(15 * time.Second) } } }