package main import ( "encoding/json" "flag" "log" mu "mfw/util" "net" "os" "os/signal" "qfw/util" "syscall" "time" ) var ( udpclient mu.UdpClient //udp对象 SingleThread = make(chan bool, 1) SingleClear = 0 toaddr = []*net.UDPAddr{} //下节点对象 ChSign = make(chan os.Signal) sid, eid string //测试使用 ) func init() { signal.Notify(ChSign) go DealSign() nextNode := util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) for _, m := range nextNode { toaddr = append(toaddr, &net.UDPAddr{ IP: net.ParseIP(m["addr"].(string)), Port: util.IntAll(m["port"]), }) } } var queryClose = make(chan bool) var queryCloseOver = make(chan bool) func DealSign() { for { select { case sign := <-ChSign: log.Println("receive:", sign) if v, ok := sign.(syscall.Signal); ok && v == os.Interrupt { log.Println("receice signal..,start close iter") if P_QL.Brun { queryClose <- true select { case <-queryCloseOver: case <-time.After(30 * time.Second): } } util.ReadConfig(&Sysconfig) log.Println("signal deal over") } } } } func main() { //udp跑增量 id段 project //udp跑全量 ql //udp跑历史数据 信息id1,id2/或id段 ls //udp强制合并 信息id1,id2,id3 [项目id] 不存在时新建 qzhb //udp强制拆分 项目id,信息id1,id2 qzcf //udp重新合并 信息id1,id2,id3 cxhb if Sysconfig["loadStart"] != nil { loadStart := util.Int64All(Sysconfig["loadStart"]) if loadStart > -1 { P_QL.loadData(loadStart) } } P_QL.loadSite() go checkMapJob() time.Sleep(99999 * time.Hour) } //测试组人员使用 func mainT() { //sid = "5649a0fcaf5374672e005704" //eid = "5e169e5250b5ea296ec896f0" flag.StringVar(&sid, "sid", "", "开始id") flag.StringVar(&eid, "eid", "", "结束id") flag.Parse() mapinfo := map[string]interface{}{} if sid == "" || eid == "" { log.Println("sid, eid参数不能为空") os.Exit(0) } mapinfo["gtid"] = sid mapinfo["lteid"] = eid mapinfo["stype"] = "ql" mapinfo["ip"] = "127.0.0.1" mapinfo["port"] = Sysconfig["udpport"] if Sysconfig["loadStart"] != nil { loadStart := util.Int64All(Sysconfig["loadStart"]) if loadStart > -1 { P_QL.loadData(loadStart) } } P_QL.loadSite() P_QL.currentType = mapinfo["stype"].(string) P_QL.pici = time.Now().Unix() P_QL.taskQl(mapinfo) time.Sleep(20 * time.Second) } //udp调用信号 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo) if err != nil { _ = udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) SingleThread <- true tasktype, _ := mapInfo["stype"].(string) log.Println("tasktype:", tasktype) switch tasktype { case "ql": //全量合并 go func() { defer func() { <-SingleThread }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskQl(mapInfo) }() case "project": //增量合并,未抽取到项目名称或项目编号的不合并 bidding中mergestatus 1已合并 2字段问题不合并 3历史待合并 //合同、验收公告在6个月内查询不到可扩展到两年 go func() { defer func() { <-SingleThread }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskZl(mapInfo) }() case "updateInfo": //招标字段变更 go func() { defer func() { <-SingleThread }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskUpdateInfo(mapInfo) }() case "history": //历史数据合并,暂时不写 go func() { defer func() { <-SingleThread }() }() } } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { udptaskmap.Delete(ok) log.Println("ok:", ok) } } }