package tool import ( "context" "crypto/sha256" "encoding/json" "fmt" "go.mongodb.org/mongo-driver/bson/primitive" "google.golang.org/grpc" "gopkg.in/mgo.v2/bson" "jy_publishing/Logger" jypb "jy_publishing/proto/common" pb "jy_publishing/proto/proto" "jygit.jydev.jianyu360.cn/BP/servicerd/proto" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "log" "net" "regexp" "strconv" "strings" "time" ) var ( JyUrl = "https://www.jianyu360.cn/article/content/%s.html" InfoFields = []string{"title", "project_code", "province", "city", "industry", "buyer", "budget", "winner", "amount", "detail", "attch", "contact_person", "contact_phone", "attach", "discern_attach", "type", "recommended_service", "deliveryAddress"} ) var SaveFields = map[string]string{ "title": "title", "project_code": "projectcode", "province": "area", "city": "city", "buyer": "buyer", "budget": "budget", "winner": "s_winner", "amount": "bidamount", "detail": "detail", "contact_phone": "buyertel", "contact_person": "buyerperson", "discern_attach": "attach_text", "type": "type", // 消息类型 "recommended_service": "recommended_service", // 供应商推荐服务 "deliveryAddress": "deliveryAddress", //"attch": "", //"industry": "", //"contract_overt": "", } var InfoType = map[int]string{ 1: "招标信息", 2: "采购信息", 4: "招标公告", 5: "采购意向", 6: "招标预告", 7: "招标结果", 8: "直采-采购信息", } // @Description 信息处理(信息发布和附件识别) // 1、敏感词处理,2、信息发布,3、信息删除 // @Author J 2022/4/9 11:47 AM func TaskInfo(obj interface{}) { info, _ := obj.(map[string]interface{}) if util.ObjToString(info["action"]) == "1" { // 敏感词处理 Sensitive(info) } else if util.ObjToString(info["action"]) == "2" { // 数据处理 InfoPub(info) } else if util.ObjToString(info["action"]) == "3" { //id := util.ObjToString(info["id"]) tmp := info["appendInfo"].(map[string]interface{}) DelMethod(util.IntAll(tmp["type"]), util.ObjToString(tmp["publish_id"])) } } // @Description 敏感词处理(title, content, attachment) // @Author J 2022/4/11 9:36 AM func Sensitive(info map[string]interface{}) { tmp := info["appendInfo"].(map[string]interface{}) tArr := WordsIdentify(util.ObjToString(tmp["title"])) dArr := WordsIdentify(util.ObjToString(tmp["detail"])) if attsMap, ok := tmp["attach"].(map[string]interface{}); ok && len(attsMap) > 0 { other := map[string]interface{}{ "id": info["id"], "action": info["action"], "msgType": info["msgType"], "title": tArr, "detail": dArr, } otherJson, _ := json.Marshal(other) var attsArr []*pb.Request for _, m := range attsMap { m1 := m.(map[string]interface{}) attsArr = append(attsArr, &pb.Request{ FileUrl: util.ObjToString(m1["fid"]), FileName: util.ObjToString(m1["filename"]), FileType: util.ObjToString(m1["ftype"]), //ReturnType: 0, // 不传 ExtractType: 0, }) } msginfo := &pb.FileRequest{ Message: attsArr, Other: string(otherJson), Topic: FileTopicResult, } Logger.Debug("file extract send nsq: " + fmt.Sprint(msginfo)) _ = MProducer.Publish(msginfo) } else { // 没有附件 Logger.Debug("title sensitive array: " + fmt.Sprint(tArr)) Logger.Debug("detail sensitive array: " + fmt.Sprint(dArr)) req := &jypb.SensitiveRequest{ Id: util.ObjToString(info["id"]), MsgType: util.ObjToString(info["msgType"]), Title: tArr, Detail: dArr, } Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req)) JyRpcSensitive(req) } //atts := tmp["attachment"].(map[string]interface{}) //resultAtts := make(map[string]interface{}) //resultAttach := make(map[string]interface{}) //for k, v := range atts { // attach := make(map[string]interface{}) // attach_text字段 // resp, err := AttsMethod(v.(map[string]interface{})) // if err != nil { // return nil // } // for i, r := range resp.Result { // if w := WordsIdentify(r.TextContent); w != nil { // resultAtts[k] = w // } // attach[strconv.Itoa(i)] = map[string]interface{}{"file_name": r.FileName, "attach_url": r.TextUrl} // } // resultAttach[k] = attach //} //resultMap["attach_text"] = resultAttach //resultMap["attachment"] = resultAtts } // @Description 敏感词识别 // @Author J 2022/4/12 1:33 PM func WordsIdentify(str string) []string { if str == "" { return nil } ret := Ms.Discern(str, 2) if len(ret) > 0 { var words []string for _, r := range ret { words = append(words, r.MatchRule) } return words } return []string{} } // @Description 附件调用gRpc接口处理 // @Author J 2022/4/12 10:02 AM // Deprecated func AttsMethod(att map[string]interface{}) (*pb.FileResponse, error) { reqs := &pb.FileRequest{ Message: []*pb.Request{{ FileName: "", FileType: "", FileUrl: ""}}, Other: "", } // 1.调用gRPC接口 conn, err := grpc.Dial(ClientAddr, grpc.WithInsecure()) if err != nil { return nil, err } var client proto.ServiceClient client = proto.NewServiceClient(conn) repl, err := client.Apply(context.Background(), &proto.ApplyReqData{Name: "extract_service", Balance: 0}) if err != nil { return nil, err } //2.业务调用 addr := fmt.Sprintf("%s:%d", repl.Ip, repl.Port) conn_b, err := grpc.Dial(addr, grpc.WithInsecure()) if err != nil { return nil, err } defer func(conn_b *grpc.ClientConn) { _ = conn_b.Close() }(conn_b) pc := pb.NewFileExtractClient(conn_b) rep, err := pc.FileExtract(context.Background(), reqs) if err != nil { return nil, err } return rep, nil } type DeliveryAddress struct { Area string `json:"area"` City string `json:"city"` Districts string `json:"districts"` DetailsAddr string `json:"detailsAddr"` } // @Description 信息发布 // @Author J 2022/4/12 1:57 PM func InfoPub(info map[string]interface{}) { tmp := info["appendInfo"].(map[string]interface{}) saveMap := make(map[string]interface{}) jyMap := make(map[string]interface{}) extractType := 0 for _, f := range InfoFields { if tmp[f] == nil { continue } //交付地址 if f == "deliveryAddress" { daStr := util.ObjToString(tmp[f]) var da = DeliveryAddress{} err := json.Unmarshal([]byte(daStr), &da) if err == nil && da.Area != "" { saveMap["deliver_area"] = da.Area saveMap["deliver_city"] = da.City saveMap["deliver_district"] = da.Districts saveMap["deliver_detail"] = da.DetailsAddr } } else if f == "budget" || f == "amount" { saveMap[SaveFields[f]] = util.Float64All(tmp[f]) jyMap[f] = util.Float64All(tmp[f]) } else if f == "industry" { // topscopeclass/subcopeclass //if s := util.ObjToString(tmp[f]); s != "" { // // for _, s2 := range strings.Split(s, ",") { // arr := strings.Split(s2, "_") // // todo // } //} } else if f == "winner" { if s := util.ObjToString(tmp[f]); s != "" { s = strings.ReplaceAll(s, ",", ",") //中文变英文 saveMap[SaveFields[f]] = s saveMap[f] = s jyMap[f] = s jyMap[SaveFields[f]] = s } } else if f == "attach" { s := util.ObjToString(tmp[f]) if s != "" { atts := map[string]interface{}{} if err := json.Unmarshal([]byte(s), &atts); err != nil { Logger.Error("data Unmarshal Failed:", Logger.Field("error", err)) } for _, i := range atts { i2 := i.(map[string]interface{}) //delete(i2, "uid") delete(i2, "ossurl") i2["url"] = "oss" } saveMap["projectinfo"] = map[string]interface{}{"attachments": atts} saveMap["isValidFile"] = true } } else if f == "discern_attach" { if s := util.ObjToString(tmp[f]); s != "" { atts_txt := map[string]interface{}{} if err := json.Unmarshal([]byte(s), &atts_txt); err != nil { Logger.Error("data Unmarshal Failed:", Logger.Field("error", err)) } for k, v := range atts_txt { atts_txt[k] = map[string]interface{}{k: v} } saveMap[SaveFields[f]] = atts_txt } } else if f == "type" { jyMap[f] = InfoType[0] it := util.IntAll(tmp[f]) infoType := InfoType[it] if infoType != "" { jyMap[f] = infoType } if len(InfoCodes) >= it { if infoType == "" { jyMap[f] = InfoCodes[it-1].Name } saveMap["infoattribute"] = InfoCodes[it-1].Code saveMap["ispublic"] = InfoCodes[it-1].IsPublic //是否公开 extractType = InfoCodes[it-1].ExtractType } } else if f == "recommended_service" { saveMap[SaveFields[f]] = util.IntAll(tmp[f]) } else { if s := util.ObjToString(tmp[f]); s != "" { saveMap[SaveFields[f]] = s if SaveFields[f] == "buyer" || SaveFields[f] == "buyerperson" || SaveFields[f] == "buyertel" || SaveFields[f] == "area" || SaveFields[f] == "city" { jyMap[SaveFields[f]] = s } } } } now := time.Now() saveMap["comeintime"] = now.Unix() saveMap["publishtime"] = now.Unix() saveMap["contenthtml"] = tmp["detail"] cut := util.NewCut().ClearHtml(util.ObjToString(tmp["detail"])) tmp["detail"] = cut saveMap["s_sha"] = Sha(cut) saveMap["site"] = "剑鱼信息发布平台" saveMap["channel"] = "公告" saveMap["spidercode"] = "a_jyxxfbpt_gg" saveMap["extracttype"] = extractType saveMap["areaval"] = 0 saveMap["detail_isvalidity"] = 1 saveMap["infoformat"] = 1 saveMap["dataging"] = 0 saveMap["buyerhint"] = util.IntAll(tmp["contact_overt"]) _id := primitive.NewObjectID() saveMap["_id"] = _id saveMap["href"] = fmt.Sprintf(JyUrl, util.CommonEncodeArticle("content", mongodb.BsonIdToSId(_id))) saveMap["competehref"] = "#" saveMap["jyfb_data"] = jyMap saveMap["jyfb_id"] = util.ObjToString(info["id"]) saveMap["public_type"] = "用户发布" Logger.Debug("InfoPub mgo save: " + fmt.Sprint(saveMap)) MgoBid.SaveByOriID(BidColl, saveMap) } type AttsResponse struct { Other AttsOther `json:"other"` Result []*AttsResult `json:"result"` } type AttsOther struct { Id string `json:"id"` Action string `json:"action"` MsgType string `json:"msgType"` Detail []string `json:"detail"` Title []string `json:"title"` } type AttsResult struct { FileName string `json:"fileName"` TextUrl string `json:"textUrl"` TextContent string `json:"textContent"` FilePath string `json:"filePath"` ErrorState string `json:"errorState"` } // @Description 附件处理完成队列 // @Author J 2022/4/13 3:29 PM func TaskAtts(obj map[string]interface{}) { atts := make(map[string]interface{}) atts_text := make(map[string]interface{}) for i, r := range obj["result"].([]interface{}) { r1 := r.(map[string]interface{}) at := make(map[string]interface{}) text := make(map[string]interface{}) at["state"] = r1["errorState"].(string) if r1["errorState"].(string) == "200" { textContent := OssGetObject(util.ObjToString(r1["textUrl"])) at["sensitive"] = WordsIdentify(textContent) text["file_name"] = r1["fileName"].(string) text["attach_url"] = r1["textUrl"].(string) } atts[strconv.Itoa(i)] = at if len(text) > 0 { atts_text[strconv.Itoa(i)] = text } } attsJson, _ := json.Marshal(atts) attsTextJson, _ := json.Marshal(atts_text) // 直接调用剑鱼接口 var other map[string]interface{} _ = json.Unmarshal([]byte(util.ObjToString(obj["other"])), &other) req := &jypb.SensitiveRequest{ Id: util.ObjToString(other["id"]), MsgType: util.ObjToString(other["msgType"]), //Action: util.ObjToString(obj.Other.Action), Title: util.ObjArrToStringArr(other["title"].([]interface{})), Detail: util.ObjArrToStringArr(other["detail"].([]interface{})), Attachments: string(attsJson), AttachTxt: string(attsTextJson), } Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req)) JyRpcSensitive(req) } // @Description 敏感词识别完成调用剑鱼接口 // @Author J 2022/4/13 11:16 AM func JyRpcSensitive(req *jypb.SensitiveRequest) { conn := JyRpcClient.Conn() jyIntf := jypb.NewCommonInfoClient(conn) resp, err := jyIntf.SensitiveMethod(context.Background(), req) if err != nil { Logger.Error(err.Error()) InitEtcd() resp, err = jyIntf.SensitiveMethod(context.Background(), req) if err != nil { Logger.Error(err.Error()) } } Logger.Info("JyRpcSensitive response: " + resp.String()) } // @Description 信息删除(es、bidding、extract、project) // @Author J 2022/4/8 4:37 PM:00 func DelMethod(mold int, res string) { if !bson.IsObjectIdHex(res) { Logger.Error(" bidding del fail, id err" + res) return } q := map[string]interface{}{"_id": mongodb.StringTOBsonId(res)} b := MgoBid.Del(BidColl, q) if !b { Logger.Error(" bidding del fail...") } b = MgoExt.Del(ExtColl, q) if !b { Logger.Error(" extract del fail...") } switch mold { case 8: Index = "bidding_yg" //直采-采购信息 var ( ok bool id string ) data := Es.Get(Index, `{"query": {"bool": {"must": [{"terms": {"source_id": ["`+res+`"]}}]}},"from": 0,"size": 1}`) if data != nil && len(*data) > 0 { one := (*data)[0] id = util.ObjToString(one["id"]) if id != "" { ok = Es.DelById(Index, id) } else { ok = Es.DelById(Index, res) } } if !ok { log.Println(id, "用户删除发布信息后 删除索引信息失败:", data, "----", res) } default: Es.DelById(Index, res) } //Es.DelById(IndexAll, Itype, res) project := Sysconfig["project"].(map[string]interface{}) by, _ := json.Marshal(map[string]interface{}{ "infoid": res, "stype": "deleteInfo", }) addr := &net.UDPAddr{ IP: net.ParseIP(project["addr"].(string)), Port: util.IntAll(project["port"]), } Logger.Debug(string(by)) _ = UdpClient.WriteUdp(by, udp.OP_TYPE_DATA, addr) } // @Description 数据处理完成调用jy接口 // @Author J 2022/4/9 11:41 AM func JyRpcDataFin(_id string) { info, _ := MgoBid.FindById(BidColl, _id, `{"jyfb_id": 1}`) if len(*info) == 0 { Logger.Error("JyRpcDataFin mgo not find, id: " + _id) return } conn := JyRpcClient.Conn() req := &jypb.StateRequest{ Id: util.ObjToString((*info)["jyfb_id"]), PublishId: _id, } jyIntf := jypb.NewCommonInfoClient(conn) Logger.Info("JyRpcDataFin request: " + req.String()) resp, err := jyIntf.StateMethod(context.Background(), req) if err != nil { Logger.Error(err.Error()) InitEtcd() resp, err = jyIntf.StateMethod(context.Background(), req) if err != nil { Logger.Error(err.Error()) } } Logger.Info("JyRpcDataFin response: " + resp.String()) } var reg = regexp.MustCompile("[^0-9A-Za-z\u4e00-\u9fa5]+") var Filter = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]") func Sha(con string) string { h := sha256.New() con = reg.ReplaceAllString(Filter.ReplaceAllString(con, ""), "") h.Write([]byte(con)) return fmt.Sprintf("%x", h.Sum(nil)) }