task.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. package main
  2. import (
  3. "app.yhyue.com/BP/servicerd/proto"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "google.golang.org/grpc"
  8. "gopkg.in/mgo.v2/bson"
  9. pb "jy_publishing/proto"
  10. "net"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "utils"
  15. "utils/log"
  16. "utils/mongodb"
  17. )
  18. var (
  19. JyUrl = "https://www.jianyu360.cn/article/content/%s.html"
  20. InfoFields = []string{"title", "project_code", "province", "city", "industry", "buyer", "budget", "winner", "amount",
  21. "detail", "attch", "contract_person", "contract_phone", "attach", "discern_attach"}
  22. )
  23. var SaveFields = map[string]string{
  24. "title": "title",
  25. "project_code": "projectcode",
  26. "province": "area",
  27. "city": "city",
  28. "buyer": "buyer",
  29. "budget": "budget",
  30. "winner": "s_winner",
  31. "amount": "bidamount",
  32. "detail": "detail",
  33. "contract_person": "buyerperson",
  34. "contract_phone": "buyertel",
  35. "discern_attach": "attach_text",
  36. //"attch": "",
  37. //"industry": "",
  38. //"contract_overt": "",
  39. }
  40. // @Description 信息处理(信息发布和附件识别)
  41. // 1、敏感词处理,2、信息发布,3、信息删除
  42. // @Author J 2022/4/9 11:47 AM
  43. func taskInfo(obj interface{}) {
  44. info, _ := obj.(map[string]interface{})
  45. if util.ObjToString(info["action"]) == "1" {
  46. // 敏感词处理
  47. Sensitive(info)
  48. } else if util.ObjToString(info["action"]) == "2" {
  49. // 数据处理
  50. InfoPub(info)
  51. } else if util.ObjToString(info["action"]) == "3" {
  52. id := util.ObjToString(info["id"])
  53. DelMethod(id)
  54. }
  55. }
  56. // @Description 敏感词处理(title, content, attachment)
  57. // @Author J 2022/4/11 9:36 AM
  58. func Sensitive(info map[string]interface{}) {
  59. tmp := info["appendInfo"].(map[string]interface{})
  60. tArr := WordsIdentify(util.ObjToString(tmp["title"]))
  61. dArr := WordsIdentify(util.ObjToString(tmp["detail"]))
  62. if tmp["attach"] != nil {
  63. attsMap := tmp["attach"].(map[string]interface{})
  64. other := map[string]interface{}{
  65. "id": tmp["id"],
  66. "action": tmp["action"],
  67. "msgType": tmp["msgType"],
  68. "title": strings.Join(tArr, ","),
  69. "detail": strings.Join(dArr, ","),
  70. }
  71. otherJson, _ := json.Marshal(other)
  72. var attsArr []*pb.Request
  73. for _, m := range attsMap {
  74. m1 := m.(map[string]interface{})
  75. attsArr = append(attsArr, &pb.Request{
  76. FileUrl: util.ObjToString(m1["fid"]),
  77. FileName: util.ObjToString(m1["filename"]),
  78. FileType: util.ObjToString(m1["ftype"]),
  79. ReturnType: 2,
  80. ExtractType: 2,
  81. })
  82. }
  83. msginfo := &pb.FileRequest{
  84. Message: attsArr,
  85. Other: string(otherJson),
  86. }
  87. _ = MProducer.Publish(msginfo)
  88. } else {
  89. // 没有附件
  90. req := &pb.SensitiveRequest{
  91. Id: util.ObjToString(info["id"]),
  92. MsgType: util.ObjToString(info["msgtype"]),
  93. Title: tArr,
  94. Detail: dArr,
  95. }
  96. JyRpcSensitive(req)
  97. }
  98. //atts := tmp["attachment"].(map[string]interface{})
  99. //resultAtts := make(map[string]interface{})
  100. //resultAttach := make(map[string]interface{})
  101. //for k, v := range atts {
  102. // attach := make(map[string]interface{}) // attach_text字段
  103. // resp, err := AttsMethod(v.(map[string]interface{}))
  104. // if err != nil {
  105. // return nil
  106. // }
  107. // for i, r := range resp.Result {
  108. // if w := WordsIdentify(r.TextContent); w != nil {
  109. // resultAtts[k] = w
  110. // }
  111. // attach[strconv.Itoa(i)] = map[string]interface{}{"file_name": r.FileName, "attach_url": r.TextUrl}
  112. // }
  113. // resultAttach[k] = attach
  114. //}
  115. //resultMap["attach_text"] = resultAttach
  116. //resultMap["attachment"] = resultAtts
  117. }
  118. // @Description 敏感词识别
  119. // @Author J 2022/4/12 1:33 PM
  120. func WordsIdentify(str string) []string {
  121. if str == "" {
  122. return nil
  123. }
  124. ret := Ms.Discern(str, 2)
  125. if len(ret) > 0 {
  126. var words []string
  127. for _, r := range ret {
  128. words = append(words, r.MatchRule)
  129. }
  130. return words
  131. }
  132. return []string{}
  133. }
  134. // @Description 附件调用gRpc接口处理
  135. // @Author J 2022/4/12 10:02 AM
  136. // Deprecated
  137. func AttsMethod(att map[string]interface{}) (*pb.FileResponse, error) {
  138. reqs := &pb.FileRequest{
  139. Message: []*pb.Request{{
  140. FileName: "",
  141. FileType: "",
  142. FileUrl: ""}},
  143. Other: "",
  144. }
  145. // 1.调用gRPC接口
  146. conn, err := grpc.Dial(ClientAddr, grpc.WithInsecure())
  147. if err != nil {
  148. return nil, err
  149. }
  150. var client proto.ServiceClient
  151. client = proto.NewServiceClient(conn)
  152. repl, err := client.Apply(context.Background(), &proto.ApplyReqData{Name: "extract_service", Balance: 0})
  153. if err != nil {
  154. return nil, err
  155. }
  156. //2.业务调用
  157. addr := fmt.Sprintf("%s:%d", repl.Ip, repl.Port)
  158. conn_b, err := grpc.Dial(addr, grpc.WithInsecure())
  159. if err != nil {
  160. return nil, err
  161. }
  162. defer func(conn_b *grpc.ClientConn) {
  163. _ = conn_b.Close()
  164. }(conn_b)
  165. pc := pb.NewFileExtractClient(conn_b)
  166. rep, err := pc.FileExtract(context.Background(), reqs)
  167. if err != nil {
  168. return nil, err
  169. }
  170. return rep, nil
  171. }
  172. // @Description 信息发布
  173. // @Author J 2022/4/12 1:57 PM
  174. func InfoPub(info map[string]interface{}) {
  175. tmp := info["appendInfo"].(map[string]interface{})
  176. saveMap := make(map[string]interface{})
  177. jyMap := make(map[string]interface{})
  178. for _, f := range InfoFields {
  179. if tmp[f] == nil {
  180. continue
  181. }
  182. if f == "budget" || f == "amount" {
  183. saveMap[SaveFields[f]] = util.Float64All(tmp[f])
  184. jyMap[f] = util.Float64All(tmp[f])
  185. } else if f == "industry" {
  186. // topscopeclass/subcopeclass
  187. //if s := util.ObjToString(tmp[f]); s != "" {
  188. //
  189. // for _, s2 := range strings.Split(s, ",") {
  190. // arr := strings.Split(s2, "_")
  191. // // todo
  192. // }
  193. //}
  194. } else if f == "winner" {
  195. if s := util.ObjToString(tmp[f]); s != "" {
  196. s = strings.ReplaceAll(s, ",", ",") //中文变英文
  197. saveMap[SaveFields[f]] = s
  198. saveMap[f] = s
  199. jyMap[f] = s
  200. jyMap[SaveFields[f]] = s
  201. }
  202. } else if f == "attach" {
  203. s := util.ObjToString(tmp[f])
  204. atts := map[string]interface{}{}
  205. if err := json.Unmarshal([]byte(s), &atts); err != nil {
  206. log.Error("data Unmarshal Failed:", log.Field("error", err))
  207. }
  208. saveMap[SaveFields[f]] = atts
  209. //for _, a := range atts {
  210. // a1 := a.(map[string]interface{})
  211. // a1["filename"] = util.ObjToString(a1["fileName"])
  212. // delete(a1, "fileName")
  213. // a1[""] = a[""]
  214. //}
  215. } else if f == "discern_attach" {
  216. if s := util.ObjToString(tmp[f]); s != "" {
  217. atts := map[string]interface{}{}
  218. if err := json.Unmarshal([]byte(s), &atts); err != nil {
  219. log.Error("data Unmarshal Failed:", log.Field("error", err))
  220. }
  221. saveMap[SaveFields[f]] = atts
  222. }
  223. } else {
  224. if s := util.ObjToString(tmp[f]); s != "" {
  225. saveMap[SaveFields[f]] = s
  226. if SaveFields[f] == "buyer" || SaveFields[f] == "buyerperson" || SaveFields[f] == "buyertel" ||
  227. SaveFields[f] == "area" || SaveFields[f] == "city" {
  228. jyMap[SaveFields[f]] = s
  229. }
  230. }
  231. }
  232. }
  233. now := time.Now()
  234. saveMap["comeintime"] = now.Unix()
  235. saveMap["publishtime"] = now.Unix()
  236. saveMap["contenthtml"] = tmp["detail"]
  237. saveMap["site"] = "剑鱼信息发布平台"
  238. saveMap["channel"] = "公告"
  239. saveMap["spidercode"] = "a_jyxxfbpt_gg"
  240. saveMap["extracttype"] = 0
  241. saveMap["areaval"] = 0
  242. saveMap["infoformat"] = 1
  243. saveMap["dataging"] = 0
  244. saveMap["buyerhint"] = util.IntAll(tmp["contact_overt"])
  245. _id := bson.NewObjectIdWithTime(now)
  246. saveMap["href"] = fmt.Sprintf(JyUrl, util.CommonEncodeArticle("content", mongodb.BsonIdToSId(_id)))
  247. saveMap["competehref"] = "#"
  248. saveMap["jyfb_data"] = jyMap
  249. saveMap["jyfbid"] = tmp["_id"]
  250. MgoBid.SaveByOriID(BidColl, saveMap)
  251. }
  252. // @Description 附件处理完成队列
  253. // @Author J 2022/4/13 3:29 PM
  254. func taskAtts(obj interface{}) {
  255. result := obj.(*pb.FileResponse)
  256. util.Info(result.String())
  257. otherMap := make(map[string]interface{})
  258. if err := json.Unmarshal([]byte(result.Other), &otherMap); err != nil {
  259. log.Error("data Unmarshal Failed:", log.Field("error", err))
  260. }
  261. atts := make(map[string]interface{})
  262. atts_text := make(map[string]interface{})
  263. for i, r := range result.Result {
  264. at := make(map[string]interface{})
  265. text := make(map[string]interface{})
  266. at["state"] = r.ErrorState
  267. if r.ErrorState == "200" {
  268. at["sensitive"] = WordsIdentify(r.TextContent)
  269. text["file_name"] = r.FileName
  270. text["attach_url"] = r.TextUrl
  271. }
  272. atts[strconv.Itoa(i)] = at
  273. if len(text) > 0 {
  274. atts_text[strconv.Itoa(i)] = text
  275. }
  276. }
  277. attsJson, _ := json.Marshal(atts)
  278. attsTextJson, _ := json.Marshal(atts_text)
  279. // 直接调用剑鱼接口
  280. req := &pb.SensitiveRequest{
  281. Id: util.ObjToString(otherMap["id"]),
  282. MsgType: util.ObjToString(otherMap["msgType"]),
  283. Action: util.ObjToString(otherMap["action"]),
  284. Title: strings.Split(util.ObjToString(otherMap["title"]), ","),
  285. Detail: strings.Split(util.ObjToString(otherMap["detail"]), ","),
  286. Attachments: string(attsJson),
  287. AttachTxt: string(attsTextJson),
  288. }
  289. JyRpcSensitive(req)
  290. }
  291. // @Description 敏感词识别完成调用剑鱼接口
  292. // @Author J 2022/4/13 11:16 AM
  293. func JyRpcSensitive(req *pb.SensitiveRequest) {
  294. conn := JyRpcClient.Conn()
  295. defer conn.Close()
  296. jyIntf := pb.NewJyInterfaceClient(conn)
  297. resp, err := jyIntf.SensitiveMethod(context.Background(), req)
  298. if err != nil {
  299. log.Error(err.Error())
  300. }
  301. log.Info(resp.String())
  302. }
  303. // @Description 信息删除(es、bidding、extract、project)
  304. // @Author J 2022/4/8 4:37 PM:00
  305. func DelMethod(res string) {
  306. q := map[string]interface{}{"_id": mongodb.StringTOBsonId(res)}
  307. b := MgoBid.Del(BidColl, q)
  308. if !b {
  309. log.Error(" bidding del fail...")
  310. }
  311. b = MgoExt.Del(ExtColl, q)
  312. if !b {
  313. log.Error(" extract del fail...")
  314. }
  315. Es.DelById(Index, Itype, res)
  316. Es.DelById(IndexAll, Itype, res)
  317. project := Sysconfig["project"].(map[string]interface{})
  318. by, _ := json.Marshal(map[string]interface{}{
  319. "infoid": res,
  320. "stype": "deleteInfo",
  321. })
  322. addr := &net.UDPAddr{
  323. IP: net.ParseIP(project["addr"].(string)),
  324. Port: util.IntAll(project["port"]),
  325. }
  326. log.Debug(string(by))
  327. _ = UdpClient.WriteUdp(by, util.OP_TYPE_DATA, addr)
  328. }