task.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. package tool
  2. import (
  3. "context"
  4. "crypto/sha256"
  5. "encoding/json"
  6. "fmt"
  7. "go.mongodb.org/mongo-driver/bson/primitive"
  8. "google.golang.org/grpc"
  9. "gopkg.in/mgo.v2/bson"
  10. "jy_publishing/Logger"
  11. jypb "jy_publishing/proto/common"
  12. pb "jy_publishing/proto/proto"
  13. "jygit.jydev.jianyu360.cn/BP/servicerd/proto"
  14. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  15. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  16. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  17. "log"
  18. "net"
  19. "net/rpc"
  20. "regexp"
  21. "strconv"
  22. "strings"
  23. "time"
  24. )
  25. var (
  26. JyUrl = "https://www.jianyu360.cn/article/content/%s.html"
  27. InfoFields = []string{"title", "project_code", "province", "city", "industry", "buyer", "budget", "winner", "amount",
  28. "detail", "attch", "contact_person", "contact_phone", "attach", "discern_attach", "type", "recommended_service", "deliveryAddress"}
  29. )
  30. var SaveFields = map[string]string{
  31. "title": "title",
  32. "project_code": "projectcode",
  33. "province": "area",
  34. "city": "city",
  35. "buyer": "buyer",
  36. "budget": "budget",
  37. "winner": "s_winner",
  38. "amount": "bidamount",
  39. "detail": "detail",
  40. "contact_phone": "buyertel",
  41. "contact_person": "buyerperson",
  42. "discern_attach": "attach_text",
  43. "type": "type", // 消息类型
  44. "recommended_service": "recommended_service", // 供应商推荐服务
  45. "deliveryAddress": "deliveryAddress",
  46. //"attch": "",
  47. //"industry": "",
  48. //"contract_overt": "",
  49. }
  50. var InfoType = map[int]string{
  51. 1: "招标信息",
  52. 2: "采购信息",
  53. 4: "招标公告",
  54. 5: "采购意向",
  55. 6: "招标预告",
  56. 7: "招标结果",
  57. 8: "直采-采购信息",
  58. }
  59. // @Description 信息处理(信息发布和附件识别)
  60. // 1、敏感词处理,2、信息发布,3、信息删除
  61. // @Author J 2022/4/9 11:47 AM
  62. func TaskInfo(obj interface{}) {
  63. info, _ := obj.(map[string]interface{})
  64. if util.ObjToString(info["action"]) == "1" {
  65. // 敏感词处理
  66. Sensitive(info)
  67. } else if util.ObjToString(info["action"]) == "2" {
  68. // 数据处理
  69. InfoPub(info)
  70. } else if util.ObjToString(info["action"]) == "3" {
  71. //id := util.ObjToString(info["id"])
  72. tmp := info["appendInfo"].(map[string]interface{})
  73. DelMethod(util.IntAll(tmp["type"]), util.ObjToString(tmp["publish_id"]))
  74. }
  75. }
  76. // @Description 敏感词处理(title, content, attachment)
  77. // @Author J 2022/4/11 9:36 AM
  78. func Sensitive(info map[string]interface{}) {
  79. tmp := info["appendInfo"].(map[string]interface{})
  80. tArr := WordsIdentify(util.ObjToString(tmp["title"]))
  81. dArr := WordsIdentify(util.ObjToString(tmp["detail"]))
  82. if attsMap, ok := tmp["attach"].(map[string]interface{}); ok && len(attsMap) > 0 {
  83. other := map[string]interface{}{
  84. "id": info["id"],
  85. "action": info["action"],
  86. "msgType": info["msgType"],
  87. "title": tArr,
  88. "detail": dArr,
  89. }
  90. otherJson, _ := json.Marshal(other)
  91. var attsArr []*pb.Request
  92. for _, m := range attsMap {
  93. m1 := m.(map[string]interface{})
  94. attsArr = append(attsArr, &pb.Request{
  95. FileUrl: util.ObjToString(m1["fid"]),
  96. FileName: util.ObjToString(m1["filename"]),
  97. FileType: util.ObjToString(m1["ftype"]),
  98. //ReturnType: 0, // 不传
  99. ExtractType: 0,
  100. })
  101. }
  102. msginfo := &pb.FileRequest{
  103. Message: attsArr,
  104. Other: string(otherJson),
  105. Topic: FileTopicResult,
  106. }
  107. Logger.Debug("file extract send nsq: " + fmt.Sprint(msginfo))
  108. _ = MProducer.Publish(msginfo)
  109. } else {
  110. // 没有附件
  111. Logger.Debug("title sensitive array: " + fmt.Sprint(tArr))
  112. Logger.Debug("detail sensitive array: " + fmt.Sprint(dArr))
  113. req := &jypb.SensitiveRequest{
  114. Id: util.ObjToString(info["id"]),
  115. MsgType: util.ObjToString(info["msgType"]),
  116. Title: tArr,
  117. Detail: dArr,
  118. }
  119. Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req))
  120. JyRpcSensitive(req)
  121. }
  122. //atts := tmp["attachment"].(map[string]interface{})
  123. //resultAtts := make(map[string]interface{})
  124. //resultAttach := make(map[string]interface{})
  125. //for k, v := range atts {
  126. // attach := make(map[string]interface{}) // attach_text字段
  127. // resp, err := AttsMethod(v.(map[string]interface{}))
  128. // if err != nil {
  129. // return nil
  130. // }
  131. // for i, r := range resp.Result {
  132. // if w := WordsIdentify(r.TextContent); w != nil {
  133. // resultAtts[k] = w
  134. // }
  135. // attach[strconv.Itoa(i)] = map[string]interface{}{"file_name": r.FileName, "attach_url": r.TextUrl}
  136. // }
  137. // resultAttach[k] = attach
  138. //}
  139. //resultMap["attach_text"] = resultAttach
  140. //resultMap["attachment"] = resultAtts
  141. }
  142. // @Description 敏感词识别
  143. // @Author J 2022/4/12 1:33 PM
  144. func WordsIdentify(str string) []string {
  145. if str == "" {
  146. return nil
  147. }
  148. ret := Ms.Discern(str, 2)
  149. if len(ret) > 0 {
  150. var words []string
  151. for _, r := range ret {
  152. words = append(words, r.MatchRule)
  153. }
  154. return words
  155. }
  156. return []string{}
  157. }
  158. // @Description 附件调用gRpc接口处理
  159. // @Author J 2022/4/12 10:02 AM
  160. // Deprecated
  161. func AttsMethod(att map[string]interface{}) (*pb.FileResponse, error) {
  162. reqs := &pb.FileRequest{
  163. Message: []*pb.Request{{
  164. FileName: "",
  165. FileType: "",
  166. FileUrl: ""}},
  167. Other: "",
  168. }
  169. // 1.调用gRPC接口
  170. conn, err := grpc.Dial(ClientAddr, grpc.WithInsecure())
  171. if err != nil {
  172. return nil, err
  173. }
  174. var client proto.ServiceClient
  175. client = proto.NewServiceClient(conn)
  176. repl, err := client.Apply(context.Background(), &proto.ApplyReqData{Name: "extract_service", Balance: 0})
  177. if err != nil {
  178. return nil, err
  179. }
  180. //2.业务调用
  181. addr := fmt.Sprintf("%s:%d", repl.Ip, repl.Port)
  182. conn_b, err := grpc.Dial(addr, grpc.WithInsecure())
  183. if err != nil {
  184. return nil, err
  185. }
  186. defer func(conn_b *grpc.ClientConn) {
  187. _ = conn_b.Close()
  188. }(conn_b)
  189. pc := pb.NewFileExtractClient(conn_b)
  190. rep, err := pc.FileExtract(context.Background(), reqs)
  191. if err != nil {
  192. return nil, err
  193. }
  194. return rep, nil
  195. }
  196. type DeliveryAddress struct {
  197. Area string `json:"area"`
  198. City string `json:"city"`
  199. Districts string `json:"districts"`
  200. DetailsAddr string `json:"detailsAddr"`
  201. }
  202. // @Description 信息发布
  203. // @Author J 2022/4/12 1:57 PM
  204. func InfoPub(info map[string]interface{}) {
  205. tmp := info["appendInfo"].(map[string]interface{})
  206. saveMap := make(map[string]interface{})
  207. jyMap := make(map[string]interface{})
  208. extractType := 0
  209. for _, f := range InfoFields {
  210. if tmp[f] == nil {
  211. continue
  212. }
  213. //交付地址
  214. if f == "deliveryAddress" {
  215. daStr := util.ObjToString(tmp[f])
  216. var da = DeliveryAddress{}
  217. err := json.Unmarshal([]byte(daStr), &da)
  218. if err == nil && da.Area != "" {
  219. saveMap["deliver_area"] = da.Area
  220. saveMap["deliver_city"] = da.City
  221. saveMap["deliver_district"] = da.Districts
  222. saveMap["deliver_detail"] = da.DetailsAddr
  223. }
  224. } else if f == "budget" || f == "amount" {
  225. saveMap[SaveFields[f]] = util.Float64All(tmp[f])
  226. jyMap[f] = util.Float64All(tmp[f])
  227. } else if f == "industry" {
  228. // topscopeclass/subcopeclass
  229. //if s := util.ObjToString(tmp[f]); s != "" {
  230. //
  231. // for _, s2 := range strings.Split(s, ",") {
  232. // arr := strings.Split(s2, "_")
  233. // // todo
  234. // }
  235. //}
  236. } else if f == "winner" {
  237. if s := util.ObjToString(tmp[f]); s != "" {
  238. s = strings.ReplaceAll(s, ",", ",") //中文变英文
  239. saveMap[SaveFields[f]] = s
  240. saveMap[f] = s
  241. jyMap[f] = s
  242. jyMap[SaveFields[f]] = s
  243. }
  244. } else if f == "attach" {
  245. s := util.ObjToString(tmp[f])
  246. if s != "" {
  247. atts := map[string]interface{}{}
  248. if err := json.Unmarshal([]byte(s), &atts); err != nil {
  249. Logger.Error("data Unmarshal Failed:", Logger.Field("error", err))
  250. }
  251. for _, i := range atts {
  252. i2 := i.(map[string]interface{})
  253. //delete(i2, "uid")
  254. delete(i2, "ossurl")
  255. i2["url"] = "oss"
  256. }
  257. saveMap["projectinfo"] = map[string]interface{}{"attachments": atts}
  258. saveMap["isValidFile"] = true
  259. }
  260. } else if f == "discern_attach" {
  261. if s := util.ObjToString(tmp[f]); s != "" {
  262. atts_txt := map[string]interface{}{}
  263. if err := json.Unmarshal([]byte(s), &atts_txt); err != nil {
  264. Logger.Error("data Unmarshal Failed:", Logger.Field("error", err))
  265. }
  266. for k, v := range atts_txt {
  267. atts_txt[k] = map[string]interface{}{k: v}
  268. }
  269. saveMap[SaveFields[f]] = atts_txt
  270. }
  271. } else if f == "type" {
  272. jyMap[f] = InfoType[0]
  273. it := util.IntAll(tmp[f])
  274. infoType := InfoType[it]
  275. if infoType != "" {
  276. jyMap[f] = infoType
  277. }
  278. if len(InfoCodes) >= it {
  279. if infoType == "" {
  280. jyMap[f] = InfoCodes[it-1].Name
  281. }
  282. saveMap["infoattribute"] = InfoCodes[it-1].Code
  283. saveMap["ispublic"] = InfoCodes[it-1].IsPublic //是否公开
  284. extractType = InfoCodes[it-1].ExtractType
  285. }
  286. } else if f == "recommended_service" {
  287. saveMap[SaveFields[f]] = util.IntAll(tmp[f])
  288. } else {
  289. if s := util.ObjToString(tmp[f]); s != "" {
  290. saveMap[SaveFields[f]] = s
  291. if SaveFields[f] == "buyer" || SaveFields[f] == "buyerperson" || SaveFields[f] == "buyertel" ||
  292. SaveFields[f] == "area" || SaveFields[f] == "city" {
  293. jyMap[SaveFields[f]] = s
  294. }
  295. }
  296. }
  297. }
  298. now := time.Now()
  299. saveMap["comeintime"] = now.Unix()
  300. saveMap["publishtime"] = now.Unix()
  301. saveMap["contenthtml"] = tmp["detail"]
  302. cut := util.NewCut().ClearHtml(util.ObjToString(tmp["detail"]))
  303. tmp["detail"] = cut
  304. saveMap["s_sha"] = Sha(cut)
  305. saveMap["site"] = "剑鱼信息发布平台"
  306. saveMap["channel"] = "公告"
  307. saveMap["spidercode"] = "a_jyxxfbpt_gg"
  308. saveMap["extracttype"] = extractType
  309. saveMap["areaval"] = 0
  310. saveMap["detail_isvalidity"] = 1
  311. saveMap["infoformat"] = 1
  312. saveMap["dataging"] = 0
  313. saveMap["buyerhint"] = util.IntAll(tmp["contact_overt"])
  314. _id := primitive.NewObjectID()
  315. saveMap["_id"] = _id
  316. saveMap["href"] = fmt.Sprintf(JyUrl, util.CommonEncodeArticle("content", mongodb.BsonIdToSId(_id)))
  317. saveMap["competehref"] = "#"
  318. saveMap["jyfb_data"] = jyMap
  319. saveMap["jyfb_id"] = util.ObjToString(info["id"])
  320. saveMap["public_type"] = "用户发布"
  321. Logger.Debug("InfoPub mgo save: " + fmt.Sprint(saveMap))
  322. MgoBid.SaveByOriID(BidColl, saveMap)
  323. err := OssUpRpc(UploadArgs{
  324. Stream: []byte(util.ObjToString(tmp["detail"])),
  325. Gzip: false,
  326. BucketID: Contenthtml_BucketId,
  327. ObjectName: fmt.Sprintf("%s.txt", mongodb.BsonIdToSId(_id)),
  328. })
  329. if err != nil {
  330. Logger.Error("InfoPub oss upload: " + err.Error())
  331. }
  332. err = OssUpRpc(UploadArgs{
  333. Stream: []byte(cut),
  334. Gzip: false,
  335. BucketID: Detail_BucketId,
  336. ObjectName: fmt.Sprintf("%s.txt", mongodb.BsonIdToSId(_id)),
  337. })
  338. if err != nil {
  339. Logger.Error("InfoPub oss upload: " + err.Error())
  340. }
  341. }
  342. type AttsResponse struct {
  343. Other AttsOther `json:"other"`
  344. Result []*AttsResult `json:"result"`
  345. }
  346. type AttsOther struct {
  347. Id string `json:"id"`
  348. Action string `json:"action"`
  349. MsgType string `json:"msgType"`
  350. Detail []string `json:"detail"`
  351. Title []string `json:"title"`
  352. }
  353. type AttsResult struct {
  354. FileName string `json:"fileName"`
  355. TextUrl string `json:"textUrl"`
  356. TextContent string `json:"textContent"`
  357. FilePath string `json:"filePath"`
  358. ErrorState string `json:"errorState"`
  359. }
  360. // @Description 附件处理完成队列
  361. // @Author J 2022/4/13 3:29 PM
  362. func TaskAtts(obj map[string]interface{}) {
  363. atts := make(map[string]interface{})
  364. atts_text := make(map[string]interface{})
  365. for i, r := range obj["result"].([]interface{}) {
  366. r1 := r.(map[string]interface{})
  367. at := make(map[string]interface{})
  368. text := make(map[string]interface{})
  369. at["state"] = r1["errorState"].(string)
  370. if r1["errorState"].(string) == "200" {
  371. textContent := OssGetObject(util.ObjToString(r1["textUrl"]))
  372. at["sensitive"] = WordsIdentify(textContent)
  373. text["file_name"] = r1["fileName"].(string)
  374. text["attach_url"] = r1["textUrl"].(string)
  375. }
  376. atts[strconv.Itoa(i)] = at
  377. if len(text) > 0 {
  378. atts_text[strconv.Itoa(i)] = text
  379. }
  380. }
  381. attsJson, _ := json.Marshal(atts)
  382. attsTextJson, _ := json.Marshal(atts_text)
  383. // 直接调用剑鱼接口
  384. var other map[string]interface{}
  385. _ = json.Unmarshal([]byte(util.ObjToString(obj["other"])), &other)
  386. req := &jypb.SensitiveRequest{
  387. Id: util.ObjToString(other["id"]),
  388. MsgType: util.ObjToString(other["msgType"]),
  389. //Action: util.ObjToString(obj.Other.Action),
  390. Title: util.ObjArrToStringArr(other["title"].([]interface{})),
  391. Detail: util.ObjArrToStringArr(other["detail"].([]interface{})),
  392. Attachments: string(attsJson),
  393. AttachTxt: string(attsTextJson),
  394. }
  395. Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req))
  396. JyRpcSensitive(req)
  397. }
  398. // @Description 敏感词识别完成调用剑鱼接口
  399. // @Author J 2022/4/13 11:16 AM
  400. func JyRpcSensitive(req *jypb.SensitiveRequest) {
  401. conn := JyRpcClient.Conn()
  402. jyIntf := jypb.NewCommonInfoClient(conn)
  403. resp, err := jyIntf.SensitiveMethod(context.Background(), req)
  404. if err != nil {
  405. Logger.Error(err.Error())
  406. InitEtcd()
  407. resp, err = jyIntf.SensitiveMethod(context.Background(), req)
  408. if err != nil {
  409. Logger.Error(err.Error())
  410. }
  411. }
  412. Logger.Info("JyRpcSensitive response: " + resp.String())
  413. }
  414. // @Description 信息删除(es、bidding、extract、project)
  415. // @Author J 2022/4/8 4:37 PM:00
  416. func DelMethod(mold int, res string) {
  417. if !bson.IsObjectIdHex(res) {
  418. Logger.Error(" bidding del fail, id err" + res)
  419. return
  420. }
  421. q := map[string]interface{}{"_id": mongodb.StringTOBsonId(res)}
  422. b := MgoBid.Del(BidColl, q)
  423. if !b {
  424. Logger.Error(" bidding del fail...")
  425. }
  426. b = MgoExt.Del(ExtColl, q)
  427. if !b {
  428. Logger.Error(" extract del fail...")
  429. }
  430. switch mold {
  431. case 8:
  432. Index = "bidding_yg" //直采-采购信息
  433. var (
  434. ok bool
  435. id string
  436. )
  437. data := Es.Get(Index, `{"query": {"bool": {"must": [{"terms": {"source_id": ["`+res+`"]}}]}},"from": 0,"size": 1}`)
  438. if data != nil && len(*data) > 0 {
  439. one := (*data)[0]
  440. id = util.ObjToString(one["id"])
  441. if id != "" {
  442. ok = Es.DelById(Index, id)
  443. } else {
  444. ok = Es.DelById(Index, res)
  445. }
  446. }
  447. if !ok {
  448. log.Println(id, "用户删除发布信息后 删除索引信息失败:", data, "----", res)
  449. }
  450. default:
  451. Es.DelById(Index, res)
  452. }
  453. //Es.DelById(IndexAll, Itype, res)
  454. project := Sysconfig["project"].(map[string]interface{})
  455. by, _ := json.Marshal(map[string]interface{}{
  456. "infoid": res,
  457. "stype": "deleteInfo",
  458. })
  459. addr := &net.UDPAddr{
  460. IP: net.ParseIP(project["addr"].(string)),
  461. Port: util.IntAll(project["port"]),
  462. }
  463. Logger.Debug(string(by))
  464. _ = UdpClient.WriteUdp(by, udp.OP_TYPE_DATA, addr)
  465. }
  466. // @Description 数据处理完成调用jy接口
  467. // @Author J 2022/4/9 11:41 AM
  468. func JyRpcDataFin(_id string) {
  469. info, _ := MgoBid.FindById(BidColl, _id, `{"jyfb_id": 1}`)
  470. if len(*info) == 0 {
  471. Logger.Error("JyRpcDataFin mgo not find, id: " + _id)
  472. return
  473. }
  474. conn := JyRpcClient.Conn()
  475. req := &jypb.StateRequest{
  476. Id: util.ObjToString((*info)["jyfb_id"]),
  477. PublishId: _id,
  478. }
  479. jyIntf := jypb.NewCommonInfoClient(conn)
  480. Logger.Info("JyRpcDataFin request: " + req.String())
  481. resp, err := jyIntf.StateMethod(context.Background(), req)
  482. if err != nil {
  483. Logger.Error(err.Error())
  484. InitEtcd()
  485. resp, err = jyIntf.StateMethod(context.Background(), req)
  486. if err != nil {
  487. Logger.Error(err.Error())
  488. }
  489. }
  490. Logger.Info("JyRpcDataFin response: " + resp.String())
  491. }
  492. var reg = regexp.MustCompile("[^0-9A-Za-z\u4e00-\u9fa5]+")
  493. var Filter = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]")
  494. func Sha(con string) string {
  495. h := sha256.New()
  496. con = reg.ReplaceAllString(Filter.ReplaceAllString(con, ""), "")
  497. h.Write([]byte(con))
  498. return fmt.Sprintf("%x", h.Sum(nil))
  499. }
  500. type UploadArgs struct {
  501. Stream []byte // 客户端将文件数据传递过来
  502. Gzip bool //是否压缩
  503. BucketID string //桶id
  504. ObjectName string //对象名称
  505. }
  506. type UpResult struct {
  507. Error_code int `json:"error_code"`
  508. Error_msg string `json:"error_msg"`
  509. Data interface{} `json:"data"`
  510. }
  511. func OssUpRpc(req UploadArgs) error {
  512. client, err := rpc.DialHTTP("tcp", Oss_Server_Address)
  513. defer client.Close()
  514. if err != nil {
  515. return err
  516. }
  517. var resp = &UpResult{}
  518. if err = client.Call("OSSService.Upload", req, resp); err != nil {
  519. log.Println("OSSService.Upload", "err", err.Error())
  520. return err
  521. }
  522. return nil
  523. }