123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- package main
- import (
- "encoding/json"
- "fmt"
- "gopkg.in/mgo.v2/bson"
- "io/ioutil"
- "log"
- "net"
- "net/http"
- "strings"
- "time"
- util "utils"
- "utils/nsq"
- "utils/udp"
- )
- var (
- Mcmer *gonsq.Consumer
- )
- func main() {
- // company_id
- //redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
- //inits()
- //go inspectQuery()
- //go checkMapJob()
- //go task_index()
- //go nsqMethod()
- go UpdateBidding()
- go UpdateExtract()
- go SaveEsMethod()
- go SaveAllEsMethod()
- go SaveElseEsMethod()
- go SaveProjectEs()
- updport := Sysconfig["udpport"].(string)
- udpclient = udp.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- util.Debug("Udp服务监听", updport)
- ch := make(chan bool, 1)
- <-ch
- }
- /**
- 检查es查询队列 10s查询一次
- */
- func inspectQuery() {
- ticker := time.NewTicker(time.Second * 10)
- url := esAddr + "/_nodes/stats/thread_pool"
- for range ticker.C {
- resp, _ := http.Get(url)
- if resp != nil && resp.Body != nil {
- defer resp.Body.Close()
- }
- body, _ := ioutil.ReadAll(resp.Body)
- respMap := make(map[string]interface{})
- err := json.Unmarshal(body, &respMap)
- if err == nil {
- if data, o1 := respMap["nodes"].(map[string]interface{}); o1 {
- if nodes, o2 := data[esNode].(map[string]interface{}); o2 {
- if pool, o3 := nodes["thread_pool"].(map[string]interface{}); o3 {
- index, _ := pool["index"].(map[string]interface{})
- search, _ := pool["search"].(map[string]interface{})
- bulk, _ := pool["bulk"].(map[string]interface{})
- if util.IntAll(index["queue"]) > 0 || util.IntAll(search["queue"]) > 0 || util.IntAll(bulk["queue"]) > 0 {
- util.Debug("es thread_pool index queue---", index["queue"])
- util.Debug("es thread_pool search queue---", search["queue"])
- util.Debug("es thread_pool bulk queue---", bulk["queue"])
- StopFlag = true
- } else {
- StopFlag = false
- }
- }
- }
- }
- }
- }
- }
- var pool = make(chan bool, 20)
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA: //上个节点的数据
- //从表中开始处理生成企业数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Println("err:", err, "mapInfo:", mapInfo, string(data))
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
- } else if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go udpclient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- tasktype, _ := mapInfo["stype"].(string)
- t := NewTk(mapInfo)
- switch tasktype {
- case "winner":
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- winnerTask(data, mapInfo)
- }()
- case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- t.thread = 1
- t.biddingTask(data, mapInfo)
- }()
- case "bidding_history": //增量id段历史数据
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- t.thread = 1
- t.biddingTask(data, mapInfo)
- }()
- case "project":
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- projectTask(data, project, mapInfo)
- }()
- case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- t.thread = 30
- t.biddingBackTask(data, mapInfo)
- }()
- case "biddingall": //合并并重新生成索引,不生成关键词
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- t.thread = 30
- t.biddingAllTask(data, mapInfo)
- }()
- case "biddingdata": //bidding全量数据
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- t.thread = 30
- t.biddingDataTask(data, mapInfo)
- }()
- case "biddingmerge": //重新合并但不生成索引,不生成关键词
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingMergeTask(data, mapInfo)
- }()
- case "buyer":
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- buyerTask(data, mapInfo)
- }()
- case "winnerent": //标准库
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- standardTask("winnerent", mapInfo)
- }()
- case "buyerent": //标准库
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- standardTask("buyerent", mapInfo)
- }()
- case "agencyent": //标准库
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- standardTask("agencyent", mapInfo)
- }()
- case "biddingdelbyextract": //根据repeat删除es
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingDelByExtract(data, mapInfo)
- }()
- case "biddingdelbyextracttype": //根据extracttype删除es
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingDelByExtracttype(data, mapInfo)
- }()
- default:
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- util.Debug("err ---", mapInfo)
- }()
- }
- }
- case udp.OP_NOOP: //下个节点回应
- log.Println("发送成功", string(data))
- }
- }
- func SaveEsMethod() {
- arru := make([]map[string]interface{}, EsBulkSize)
- indexu := 0
- for {
- select {
- case v := <-saveEsPool:
- arru[indexu] = v
- indexu++
- if indexu == EsBulkSize {
- saveEsSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEsSp
- }()
- //Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
- Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
- //if len(multiIndex) == 2 {
- // Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true)
- //}
- }(arru)
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveEsSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEsSp
- }()
- //Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
- Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
- //if len(multiIndex) == 2 {
- // Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true)
- //}
- }(arru[:indexu])
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- }
- }
- }
- func SaveElseEsMethod() {
- arru := make([]map[string]interface{}, EsBulkSize)
- indexu := 0
- for {
- select {
- case v := <-saveEsElsePool:
- arru[indexu] = v
- indexu++
- if indexu == EsBulkSize {
- saveEsElseSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEsElseSp
- }()
- Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
- }(arru)
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveEsElseSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEsElseSp
- }()
- Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- }
- }
- }
- func SaveAllEsMethod() {
- arru := make([]map[string]interface{}, EsBulkSize)
- indexu := 0
- for {
- select {
- case v := <-saveEsAllPool:
- arru[indexu] = v
- indexu++
- if indexu == EsBulkSize {
- saveEsAllSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEsAllSp
- }()
- Es1.BulkSave("biddingall", "bidding", &arru, true)
- }(arru)
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveEsAllSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEsAllSp
- }()
- Es1.BulkSave("biddingall", "bidding", &arru, true)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- }
- }
- }
- func SaveProjectEs() {
- arru := make([]map[string]interface{}, EsBulkSize)
- indexu := 0
- for {
- select {
- case v := <-saveProjectEsPool:
- arru[indexu] = v
- indexu++
- if indexu == EsBulkSize {
- saveProjectSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveProjectSp
- }()
- //Es1.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
- Es2.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
- }(arru)
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveProjectSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveProjectSp
- }()
- //Es1.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
- Es2.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- }
- }
- }
- func UpdateBidding() {
- arru := make([][]map[string]interface{}, MgoBulkSize)
- indexu := 0
- for {
- select {
- case v := <-updateBiddingPool:
- arru[indexu] = v
- indexu++
- if indexu == MgoBulkSize {
- updateBiddingSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateBiddingSp
- }()
- biddingMgo.UpdateBulk(currentColl, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, MgoBulkSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateBiddingSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateBiddingSp
- }()
- biddingMgo.UpdateBulk(currentColl, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, MgoBulkSize)
- indexu = 0
- }
- }
- }
- }
- func UpdateExtract() {
- extract := util.ObjToString(extract["collect"])
- arru := make([][]map[string]interface{}, MgoBulkSize)
- indexu := 0
- for {
- select {
- case v := <-updateExtractPool:
- arru[indexu] = v
- indexu++
- if indexu == MgoBulkSize {
- updateExtractSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateExtractSp
- }()
- extractMgo.UpdateBulk(extract, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, MgoBulkSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateExtractSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateExtractSp
- }()
- extractMgo.UpdateBulk(extract, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, MgoBulkSize)
- indexu = 0
- }
- }
- }
- }
- // @Description nsq处理id不变,内容替换的竞品数据
- // @Author J 2022/8/10 11:40
- func nsqMethod() {
- cof := Sysconfig["nsq_id"].(map[string]interface{})
- var err error
- Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{
- IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
- Addr: util.ObjToString(cof["addr"]),
- ConnectType: 0, //默认连接nsqd
- Topic: util.ObjToString(cof["topic"]),
- Channel: util.ObjToString(cof["channel"]),
- Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
- })
- if err != nil {
- util.Debug("nsqMethod err: ", err.Error())
- }
- for {
- select {
- case obj := <-Mcmer.Ch: //从通道读取即可
- util.Debug("index nsq: " + fmt.Sprint(obj))
- id := strings.Split(util.ObjToString(obj), "=")
- if bson.IsObjectIdHex(id[1]) {
- taskinfo(id[1])
- } else {
- util.Debug("jy nsq id err: ", id[1])
- }
- }
- }
- }
|