task.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. package main
  2. import (
  3. "app.yhyue.com/BP/servicerd/proto"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "google.golang.org/grpc"
  8. "gopkg.in/mgo.v2/bson"
  9. jypb "jy_publishing/proto/common"
  10. pb "jy_publishing/proto/proto"
  11. "net"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "utils"
  16. . "utils/log"
  17. "utils/mongodb"
  18. )
  19. var (
  20. JyUrl = "https://www.jianyu360.cn/article/content/%s.html"
  21. InfoFields = []string{"title", "project_code", "province", "city", "industry", "buyer", "budget", "winner", "amount",
  22. "detail", "attch", "contact_person", "contact_phone", "attach", "discern_attach"}
  23. )
  24. var SaveFields = map[string]string{
  25. "title": "title",
  26. "project_code": "projectcode",
  27. "province": "area",
  28. "city": "city",
  29. "buyer": "buyer",
  30. "budget": "budget",
  31. "winner": "s_winner",
  32. "amount": "bidamount",
  33. "detail": "detail",
  34. "contact_phone": "buyertel",
  35. "contact_person": "buyerperson",
  36. "discern_attach": "attach_text",
  37. //"attch": "",
  38. //"industry": "",
  39. //"contract_overt": "",
  40. }
  41. // @Description 信息处理(信息发布和附件识别)
  42. // 1、敏感词处理,2、信息发布,3、信息删除
  43. // @Author J 2022/4/9 11:47 AM
  44. func taskInfo(obj interface{}) {
  45. info, _ := obj.(map[string]interface{})
  46. if util.ObjToString(info["action"]) == "1" {
  47. // 敏感词处理
  48. Sensitive(info)
  49. } else if util.ObjToString(info["action"]) == "2" {
  50. // 数据处理
  51. InfoPub(info)
  52. } else if util.ObjToString(info["action"]) == "3" {
  53. //id := util.ObjToString(info["id"])
  54. tmp := info["appendInfo"].(map[string]interface{})
  55. DelMethod(util.ObjToString(tmp["publish_id"]))
  56. }
  57. }
  58. // @Description 敏感词处理(title, content, attachment)
  59. // @Author J 2022/4/11 9:36 AM
  60. func Sensitive(info map[string]interface{}) {
  61. tmp := info["appendInfo"].(map[string]interface{})
  62. tArr := WordsIdentify(util.ObjToString(tmp["title"]))
  63. dArr := WordsIdentify(util.ObjToString(tmp["detail"]))
  64. if attsMap, ok := tmp["attach"].(map[string]interface{}); ok && len(attsMap) > 0 {
  65. other := map[string]interface{}{
  66. "id": info["id"],
  67. "action": info["action"],
  68. "msgType": info["msgType"],
  69. "title": tArr,
  70. "detail": dArr,
  71. }
  72. otherJson, _ := json.Marshal(other)
  73. var attsArr []*pb.Request
  74. for _, m := range attsMap {
  75. m1 := m.(map[string]interface{})
  76. attsArr = append(attsArr, &pb.Request{
  77. FileUrl: util.ObjToString(m1["fid"]),
  78. FileName: util.ObjToString(m1["filename"]),
  79. FileType: util.ObjToString(m1["ftype"]),
  80. ReturnType: 1,
  81. ExtractType: 0,
  82. })
  83. }
  84. msginfo := &pb.FileRequest{
  85. Message: attsArr,
  86. Other: string(otherJson),
  87. Topic: FileTopicResult,
  88. }
  89. Logger.Debug("file extract send nsq: " + fmt.Sprint(msginfo))
  90. _ = MProducer.Publish(msginfo)
  91. } else {
  92. // 没有附件
  93. Logger.Debug("title sensitive array: " + fmt.Sprint(tArr))
  94. Logger.Debug("detail sensitive array: " + fmt.Sprint(dArr))
  95. req := &jypb.SensitiveRequest{
  96. Id: util.ObjToString(info["id"]),
  97. MsgType: util.ObjToString(info["msgType"]),
  98. Title: tArr,
  99. Detail: dArr,
  100. }
  101. Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req))
  102. JyRpcSensitive(req)
  103. }
  104. //atts := tmp["attachment"].(map[string]interface{})
  105. //resultAtts := make(map[string]interface{})
  106. //resultAttach := make(map[string]interface{})
  107. //for k, v := range atts {
  108. // attach := make(map[string]interface{}) // attach_text字段
  109. // resp, err := AttsMethod(v.(map[string]interface{}))
  110. // if err != nil {
  111. // return nil
  112. // }
  113. // for i, r := range resp.Result {
  114. // if w := WordsIdentify(r.TextContent); w != nil {
  115. // resultAtts[k] = w
  116. // }
  117. // attach[strconv.Itoa(i)] = map[string]interface{}{"file_name": r.FileName, "attach_url": r.TextUrl}
  118. // }
  119. // resultAttach[k] = attach
  120. //}
  121. //resultMap["attach_text"] = resultAttach
  122. //resultMap["attachment"] = resultAtts
  123. }
  124. // @Description 敏感词识别
  125. // @Author J 2022/4/12 1:33 PM
  126. func WordsIdentify(str string) []string {
  127. if str == "" {
  128. return nil
  129. }
  130. ret := Ms.Discern(str, 2)
  131. if len(ret) > 0 {
  132. var words []string
  133. for _, r := range ret {
  134. words = append(words, r.MatchRule)
  135. }
  136. return words
  137. }
  138. return []string{}
  139. }
  140. // @Description 附件调用gRpc接口处理
  141. // @Author J 2022/4/12 10:02 AM
  142. // Deprecated
  143. func AttsMethod(att map[string]interface{}) (*pb.FileResponse, error) {
  144. reqs := &pb.FileRequest{
  145. Message: []*pb.Request{{
  146. FileName: "",
  147. FileType: "",
  148. FileUrl: ""}},
  149. Other: "",
  150. }
  151. // 1.调用gRPC接口
  152. conn, err := grpc.Dial(ClientAddr, grpc.WithInsecure())
  153. if err != nil {
  154. return nil, err
  155. }
  156. var client proto.ServiceClient
  157. client = proto.NewServiceClient(conn)
  158. repl, err := client.Apply(context.Background(), &proto.ApplyReqData{Name: "extract_service", Balance: 0})
  159. if err != nil {
  160. return nil, err
  161. }
  162. //2.业务调用
  163. addr := fmt.Sprintf("%s:%d", repl.Ip, repl.Port)
  164. conn_b, err := grpc.Dial(addr, grpc.WithInsecure())
  165. if err != nil {
  166. return nil, err
  167. }
  168. defer func(conn_b *grpc.ClientConn) {
  169. _ = conn_b.Close()
  170. }(conn_b)
  171. pc := pb.NewFileExtractClient(conn_b)
  172. rep, err := pc.FileExtract(context.Background(), reqs)
  173. if err != nil {
  174. return nil, err
  175. }
  176. return rep, nil
  177. }
  178. // @Description 信息发布
  179. // @Author J 2022/4/12 1:57 PM
  180. func InfoPub(info map[string]interface{}) {
  181. tmp := info["appendInfo"].(map[string]interface{})
  182. saveMap := make(map[string]interface{})
  183. jyMap := make(map[string]interface{})
  184. for _, f := range InfoFields {
  185. if tmp[f] == nil {
  186. continue
  187. }
  188. if f == "budget" || f == "amount" {
  189. saveMap[SaveFields[f]] = util.Float64All(tmp[f])
  190. jyMap[f] = util.Float64All(tmp[f])
  191. } else if f == "industry" {
  192. // topscopeclass/subcopeclass
  193. //if s := util.ObjToString(tmp[f]); s != "" {
  194. //
  195. // for _, s2 := range strings.Split(s, ",") {
  196. // arr := strings.Split(s2, "_")
  197. // // todo
  198. // }
  199. //}
  200. } else if f == "winner" {
  201. if s := util.ObjToString(tmp[f]); s != "" {
  202. s = strings.ReplaceAll(s, ",", ",") //中文变英文
  203. saveMap[SaveFields[f]] = s
  204. saveMap[f] = s
  205. jyMap[f] = s
  206. jyMap[SaveFields[f]] = s
  207. }
  208. } else if f == "attach" {
  209. s := util.ObjToString(tmp[f])
  210. if s != "" {
  211. atts := map[string]interface{}{}
  212. if err := json.Unmarshal([]byte(s), &atts); err != nil {
  213. Logger.Error("data Unmarshal Failed:", Field("error", err))
  214. }
  215. delete(atts, "uid")
  216. delete(atts, "ossurl")
  217. atts["url"] = "oss"
  218. saveMap["projectinfo"] = map[string]interface{}{"attachments": atts}
  219. }
  220. } else if f == "discern_attach" {
  221. if s := util.ObjToString(tmp[f]); s != "" {
  222. atts := map[string]interface{}{}
  223. if err := json.Unmarshal([]byte(s), &atts); err != nil {
  224. Logger.Error("data Unmarshal Failed:", Field("error", err))
  225. }
  226. saveMap[SaveFields[f]] = atts
  227. }
  228. } else {
  229. if s := util.ObjToString(tmp[f]); s != "" {
  230. saveMap[SaveFields[f]] = s
  231. if SaveFields[f] == "buyer" || SaveFields[f] == "buyerperson" || SaveFields[f] == "buyertel" ||
  232. SaveFields[f] == "area" || SaveFields[f] == "city" {
  233. jyMap[SaveFields[f]] = s
  234. }
  235. }
  236. }
  237. }
  238. now := time.Now()
  239. saveMap["comeintime"] = now.Unix()
  240. saveMap["publishtime"] = now.Unix()
  241. saveMap["contenthtml"] = tmp["detail"]
  242. cut := util.NewCut().ClearHtml(util.ObjToString(tmp["detail"]))
  243. tmp["detail"] = cut
  244. saveMap["site"] = "剑鱼信息发布平台"
  245. saveMap["channel"] = "公告"
  246. saveMap["spidercode"] = "a_jyxxfbpt_gg"
  247. saveMap["extracttype"] = 0
  248. saveMap["areaval"] = 0
  249. saveMap["infoformat"] = 1
  250. saveMap["dataging"] = 0
  251. saveMap["buyerhint"] = util.IntAll(tmp["contact_overt"])
  252. _id := bson.NewObjectIdWithTime(now)
  253. saveMap["href"] = fmt.Sprintf(JyUrl, util.CommonEncodeArticle("content", mongodb.BsonIdToSId(_id)))
  254. saveMap["competehref"] = "#"
  255. saveMap["jyfb_data"] = jyMap
  256. saveMap["jyfb_id"] = util.ObjToString(info["id"])
  257. Logger.Debug("InfoPub mgo save: " + fmt.Sprint(saveMap))
  258. MgoBid.SaveByOriID(BidColl, saveMap)
  259. }
  260. type AttsResponse struct {
  261. Other AttsOther `json:"other"`
  262. Result []*AttsResult `json:"result"`
  263. }
  264. type AttsOther struct {
  265. Id string `json:"id"`
  266. Action string `json:"action"`
  267. MsgType string `json:"msgType"`
  268. Detail []string `json:"detail"`
  269. Title []string `json:"title"`
  270. }
  271. type AttsResult struct {
  272. FileName string `json:"fileName"`
  273. TextUrl string `json:"textUrl"`
  274. TextContent string `json:"textContent"`
  275. FilePath string `json:"filePath"`
  276. ErrorState string `json:"errorState"`
  277. }
  278. // @Description 附件处理完成队列
  279. // @Author J 2022/4/13 3:29 PM
  280. func taskAtts(obj map[string]interface{}) {
  281. atts := make(map[string]interface{})
  282. atts_text := make(map[string]interface{})
  283. for i, r := range obj["result"].([]interface{}) {
  284. r1 := r.(map[string]interface{})
  285. at := make(map[string]interface{})
  286. text := make(map[string]interface{})
  287. at["state"] = r1["errorState"].(string)
  288. if r1["errorState"].(string) == "200" {
  289. at["sensitive"] = WordsIdentify(r1["textContent"].(string))
  290. text["file_name"] = r1["fileName"].(string)
  291. text["attach_url"] = r1["textUrl"].(string)
  292. }
  293. atts[strconv.Itoa(i)] = at
  294. if len(text) > 0 {
  295. atts_text[strconv.Itoa(i)] = text
  296. }
  297. }
  298. attsJson, _ := json.Marshal(atts)
  299. attsTextJson, _ := json.Marshal(atts_text)
  300. // 直接调用剑鱼接口
  301. var other map[string]interface{}
  302. _ = json.Unmarshal([]byte(util.ObjToString(obj["other"])), &other)
  303. req := &jypb.SensitiveRequest{
  304. Id: util.ObjToString(other["id"]),
  305. MsgType: util.ObjToString(other["msgType"]),
  306. //Action: util.ObjToString(obj.Other.Action),
  307. Title: util.ObjArrToStringArr(other["title"].([]interface{})),
  308. Detail: util.ObjArrToStringArr(other["detail"].([]interface{})),
  309. Attachments: string(attsJson),
  310. AttachTxt: string(attsTextJson),
  311. }
  312. Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req))
  313. JyRpcSensitive(req)
  314. }
  315. // @Description 敏感词识别完成调用剑鱼接口
  316. // @Author J 2022/4/13 11:16 AM
  317. func JyRpcSensitive(req *jypb.SensitiveRequest) {
  318. conn := JyRpcClient.Conn()
  319. jyIntf := jypb.NewCommonInfoClient(conn)
  320. resp, err := jyIntf.SensitiveMethod(context.Background(), req)
  321. if err != nil {
  322. Logger.Error(err.Error())
  323. initEtcd()
  324. resp, err = jyIntf.SensitiveMethod(context.Background(), req)
  325. if err != nil {
  326. Logger.Error(err.Error())
  327. }
  328. }
  329. Logger.Info("JyRpcSensitive response: " + resp.String())
  330. }
  331. // @Description 信息删除(es、bidding、extract、project)
  332. // @Author J 2022/4/8 4:37 PM:00
  333. func DelMethod(res string) {
  334. q := map[string]interface{}{"_id": mongodb.StringTOBsonId(res)}
  335. b := MgoBid.Del(BidColl, q)
  336. if !b {
  337. Logger.Error(" bidding del fail...")
  338. }
  339. b = MgoExt.Del(ExtColl, q)
  340. if !b {
  341. Logger.Error(" extract del fail...")
  342. }
  343. Es.DelById(Index, Itype, res)
  344. Es.DelById(IndexAll, Itype, res)
  345. project := Sysconfig["project"].(map[string]interface{})
  346. by, _ := json.Marshal(map[string]interface{}{
  347. "infoid": res,
  348. "stype": "deleteInfo",
  349. })
  350. addr := &net.UDPAddr{
  351. IP: net.ParseIP(project["addr"].(string)),
  352. Port: util.IntAll(project["port"]),
  353. }
  354. Logger.Debug(string(by))
  355. _ = UdpClient.WriteUdp(by, util.OP_TYPE_DATA, addr)
  356. }
  357. // @Description 数据处理完成调用jy接口
  358. // @Author J 2022/4/9 11:41 AM
  359. func JyRpcDataFin(_id string) {
  360. info, _ := MgoBid.FindById(BidColl, _id, `{"jyfb_id": 1}`)
  361. if len(*info) == 0 {
  362. Logger.Error("JyRpcDataFin mgo not find, id: " + _id)
  363. return
  364. }
  365. conn := JyRpcClient.Conn()
  366. req := &jypb.StateRequest{
  367. Id: util.ObjToString((*info)["jyfb_id"]),
  368. PublishId: _id,
  369. }
  370. jyIntf := jypb.NewCommonInfoClient(conn)
  371. Logger.Info("JyRpcDataFin request: " + req.String())
  372. resp, err := jyIntf.StateMethod(context.Background(), req)
  373. if err != nil {
  374. Logger.Error(err.Error())
  375. initEtcd()
  376. resp, err = jyIntf.StateMethod(context.Background(), req)
  377. if err != nil {
  378. Logger.Error(err.Error())
  379. }
  380. }
  381. Logger.Info("JyRpcDataFin response: " + resp.String())
  382. }