package main import ( "encoding/json" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" gonsq "jygit.jydev.jianyu360.cn/data_processing/common_utils/nsq" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "os" "os/signal" "project/config" "strings" "syscall" "time" ) var ( udpclient udp.UdpClient //udp对象 SingleThread = make(chan bool, 1) SingleClear = 0 toaddr = []*net.UDPAddr{} //下节点对象 ChSign = make(chan os.Signal) Es *elastic.Elastic Mcmer *gonsq.Consumer sid, eid string //测试使用 UdpChan = make(chan map[string]interface{}, 500) ) func init() { signal.Notify(ChSign) go DealSign() for _, m := range config.Conf.UdpNode { toaddr = append(toaddr, &net.UDPAddr{ IP: net.ParseIP(m.Addr), Port: util.IntAll(m.Port), }) } Es = &elastic.Elastic{ S_esurl: config.Conf.DB.Es.Addr, I_size: config.Conf.DB.Es.Size, Username: config.Conf.DB.Es.User, Password: config.Conf.DB.Es.Password, } Es.InitElasticSize() P_QL = NewPT() go P_QL.updateAllQueue() go P_QL.clearMem() } var queryClose = make(chan bool) var queryCloseOver = make(chan bool) func DealSign() { for { select { case sign := <-ChSign: //log.Println("receive:", sign) if v, ok := sign.(syscall.Signal); ok && v == os.Interrupt { log.Info("receice signal..,start close iter") if P_QL.Brun { queryClose <- true select { case <-queryCloseOver: case <-time.After(30 * time.Second): } } log.Info("signal deal over") } } } } func mainT() { P_QL.loadSpiderCode() P_QL.loadSite() if config.Conf.Serve.LoadStart > 0 { P_QL.loadData(config.Conf.Serve.LoadStart) } //go checkMapJob() //go P_QL.nsqMethod() for { mapinfo, ok := <-UdpChan if !ok { continue } tasktype := util.ObjToString(mapinfo["stype"]) switch tasktype { case "ql": //全量合并 go func() { defer func() { <-SingleThread }() SingleThread <- true P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskQl(mapinfo) }() case "project": //增量合并, go func() { defer func() { <-SingleThread }() SingleThread <- true P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskZl(mapinfo) }() case "project_history": //增量合并, id段历史数据 go func() { defer func() { <-SingleThread }() SingleThread <- true P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskZl(mapinfo) }() case "updateInfo": //招标字段变更 go func() { defer func() { }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskUpdateInfo(mapinfo) }() case "updatePro": //修改项目外围字段(只修改外围字段值) go func() { defer func() { }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskUpdatePro(mapinfo) }() case "deleteInfo": // 删除招标公告 go func() { defer func() { }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.delInfoPro(mapinfo) }() case "spider": // 爬虫代码code、isflow go func() { defer func() { }() go P_QL.loadSpiderCode() }() case "history": //历史数据合并,暂时不写 go func() { defer func() { }() }() default: <-SingleThread } } } func main() { sid = "626cccaa631ff1ac3d29289e" eid = "640aa55d8aea8786d1cd0247" //flag.StringVar(&sid, "sid", "", "开始id") //flag.StringVar(&eid, "eid", "", "结束id") //flag.Parse() mapinfo := map[string]interface{}{} if sid == "" || eid == "" { log.Info("sid, eid参数不能为空") os.Exit(0) } mapinfo["gtid"] = sid mapinfo["lteid"] = eid mapinfo["stype"] = "project" mapinfo["ip"] = "127.0.0.1" mapinfo["port"] = "1782" P_QL.loadSpiderCode() P_QL.loadSite() if config.Conf.Serve.LoadStart > 0 { P_QL.loadData(config.Conf.Serve.LoadStart) } P_QL.loadSite() P_QL.currentType = mapinfo["stype"].(string) P_QL.pici = time.Now().Unix() P_QL.taskQl(mapinfo) time.Sleep(99999 * time.Hour) } // udp调用信号 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("udp---", zap.Any("mapInfo:", mapInfo)) if err != nil { _ = udpclient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpclient.WriteUdp([]byte(key), udp.OP_NOOP, ra) if util.ObjToString(mapInfo["stype"]) != "monitor" { UdpChan <- mapInfo } } case udp.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { udptaskmap.Delete(ok) log.Info("ok:" + ok) } } } // @Description nsq处理id不变,内容替换的竞品数据 // @Author J 2022/8/10 11:40 func (p *ProjectTask) nsqMethod() { var err error Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{ IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断 Addr: config.Conf.Nsq.Addr, ConnectType: 0, //默认连接nsqd Topic: config.Conf.Nsq.Topic, Channel: config.Conf.Nsq.Channel, Concurrent: config.Conf.Nsq.Concurrent, //并发数 }) if err != nil { log.Info("nsqMethod err: " + err.Error()) } for { select { case obj := <-Mcmer.Ch: //从通道读取即可 id := strings.Split(util.ObjToString(obj), "=") if mongodb.IsObjectIdHex(id[1]) { p.taskinfo(id[1]) } else { log.Info("jy nsq id err: " + id[1]) } } } }