123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 |
- package main
- import (
- "context"
- "crypto/sha256"
- "encoding/json"
- "fmt"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "google.golang.org/grpc"
- "gopkg.in/mgo.v2/bson"
- "jy_publishing/Logger"
- jypb "jy_publishing/proto/common"
- pb "jy_publishing/proto/proto"
- "jygit.jydev.jianyu360.cn/BP/servicerd/proto"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "regexp"
- "strconv"
- "strings"
- "time"
- )
- var (
- JyUrl = "https://www.jianyu360.cn/article/content/%s.html"
- InfoFields = []string{"title", "project_code", "province", "city", "industry", "buyer", "budget", "winner", "amount",
- "detail", "attch", "contact_person", "contact_phone", "attach", "discern_attach", "type", "recommended_service"}
- )
- var SaveFields = map[string]string{
- "title": "title",
- "project_code": "projectcode",
- "province": "area",
- "city": "city",
- "buyer": "buyer",
- "budget": "budget",
- "winner": "s_winner",
- "amount": "bidamount",
- "detail": "detail",
- "contact_phone": "buyertel",
- "contact_person": "buyerperson",
- "discern_attach": "attach_text",
- "type": "type", // 消息类型
- "recommended_service": "recommended_service", // 供应商推荐服务
- //"attch": "",
- //"industry": "",
- //"contract_overt": "",
- }
- var InfoType = map[int]string{
- 1: "招标信息",
- 2: "采购信息",
- 4: "招标公告",
- 5: "采购意向",
- 6: "招标预告",
- 7: "招标结果",
- 8: "直采-采购信息",
- }
- // @Description 信息处理(信息发布和附件识别)
- // 1、敏感词处理,2、信息发布,3、信息删除
- // @Author J 2022/4/9 11:47 AM
- func taskInfo(obj interface{}) {
- info, _ := obj.(map[string]interface{})
- if util.ObjToString(info["action"]) == "1" {
- // 敏感词处理
- Sensitive(info)
- } else if util.ObjToString(info["action"]) == "2" {
- // 数据处理
- InfoPub(info)
- } else if util.ObjToString(info["action"]) == "3" {
- //id := util.ObjToString(info["id"])
- tmp := info["appendInfo"].(map[string]interface{})
- DelMethod(util.ObjToString(tmp["publish_id"]))
- }
- }
- // @Description 敏感词处理(title, content, attachment)
- // @Author J 2022/4/11 9:36 AM
- func Sensitive(info map[string]interface{}) {
- tmp := info["appendInfo"].(map[string]interface{})
- tArr := WordsIdentify(util.ObjToString(tmp["title"]))
- dArr := WordsIdentify(util.ObjToString(tmp["detail"]))
- if attsMap, ok := tmp["attach"].(map[string]interface{}); ok && len(attsMap) > 0 {
- other := map[string]interface{}{
- "id": info["id"],
- "action": info["action"],
- "msgType": info["msgType"],
- "title": tArr,
- "detail": dArr,
- }
- otherJson, _ := json.Marshal(other)
- var attsArr []*pb.Request
- for _, m := range attsMap {
- m1 := m.(map[string]interface{})
- attsArr = append(attsArr, &pb.Request{
- FileUrl: util.ObjToString(m1["fid"]),
- FileName: util.ObjToString(m1["filename"]),
- FileType: util.ObjToString(m1["ftype"]),
- //ReturnType: 0, // 不传
- ExtractType: 0,
- })
- }
- msginfo := &pb.FileRequest{
- Message: attsArr,
- Other: string(otherJson),
- Topic: FileTopicResult,
- }
- Logger.Debug("file extract send nsq: " + fmt.Sprint(msginfo))
- _ = MProducer.Publish(msginfo)
- } else {
- // 没有附件
- Logger.Debug("title sensitive array: " + fmt.Sprint(tArr))
- Logger.Debug("detail sensitive array: " + fmt.Sprint(dArr))
- req := &jypb.SensitiveRequest{
- Id: util.ObjToString(info["id"]),
- MsgType: util.ObjToString(info["msgType"]),
- Title: tArr,
- Detail: dArr,
- }
- Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req))
- JyRpcSensitive(req)
- }
- //atts := tmp["attachment"].(map[string]interface{})
- //resultAtts := make(map[string]interface{})
- //resultAttach := make(map[string]interface{})
- //for k, v := range atts {
- // attach := make(map[string]interface{}) // attach_text字段
- // resp, err := AttsMethod(v.(map[string]interface{}))
- // if err != nil {
- // return nil
- // }
- // for i, r := range resp.Result {
- // if w := WordsIdentify(r.TextContent); w != nil {
- // resultAtts[k] = w
- // }
- // attach[strconv.Itoa(i)] = map[string]interface{}{"file_name": r.FileName, "attach_url": r.TextUrl}
- // }
- // resultAttach[k] = attach
- //}
- //resultMap["attach_text"] = resultAttach
- //resultMap["attachment"] = resultAtts
- }
- // @Description 敏感词识别
- // @Author J 2022/4/12 1:33 PM
- func WordsIdentify(str string) []string {
- if str == "" {
- return nil
- }
- ret := Ms.Discern(str, 2)
- if len(ret) > 0 {
- var words []string
- for _, r := range ret {
- words = append(words, r.MatchRule)
- }
- return words
- }
- return []string{}
- }
- // @Description 附件调用gRpc接口处理
- // @Author J 2022/4/12 10:02 AM
- // Deprecated
- func AttsMethod(att map[string]interface{}) (*pb.FileResponse, error) {
- reqs := &pb.FileRequest{
- Message: []*pb.Request{{
- FileName: "",
- FileType: "",
- FileUrl: ""}},
- Other: "",
- }
- // 1.调用gRPC接口
- conn, err := grpc.Dial(ClientAddr, grpc.WithInsecure())
- if err != nil {
- return nil, err
- }
- var client proto.ServiceClient
- client = proto.NewServiceClient(conn)
- repl, err := client.Apply(context.Background(), &proto.ApplyReqData{Name: "extract_service", Balance: 0})
- if err != nil {
- return nil, err
- }
- //2.业务调用
- addr := fmt.Sprintf("%s:%d", repl.Ip, repl.Port)
- conn_b, err := grpc.Dial(addr, grpc.WithInsecure())
- if err != nil {
- return nil, err
- }
- defer func(conn_b *grpc.ClientConn) {
- _ = conn_b.Close()
- }(conn_b)
- pc := pb.NewFileExtractClient(conn_b)
- rep, err := pc.FileExtract(context.Background(), reqs)
- if err != nil {
- return nil, err
- }
- return rep, nil
- }
- // @Description 信息发布
- // @Author J 2022/4/12 1:57 PM
- func InfoPub(info map[string]interface{}) {
- tmp := info["appendInfo"].(map[string]interface{})
- saveMap := make(map[string]interface{})
- jyMap := make(map[string]interface{})
- for _, f := range InfoFields {
- if tmp[f] == nil {
- continue
- }
- if f == "budget" || f == "amount" {
- saveMap[SaveFields[f]] = util.Float64All(tmp[f])
- jyMap[f] = util.Float64All(tmp[f])
- } else if f == "industry" {
- // topscopeclass/subcopeclass
- //if s := util.ObjToString(tmp[f]); s != "" {
- //
- // for _, s2 := range strings.Split(s, ",") {
- // arr := strings.Split(s2, "_")
- // // todo
- // }
- //}
- } else if f == "winner" {
- if s := util.ObjToString(tmp[f]); s != "" {
- s = strings.ReplaceAll(s, ",", ",") //中文变英文
- saveMap[SaveFields[f]] = s
- saveMap[f] = s
- jyMap[f] = s
- jyMap[SaveFields[f]] = s
- }
- } else if f == "attach" {
- s := util.ObjToString(tmp[f])
- if s != "" {
- atts := map[string]interface{}{}
- if err := json.Unmarshal([]byte(s), &atts); err != nil {
- Logger.Error("data Unmarshal Failed:", Logger.Field("error", err))
- }
- for _, i := range atts {
- i2 := i.(map[string]interface{})
- //delete(i2, "uid")
- delete(i2, "ossurl")
- i2["url"] = "oss"
- }
- saveMap["projectinfo"] = map[string]interface{}{"attachments": atts}
- }
- } else if f == "discern_attach" {
- if s := util.ObjToString(tmp[f]); s != "" {
- atts_txt := map[string]interface{}{}
- if err := json.Unmarshal([]byte(s), &atts_txt); err != nil {
- Logger.Error("data Unmarshal Failed:", Logger.Field("error", err))
- }
- for k, v := range atts_txt {
- atts_txt[k] = map[string]interface{}{k: v}
- }
- saveMap[SaveFields[f]] = atts_txt
- }
- } else if f == "type" {
- jyMap[f] = InfoType[0]
- it := util.IntAll(tmp[f])
- infoType := InfoType[it]
- if infoType != "" {
- jyMap[f] = infoType
- }
- if len(InfoCodes) >= it {
- if infoType == "" {
- jyMap[f] = InfoCodes[it-1]
- }
- saveMap["infoAttribute"] = InfoCodes[it-1].Code
- }
- } else if f == "recommended_service" {
- saveMap[SaveFields[f]] = util.IntAll(tmp[f])
- } else {
- if s := util.ObjToString(tmp[f]); s != "" {
- saveMap[SaveFields[f]] = s
- if SaveFields[f] == "buyer" || SaveFields[f] == "buyerperson" || SaveFields[f] == "buyertel" ||
- SaveFields[f] == "area" || SaveFields[f] == "city" {
- jyMap[SaveFields[f]] = s
- }
- }
- }
- }
- now := time.Now()
- saveMap["comeintime"] = now.Unix()
- saveMap["publishtime"] = now.Unix()
- saveMap["contenthtml"] = tmp["detail"]
- cut := util.NewCut().ClearHtml(util.ObjToString(tmp["detail"]))
- tmp["detail"] = cut
- saveMap["s_sha"] = Sha(cut)
- saveMap["site"] = "剑鱼信息发布平台"
- saveMap["channel"] = "公告"
- saveMap["spidercode"] = "a_jyxxfbpt_gg"
- saveMap["extracttype"] = 0
- saveMap["areaval"] = 0
- saveMap["detail_isvalidity"] = 1
- saveMap["infoformat"] = 1
- saveMap["dataging"] = 0
- saveMap["buyerhint"] = util.IntAll(tmp["contact_overt"])
- _id := primitive.NewObjectID()
- saveMap["_id"] = _id
- saveMap["href"] = fmt.Sprintf(JyUrl, util.CommonEncodeArticle("content", mongodb.BsonIdToSId(_id)))
- saveMap["competehref"] = "#"
- saveMap["jyfb_data"] = jyMap
- saveMap["jyfb_id"] = util.ObjToString(info["id"])
- Logger.Debug("InfoPub mgo save: " + fmt.Sprint(saveMap))
- MgoBid.SaveByOriID(BidColl, saveMap)
- }
- type AttsResponse struct {
- Other AttsOther `json:"other"`
- Result []*AttsResult `json:"result"`
- }
- type AttsOther struct {
- Id string `json:"id"`
- Action string `json:"action"`
- MsgType string `json:"msgType"`
- Detail []string `json:"detail"`
- Title []string `json:"title"`
- }
- type AttsResult struct {
- FileName string `json:"fileName"`
- TextUrl string `json:"textUrl"`
- TextContent string `json:"textContent"`
- FilePath string `json:"filePath"`
- ErrorState string `json:"errorState"`
- }
- // @Description 附件处理完成队列
- // @Author J 2022/4/13 3:29 PM
- func taskAtts(obj map[string]interface{}) {
- atts := make(map[string]interface{})
- atts_text := make(map[string]interface{})
- for i, r := range obj["result"].([]interface{}) {
- r1 := r.(map[string]interface{})
- at := make(map[string]interface{})
- text := make(map[string]interface{})
- at["state"] = r1["errorState"].(string)
- if r1["errorState"].(string) == "200" {
- textContent := OssGetObject(util.ObjToString(r1["textUrl"]))
- at["sensitive"] = WordsIdentify(textContent)
- text["file_name"] = r1["fileName"].(string)
- text["attach_url"] = r1["textUrl"].(string)
- }
- atts[strconv.Itoa(i)] = at
- if len(text) > 0 {
- atts_text[strconv.Itoa(i)] = text
- }
- }
- attsJson, _ := json.Marshal(atts)
- attsTextJson, _ := json.Marshal(atts_text)
- // 直接调用剑鱼接口
- var other map[string]interface{}
- _ = json.Unmarshal([]byte(util.ObjToString(obj["other"])), &other)
- req := &jypb.SensitiveRequest{
- Id: util.ObjToString(other["id"]),
- MsgType: util.ObjToString(other["msgType"]),
- //Action: util.ObjToString(obj.Other.Action),
- Title: util.ObjArrToStringArr(other["title"].([]interface{})),
- Detail: util.ObjArrToStringArr(other["detail"].([]interface{})),
- Attachments: string(attsJson),
- AttachTxt: string(attsTextJson),
- }
- Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req))
- JyRpcSensitive(req)
- }
- // @Description 敏感词识别完成调用剑鱼接口
- // @Author J 2022/4/13 11:16 AM
- func JyRpcSensitive(req *jypb.SensitiveRequest) {
- conn := JyRpcClient.Conn()
- jyIntf := jypb.NewCommonInfoClient(conn)
- resp, err := jyIntf.SensitiveMethod(context.Background(), req)
- if err != nil {
- Logger.Error(err.Error())
- initEtcd()
- resp, err = jyIntf.SensitiveMethod(context.Background(), req)
- if err != nil {
- Logger.Error(err.Error())
- }
- }
- Logger.Info("JyRpcSensitive response: " + resp.String())
- }
- // @Description 信息删除(es、bidding、extract、project)
- // @Author J 2022/4/8 4:37 PM:00
- func DelMethod(res string) {
- if !bson.IsObjectIdHex(res) {
- Logger.Error(" bidding del fail, id err" + res)
- return
- }
- q := map[string]interface{}{"_id": mongodb.StringTOBsonId(res)}
- b := MgoBid.Del(BidColl, q)
- if !b {
- Logger.Error(" bidding del fail...")
- }
- b = MgoExt.Del(ExtColl, q)
- if !b {
- Logger.Error(" extract del fail...")
- }
- Es.DelById(Index, res)
- //Es.DelById(IndexAll, Itype, res)
- project := Sysconfig["project"].(map[string]interface{})
- by, _ := json.Marshal(map[string]interface{}{
- "infoid": res,
- "stype": "deleteInfo",
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(project["addr"].(string)),
- Port: util.IntAll(project["port"]),
- }
- Logger.Debug(string(by))
- _ = UdpClient.WriteUdp(by, udp.OP_TYPE_DATA, addr)
- }
- // @Description 数据处理完成调用jy接口
- // @Author J 2022/4/9 11:41 AM
- func JyRpcDataFin(_id string) {
- info, _ := MgoBid.FindById(BidColl, _id, `{"jyfb_id": 1}`)
- if len(*info) == 0 {
- Logger.Error("JyRpcDataFin mgo not find, id: " + _id)
- return
- }
- conn := JyRpcClient.Conn()
- req := &jypb.StateRequest{
- Id: util.ObjToString((*info)["jyfb_id"]),
- PublishId: _id,
- }
- jyIntf := jypb.NewCommonInfoClient(conn)
- Logger.Info("JyRpcDataFin request: " + req.String())
- resp, err := jyIntf.StateMethod(context.Background(), req)
- if err != nil {
- Logger.Error(err.Error())
- initEtcd()
- resp, err = jyIntf.StateMethod(context.Background(), req)
- if err != nil {
- Logger.Error(err.Error())
- }
- }
- Logger.Info("JyRpcDataFin response: " + resp.String())
- }
- var reg = regexp.MustCompile("[^0-9A-Za-z\u4e00-\u9fa5]+")
- var Filter = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]")
- func Sha(con string) string {
- h := sha256.New()
- con = reg.ReplaceAllString(Filter.ReplaceAllString(con, ""), "")
- h.Write([]byte(con))
- return fmt.Sprintf("%x", h.Sum(nil))
- }
|