package main /** 招标信息判重 **/ import ( "encoding/json" "flag" "flow_repeat/nsqdata" "fmt" "log" "net" "regexp" "sync" "time" "jygit.jydev.jianyu360.cn/BP/jynats/jnats" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" ) var ( Sysconfig map[string]interface{} //配置文件 data_mgo, task_mgo, spider_mgo *MongodbSim task_coll, task_bidding, spider_coll string extract, extract_back, extract_log string udpclient mu.UdpClient nextNode []map[string]interface{} dupdays = 7 DM, FullDM *datamap Update *updateInfo AddGroupPool *addGroupInfo //正则筛选相关 FilterRegTitle = regexp.MustCompile("^_$") FilterRegTitle_0 = regexp.MustCompile("^_$") FilterRegTitle_1 = regexp.MustCompile("^_$") FilterRegTitle_2 = regexp.MustCompile("^_$") threadNum int SiteMap map[string]map[string]interface{} LowHeavy, TimingTask, IsFull, isUpdateSite bool timingSpanDay, timingPubScope int64 gtid, lastid, sec_gtid, sec_lteid, lteid string updatelock, datalock, numlock, cronlock sync.Mutex jyfb_data map[string]string taskList []map[string]interface{} nspdata_1, nspdata_2 *nsqdata.Producer responselock sync.Mutex lastNodeResponse int64 jn *jnats.Jnats ) // 初始化加载 func init() { flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量 flag.StringVar(>id, "gtid", "", "历史增量的起始id") //历史 flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id") flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id") flag.Parse() qu.ReadConfig(&Sysconfig) InitAllInfos() //加载所有信息... } func main() { if TimingTask { log.Println("正常历史部署...组装...") go historyRepeat() } else { log.Println("正常增量部署...流式...") jn = jnats.NewJnats("192.168.3.240:19092") // ////先消费,带zip压缩,用于跨网传输节省流量 //jn.SubZip("test", func(msg *nats.Msg) { // log.Println(string(msg.Data)) // //回执消息 // msg.Respond([]byte("receive msg:" + string(msg.Data))) //}) } time.Sleep(99999 * time.Hour) } func mainTest() { increaseRepeat(map[string]interface{}{ "gtid": "12ec61170ae152a3c2310f02", "lteid": "92ec61170ae152a3c2310f02", }) time.Sleep(99999 * time.Hour) } // 主函数 func mainTestTest() { go checkMailJob() lastNodeResponse = time.Now().Unix() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) if TimingTask { log.Println("正常历史部署...") go historyRepeat() } else { if !IsFull { log.Println("正常增量部署与监控机制...") go lastUdpJob() go getRepeatTask() } } time.Sleep(99999 * time.Hour) } // udp接收 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"]) stype := qu.ObjToString(mapInfo["stype"]) if stype == "monitor" { log.Println("收到监测......") key := qu.ObjToString(mapInfo["key"]) udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) return } if sid == "" || eid == "" { log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid) } else { lastNodeResponse = time.Now().Unix() key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"]) udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) //计算是否需要加载站点~每天加载一次 if isUpdateSite { initSite() } //插入任务-判断任务-是否存在 updatelock.Lock() taskList = append(taskList, mapInfo) log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList) updatelock.Unlock() } } case mu.OP_NOOP: //下个节点回应 log.Println("下节点回应:", string(data)) udptaskmap.Delete(string(data)) } } // 监听-获取-分发判重任务 func getRepeatTask() { for { if len(taskList) > 0 { updatelock.Lock() len_list := len(taskList) if len_list > 1 { first_id := taskList[0]["gtid"] end_id := taskList[len_list-1]["lteid"] if first_id != "" && end_id != "" { log.Println("合并段落~正常~", first_id, "~", end_id) increaseRepeat(map[string]interface{}{ "gtid": first_id, "lteid": end_id, }) taskList = taskList[len_list:] log.Println("此段落结束当前任务池...", len(taskList), taskList) } else { log.Println("合并段落~错误~正常取段落~~~") mapInfo := taskList[0] if mapInfo != nil { increaseRepeat(mapInfo) //判重方法 } taskList = taskList[1:] log.Println("此段落结束当前任务池...", len(taskList), taskList) } } else { mapInfo := taskList[0] if mapInfo != nil { increaseRepeat(mapInfo) //判重方法 } taskList = taskList[1:] log.Println("此段落结束当前任务池...", len(taskList), taskList) } updatelock.Unlock() } else { time.Sleep(15 * time.Second) } } } func lastUdpJob() { for { responselock.Lock() if time.Now().Unix()-lastNodeResponse >= 1800 { lastNodeResponse = time.Now().Unix() //重置时间 sendErrMailApi("判重增量~发现处理流程超时~给予告警", fmt.Sprintf("半小时左右~无新段落数据进入判重增量流程...相关人员检查...")) } responselock.Unlock() time.Sleep(300 * time.Second) } }