package main import ( "encoding/json" "fmt" "log" mu "mfw/util" "mongodb" "net" nsq "nsq" "os" "os/signal" "qfw/util" "qfw/util/elastic" "strings" "syscall" "time" ) var ( udpclient mu.UdpClient //udp对象 SingleThread = make(chan bool, 1) SingleClear = 0 toaddr = []*net.UDPAddr{} //下节点对象 ChSign = make(chan os.Signal) Es *elastic.Elastic Index string Itype string Mcmer *nsq.Consumer sid, eid string //测试使用 UdpChan = make(chan map[string]interface{}, 500) ) func init() { signal.Notify(ChSign) go DealSign() nextNode := util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) for _, m := range nextNode { toaddr = append(toaddr, &net.UDPAddr{ IP: net.ParseIP(m["addr"].(string)), Port: util.IntAll(m["port"]), }) } es := Sysconfig["es"].(map[string]interface{}) Es = &elastic.Elastic{ S_esurl: util.ObjToString(es["addr"]), I_size: util.IntAllDef(es["pool"], 10), } Index = util.ObjToString(es["index"]) Itype = util.ObjToString(es["itype"]) Es.InitElasticSize() P_QL = NewPT() go P_QL.updateAllQueue() go P_QL.clearMem() } var queryClose = make(chan bool) var queryCloseOver = make(chan bool) func DealSign() { for { select { case sign := <-ChSign: //log.Println("receive:", sign) if v, ok := sign.(syscall.Signal); ok && v == os.Interrupt { log.Println("receice signal..,start close iter") if P_QL.Brun { queryClose <- true select { case <-queryCloseOver: case <-time.After(30 * time.Second): } } util.ReadConfig(&Sysconfig) log.Println("signal deal over") } } } } func main() { P_QL.loadSpiderCode() P_QL.loadSite() if Sysconfig["loadStart"] != nil { loadStart := util.Int64All(Sysconfig["loadStart"]) if loadStart > -1 { P_QL.loadData(loadStart) } } go checkMapJob() go P_QL.nsqMethod() for { mapinfo, ok := <-UdpChan if !ok { continue } SingleThread <- true tasktype := util.ObjToString(mapinfo["stype"]) switch tasktype { case "ql": //全量合并 go func() { defer func() { <-SingleThread }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskQl(mapinfo) }() case "project": //增量合并, go func() { defer func() { <-SingleThread }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskZl(mapinfo) }() case "project_history": //增量合并, id段历史数据 go func() { defer func() { <-SingleThread }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskZl(mapinfo) }() case "updateInfo": //招标字段变更 go func() { defer func() { <-SingleThread }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskUpdateInfo(mapinfo) }() case "updatePro": //修改项目外围字段(只修改外围字段值) go func() { defer func() { <-SingleThread }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.taskUpdatePro(mapinfo) }() case "deleteInfo": // 删除招标公告 go func() { defer func() { <-SingleThread }() P_QL.currentType = tasktype P_QL.pici = time.Now().Unix() P_QL.delInfoPro(mapinfo) }() case "spider": // 爬虫代码code、isflow go func() { defer func() { <-SingleThread }() go P_QL.loadSpiderCode() }() case "history": //历史数据合并,暂时不写 go func() { defer func() { <-SingleThread }() }() default: <-SingleThread } } } func mainT() { sid = "62df420b2e43d7e553df78f3" eid = "62df420b2e43d7e553df78f3" //flag.StringVar(&sid, "sid", "", "开始id") //flag.StringVar(&eid, "eid", "", "结束id") //flag.Parse() mapinfo := map[string]interface{}{} if sid == "" || eid == "" { log.Println("sid, eid参数不能为空") os.Exit(0) } mapinfo["gtid"] = sid mapinfo["lteid"] = eid mapinfo["stype"] = "ql" mapinfo["ip"] = "127.0.0.1" mapinfo["port"] = Sysconfig["udpport"] P_QL.loadSpiderCode() P_QL.loadSite() if Sysconfig["loadStart"] != nil { loadStart := util.Int64All(Sysconfig["loadStart"]) if loadStart > -1 { P_QL.loadData(loadStart) } } P_QL.loadSite() P_QL.currentType = mapinfo["stype"].(string) P_QL.pici = time.Now().Unix() P_QL.taskQl(mapinfo) time.Sleep(99999 * time.Hour) } //udp调用信号 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo) if err != nil { _ = udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) UdpChan <- mapInfo } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { udptaskmap.Delete(ok) log.Println("ok:", ok) } } } // @Description nsq处理id不变,内容替换的竞品数据 // @Author J 2022/8/10 11:40 func (p *ProjectTask) nsqMethod() { cof := Sysconfig["nsq_id"].(map[string]interface{}) var err error Mcmer, err = nsq.NewConsumer(&nsq.Cconfig{ IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断 Addr: util.ObjToString(cof["addr"]), ConnectType: 0, //默认连接nsqd Topic: util.ObjToString(cof["topic"]), Channel: util.ObjToString(cof["channel"]), Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数 }) if err != nil { util.Debug("nsqMethod err: ", err.Error()) } for { select { case obj := <-Mcmer.Ch: //从通道读取即可 util.Debug("project nsq: " + fmt.Sprint(obj)) id := strings.Split(util.ObjToString(obj), "=") if mongodb.IsObjectIdHex(id[1]) { p.taskinfo(id[1]) } else { util.Debug("jy nsq id err: ", id[1]) } } } }