task.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. package main
  2. import (
  3. "context"
  4. "crypto/sha256"
  5. "encoding/json"
  6. "fmt"
  7. "go.mongodb.org/mongo-driver/bson/primitive"
  8. "google.golang.org/grpc"
  9. "gopkg.in/mgo.v2/bson"
  10. "jy_publishing/Logger"
  11. jypb "jy_publishing/proto/common"
  12. pb "jy_publishing/proto/proto"
  13. "jygit.jydev.jianyu360.cn/BP/servicerd/proto"
  14. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  15. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  16. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  17. "net"
  18. "regexp"
  19. "strconv"
  20. "strings"
  21. "time"
  22. )
  23. var (
  24. JyUrl = "https://www.jianyu360.cn/article/content/%s.html"
  25. InfoFields = []string{"title", "project_code", "province", "city", "industry", "buyer", "budget", "winner", "amount",
  26. "detail", "attch", "contact_person", "contact_phone", "attach", "discern_attach", "type", "recommended_service"}
  27. )
  28. var SaveFields = map[string]string{
  29. "title": "title",
  30. "project_code": "projectcode",
  31. "province": "area",
  32. "city": "city",
  33. "buyer": "buyer",
  34. "budget": "budget",
  35. "winner": "s_winner",
  36. "amount": "bidamount",
  37. "detail": "detail",
  38. "contact_phone": "buyertel",
  39. "contact_person": "buyerperson",
  40. "discern_attach": "attach_text",
  41. "type": "type", // 消息类型
  42. "recommended_service": "recommended_service", // 供应商推荐服务
  43. //"attch": "",
  44. //"industry": "",
  45. //"contract_overt": "",
  46. }
  47. var InfoType = map[int]string{
  48. 1: "招标信息",
  49. 2: "采购信息",
  50. 4: "招标公告",
  51. 5: "采购意向",
  52. 6: "招标预告",
  53. 7: "招标结果",
  54. }
  55. // @Description 信息处理(信息发布和附件识别)
  56. // 1、敏感词处理,2、信息发布,3、信息删除
  57. // @Author J 2022/4/9 11:47 AM
  58. func taskInfo(obj interface{}) {
  59. info, _ := obj.(map[string]interface{})
  60. if util.ObjToString(info["action"]) == "1" {
  61. // 敏感词处理
  62. Sensitive(info)
  63. } else if util.ObjToString(info["action"]) == "2" {
  64. // 数据处理
  65. InfoPub(info)
  66. } else if util.ObjToString(info["action"]) == "3" {
  67. //id := util.ObjToString(info["id"])
  68. tmp := info["appendInfo"].(map[string]interface{})
  69. DelMethod(util.ObjToString(tmp["publish_id"]))
  70. }
  71. }
  72. // @Description 敏感词处理(title, content, attachment)
  73. // @Author J 2022/4/11 9:36 AM
  74. func Sensitive(info map[string]interface{}) {
  75. tmp := info["appendInfo"].(map[string]interface{})
  76. tArr := WordsIdentify(util.ObjToString(tmp["title"]))
  77. dArr := WordsIdentify(util.ObjToString(tmp["detail"]))
  78. if attsMap, ok := tmp["attach"].(map[string]interface{}); ok && len(attsMap) > 0 {
  79. other := map[string]interface{}{
  80. "id": info["id"],
  81. "action": info["action"],
  82. "msgType": info["msgType"],
  83. "title": tArr,
  84. "detail": dArr,
  85. }
  86. otherJson, _ := json.Marshal(other)
  87. var attsArr []*pb.Request
  88. for _, m := range attsMap {
  89. m1 := m.(map[string]interface{})
  90. attsArr = append(attsArr, &pb.Request{
  91. FileUrl: util.ObjToString(m1["fid"]),
  92. FileName: util.ObjToString(m1["filename"]),
  93. FileType: util.ObjToString(m1["ftype"]),
  94. //ReturnType: 0, // 不传
  95. ExtractType: 0,
  96. })
  97. }
  98. msginfo := &pb.FileRequest{
  99. Message: attsArr,
  100. Other: string(otherJson),
  101. Topic: FileTopicResult,
  102. }
  103. Logger.Debug("file extract send nsq: " + fmt.Sprint(msginfo))
  104. _ = MProducer.Publish(msginfo)
  105. } else {
  106. // 没有附件
  107. Logger.Debug("title sensitive array: " + fmt.Sprint(tArr))
  108. Logger.Debug("detail sensitive array: " + fmt.Sprint(dArr))
  109. req := &jypb.SensitiveRequest{
  110. Id: util.ObjToString(info["id"]),
  111. MsgType: util.ObjToString(info["msgType"]),
  112. Title: tArr,
  113. Detail: dArr,
  114. }
  115. Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req))
  116. JyRpcSensitive(req)
  117. }
  118. //atts := tmp["attachment"].(map[string]interface{})
  119. //resultAtts := make(map[string]interface{})
  120. //resultAttach := make(map[string]interface{})
  121. //for k, v := range atts {
  122. // attach := make(map[string]interface{}) // attach_text字段
  123. // resp, err := AttsMethod(v.(map[string]interface{}))
  124. // if err != nil {
  125. // return nil
  126. // }
  127. // for i, r := range resp.Result {
  128. // if w := WordsIdentify(r.TextContent); w != nil {
  129. // resultAtts[k] = w
  130. // }
  131. // attach[strconv.Itoa(i)] = map[string]interface{}{"file_name": r.FileName, "attach_url": r.TextUrl}
  132. // }
  133. // resultAttach[k] = attach
  134. //}
  135. //resultMap["attach_text"] = resultAttach
  136. //resultMap["attachment"] = resultAtts
  137. }
  138. // @Description 敏感词识别
  139. // @Author J 2022/4/12 1:33 PM
  140. func WordsIdentify(str string) []string {
  141. if str == "" {
  142. return nil
  143. }
  144. ret := Ms.Discern(str, 2)
  145. if len(ret) > 0 {
  146. var words []string
  147. for _, r := range ret {
  148. words = append(words, r.MatchRule)
  149. }
  150. return words
  151. }
  152. return []string{}
  153. }
  154. // @Description 附件调用gRpc接口处理
  155. // @Author J 2022/4/12 10:02 AM
  156. // Deprecated
  157. func AttsMethod(att map[string]interface{}) (*pb.FileResponse, error) {
  158. reqs := &pb.FileRequest{
  159. Message: []*pb.Request{{
  160. FileName: "",
  161. FileType: "",
  162. FileUrl: ""}},
  163. Other: "",
  164. }
  165. // 1.调用gRPC接口
  166. conn, err := grpc.Dial(ClientAddr, grpc.WithInsecure())
  167. if err != nil {
  168. return nil, err
  169. }
  170. var client proto.ServiceClient
  171. client = proto.NewServiceClient(conn)
  172. repl, err := client.Apply(context.Background(), &proto.ApplyReqData{Name: "extract_service", Balance: 0})
  173. if err != nil {
  174. return nil, err
  175. }
  176. //2.业务调用
  177. addr := fmt.Sprintf("%s:%d", repl.Ip, repl.Port)
  178. conn_b, err := grpc.Dial(addr, grpc.WithInsecure())
  179. if err != nil {
  180. return nil, err
  181. }
  182. defer func(conn_b *grpc.ClientConn) {
  183. _ = conn_b.Close()
  184. }(conn_b)
  185. pc := pb.NewFileExtractClient(conn_b)
  186. rep, err := pc.FileExtract(context.Background(), reqs)
  187. if err != nil {
  188. return nil, err
  189. }
  190. return rep, nil
  191. }
  192. // @Description 信息发布
  193. // @Author J 2022/4/12 1:57 PM
  194. func InfoPub(info map[string]interface{}) {
  195. tmp := info["appendInfo"].(map[string]interface{})
  196. saveMap := make(map[string]interface{})
  197. jyMap := make(map[string]interface{})
  198. for _, f := range InfoFields {
  199. if tmp[f] == nil {
  200. continue
  201. }
  202. if f == "budget" || f == "amount" {
  203. saveMap[SaveFields[f]] = util.Float64All(tmp[f])
  204. jyMap[f] = util.Float64All(tmp[f])
  205. } else if f == "industry" {
  206. // topscopeclass/subcopeclass
  207. //if s := util.ObjToString(tmp[f]); s != "" {
  208. //
  209. // for _, s2 := range strings.Split(s, ",") {
  210. // arr := strings.Split(s2, "_")
  211. // // todo
  212. // }
  213. //}
  214. } else if f == "winner" {
  215. if s := util.ObjToString(tmp[f]); s != "" {
  216. s = strings.ReplaceAll(s, ",", ",") //中文变英文
  217. saveMap[SaveFields[f]] = s
  218. saveMap[f] = s
  219. jyMap[f] = s
  220. jyMap[SaveFields[f]] = s
  221. }
  222. } else if f == "attach" {
  223. s := util.ObjToString(tmp[f])
  224. if s != "" {
  225. atts := map[string]interface{}{}
  226. if err := json.Unmarshal([]byte(s), &atts); err != nil {
  227. Logger.Error("data Unmarshal Failed:", Logger.Field("error", err))
  228. }
  229. for _, i := range atts {
  230. i2 := i.(map[string]interface{})
  231. //delete(i2, "uid")
  232. delete(i2, "ossurl")
  233. i2["url"] = "oss"
  234. }
  235. saveMap["projectinfo"] = map[string]interface{}{"attachments": atts}
  236. }
  237. } else if f == "discern_attach" {
  238. if s := util.ObjToString(tmp[f]); s != "" {
  239. atts_txt := map[string]interface{}{}
  240. if err := json.Unmarshal([]byte(s), &atts_txt); err != nil {
  241. Logger.Error("data Unmarshal Failed:", Logger.Field("error", err))
  242. }
  243. for k, v := range atts_txt {
  244. atts_txt[k] = map[string]interface{}{k: v}
  245. }
  246. saveMap[SaveFields[f]] = atts_txt
  247. }
  248. } else if f == "type" {
  249. jyMap[f] = InfoType[util.IntAll(tmp[f])]
  250. } else if f == "recommended_service" {
  251. saveMap[SaveFields[f]] = util.IntAll(tmp[f])
  252. } else {
  253. if s := util.ObjToString(tmp[f]); s != "" {
  254. saveMap[SaveFields[f]] = s
  255. if SaveFields[f] == "buyer" || SaveFields[f] == "buyerperson" || SaveFields[f] == "buyertel" ||
  256. SaveFields[f] == "area" || SaveFields[f] == "city" {
  257. jyMap[SaveFields[f]] = s
  258. }
  259. }
  260. }
  261. }
  262. now := time.Now()
  263. saveMap["comeintime"] = now.Unix()
  264. saveMap["publishtime"] = now.Unix()
  265. saveMap["contenthtml"] = tmp["detail"]
  266. cut := util.NewCut().ClearHtml(util.ObjToString(tmp["detail"]))
  267. tmp["detail"] = cut
  268. saveMap["s_sha"] = Sha(cut)
  269. saveMap["site"] = "剑鱼信息发布平台"
  270. saveMap["channel"] = "公告"
  271. saveMap["spidercode"] = "a_jyxxfbpt_gg"
  272. saveMap["extracttype"] = 0
  273. saveMap["areaval"] = 0
  274. saveMap["detail_isvalidity"] = 1
  275. saveMap["infoformat"] = 1
  276. saveMap["dataging"] = 0
  277. saveMap["buyerhint"] = util.IntAll(tmp["contact_overt"])
  278. _id := primitive.NewObjectID()
  279. saveMap["_id"] = _id
  280. saveMap["href"] = fmt.Sprintf(JyUrl, util.CommonEncodeArticle("content", mongodb.BsonIdToSId(_id)))
  281. saveMap["competehref"] = "#"
  282. saveMap["jyfb_data"] = jyMap
  283. saveMap["jyfb_id"] = util.ObjToString(info["id"])
  284. Logger.Debug("InfoPub mgo save: " + fmt.Sprint(saveMap))
  285. MgoBid.SaveByOriID(BidColl, saveMap)
  286. }
  287. type AttsResponse struct {
  288. Other AttsOther `json:"other"`
  289. Result []*AttsResult `json:"result"`
  290. }
  291. type AttsOther struct {
  292. Id string `json:"id"`
  293. Action string `json:"action"`
  294. MsgType string `json:"msgType"`
  295. Detail []string `json:"detail"`
  296. Title []string `json:"title"`
  297. }
  298. type AttsResult struct {
  299. FileName string `json:"fileName"`
  300. TextUrl string `json:"textUrl"`
  301. TextContent string `json:"textContent"`
  302. FilePath string `json:"filePath"`
  303. ErrorState string `json:"errorState"`
  304. }
  305. // @Description 附件处理完成队列
  306. // @Author J 2022/4/13 3:29 PM
  307. func taskAtts(obj map[string]interface{}) {
  308. atts := make(map[string]interface{})
  309. atts_text := make(map[string]interface{})
  310. for i, r := range obj["result"].([]interface{}) {
  311. r1 := r.(map[string]interface{})
  312. at := make(map[string]interface{})
  313. text := make(map[string]interface{})
  314. at["state"] = r1["errorState"].(string)
  315. if r1["errorState"].(string) == "200" {
  316. textContent := OssGetObject(util.ObjToString(r1["textUrl"]))
  317. at["sensitive"] = WordsIdentify(textContent)
  318. text["file_name"] = r1["fileName"].(string)
  319. text["attach_url"] = r1["textUrl"].(string)
  320. }
  321. atts[strconv.Itoa(i)] = at
  322. if len(text) > 0 {
  323. atts_text[strconv.Itoa(i)] = text
  324. }
  325. }
  326. attsJson, _ := json.Marshal(atts)
  327. attsTextJson, _ := json.Marshal(atts_text)
  328. // 直接调用剑鱼接口
  329. var other map[string]interface{}
  330. _ = json.Unmarshal([]byte(util.ObjToString(obj["other"])), &other)
  331. req := &jypb.SensitiveRequest{
  332. Id: util.ObjToString(other["id"]),
  333. MsgType: util.ObjToString(other["msgType"]),
  334. //Action: util.ObjToString(obj.Other.Action),
  335. Title: util.ObjArrToStringArr(other["title"].([]interface{})),
  336. Detail: util.ObjArrToStringArr(other["detail"].([]interface{})),
  337. Attachments: string(attsJson),
  338. AttachTxt: string(attsTextJson),
  339. }
  340. Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req))
  341. JyRpcSensitive(req)
  342. }
  343. // @Description 敏感词识别完成调用剑鱼接口
  344. // @Author J 2022/4/13 11:16 AM
  345. func JyRpcSensitive(req *jypb.SensitiveRequest) {
  346. conn := JyRpcClient.Conn()
  347. jyIntf := jypb.NewCommonInfoClient(conn)
  348. resp, err := jyIntf.SensitiveMethod(context.Background(), req)
  349. if err != nil {
  350. Logger.Error(err.Error())
  351. initEtcd()
  352. resp, err = jyIntf.SensitiveMethod(context.Background(), req)
  353. if err != nil {
  354. Logger.Error(err.Error())
  355. }
  356. }
  357. Logger.Info("JyRpcSensitive response: " + resp.String())
  358. }
  359. // @Description 信息删除(es、bidding、extract、project)
  360. // @Author J 2022/4/8 4:37 PM:00
  361. func DelMethod(res string) {
  362. if !bson.IsObjectIdHex(res) {
  363. Logger.Error(" bidding del fail, id err" + res)
  364. return
  365. }
  366. q := map[string]interface{}{"_id": mongodb.StringTOBsonId(res)}
  367. b := MgoBid.Del(BidColl, q)
  368. if !b {
  369. Logger.Error(" bidding del fail...")
  370. }
  371. b = MgoExt.Del(ExtColl, q)
  372. if !b {
  373. Logger.Error(" extract del fail...")
  374. }
  375. Es.DelById(Index, res)
  376. //Es.DelById(IndexAll, Itype, res)
  377. project := Sysconfig["project"].(map[string]interface{})
  378. by, _ := json.Marshal(map[string]interface{}{
  379. "infoid": res,
  380. "stype": "deleteInfo",
  381. })
  382. addr := &net.UDPAddr{
  383. IP: net.ParseIP(project["addr"].(string)),
  384. Port: util.IntAll(project["port"]),
  385. }
  386. Logger.Debug(string(by))
  387. _ = UdpClient.WriteUdp(by, udp.OP_TYPE_DATA, addr)
  388. }
  389. // @Description 数据处理完成调用jy接口
  390. // @Author J 2022/4/9 11:41 AM
  391. func JyRpcDataFin(_id string) {
  392. info, _ := MgoBid.FindById(BidColl, _id, `{"jyfb_id": 1}`)
  393. if len(*info) == 0 {
  394. Logger.Error("JyRpcDataFin mgo not find, id: " + _id)
  395. return
  396. }
  397. conn := JyRpcClient.Conn()
  398. req := &jypb.StateRequest{
  399. Id: util.ObjToString((*info)["jyfb_id"]),
  400. PublishId: _id,
  401. }
  402. jyIntf := jypb.NewCommonInfoClient(conn)
  403. Logger.Info("JyRpcDataFin request: " + req.String())
  404. resp, err := jyIntf.StateMethod(context.Background(), req)
  405. if err != nil {
  406. Logger.Error(err.Error())
  407. initEtcd()
  408. resp, err = jyIntf.StateMethod(context.Background(), req)
  409. if err != nil {
  410. Logger.Error(err.Error())
  411. }
  412. }
  413. Logger.Info("JyRpcDataFin response: " + resp.String())
  414. }
  415. var reg = regexp.MustCompile("[^0-9A-Za-z\u4e00-\u9fa5]+")
  416. var Filter = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]")
  417. func Sha(con string) string {
  418. h := sha256.New()
  419. con = reg.ReplaceAllString(Filter.ReplaceAllString(con, ""), "")
  420. h.Write([]byte(con))
  421. return fmt.Sprintf("%x", h.Sum(nil))
  422. }