package main import ( "bytes" "context" "encoding/json" "errors" "field-dispose/config" "field-dispose/proto" "fmt" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "net/http" "strings" "sync" "sync/atomic" "time" service "jygit.jydev.jianyu360.cn/BP/servicerd/proto" ) var IpDialErrNum, IpGetErrNum, ExtractDialErrNum = int64(0), int64(0), int64(0) //@Description 处理字段procurementlist、review_experts // @Author J 2022/8/31 14:57 func getIntention(mapinfo map[string]interface{}) { defer util.Catch() gtid, _ := mapinfo["gtid"].(string) lteid, _ := mapinfo["lteid"].(string) MgoB.Update("bidding_processing_ids", bson.M{"gtid": gtid}, bson.M{"$set": bson.M{"dataprocess": 2, "updatetime": time.Now().Unix()}}, false, false) sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} query := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid), }, } field := map[string]interface{}{ "toptype": 1, "attach_text": 1, "contenthtml": 1, "site": 1, "detail": 1, } log.Info(fmt.Sprintf("count --- %d", MgoB.Count("bidding", query))) it := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding").Find(&query).Select(&field).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(tmp); count++ { if count%200 == 0 { log.Info("getIntention", zap.Int("current:", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() id := mongodb.BsonIdToSId(tmp["_id"]) update := make(map[string]interface{}) result2 := taskB(tmp, gtid, lteid) result1 := taskA(tmp, gtid, lteid) if r, ok := result2["result"].(map[string]interface{}); ok { if r[id] != nil && len(r[id].([]interface{})) > 0 { update["review_experts"] = strings.Join(util.ObjArrToStringArr(r[id].([]interface{})), ",") } } if result1 != nil && len(result1) > 0 { if result1["purchasinglist"] != nil && len(result1["purchasinglist"].([]interface{})) > 0 { update["purchasinglist"] = result1["purchasinglist"] } if result1["procurementlist"] != nil { update["procurementlist"] = result1["procurementlist"] } } if len(update) > 0 { updatePool <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(id)}, {"$set": update}, } //updateEsPool <- []map[string]interface{}{{ // "_id": id, //}, // update, //} } }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("dispose over...", zap.Int("count:", count), zap.String("gtid:", gtid), zap.String("lteid:", lteid)) NextNode(mapinfo) } // @Description procurementlist、purchasinglist // @Author J 2022/8/31 15:28 func taskA(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} { id := mongodb.BsonIdToSId(tmp["_id"]) delete(tmp, "_id") delete(tmp, "detail") reqStr, err := json.Marshal(tmp) if err != nil { ErrorInfoCache <- map[string]interface{}{ "err": "Json Marshal Error", "type": "采购意向/标的物", "id": id, "comeintime": time.Now().Unix(), "ok": false, "gtid": gtid, "letid": lteid, } return nil } //处理数据 result, err := rpcGetFieldP(string(reqStr), id) if err != nil { //保存处理异常信息 ErrorInfoCache <- map[string]interface{}{ "err": err.Error(), "type": "采购意向/标的物", "id": id, "comeintime": time.Now().Unix(), "ok": false, "gtid": gtid, "letid": lteid, } } return result } // @Description review_experts // @Author J 2022/8/31 15:28 func taskB(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} { id := mongodb.BsonIdToSId(tmp["_id"]) reqStr, err := json.Marshal(map[string]interface{}{id: tmp["detail"]}) if err != nil { ErrorInfoCache <- map[string]interface{}{ "err": "Json Marshal Error", "type": "评标专家字段", "id": id, "comeintime": time.Now().Unix(), "ok": false, "gtid": gtid, "letid": lteid, } return nil } //处理数据 result, err := rpcGetFieldR(string(reqStr), id) if err != nil { ErrorInfoCache <- map[string]interface{}{ "err": err.Error(), "type": "评标专家字段", "id": id, "comeintime": time.Now().Unix(), "ok": false, "gtid": gtid, "letid": lteid, } } return result } func rpcGetFieldP(reqStr, id string) (map[string]interface{}, error) { defer util.Catch() //获取ip、port服务 ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if ipErr != nil { atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1 return nil, errors.New("Ip Grpc Dial Error") } atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置 defer ipConn.Close() ipClient := service.NewServiceClient(ipConn) ip := "" port := -1 //重试获取ip、port if Skipping { for i := 1; i <= 3; i++ { repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 2}) if err != nil { continue } else { ip = repl.Ip port = int(repl.Port) break } } } else { for { repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 2}) if err != nil { continue } else { ip = repl.Ip port = int(repl.Port) break } } } if ip == "" || port == -1 { //重试三次,回去ip、port失败 atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1 return nil, errors.New("Get Ip Error") } atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置 //处理数据 addr := ip + ":" + fmt.Sprint(port) conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1 return nil, errors.New("Extract Grpc Dial Error,addr:" + addr) } atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置 defer conn.Close() client := proto.NewGoodsExtractClient(conn) req := &proto.GoodsRequest{ Contents: reqStr, } ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) go func(ctx context.Context) { select { case <-ctx.Done(): //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)}) case <-time.After(time.Second * 30): // 超时处理 //log.Info("rpcGetFieldP 字段识别超过2min", zap.Any("serve", "goods_service"), zap.String("id", id), zap.Any("ip+port", addr)) //ErrorInfoCache <- map[string]interface{}{ // "err": "接口处理超过2min", // "type": "采购意向/标的物", // "id": id, // "comeintime": time.Now().Unix(), // "ok": false, // "gtid": id, // "letid": id, //} } }(ctx) resp, err := client.GoodsExtract(ctx, req) cancel() if err != nil { //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)}) return nil, err } result := map[string]interface{}{} if json.Unmarshal([]byte(resp.Goods), &result) != nil { //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)}) return nil, errors.New("Json Unmarshal Error") } // 服务中心释放服务 //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)}) return result, nil } func rpcGetFieldR(reqStr, id string) (map[string]interface{}, error) { defer util.Catch() //获取ip、port服务 ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if ipErr != nil { atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1 return nil, errors.New("Ip Grpc Dial Error") } atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置 defer ipConn.Close() ipClient := service.NewServiceClient(ipConn) ip := "" port := -1 //重试获取ip、port if Skipping { for i := 1; i <= 3; i++ { repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0}) if err != nil { continue } else { ip = repl.Ip port = int(repl.Port) break } } } else { for { repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0}) if err != nil { continue } else { ip = repl.Ip port = int(repl.Port) break } } } if ip == "" || port == -1 { //重试三次,回去ip、port失败 atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1 return nil, errors.New("Get Ip Error") } atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置 //处理数据 addr := ip + ":" + fmt.Sprint(port) conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1 return nil, errors.New("Extract Grpc Dial Error,addr:" + addr) } atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置 defer conn.Close() client := proto.NewExistsExpertClient(conn) req := &proto.ContentRequest{ Contents: reqStr, } ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) go func(ctx context.Context) { select { case <-ctx.Done(): case <-time.After(time.Second * 30): // 超时处理 //log.Info("rpcGetFieldP 字段识别超过2min", zap.Any("serve", "extract_expert_service"), zap.String("id", id), zap.Any("ip+port", addr)) } }(ctx) resp, err := client.Extract(ctx, req) cancel() if err != nil { return nil, err } result := make(map[string]interface{}) if json.Unmarshal([]byte(resp.Results), &result) != nil { return nil, errors.New("Json Unmarshal Error") } return result, nil } //定时检测异常数据量,发告警 func CheckErrorNum() { defer util.Catch() for { warnText := "" //消息治理中心服务连接异常告警 if IpDialErrNum > 3 { warnText += "消息治理中心连接失败次数:" + fmt.Sprint(IpDialErrNum) + ";\n" } //获取治理中心IP异常告警 if IpDialErrNum > 5 { warnText += "获取IP异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n" } //连接IP服务异常告警 if IpDialErrNum > 5 { warnText += "IP连接异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n" } if warnText != "" { SendWarnInfo(warnText) } time.Sleep(30 * time.Minute) } } var TextModel = `{ "msgtype": "text", "text": { "content": "%s", "mentioned_mobile_list":[%s] } }` //告警信息 func SendWarnInfo(content string) { defer util.Catch() toUserMsg := fmt.Sprintf(TextModel, content, "13373929153,15090279371") //李俊亮、王江含 log.Info("SendWarnInfo", zap.String("toUserMsg", toUserMsg)) resp1, err := http.Post( "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=45962efc-ca87-4996-9ffa-08bf6608ab7a", "application/json", bytes.NewBuffer([]byte(toUserMsg)), ) if err != nil { fmt.Println("request error:", err) return } defer resp1.Body.Close() } var SP = make(chan bool, 5) var ErrorInfoCache = make(chan map[string]interface{}, 1000) //异常信息集合 //批量保存异常信息 func SaveErrorInfo() { log.Info("SaveErrorInfo 异常信息...") savearr := make([]map[string]interface{}, 200) indexh := 0 for { select { case v := <-ErrorInfoCache: savearr[indexh] = v indexh++ if indexh == 200 { SP <- true go func(savearr []map[string]interface{}) { defer func() { <-SP }() MgoB.SaveBulk("bidding_warn_1", savearr...) }(savearr) savearr = make([]map[string]interface{}, 200) indexh = 0 } case <-time.After(1 * time.Minute): if indexh > 0 { SP <- true go func(savearr []map[string]interface{}) { defer func() { <-SP }() MgoB.SaveBulk("bidding_warn_1", savearr...) }(savearr[:indexh]) savearr = make([]map[string]interface{}, 200) indexh = 0 } } } }