task.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. package main
  2. import (
  3. "app.yhyue.com/BP/servicerd/proto"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "google.golang.org/grpc"
  8. "gopkg.in/mgo.v2/bson"
  9. jypb "jy_publishing/proto/common"
  10. pb "jy_publishing/proto/proto"
  11. "net"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "utils"
  16. . "utils/log"
  17. "utils/mongodb"
  18. )
  19. var (
  20. JyUrl = "https://www.jianyu360.cn/article/content/%s.html"
  21. InfoFields = []string{"title", "project_code", "province", "city", "industry", "buyer", "budget", "winner", "amount",
  22. "detail", "attch", "contact_person", "contact_phone", "attach", "discern_attach", "type", "recommended_service"}
  23. )
  24. var SaveFields = map[string]string{
  25. "title": "title",
  26. "project_code": "projectcode",
  27. "province": "area",
  28. "city": "city",
  29. "buyer": "buyer",
  30. "budget": "budget",
  31. "winner": "s_winner",
  32. "amount": "bidamount",
  33. "detail": "detail",
  34. "contact_phone": "buyertel",
  35. "contact_person": "buyerperson",
  36. "discern_attach": "attach_text",
  37. "type": "type", // 消息类型
  38. "recommended_service": "recommended_service", // 供应商推荐服务
  39. //"attch": "",
  40. //"industry": "",
  41. //"contract_overt": "",
  42. }
  43. var InfoType = map[int]string{
  44. 1: "招标信息",
  45. 2: "采购信息",
  46. 4: "招标公告",
  47. 5: "采购意向",
  48. 6: "招标预告",
  49. 7: "招标结果",
  50. }
  51. // @Description 信息处理(信息发布和附件识别)
  52. // 1、敏感词处理,2、信息发布,3、信息删除
  53. // @Author J 2022/4/9 11:47 AM
  54. func taskInfo(obj interface{}) {
  55. info, _ := obj.(map[string]interface{})
  56. if util.ObjToString(info["action"]) == "1" {
  57. // 敏感词处理
  58. Sensitive(info)
  59. } else if util.ObjToString(info["action"]) == "2" {
  60. // 数据处理
  61. InfoPub(info)
  62. } else if util.ObjToString(info["action"]) == "3" {
  63. //id := util.ObjToString(info["id"])
  64. tmp := info["appendInfo"].(map[string]interface{})
  65. DelMethod(util.ObjToString(tmp["publish_id"]))
  66. }
  67. }
  68. // @Description 敏感词处理(title, content, attachment)
  69. // @Author J 2022/4/11 9:36 AM
  70. func Sensitive(info map[string]interface{}) {
  71. tmp := info["appendInfo"].(map[string]interface{})
  72. tArr := WordsIdentify(util.ObjToString(tmp["title"]))
  73. dArr := WordsIdentify(util.ObjToString(tmp["detail"]))
  74. if attsMap, ok := tmp["attach"].(map[string]interface{}); ok && len(attsMap) > 0 {
  75. other := map[string]interface{}{
  76. "id": info["id"],
  77. "action": info["action"],
  78. "msgType": info["msgType"],
  79. "title": tArr,
  80. "detail": dArr,
  81. }
  82. otherJson, _ := json.Marshal(other)
  83. var attsArr []*pb.Request
  84. for _, m := range attsMap {
  85. m1 := m.(map[string]interface{})
  86. attsArr = append(attsArr, &pb.Request{
  87. FileUrl: util.ObjToString(m1["fid"]),
  88. FileName: util.ObjToString(m1["filename"]),
  89. FileType: util.ObjToString(m1["ftype"]),
  90. //ReturnType: 0, // 不传
  91. ExtractType: 0,
  92. })
  93. }
  94. msginfo := &pb.FileRequest{
  95. Message: attsArr,
  96. Other: string(otherJson),
  97. Topic: FileTopicResult,
  98. }
  99. Logger.Debug("file extract send nsq: " + fmt.Sprint(msginfo))
  100. _ = MProducer.Publish(msginfo)
  101. } else {
  102. // 没有附件
  103. Logger.Debug("title sensitive array: " + fmt.Sprint(tArr))
  104. Logger.Debug("detail sensitive array: " + fmt.Sprint(dArr))
  105. req := &jypb.SensitiveRequest{
  106. Id: util.ObjToString(info["id"]),
  107. MsgType: util.ObjToString(info["msgType"]),
  108. Title: tArr,
  109. Detail: dArr,
  110. }
  111. Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req))
  112. JyRpcSensitive(req)
  113. }
  114. //atts := tmp["attachment"].(map[string]interface{})
  115. //resultAtts := make(map[string]interface{})
  116. //resultAttach := make(map[string]interface{})
  117. //for k, v := range atts {
  118. // attach := make(map[string]interface{}) // attach_text字段
  119. // resp, err := AttsMethod(v.(map[string]interface{}))
  120. // if err != nil {
  121. // return nil
  122. // }
  123. // for i, r := range resp.Result {
  124. // if w := WordsIdentify(r.TextContent); w != nil {
  125. // resultAtts[k] = w
  126. // }
  127. // attach[strconv.Itoa(i)] = map[string]interface{}{"file_name": r.FileName, "attach_url": r.TextUrl}
  128. // }
  129. // resultAttach[k] = attach
  130. //}
  131. //resultMap["attach_text"] = resultAttach
  132. //resultMap["attachment"] = resultAtts
  133. }
  134. // @Description 敏感词识别
  135. // @Author J 2022/4/12 1:33 PM
  136. func WordsIdentify(str string) []string {
  137. if str == "" {
  138. return nil
  139. }
  140. ret := Ms.Discern(str, 2)
  141. if len(ret) > 0 {
  142. var words []string
  143. for _, r := range ret {
  144. words = append(words, r.MatchRule)
  145. }
  146. return words
  147. }
  148. return []string{}
  149. }
  150. // @Description 附件调用gRpc接口处理
  151. // @Author J 2022/4/12 10:02 AM
  152. // Deprecated
  153. func AttsMethod(att map[string]interface{}) (*pb.FileResponse, error) {
  154. reqs := &pb.FileRequest{
  155. Message: []*pb.Request{{
  156. FileName: "",
  157. FileType: "",
  158. FileUrl: ""}},
  159. Other: "",
  160. }
  161. // 1.调用gRPC接口
  162. conn, err := grpc.Dial(ClientAddr, grpc.WithInsecure())
  163. if err != nil {
  164. return nil, err
  165. }
  166. var client proto.ServiceClient
  167. client = proto.NewServiceClient(conn)
  168. repl, err := client.Apply(context.Background(), &proto.ApplyReqData{Name: "extract_service", Balance: 0})
  169. if err != nil {
  170. return nil, err
  171. }
  172. //2.业务调用
  173. addr := fmt.Sprintf("%s:%d", repl.Ip, repl.Port)
  174. conn_b, err := grpc.Dial(addr, grpc.WithInsecure())
  175. if err != nil {
  176. return nil, err
  177. }
  178. defer func(conn_b *grpc.ClientConn) {
  179. _ = conn_b.Close()
  180. }(conn_b)
  181. pc := pb.NewFileExtractClient(conn_b)
  182. rep, err := pc.FileExtract(context.Background(), reqs)
  183. if err != nil {
  184. return nil, err
  185. }
  186. return rep, nil
  187. }
  188. // @Description 信息发布
  189. // @Author J 2022/4/12 1:57 PM
  190. func InfoPub(info map[string]interface{}) {
  191. tmp := info["appendInfo"].(map[string]interface{})
  192. saveMap := make(map[string]interface{})
  193. jyMap := make(map[string]interface{})
  194. for _, f := range InfoFields {
  195. if tmp[f] == nil {
  196. continue
  197. }
  198. if f == "budget" || f == "amount" {
  199. saveMap[SaveFields[f]] = util.Float64All(tmp[f])
  200. jyMap[f] = util.Float64All(tmp[f])
  201. } else if f == "industry" {
  202. // topscopeclass/subcopeclass
  203. //if s := util.ObjToString(tmp[f]); s != "" {
  204. //
  205. // for _, s2 := range strings.Split(s, ",") {
  206. // arr := strings.Split(s2, "_")
  207. // // todo
  208. // }
  209. //}
  210. } else if f == "winner" {
  211. if s := util.ObjToString(tmp[f]); s != "" {
  212. s = strings.ReplaceAll(s, ",", ",") //中文变英文
  213. saveMap[SaveFields[f]] = s
  214. saveMap[f] = s
  215. jyMap[f] = s
  216. jyMap[SaveFields[f]] = s
  217. }
  218. } else if f == "attach" {
  219. s := util.ObjToString(tmp[f])
  220. if s != "" {
  221. atts := map[string]interface{}{}
  222. if err := json.Unmarshal([]byte(s), &atts); err != nil {
  223. Logger.Error("data Unmarshal Failed:", Field("error", err))
  224. }
  225. for _, i := range atts {
  226. i2 := i.(map[string]interface{})
  227. //delete(i2, "uid")
  228. delete(i2, "ossurl")
  229. i2["url"] = "oss"
  230. }
  231. saveMap["projectinfo"] = map[string]interface{}{"attachments": atts}
  232. }
  233. } else if f == "discern_attach" {
  234. if s := util.ObjToString(tmp[f]); s != "" {
  235. atts_txt := map[string]interface{}{}
  236. if err := json.Unmarshal([]byte(s), &atts_txt); err != nil {
  237. Logger.Error("data Unmarshal Failed:", Field("error", err))
  238. }
  239. for k, v := range atts_txt {
  240. atts_txt[k] = map[string]interface{}{k: v}
  241. }
  242. saveMap[SaveFields[f]] = atts_txt
  243. }
  244. } else if f == "type" {
  245. jyMap[f] = InfoType[util.IntAll(tmp[f])]
  246. } else if f == "recommended_service" {
  247. saveMap[SaveFields[f]] = util.IntAll(tmp[f])
  248. } else {
  249. if s := util.ObjToString(tmp[f]); s != "" {
  250. saveMap[SaveFields[f]] = s
  251. if SaveFields[f] == "buyer" || SaveFields[f] == "buyerperson" || SaveFields[f] == "buyertel" ||
  252. SaveFields[f] == "area" || SaveFields[f] == "city" {
  253. jyMap[SaveFields[f]] = s
  254. }
  255. }
  256. }
  257. }
  258. now := time.Now()
  259. saveMap["comeintime"] = now.Unix()
  260. saveMap["publishtime"] = now.Unix()
  261. saveMap["contenthtml"] = tmp["detail"]
  262. cut := util.NewCut().ClearHtml(util.ObjToString(tmp["detail"]))
  263. tmp["detail"] = cut
  264. saveMap["site"] = "剑鱼信息发布平台"
  265. saveMap["channel"] = "公告"
  266. saveMap["spidercode"] = "a_jyxxfbpt_gg"
  267. saveMap["extracttype"] = 0
  268. saveMap["areaval"] = 0
  269. saveMap["infoformat"] = 1
  270. saveMap["dataging"] = 0
  271. saveMap["buyerhint"] = util.IntAll(tmp["contact_overt"])
  272. _id := bson.NewObjectIdWithTime(now)
  273. saveMap["href"] = fmt.Sprintf(JyUrl, util.CommonEncodeArticle("content", mongodb.BsonIdToSId(_id)))
  274. saveMap["competehref"] = "#"
  275. saveMap["jyfb_data"] = jyMap
  276. saveMap["jyfb_id"] = util.ObjToString(info["id"])
  277. Logger.Debug("InfoPub mgo save: " + fmt.Sprint(saveMap))
  278. MgoBid.SaveByOriID(BidColl, saveMap)
  279. }
  280. type AttsResponse struct {
  281. Other AttsOther `json:"other"`
  282. Result []*AttsResult `json:"result"`
  283. }
  284. type AttsOther struct {
  285. Id string `json:"id"`
  286. Action string `json:"action"`
  287. MsgType string `json:"msgType"`
  288. Detail []string `json:"detail"`
  289. Title []string `json:"title"`
  290. }
  291. type AttsResult struct {
  292. FileName string `json:"fileName"`
  293. TextUrl string `json:"textUrl"`
  294. TextContent string `json:"textContent"`
  295. FilePath string `json:"filePath"`
  296. ErrorState string `json:"errorState"`
  297. }
  298. // @Description 附件处理完成队列
  299. // @Author J 2022/4/13 3:29 PM
  300. func taskAtts(obj map[string]interface{}) {
  301. atts := make(map[string]interface{})
  302. atts_text := make(map[string]interface{})
  303. for i, r := range obj["result"].([]interface{}) {
  304. r1 := r.(map[string]interface{})
  305. at := make(map[string]interface{})
  306. text := make(map[string]interface{})
  307. at["state"] = r1["errorState"].(string)
  308. if r1["errorState"].(string) == "200" {
  309. textContent := OssGetObject(util.ObjToString(r1["textUrl"]))
  310. at["sensitive"] = WordsIdentify(textContent)
  311. text["file_name"] = r1["fileName"].(string)
  312. text["attach_url"] = r1["textUrl"].(string)
  313. }
  314. atts[strconv.Itoa(i)] = at
  315. if len(text) > 0 {
  316. atts_text[strconv.Itoa(i)] = text
  317. }
  318. }
  319. attsJson, _ := json.Marshal(atts)
  320. attsTextJson, _ := json.Marshal(atts_text)
  321. // 直接调用剑鱼接口
  322. var other map[string]interface{}
  323. _ = json.Unmarshal([]byte(util.ObjToString(obj["other"])), &other)
  324. req := &jypb.SensitiveRequest{
  325. Id: util.ObjToString(other["id"]),
  326. MsgType: util.ObjToString(other["msgType"]),
  327. //Action: util.ObjToString(obj.Other.Action),
  328. Title: util.ObjArrToStringArr(other["title"].([]interface{})),
  329. Detail: util.ObjArrToStringArr(other["detail"].([]interface{})),
  330. Attachments: string(attsJson),
  331. AttachTxt: string(attsTextJson),
  332. }
  333. Logger.Debug("JyRpcSensitive request: " + fmt.Sprint(req))
  334. JyRpcSensitive(req)
  335. }
  336. // @Description 敏感词识别完成调用剑鱼接口
  337. // @Author J 2022/4/13 11:16 AM
  338. func JyRpcSensitive(req *jypb.SensitiveRequest) {
  339. conn := JyRpcClient.Conn()
  340. jyIntf := jypb.NewCommonInfoClient(conn)
  341. resp, err := jyIntf.SensitiveMethod(context.Background(), req)
  342. if err != nil {
  343. Logger.Error(err.Error())
  344. initEtcd()
  345. resp, err = jyIntf.SensitiveMethod(context.Background(), req)
  346. if err != nil {
  347. Logger.Error(err.Error())
  348. }
  349. }
  350. Logger.Info("JyRpcSensitive response: " + resp.String())
  351. }
  352. // @Description 信息删除(es、bidding、extract、project)
  353. // @Author J 2022/4/8 4:37 PM:00
  354. func DelMethod(res string) {
  355. if !bson.IsObjectIdHex(res) {
  356. Logger.Error(" bidding del fail, id err" + res)
  357. return
  358. }
  359. q := map[string]interface{}{"_id": mongodb.StringTOBsonId(res)}
  360. b := MgoBid.Del(BidColl, q)
  361. if !b {
  362. Logger.Error(" bidding del fail...")
  363. }
  364. b = MgoExt.Del(ExtColl, q)
  365. if !b {
  366. Logger.Error(" extract del fail...")
  367. }
  368. Es.DelById(Index, Itype, res)
  369. Es.DelById(IndexAll, Itype, res)
  370. project := Sysconfig["project"].(map[string]interface{})
  371. by, _ := json.Marshal(map[string]interface{}{
  372. "infoid": res,
  373. "stype": "deleteInfo",
  374. })
  375. addr := &net.UDPAddr{
  376. IP: net.ParseIP(project["addr"].(string)),
  377. Port: util.IntAll(project["port"]),
  378. }
  379. Logger.Debug(string(by))
  380. _ = UdpClient.WriteUdp(by, util.OP_TYPE_DATA, addr)
  381. }
  382. // @Description 数据处理完成调用jy接口
  383. // @Author J 2022/4/9 11:41 AM
  384. func JyRpcDataFin(_id string) {
  385. info, _ := MgoBid.FindById(BidColl, _id, `{"jyfb_id": 1}`)
  386. if len(*info) == 0 {
  387. Logger.Error("JyRpcDataFin mgo not find, id: " + _id)
  388. return
  389. }
  390. conn := JyRpcClient.Conn()
  391. req := &jypb.StateRequest{
  392. Id: util.ObjToString((*info)["jyfb_id"]),
  393. PublishId: _id,
  394. }
  395. jyIntf := jypb.NewCommonInfoClient(conn)
  396. Logger.Info("JyRpcDataFin request: " + req.String())
  397. resp, err := jyIntf.StateMethod(context.Background(), req)
  398. if err != nil {
  399. Logger.Error(err.Error())
  400. initEtcd()
  401. resp, err = jyIntf.StateMethod(context.Background(), req)
  402. if err != nil {
  403. Logger.Error(err.Error())
  404. }
  405. }
  406. Logger.Info("JyRpcDataFin response: " + resp.String())
  407. }