123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- package udptask
- import (
- util "common_utils"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "log"
- mu "mfw/util"
- "net"
- "net/http"
- qutil "qfw/util"
- "sync"
- u "util"
- // "sync"
- "task"
- "time"
- . "tools"
- )
- var responselock sync.Mutex
- var LastNodeResponse int64
- func InitUdp() {
- go func() {
- updport, _ := Config["udpport"].(string)
- Udpclient = mu.UdpClient{Local: ":" + updport, BufSize: 1024}
- Udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- time.Sleep(99999 * time.Hour)
- }()
- go checkMapJob()
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- log.Println("udp回调", string(data), "-----", ra, "-----", act)
- defer qutil.Catch()
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Println("err:", err, "mapInfo:", mapInfo)
- stype, _ := mapInfo["stype"].(string)
- if err != nil || stype == "" {
- Udpclient.WriteUdp([]byte("stype:"+stype+",err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- if stype == "distributed" { //分布式抽取分支
- log.Println("分布式00000--开始")
- go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qutil.ObjToString(mapInfo["ip"])+"udpok"), mu.OP_NOOP, ra)
- InstanceId := qutil.ObjToString(mapInfo["InstanceId"])
- MgoDcs.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
- map[string]interface{}{
- "$set": map[string]interface{}{
- "extstatus": "running",
- },
- }, true, false)
- //执行抽取任务
- ExtractByUdp(ra, qutil.ObjToString(mapInfo["InstanceId"]), qutil.ObjToString(mapInfo["ip"]))
- MgoDcs.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
- map[string]interface{}{
- "$set": map[string]interface{}{
- "extstatus": "ok",
- },
- }, true, false)
- log.Println("分布式抽取完成,可以释放esc实例", qutil.ObjToString(mapInfo["ip"]))
- } else {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- //行业分类开始,更新bidding_processing_ids表dataprocess=5
- if stype == "hangye" {
- LastNodeResponse = time.Now().Unix()
- task.HangyeUdps <- mapInfo
- gtid := mapInfo["gtid"].(string)
- lteid := mapInfo["lteid"].(string)
- query := map[string]interface{}{
- "gtid": gtid,
- "lteid": lteid,
- }
- // 使用老表更新dataprocess 时
- if util.ObjToString(Config["dbname_old"]) != "" {
- set := map[string]interface{}{
- "$set": map[string]interface{}{
- "dataprocess_ai": 4,
- "updatetime": time.Now().Unix(),
- },
- }
- MgoClassOld.Update("bidding_processing_ids", query, set, false, false)
- } else {
- set := map[string]interface{}{
- "$set": map[string]interface{}{
- "dataprocess": 5,
- "updatetime": time.Now().Unix(),
- },
- }
- MgoClass.Update("bidding_processing_ids", query, set, false, false)
- }
- } else if stype == "monitor" { //程序监听类型
- fmt.Println("stype :monitor")
- } else if stype != "" {
- go UdpTask(stype, mapInfo) //执行分类
- } else {
- log.Println("stype 为空")
- }
- }
- }
- case mu.OP_NOOP: //下个节点回应
- udptaskmap.Delete(string(data))
- log.Println("下节点回应:", string(data))
- }
- }
- // 行业分类udp任务执行
- func RunningHangyeClass() {
- defer qutil.Catch()
- go func() {
- for {
- time.Sleep(1 * time.Minute)
- qutil.Debug("内存中行业分类剩余id段个数:", len(task.HangyeUdps))
- }
- }()
- for {
- mapInfo := <-task.HangyeUdps
- qutil.Debug("行业分类udps mapinfo:", mapInfo)
- UdpTask("hangye", mapInfo)
- }
- }
- // UdpTask udp 任务
- func UdpTask(stype string, mapInfo map[string]interface{}) int {
- total := 0
- defer qutil.Catch()
- tconf, _ := Config[stype].(map[string]interface{})
- if tconf != nil {
- tid, _ := tconf["taskid"].(string)
- task.TaskLock.Lock()
- defer task.TaskLock.Unlock()
- t := task.NEWTASKPOOL[tid]
- log.Println("ttt==nil:", t == nil)
- if t == nil || (t != nil && t.B_UpdateRule) { //加载任务
- //更新任务的b_updaterule,避免下次重新加载rule
- task.UpdateTaskInfo(false, tid)
- task.InitTaskData(tid) //初始化任务信息
- bres, tt, _ := task.NewAnalyTask(tid, "", "", "", 5) //初始化连接
- if bres && tt != nil {
- task.NEWTASKPOOL[tid] = tt
- log.Println("udp加载任务", tt.S_name)
- }
- }
- t = task.NEWTASKPOOL[tid]
- if t != nil {
- t.I_wordcount = 0
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": u.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": u.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- mapInfo["q"] = q
- log.Println("启动任务")
- if t.MulMgo != nil {
- log.Println(stype, "分类,走合并数据")
- total = task.UdpTaskRunAll(t, true, mapInfo, stype)
- } else {
- log.Println(stype, "分类,不走合并数据")
- total = task.NewTaskRunAll(t, true, mapInfo)
- }
- //任务完成,调度下个节点
- if tconf["nextNode"] != nil && mapInfo["stop"] == nil {
- arr := qutil.ObjArrToMapArr(tconf["nextNode"].([]interface{}))
- if len(arr) > 0 {
- for _, to := range arr {
- sid, _ := mapInfo["gtid"].(string)
- eid, _ := mapInfo["lteid"].(string)
- key := sid + "-" + eid + "-" + qutil.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": qutil.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: qutil.IntAll(to["port"]),
- }
- node := &UdpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- }
- }
- }
- return total
- }
- // 分布式抽取-执行
- func ExtractByUdp(ra *net.UDPAddr, instanceId ...string) {
- if len(instanceId) > 0 { //分布式抽取进度
- go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
- for {
- tsk, b := MgoDcs.FindOne("esctask", `{"state":{"$lt":1}}`)
- if tsk != nil && !b {
- break
- }
- MgoDcs.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
- "$set": map[string]interface{}{
- "InstanceId": instanceId[0],
- "state": 1,
- "runtime": time.Now().Format(qutil.Date_Full_Layout),
- },
- })
- ip := qutil.ObjToString(instanceId[1])
- sid := qutil.ObjToString((*tsk)["sid"])
- eid := qutil.ObjToString((*tsk)["eid"])
- mapinfo := map[string]interface{}{}
- if sid == "" || eid == "" {
- log.Println("sid,eid参数不能为空")
- break
- }
- mapinfo["ip"] = ip
- mapinfo["gtid"] = sid
- mapinfo["lteid"] = eid
- mapinfo["stop"] = "true"
- totalZB := UdpTask("newzhaobiao", mapinfo) //招标
- totalHY := UdpTask("newhangye", mapinfo) //行业
- totalYZ := UdpTask("newyezhu", mapinfo) //业主
- totalBQ := UdpTask("newbiaoqian", mapinfo) //标签
- //totalKT := UdpTask("kvtextzhaobiao", []byte{}, mapinfo) //kvtext
- if totalZB > 0 {
- log.Println("总数-数量:", totalZB)
- }
- MgoDcs.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
- "$set": map[string]interface{}{
- "InstanceId": instanceId[0],
- "oktime": time.Now().Format(qutil.Date_Full_Layout),
- "state": 1,
- },
- })
- set := map[string]interface{}{
- "$inc": map[string]interface{}{
- "step": 1,
- "totalnum": totalZB,
- "totalhy": totalHY,
- "totalyz": totalYZ,
- "totalbq": totalBQ,
- },
- }
- //如果同一id段数据不一致做记录
- if totalZB != totalHY || totalZB != totalYZ || totalHY != totalYZ || totalZB == 0 {
- set["$addToset"] = map[string]interface{}{
- "errnum": map[string]interface{}{
- "sid": sid,
- "eid": eid,
- "totalnum": totalZB,
- "totalhy": totalHY,
- "totalyz": totalYZ,
- //"totalkt": totalKT,
- },
- }
- }
- MgoDcs.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`, set, true, false)
- }
- log.Println("分类完成")
- }
- }
- // LastUdpJob 处理UDP 没有接受数据
- func LastUdpJob() {
- for {
- responselock.Lock()
- if time.Now().Unix()-LastNodeResponse >= 1800 {
- LastNodeResponse = time.Now().Unix() //重置时间
- sendErrMailApi("分类异常", fmt.Sprintf("半小时左右~无新段落数据进入 分类流程...相关人员检查..."))
- }
- responselock.Unlock()
- time.Sleep(300 * time.Second)
- }
- }
- // sendErrMailApi 发送邮件
- func sendErrMailApi(title, body string) {
- jkmail, _ := Config["jkmail"].(map[string]interface{})
- if jkmail != nil {
- tomail, _ = jkmail["to"].(string)
- api, _ = jkmail["api"].(string)
- }
- log.Println(tomail, api)
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
- if err == nil {
- defer res.Body.Close()
- read, err := ioutil.ReadAll(res.Body)
- log.Println("邮件发送成功:", string(read), err)
- } else {
- log.Println("邮件发送失败:", err)
- }
- }
|