package udptask import ( util "common_utils" "encoding/json" "fmt" "io/ioutil" "log" mu "mfw/util" "net" "net/http" qutil "qfw/util" "sync" u "util" // "sync" "task" "time" . "tools" ) var responselock sync.Mutex var LastNodeResponse int64 func InitUdp() { go func() { updport, _ := Config["udpport"].(string) Udpclient = mu.UdpClient{Local: ":" + updport, BufSize: 1024} Udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) }() go checkMapJob() } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { log.Println("udp回调", string(data), "-----", ra, "-----", act) defer qutil.Catch() switch act { case mu.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo) stype, _ := mapInfo["stype"].(string) if err != nil || stype == "" { Udpclient.WriteUdp([]byte("stype:"+stype+",err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { if stype == "distributed" { //分布式抽取分支 log.Println("分布式00000--开始") go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qutil.ObjToString(mapInfo["ip"])+"udpok"), mu.OP_NOOP, ra) InstanceId := qutil.ObjToString(mapInfo["InstanceId"]) MgoDcs.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`, map[string]interface{}{ "$set": map[string]interface{}{ "extstatus": "running", }, }, true, false) //执行抽取任务 ExtractByUdp(ra, qutil.ObjToString(mapInfo["InstanceId"]), qutil.ObjToString(mapInfo["ip"])) MgoDcs.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`, map[string]interface{}{ "$set": map[string]interface{}{ "extstatus": "ok", }, }, true, false) log.Println("分布式抽取完成,可以释放esc实例", qutil.ObjToString(mapInfo["ip"])) } else { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) //行业分类开始,更新bidding_processing_ids表dataprocess=5 if stype == "hangye" { LastNodeResponse = time.Now().Unix() task.HangyeUdps <- mapInfo gtid := mapInfo["gtid"].(string) lteid := mapInfo["lteid"].(string) query := map[string]interface{}{ "gtid": gtid, "lteid": lteid, } // 使用老表更新dataprocess 时 if util.ObjToString(Config["dbname_old"]) != "" { set := map[string]interface{}{ "$set": map[string]interface{}{ "dataprocess_ai": 4, "updatetime": time.Now().Unix(), }, } MgoClassOld.Update("bidding_processing_ids", query, set, false, false) } else { set := map[string]interface{}{ "$set": map[string]interface{}{ "dataprocess": 5, "updatetime": time.Now().Unix(), }, } MgoClass.Update("bidding_processing_ids", query, set, false, false) } } else if stype == "monitor" { //程序监听类型 fmt.Println("stype :monitor") } else if stype != "" { go UdpTask(stype, mapInfo) //执行分类 } else { log.Println("stype 为空") } } } case mu.OP_NOOP: //下个节点回应 udptaskmap.Delete(string(data)) log.Println("下节点回应:", string(data)) } } // 行业分类udp任务执行 func RunningHangyeClass() { defer qutil.Catch() go func() { for { time.Sleep(1 * time.Minute) qutil.Debug("内存中行业分类剩余id段个数:", len(task.HangyeUdps)) } }() for { mapInfo := <-task.HangyeUdps qutil.Debug("行业分类udps mapinfo:", mapInfo) UdpTask("hangye", mapInfo) } } // UdpTask udp 任务 func UdpTask(stype string, mapInfo map[string]interface{}) int { total := 0 defer qutil.Catch() tconf, _ := Config[stype].(map[string]interface{}) if tconf != nil { tid, _ := tconf["taskid"].(string) task.TaskLock.Lock() defer task.TaskLock.Unlock() t := task.NEWTASKPOOL[tid] log.Println("ttt==nil:", t == nil) if t == nil || (t != nil && t.B_UpdateRule) { //加载任务 //更新任务的b_updaterule,避免下次重新加载rule task.UpdateTaskInfo(false, tid) task.InitTaskData(tid) //初始化任务信息 bres, tt, _ := task.NewAnalyTask(tid, "", "", "", 5) //初始化连接 if bres && tt != nil { task.NEWTASKPOOL[tid] = tt log.Println("udp加载任务", tt.S_name) } } t = task.NEWTASKPOOL[tid] if t != nil { t.I_wordcount = 0 q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": u.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": u.StringTOBsonId(mapInfo["lteid"].(string)), }, } } mapInfo["q"] = q log.Println("启动任务") if t.MulMgo != nil { log.Println(stype, "分类,走合并数据") total = task.UdpTaskRunAll(t, true, mapInfo, stype) } else { log.Println(stype, "分类,不走合并数据") total = task.NewTaskRunAll(t, true, mapInfo) } //任务完成,调度下个节点 if tconf["nextNode"] != nil && mapInfo["stop"] == nil { arr := qutil.ObjArrToMapArr(tconf["nextNode"].([]interface{})) if len(arr) > 0 { for _, to := range arr { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) key := sid + "-" + eid + "-" + qutil.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": qutil.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: qutil.IntAll(to["port"]), } node := &UdpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } } } } return total } // 分布式抽取-执行 func ExtractByUdp(ra *net.UDPAddr, instanceId ...string) { if len(instanceId) > 0 { //分布式抽取进度 go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra) for { tsk, b := MgoDcs.FindOne("esctask", `{"state":{"$lt":1}}`) if tsk != nil && !b { break } MgoDcs.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{ "$set": map[string]interface{}{ "InstanceId": instanceId[0], "state": 1, "runtime": time.Now().Format(qutil.Date_Full_Layout), }, }) ip := qutil.ObjToString(instanceId[1]) sid := qutil.ObjToString((*tsk)["sid"]) eid := qutil.ObjToString((*tsk)["eid"]) mapinfo := map[string]interface{}{} if sid == "" || eid == "" { log.Println("sid,eid参数不能为空") break } mapinfo["ip"] = ip mapinfo["gtid"] = sid mapinfo["lteid"] = eid mapinfo["stop"] = "true" totalZB := UdpTask("newzhaobiao", mapinfo) //招标 totalHY := UdpTask("newhangye", mapinfo) //行业 totalYZ := UdpTask("newyezhu", mapinfo) //业主 totalBQ := UdpTask("newbiaoqian", mapinfo) //标签 //totalKT := UdpTask("kvtextzhaobiao", []byte{}, mapinfo) //kvtext if totalZB > 0 { log.Println("总数-数量:", totalZB) } MgoDcs.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{ "$set": map[string]interface{}{ "InstanceId": instanceId[0], "oktime": time.Now().Format(qutil.Date_Full_Layout), "state": 1, }, }) set := map[string]interface{}{ "$inc": map[string]interface{}{ "step": 1, "totalnum": totalZB, "totalhy": totalHY, "totalyz": totalYZ, "totalbq": totalBQ, }, } //如果同一id段数据不一致做记录 if totalZB != totalHY || totalZB != totalYZ || totalHY != totalYZ || totalZB == 0 { set["$addToset"] = map[string]interface{}{ "errnum": map[string]interface{}{ "sid": sid, "eid": eid, "totalnum": totalZB, "totalhy": totalHY, "totalyz": totalYZ, //"totalkt": totalKT, }, } } MgoDcs.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`, set, true, false) } log.Println("分类完成") } } // LastUdpJob 处理UDP 没有接受数据 func LastUdpJob() { for { responselock.Lock() if time.Now().Unix()-LastNodeResponse >= 1800 { LastNodeResponse = time.Now().Unix() //重置时间 sendErrMailApi("分类异常", fmt.Sprintf("半小时左右~无新段落数据进入 分类流程...相关人员检查...")) } responselock.Unlock() time.Sleep(300 * time.Second) } } // sendErrMailApi 发送邮件 func sendErrMailApi(title, body string) { jkmail, _ := Config["jkmail"].(map[string]interface{}) if jkmail != nil { tomail, _ = jkmail["to"].(string) api, _ = jkmail["api"].(string) } log.Println(tomail, api) res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body)) if err == nil { defer res.Body.Close() read, err := ioutil.ReadAll(res.Body) log.Println("邮件发送成功:", string(read), err) } else { log.Println("邮件发送失败:", err) } }