123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429 |
- package main
- import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "field-dispose/config"
- "field-dispose/proto"
- "fmt"
- "go.uber.org/zap"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "net/http"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- service "jygit.jydev.jianyu360.cn/BP/servicerd/proto"
- )
- var IpDialErrNum, IpGetErrNum, ExtractDialErrNum = int64(0), int64(0), int64(0)
- // @Description 处理字段procurementlist、review_experts
- // @Author J 2022/8/31 14:57
- func getIntention(mapinfo map[string]interface{}) {
- defer util.Catch()
- gtid, _ := mapinfo["gtid"].(string)
- lteid, _ := mapinfo["lteid"].(string)
- //MgoB.Update("bidding_processing_ids", bson.M{"gtid": gtid}, bson.M{"$set": bson.M{"dataprocess": 2, "updatetime": time.Now().Unix()}}, false, false)
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- ch := make(chan bool, config.Conf.Serve.Thread)
- wg := &sync.WaitGroup{}
- query := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": mongodb.StringTOBsonId(gtid),
- "$lte": mongodb.StringTOBsonId(lteid),
- },
- }
- field := map[string]interface{}{
- "toptype": 1,
- "attach_text": 1,
- "contenthtml": 1,
- "site": 1,
- "detail": 1,
- }
- log.Info(fmt.Sprintf("count --- %d", MgoB.Count("bidding", query)))
- it := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(&query).Select(&field).Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
- if count%200 == 0 {
- log.Info("getIntention", zap.Int("current:", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- id := mongodb.BsonIdToSId(tmp["_id"])
- update := make(map[string]interface{})
- result2 := taskB(tmp, gtid, lteid)
- result1 := taskA(tmp, gtid, lteid)
- if r, ok := result2["result"].(map[string]interface{}); ok {
- if r[id] != nil && len(r[id].([]interface{})) > 0 {
- update["review_experts"] = strings.Join(util.ObjArrToStringArr(r[id].([]interface{})), ",")
- }
- }
- if result1 != nil && len(result1) > 0 {
- if result1["purchasinglist"] != nil && len(result1["purchasinglist"].([]interface{})) > 0 {
- update["purchasinglist"] = result1["purchasinglist"]
- update["purchasing"] = result1["purchasing"]
- }
- if result1["procurementlist"] != nil {
- update["procurementlist"] = result1["procurementlist"]
- }
- }
- if len(update) > 0 {
- updatePool <- []map[string]interface{}{
- {"_id": mongodb.StringTOBsonId(id)},
- {"$set": update},
- }
- //updateEsPool <- []map[string]interface{}{{
- // "_id": id,
- //},
- // update,
- //}
- }
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- log.Info("dispose over...", zap.Int("count:", count), zap.String("gtid:", gtid), zap.String("lteid:", lteid))
- NextNode(mapinfo)
- }
- // @Description procurementlist、purchasinglist
- // @Author J 2022/8/31 15:28
- func taskA(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
- id := mongodb.BsonIdToSId(tmp["_id"])
- delete(tmp, "_id")
- delete(tmp, "detail")
- reqStr, err := json.Marshal(tmp)
- if err != nil {
- ErrorInfoCache <- map[string]interface{}{
- "err": "Json Marshal Error",
- "type": "采购意向/标的物",
- "id": id,
- "comeintime": time.Now().Unix(),
- "ok": false,
- "gtid": gtid,
- "letid": lteid,
- }
- return nil
- }
- //处理数据
- result, err := rpcGetFieldP(string(reqStr), id)
- if err != nil { //保存处理异常信息
- ErrorInfoCache <- map[string]interface{}{
- "err": err.Error(),
- "type": "采购意向/标的物",
- "id": id,
- "comeintime": time.Now().Unix(),
- "ok": false,
- "gtid": gtid,
- "letid": lteid,
- }
- }
- return result
- }
- // @Description review_experts
- // @Author J 2022/8/31 15:28
- func taskB(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
- id := mongodb.BsonIdToSId(tmp["_id"])
- reqStr, err := json.Marshal(map[string]interface{}{id: tmp["detail"]})
- if err != nil {
- ErrorInfoCache <- map[string]interface{}{
- "err": "Json Marshal Error",
- "type": "评标专家字段",
- "id": id,
- "comeintime": time.Now().Unix(),
- "ok": false,
- "gtid": gtid,
- "letid": lteid,
- }
- return nil
- }
- //处理数据
- result, err := rpcGetFieldR(string(reqStr), id)
- if err != nil {
- ErrorInfoCache <- map[string]interface{}{
- "err": err.Error(),
- "type": "评标专家字段",
- "id": id,
- "comeintime": time.Now().Unix(),
- "ok": false,
- "gtid": gtid,
- "letid": lteid,
- }
- }
- return result
- }
- func rpcGetFieldP(reqStr, id string) (map[string]interface{}, error) {
- defer util.Catch()
- //获取ip、port服务
- ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if ipErr != nil {
- atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1
- return nil, errors.New("Ip Grpc Dial Error")
- }
- atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置
- defer ipConn.Close()
- ipClient := service.NewServiceClient(ipConn)
- ip := ""
- port := -1
- //重试获取ip、port
- if Skipping {
- for i := 1; i <= 3; i++ {
- repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 2})
- if err != nil {
- continue
- } else {
- ip = repl.Ip
- port = int(repl.Port)
- break
- }
- }
- } else {
- for {
- repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 2})
- if err != nil {
- continue
- } else {
- ip = repl.Ip
- port = int(repl.Port)
- break
- }
- }
- }
- if ip == "" || port == -1 { //重试三次,回去ip、port失败
- atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1
- return nil, errors.New("Get Ip Error")
- }
- atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
- //处理数据
- addr := ip + ":" + fmt.Sprint(port)
- conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
- return nil, errors.New("Extract Grpc Dial Error,addr:" + addr)
- }
- atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置
- defer conn.Close()
- client := proto.NewGoodsExtractClient(conn)
- req := &proto.GoodsRequest{
- Contents: reqStr,
- }
- ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
- go func(ctx context.Context) {
- select {
- case <-ctx.Done():
- //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
- case <-time.After(time.Second * 30):
- // 超时处理
- //log.Info("rpcGetFieldP 字段识别超过2min", zap.Any("serve", "goods_service"), zap.String("id", id), zap.Any("ip+port", addr))
- //ErrorInfoCache <- map[string]interface{}{
- // "err": "接口处理超过2min",
- // "type": "采购意向/标的物",
- // "id": id,
- // "comeintime": time.Now().Unix(),
- // "ok": false,
- // "gtid": id,
- // "letid": id,
- //}
- }
- }(ctx)
- resp, err := client.GoodsExtract(ctx, req)
- cancel()
- if err != nil {
- //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
- return nil, err
- }
- result := map[string]interface{}{}
- if json.Unmarshal([]byte(resp.Goods), &result) != nil {
- //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
- return nil, errors.New("Json Unmarshal Error")
- }
- // 服务中心释放服务
- //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
- return result, nil
- }
- func rpcGetFieldR(reqStr, id string) (map[string]interface{}, error) {
- defer util.Catch()
- //获取ip、port服务
- ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if ipErr != nil {
- atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1
- return nil, errors.New("Ip Grpc Dial Error")
- }
- atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置
- defer ipConn.Close()
- ipClient := service.NewServiceClient(ipConn)
- ip := ""
- port := -1
- //重试获取ip、port
- if Skipping {
- for i := 1; i <= 3; i++ {
- repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0})
- if err != nil {
- continue
- } else {
- ip = repl.Ip
- port = int(repl.Port)
- break
- }
- }
- } else {
- for {
- repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0})
- if err != nil {
- continue
- } else {
- ip = repl.Ip
- port = int(repl.Port)
- break
- }
- }
- }
- if ip == "" || port == -1 { //重试三次,回去ip、port失败
- atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1
- return nil, errors.New("Get Ip Error")
- }
- atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
- //处理数据
- addr := ip + ":" + fmt.Sprint(port)
- conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
- return nil, errors.New("Extract Grpc Dial Error,addr:" + addr)
- }
- atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置
- defer conn.Close()
- client := proto.NewExistsExpertClient(conn)
- req := &proto.ContentRequest{
- Contents: reqStr,
- }
- ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
- go func(ctx context.Context) {
- select {
- case <-ctx.Done():
- case <-time.After(time.Second * 30):
- // 超时处理
- //log.Info("rpcGetFieldP 字段识别超过2min", zap.Any("serve", "extract_expert_service"), zap.String("id", id), zap.Any("ip+port", addr))
- }
- }(ctx)
- resp, err := client.Extract(ctx, req)
- cancel()
- if err != nil {
- return nil, err
- }
- result := make(map[string]interface{})
- if json.Unmarshal([]byte(resp.Results), &result) != nil {
- return nil, errors.New("Json Unmarshal Error")
- }
- return result, nil
- }
- // 定时检测异常数据量,发告警
- func CheckErrorNum() {
- defer util.Catch()
- for {
- warnText := ""
- //消息治理中心服务连接异常告警
- if IpDialErrNum > 3 {
- warnText += "消息治理中心连接失败次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
- }
- //获取治理中心IP异常告警
- if IpDialErrNum > 5 {
- warnText += "获取IP异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
- }
- //连接IP服务异常告警
- if IpDialErrNum > 5 {
- warnText += "IP连接异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
- }
- if warnText != "" {
- SendWarnInfo(warnText)
- }
- time.Sleep(30 * time.Minute)
- }
- }
- var TextModel = `{
- "msgtype": "text",
- "text": {
- "content": "%s",
- "mentioned_mobile_list":[%s]
- }
- }`
- // 告警信息
- func SendWarnInfo(content string) {
- defer util.Catch()
- toUserMsg := fmt.Sprintf(TextModel, content, "13373929153,15090279371") //李俊亮、王江含
- log.Info("SendWarnInfo", zap.String("toUserMsg", toUserMsg))
- resp1, err := http.Post(
- "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=45962efc-ca87-4996-9ffa-08bf6608ab7a",
- "application/json",
- bytes.NewBuffer([]byte(toUserMsg)),
- )
- if err != nil {
- fmt.Println("request error:", err)
- return
- }
- defer resp1.Body.Close()
- }
- var SP = make(chan bool, 5)
- var ErrorInfoCache = make(chan map[string]interface{}, 1000) //异常信息集合
- // 批量保存异常信息
- func SaveErrorInfo() {
- log.Info("SaveErrorInfo 异常信息...")
- savearr := make([]map[string]interface{}, 200)
- indexh := 0
- for {
- select {
- case v := <-ErrorInfoCache:
- savearr[indexh] = v
- indexh++
- if indexh == 200 {
- SP <- true
- go func(savearr []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- MgoB.SaveBulk("bidding_warn_1", savearr...)
- }(savearr)
- savearr = make([]map[string]interface{}, 200)
- indexh = 0
- }
- case <-time.After(1 * time.Minute):
- if indexh > 0 {
- SP <- true
- go func(savearr []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- MgoB.SaveBulk("bidding_warn_1", savearr...)
- }(savearr[:indexh])
- savearr = make([]map[string]interface{}, 200)
- indexh = 0
- }
- }
- }
- }
|