package main import ( "encoding/json" "io/ioutil" "log" mu "mfw/util" "net" "net/http" "qfw/util" "qfw/util/redis" "time" ) func main() { // company_id redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4) inits() //go inspectQuery() go checkMapJob() go task_index() go UpdateBidding() go UpdateExtract() go SaveEsMethod() go SaveAllEsMethod() go SaveElseEsMethod() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) util.Debug("Udp服务监听", updport) ch := make(chan bool, 1) <-ch } /** 检查es查询队列 10s查询一次 */ func inspectQuery() { ticker := time.NewTicker(time.Second * 10) url := esAddr + "/_nodes/stats/thread_pool" for range ticker.C { resp, _ := http.Get(url) if resp != nil && resp.Body != nil { defer resp.Body.Close() } body, _ := ioutil.ReadAll(resp.Body) respMap := make(map[string]interface{}) err := json.Unmarshal(body, &respMap) if err == nil { if data, o1 := respMap["nodes"].(map[string]interface{}); o1 { if nodes, o2 := data[esNode].(map[string]interface{}); o2 { if pool, o3 := nodes["thread_pool"].(map[string]interface{}); o3 { index, _ := pool["index"].(map[string]interface{}) search, _ := pool["search"].(map[string]interface{}) bulk, _ := pool["bulk"].(map[string]interface{}) if util.IntAll(index["queue"]) > 0 || util.IntAll(search["queue"]) > 0 || util.IntAll(bulk["queue"]) > 0 { util.Debug("es thread_pool index queue---", index["queue"]) util.Debug("es thread_pool search queue---", search["queue"]) util.Debug("es thread_pool bulk queue---", bulk["queue"]) StopFlag = true } else { StopFlag = false } } } } } } } var pool = make(chan bool, 20) func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 //从表中开始处理生成企业数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo, string(data)) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) tasktype, _ := mapInfo["stype"].(string) t := NewTk(mapInfo) switch tasktype { case "winner": pool <- true go func() { defer func() { <-pool }() winnerTask(data, mapInfo) }() case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万 pool <- true go func() { defer func() { <-pool }() t.thread = 1 t.biddingTask(data, mapInfo) }() case "bidding_history": //增量id段历史数据 pool <- true go func() { defer func() { <-pool }() t.thread = 1 t.biddingTask(data, mapInfo) }() case "project": pool <- true go func() { defer func() { <-pool }() projectTask(data, project, mapInfo) }() case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引 pool <- true go func() { defer func() { <-pool }() t.thread = 30 t.biddingBackTask(data, mapInfo) }() case "biddingall": //合并并重新生成索引,不生成关键词 pool <- true go func() { defer func() { <-pool }() t.thread = 30 t.biddingAllTask(data, mapInfo) }() case "biddingdata": //bidding全量数据 pool <- true go func() { defer func() { <-pool }() t.thread = 30 t.biddingDataTask(data, mapInfo) }() case "biddingmerge": //重新合并但不生成索引,不生成关键词 pool <- true go func() { defer func() { <-pool }() biddingMergeTask(data, mapInfo) }() case "buyer": pool <- true go func() { defer func() { <-pool }() buyerTask(data, mapInfo) }() case "winnerent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("winnerent", mapInfo) }() case "buyerent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("buyerent", mapInfo) }() case "agencyent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("agencyent", mapInfo) }() case "biddingdelbyextract": //根据repeat删除es pool <- true go func() { defer func() { <-pool }() biddingDelByExtract(data, mapInfo) }() case "biddingdelbyextracttype": //根据extracttype删除es pool <- true go func() { defer func() { <-pool }() biddingDelByExtracttype(data, mapInfo) }() default: pool <- true go func() { defer func() { <-pool }() util.Debug("err ---", mapInfo) }() } } case mu.OP_NOOP: //下个节点回应 log.Println("发送成功", string(data)) } } func SaveEsMethod() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveEsPool: arru[indexu] = v indexu++ if indexu == 200 { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) if len(multiIndex) == 2 { Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true) } }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) if len(multiIndex) == 2 { Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true) } }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } } func SaveElseEsMethod() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveEsElsePool: arru[indexu] = v indexu++ if indexu == 200 { saveEsElseSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsElseSp }() Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEsElseSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsElseSp }() Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true) }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } } func SaveAllEsMethod() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveEsAllPool: arru[indexu] = v indexu++ if indexu == 200 { saveEsAllSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsAllSp }() Es1.BulkSave("bidding_all", "bidding", &arru, true) }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEsAllSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsAllSp }() Es1.BulkSave("bidding_all", "bidding", &arru, true) }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } } func UpdateBidding() { arru := make([][]map[string]interface{}, MgoBulkSize) indexu := 0 for { select { case v := <-updateBiddingPool: arru[indexu] = v indexu++ if indexu == MgoBulkSize { updateBiddingSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateBiddingSp }() biddingMgo.UpdateBulk(currentColl, arru...) }(arru) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateBiddingSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateBiddingSp }() biddingMgo.UpdateBulk(currentColl, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } } } } func UpdateExtract() { extract := util.ObjToString(extract["collect"]) arru := make([][]map[string]interface{}, MgoBulkSize) indexu := 0 for { select { case v := <-updateExtractPool: arru[indexu] = v indexu++ if indexu == MgoBulkSize { updateExtractSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateExtractSp }() extractMgo.UpdateBulk(extract, arru...) }(arru) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateExtractSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateExtractSp }() extractMgo.UpdateBulk(extract, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } } } }