123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- package main
- /**
- 招标信息判重
- **/
- import (
- "encoding/json"
- "flag"
- "flow_repeat/nsqdata"
- "fmt"
- "log"
- "net"
- "regexp"
- "sync"
- "time"
- "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
- qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- data_mgo, task_mgo, spider_mgo *MongodbSim
- task_coll, task_bidding, spider_coll string
- extract, extract_back, extract_log string
- udpclient mu.UdpClient
- nextNode []map[string]interface{}
- dupdays = 7
- DM, FullDM *datamap
- Update *updateInfo
- AddGroupPool *addGroupInfo
- //正则筛选相关
- FilterRegTitle = regexp.MustCompile("^_$")
- FilterRegTitle_0 = regexp.MustCompile("^_$")
- FilterRegTitle_1 = regexp.MustCompile("^_$")
- FilterRegTitle_2 = regexp.MustCompile("^_$")
- threadNum int
- SiteMap map[string]map[string]interface{}
- LowHeavy, TimingTask, IsFull, isUpdateSite bool
- timingSpanDay, timingPubScope int64
- gtid, lastid, sec_gtid, sec_lteid, lteid string
- updatelock, datalock, numlock, cronlock sync.Mutex
- jyfb_data map[string]string
- taskList []map[string]interface{}
- nspdata_1, nspdata_2 *nsqdata.Producer
- responselock sync.Mutex
- lastNodeResponse int64
- jn *jnats.Jnats
- )
- // 初始化加载
- func init() {
- flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
- flag.StringVar(>id, "gtid", "", "历史增量的起始id") //历史
- flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
- flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
- flag.Parse()
- qu.ReadConfig(&Sysconfig)
- InitAllInfos() //加载所有信息...
- }
- func main() {
- if TimingTask {
- log.Println("正常历史部署...组装...")
- go historyRepeat()
- } else {
- log.Println("正常增量部署...流式...")
- jn = jnats.NewJnats("192.168.3.240:19092")
- //
- ////先消费,带zip压缩,用于跨网传输节省流量
- //jn.SubZip("test", func(msg *nats.Msg) {
- // log.Println(string(msg.Data))
- // //回执消息
- // msg.Respond([]byte("receive msg:" + string(msg.Data)))
- //})
- }
- time.Sleep(99999 * time.Hour)
- }
- func mainTest() {
- increaseRepeat(map[string]interface{}{
- "gtid": "12ec61170ae152a3c2310f02",
- "lteid": "92ec61170ae152a3c2310f02",
- })
- time.Sleep(99999 * time.Hour)
- }
- // 主函数
- func mainTestTest() {
- go checkMailJob()
- lastNodeResponse = time.Now().Unix()
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- if TimingTask {
- log.Println("正常历史部署...")
- go historyRepeat()
- } else {
- if !IsFull {
- log.Println("正常增量部署与监控机制...")
- go lastUdpJob()
- go getRepeatTask()
- }
- }
- time.Sleep(99999 * time.Hour)
- }
- // udp接收
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
- stype := qu.ObjToString(mapInfo["stype"])
- if stype == "monitor" {
- log.Println("收到监测......")
- key := qu.ObjToString(mapInfo["key"])
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- return
- }
- if sid == "" || eid == "" {
- log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid)
- } else {
- lastNodeResponse = time.Now().Unix()
- key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- //计算是否需要加载站点~每天加载一次
- if isUpdateSite {
- initSite()
- }
- //插入任务-判断任务-是否存在
- updatelock.Lock()
- taskList = append(taskList, mapInfo)
- log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
- updatelock.Unlock()
- }
- }
- case mu.OP_NOOP: //下个节点回应
- log.Println("下节点回应:", string(data))
- udptaskmap.Delete(string(data))
- }
- }
- // 监听-获取-分发判重任务
- func getRepeatTask() {
- for {
- if len(taskList) > 0 {
- updatelock.Lock()
- len_list := len(taskList)
- if len_list > 1 {
- first_id := taskList[0]["gtid"]
- end_id := taskList[len_list-1]["lteid"]
- if first_id != "" && end_id != "" {
- log.Println("合并段落~正常~", first_id, "~", end_id)
- increaseRepeat(map[string]interface{}{
- "gtid": first_id,
- "lteid": end_id,
- })
- taskList = taskList[len_list:]
- log.Println("此段落结束当前任务池...", len(taskList), taskList)
- } else {
- log.Println("合并段落~错误~正常取段落~~~")
- mapInfo := taskList[0]
- if mapInfo != nil {
- increaseRepeat(mapInfo) //判重方法
- }
- taskList = taskList[1:]
- log.Println("此段落结束当前任务池...", len(taskList), taskList)
- }
- } else {
- mapInfo := taskList[0]
- if mapInfo != nil {
- increaseRepeat(mapInfo) //判重方法
- }
- taskList = taskList[1:]
- log.Println("此段落结束当前任务池...", len(taskList), taskList)
- }
- updatelock.Unlock()
- } else {
- time.Sleep(15 * time.Second)
- }
- }
- }
- func lastUdpJob() {
- for {
- responselock.Lock()
- if time.Now().Unix()-lastNodeResponse >= 1800 {
- lastNodeResponse = time.Now().Unix() //重置时间
- sendErrMailApi("判重增量~发现处理流程超时~给予告警", fmt.Sprintf("半小时左右~无新段落数据进入判重增量流程...相关人员检查..."))
- }
- responselock.Unlock()
- time.Sleep(300 * time.Second)
- }
- }
|