package main import ( "encoding/json" "log" mu "mfw/util" "net" "qfw/util" "time" ) var ( udpclient mu.UdpClient //udp对象 SingleThread = make(chan bool, 1) ) func main() { //udp跑增量 id段 project //udp跑全量 ql //udp跑历史数据 信息id1,id2/或id段 ls //udp强制合并 信息id1,id2,id3 [项目id] 不存在时新建 qzhb //udp强制拆分 项目id,信息id1,id2 qzcf //udp重新合并 信息id1,id2,id3 cxhb if Sysconfig["loadStart"] != nil { loadStart := util.Int64All(Sysconfig["loadStart"]) if loadStart > -1 { P_QL.loadData(loadStart) } } time.Sleep(99999 * time.Hour) } //udp调用信号 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) SingleThread <- true tasktype, _ := mapInfo["stype"].(string) log.Println("tasktype:", tasktype) switch tasktype { case "ql": //全量合并 go func() { defer func() { <-SingleThread }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskQl(mapInfo) }() case "project": //增量合并,未抽取到项目名称或项目编号的不合并 bidding中mergestatus 1已合并 2字段问题不合并 3历史待合并 //合同、验收公告在6个月内查询不到可扩展到两年 go func() { defer func() { <-SingleThread }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskZl(mapInfo) }() case "history": //历史数据合并,暂时不写 go func() { defer func() { <-SingleThread }() }() } } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Println("ok:", ok) } } }