package main import ( "encoding/json" "fmt" "gopkg.in/mgo.v2/bson" "io/ioutil" "log" "net" "net/http" "strings" "time" util "utils" "utils/nsq" "utils/udp" ) var ( Mcmer *gonsq.Consumer ) func main() { // company_id //redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4) //inits() //go inspectQuery() //go checkMapJob() //go task_index() //go nsqMethod() go UpdateBidding() go UpdateExtract() go SaveEsMethod() go SaveAllEsMethod() go SaveElseEsMethod() go SaveProjectEs() updport := Sysconfig["udpport"].(string) udpclient = udp.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) util.Debug("Udp服务监听", updport) ch := make(chan bool, 1) <-ch } /** 检查es查询队列 10s查询一次 */ func inspectQuery() { ticker := time.NewTicker(time.Second * 10) url := esAddr + "/_nodes/stats/thread_pool" for range ticker.C { resp, _ := http.Get(url) if resp != nil && resp.Body != nil { defer resp.Body.Close() } body, _ := ioutil.ReadAll(resp.Body) respMap := make(map[string]interface{}) err := json.Unmarshal(body, &respMap) if err == nil { if data, o1 := respMap["nodes"].(map[string]interface{}); o1 { if nodes, o2 := data[esNode].(map[string]interface{}); o2 { if pool, o3 := nodes["thread_pool"].(map[string]interface{}); o3 { index, _ := pool["index"].(map[string]interface{}) search, _ := pool["search"].(map[string]interface{}) bulk, _ := pool["bulk"].(map[string]interface{}) if util.IntAll(index["queue"]) > 0 || util.IntAll(search["queue"]) > 0 || util.IntAll(bulk["queue"]) > 0 { util.Debug("es thread_pool index queue---", index["queue"]) util.Debug("es thread_pool search queue---", search["queue"]) util.Debug("es thread_pool bulk queue---", bulk["queue"]) StopFlag = true } else { StopFlag = false } } } } } } } var pool = make(chan bool, 20) func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: //上个节点的数据 //从表中开始处理生成企业数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo, string(data)) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpclient.WriteUdp([]byte(key), udp.OP_NOOP, ra) tasktype, _ := mapInfo["stype"].(string) t := NewTk(mapInfo) switch tasktype { case "winner": pool <- true go func() { defer func() { <-pool }() winnerTask(data, mapInfo) }() case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万 pool <- true go func() { defer func() { <-pool }() t.thread = 1 t.biddingTask(data, mapInfo) }() case "bidding_history": //增量id段历史数据 pool <- true go func() { defer func() { <-pool }() t.thread = 1 t.biddingTask(data, mapInfo) }() case "project": pool <- true go func() { defer func() { <-pool }() projectTask(data, project, mapInfo) }() case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引 pool <- true go func() { defer func() { <-pool }() t.thread = 30 t.biddingBackTask(data, mapInfo) }() case "biddingall": //合并并重新生成索引,不生成关键词 pool <- true go func() { defer func() { <-pool }() t.thread = 30 t.biddingAllTask(data, mapInfo) }() case "biddingdata": //bidding全量数据 pool <- true go func() { defer func() { <-pool }() t.thread = 30 t.biddingDataTask(data, mapInfo) }() case "biddingmerge": //重新合并但不生成索引,不生成关键词 pool <- true go func() { defer func() { <-pool }() biddingMergeTask(data, mapInfo) }() case "buyer": pool <- true go func() { defer func() { <-pool }() buyerTask(data, mapInfo) }() case "winnerent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("winnerent", mapInfo) }() case "buyerent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("buyerent", mapInfo) }() case "agencyent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("agencyent", mapInfo) }() case "biddingdelbyextract": //根据repeat删除es pool <- true go func() { defer func() { <-pool }() biddingDelByExtract(data, mapInfo) }() case "biddingdelbyextracttype": //根据extracttype删除es pool <- true go func() { defer func() { <-pool }() biddingDelByExtracttype(data, mapInfo) }() default: pool <- true go func() { defer func() { <-pool }() util.Debug("err ---", mapInfo) }() } } case udp.OP_NOOP: //下个节点回应 log.Println("发送成功", string(data)) } } func SaveEsMethod() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveEsPool: arru[indexu] = v indexu++ if indexu == EsBulkSize { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() //Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) //if len(multiIndex) == 2 { // Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true) //} }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() //Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) //if len(multiIndex) == 2 { // Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true) //} }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } } func SaveElseEsMethod() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveEsElsePool: arru[indexu] = v indexu++ if indexu == EsBulkSize { saveEsElseSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsElseSp }() Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEsElseSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsElseSp }() Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } } func SaveAllEsMethod() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveEsAllPool: arru[indexu] = v indexu++ if indexu == EsBulkSize { saveEsAllSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsAllSp }() Es1.BulkSave("biddingall", "bidding", &arru, true) }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEsAllSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsAllSp }() Es1.BulkSave("biddingall", "bidding", &arru, true) }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } } func SaveProjectEs() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveProjectEsPool: arru[indexu] = v indexu++ if indexu == EsBulkSize { saveProjectSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveProjectSp }() //Es1.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true) Es2.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true) }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveProjectSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveProjectSp }() //Es1.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true) Es2.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true) }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } } func UpdateBidding() { arru := make([][]map[string]interface{}, MgoBulkSize) indexu := 0 for { select { case v := <-updateBiddingPool: arru[indexu] = v indexu++ if indexu == MgoBulkSize { updateBiddingSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateBiddingSp }() biddingMgo.UpdateBulk(currentColl, arru...) }(arru) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateBiddingSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateBiddingSp }() biddingMgo.UpdateBulk(currentColl, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } } } } func UpdateExtract() { extract := util.ObjToString(extract["collect"]) arru := make([][]map[string]interface{}, MgoBulkSize) indexu := 0 for { select { case v := <-updateExtractPool: arru[indexu] = v indexu++ if indexu == MgoBulkSize { updateExtractSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateExtractSp }() extractMgo.UpdateBulk(extract, arru...) }(arru) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateExtractSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateExtractSp }() extractMgo.UpdateBulk(extract, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } } } } // @Description nsq处理id不变,内容替换的竞品数据 // @Author J 2022/8/10 11:40 func nsqMethod() { cof := Sysconfig["nsq_id"].(map[string]interface{}) var err error Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{ IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断 Addr: util.ObjToString(cof["addr"]), ConnectType: 0, //默认连接nsqd Topic: util.ObjToString(cof["topic"]), Channel: util.ObjToString(cof["channel"]), Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数 }) if err != nil { util.Debug("nsqMethod err: ", err.Error()) } for { select { case obj := <-Mcmer.Ch: //从通道读取即可 util.Debug("index nsq: " + fmt.Sprint(obj)) id := strings.Split(util.ObjToString(obj), "=") if bson.IsObjectIdHex(id[1]) { taskinfo(id[1]) } else { util.Debug("jy nsq id err: ", id[1]) } } } }