task.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "bytes"
  7. "context"
  8. "encoding/json"
  9. "errors"
  10. "field-dispose/config"
  11. "field-dispose/proto"
  12. "fmt"
  13. "go.uber.org/zap"
  14. "google.golang.org/grpc"
  15. "google.golang.org/grpc/credentials/insecure"
  16. "net/http"
  17. "strings"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. service "app.yhyue.com/BP/servicerd/proto"
  22. )
  23. var IpDialErrNum, IpGetErrNum, ExtractDialErrNum = int64(0), int64(0), int64(0)
  24. //@Description 处理字段procurementlist、review_experts
  25. // @Author J 2022/8/31 14:57
  26. func getIntention(gtid, lteid string, mapinfo map[string]interface{}) {
  27. defer util.Catch()
  28. sess := MgoB.GetMgoConn()
  29. defer MgoB.DestoryMongoConn(sess)
  30. ch := make(chan bool, 10)
  31. wg := &sync.WaitGroup{}
  32. query := map[string]interface{}{
  33. "_id": map[string]interface{}{
  34. "$gt": mongodb.StringTOBsonId(gtid),
  35. "$lte": mongodb.StringTOBsonId(lteid),
  36. },
  37. }
  38. field := map[string]interface{}{
  39. "toptype": 1,
  40. "attach_text": 1,
  41. "contenthtml": 1,
  42. "site": 1,
  43. "detail": 1,
  44. }
  45. log.Info(fmt.Sprintf("%d", MgoB.Count("bidding", query)))
  46. it := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding").Find(&query).Select(&field).Iter()
  47. count := 0
  48. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  49. if count%20000 == 0 {
  50. log.Info("getIntention", zap.Int("current:", count))
  51. }
  52. ch <- true
  53. wg.Add(1)
  54. go func(tmp map[string]interface{}) {
  55. defer func() {
  56. <-ch
  57. wg.Done()
  58. }()
  59. id := mongodb.BsonIdToSId(tmp["_id"])
  60. update := make(map[string]interface{})
  61. result2 := taskB(tmp, gtid, lteid)
  62. result1 := taskA(tmp, gtid, lteid)
  63. if r, ok := result2["result"].(map[string]interface{}); ok {
  64. if r[id] != nil && len(r[id].([]interface{})) > 0 {
  65. update["review_experts"] = strings.Join(util.ObjArrToStringArr(r[id].([]interface{})), ",")
  66. }
  67. }
  68. util.Debug(result1)
  69. if result1 != nil && len(result1) > 0 {
  70. if result1["purchasinglist"] != nil {
  71. update["purchasinglist"] = result1["purchasinglist"]
  72. }
  73. if result1["procurementlist"] != nil {
  74. update["procurementlist"] = result1["procurementlist"]
  75. }
  76. }
  77. if len(update) > 0 {
  78. updatePool <- []map[string]interface{}{
  79. {"_id": mongodb.StringTOBsonId(id)},
  80. {"$set": update},
  81. }
  82. //updateEsPool <- []map[string]interface{}{{
  83. // "_id": id,
  84. //},
  85. // update,
  86. //}
  87. }
  88. }(tmp)
  89. tmp = map[string]interface{}{}
  90. }
  91. wg.Wait()
  92. log.Info("dispose over...", zap.Int("count:", count), zap.String("gtid:", gtid), zap.String("lteid:", lteid))
  93. NextNode(mapinfo)
  94. }
  95. // @Description procurementlist
  96. // @Author J 2022/8/31 15:28
  97. func taskA(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
  98. toptype, _ := tmp["toptype"].(string)
  99. if toptype != "采购意向" {
  100. return nil
  101. }
  102. id := mongodb.BsonIdToSId(tmp["_id"])
  103. delete(tmp, "_id")
  104. delete(tmp, "detail")
  105. reqStr, err := json.Marshal(tmp)
  106. if err != nil {
  107. ErrorInfoCache <- map[string]interface{}{
  108. "err": "Json Marshal Error",
  109. "type": "采购意向",
  110. "id": id,
  111. "comeintime": time.Now().Unix(),
  112. "ok": false,
  113. "gtid": gtid,
  114. "letid": lteid,
  115. }
  116. return nil
  117. }
  118. //处理数据
  119. result, err := rpcGetFieldP(string(reqStr))
  120. if err != nil { //保存处理异常信息
  121. ErrorInfoCache <- map[string]interface{}{
  122. "err": err.Error(),
  123. "type": "采购意向",
  124. "id": id,
  125. "comeintime": time.Now().Unix(),
  126. "ok": false,
  127. "gtid": gtid,
  128. "letid": lteid,
  129. }
  130. }
  131. return result
  132. }
  133. // @Description review_experts
  134. // @Author J 2022/8/31 15:28
  135. func taskB(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
  136. id := mongodb.BsonIdToSId(tmp["_id"])
  137. reqStr, err := json.Marshal(map[string]interface{}{id: tmp["detail"]})
  138. if err != nil {
  139. ErrorInfoCache <- map[string]interface{}{
  140. "err": "Json Marshal Error",
  141. "type": "评标专家字段",
  142. "id": id,
  143. "comeintime": time.Now().Unix(),
  144. "ok": false,
  145. "gtid": gtid,
  146. "letid": lteid,
  147. }
  148. return nil
  149. }
  150. //处理数据
  151. result, err := rpcGetFieldR(string(reqStr))
  152. if err != nil { //保存处理异常信息
  153. ErrorInfoCache <- map[string]interface{}{
  154. "err": err.Error(),
  155. "type": "评标专家字段",
  156. "id": id,
  157. "comeintime": time.Now().Unix(),
  158. "ok": false,
  159. "gtid": gtid,
  160. "letid": lteid,
  161. }
  162. }
  163. return result
  164. }
  165. func rpcGetFieldP(reqStr string) (map[string]interface{}, error) {
  166. defer util.Catch()
  167. //获取ip、port服务
  168. ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  169. if ipErr != nil {
  170. atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1
  171. return nil, errors.New("Ip Grpc Dial Error")
  172. }
  173. atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置
  174. defer ipConn.Close()
  175. ipClient := service.NewServiceClient(ipConn)
  176. ip := ""
  177. port := -1
  178. //重试获取ip、port
  179. for i := 1; i <= 3; i++ {
  180. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 0})
  181. if err != nil {
  182. continue
  183. } else {
  184. ip = repl.Ip
  185. port = int(repl.Port)
  186. break
  187. }
  188. }
  189. if ip == "" || port == -1 { //重试三次,回去ip、port失败
  190. atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1
  191. return nil, errors.New("Get Ip Error")
  192. }
  193. atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
  194. //处理数据
  195. addr := ip + ":" + fmt.Sprint(port)
  196. log.Info("rpc", zap.String("addr", addr), zap.String("str", reqStr))
  197. conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  198. if err != nil {
  199. atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
  200. return nil, errors.New("Extract Grpc Dial Error,addr:" + addr)
  201. }
  202. atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置
  203. defer conn.Close()
  204. client := proto.NewGoodsExtractClient(conn)
  205. req := &proto.GoodsRequest{
  206. Contents: reqStr,
  207. }
  208. resp, err := client.GoodsExtract(context.Background(), req)
  209. if err != nil {
  210. return nil, errors.New("Deal Data Error")
  211. }
  212. result := map[string]interface{}{}
  213. if json.Unmarshal([]byte(resp.Goods), &result) != nil {
  214. return nil, errors.New("Json Unmarshal Error")
  215. }
  216. return result, nil
  217. }
  218. func rpcGetFieldR(reqStr string) (map[string]interface{}, error) {
  219. defer util.Catch()
  220. //获取ip、port服务
  221. ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  222. if ipErr != nil {
  223. atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1
  224. return nil, errors.New("Ip Grpc Dial Error")
  225. }
  226. atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置
  227. defer ipConn.Close()
  228. ipClient := service.NewServiceClient(ipConn)
  229. ip := ""
  230. port := -1
  231. //重试获取ip、port
  232. for i := 1; i <= 3; i++ {
  233. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0})
  234. if err != nil {
  235. continue
  236. } else {
  237. ip = repl.Ip
  238. port = int(repl.Port)
  239. break
  240. }
  241. }
  242. if ip == "" || port == -1 { //重试三次,回去ip、port失败
  243. atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1
  244. return nil, errors.New("Get Ip Error")
  245. }
  246. atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
  247. //处理数据
  248. addr := ip + ":" + fmt.Sprint(port)
  249. log.Info("rpc", zap.String("addr", addr), zap.String("str", reqStr))
  250. conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  251. if err != nil {
  252. atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
  253. return nil, errors.New("Extract Grpc Dial Error,addr:" + addr)
  254. }
  255. atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置
  256. defer conn.Close()
  257. client := proto.NewExistsExpertClient(conn)
  258. req := &proto.ContentRequest{
  259. Contents: reqStr,
  260. }
  261. resp, err := client.Extract(context.Background(), req)
  262. if err != nil {
  263. return nil, errors.New("Deal Data Error")
  264. }
  265. result := make(map[string]interface{})
  266. if json.Unmarshal([]byte(resp.Results), &result) != nil {
  267. return nil, errors.New("Json Unmarshal Error")
  268. }
  269. return result, nil
  270. }
  271. //定时检测异常数据量,发告警
  272. func CheckErrorNum() {
  273. defer util.Catch()
  274. for {
  275. warnText := ""
  276. //消息治理中心服务连接异常告警
  277. if IpDialErrNum > 3 {
  278. warnText += "消息治理中心连接失败次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  279. }
  280. //获取治理中心IP异常告警
  281. if IpDialErrNum > 5 {
  282. warnText += "获取IP异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  283. }
  284. //连接IP服务异常告警
  285. if IpDialErrNum > 5 {
  286. warnText += "IP连接异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  287. }
  288. if warnText != "" {
  289. SendWarnInfo(warnText)
  290. }
  291. time.Sleep(30 * time.Minute)
  292. }
  293. }
  294. var TextModel = `{
  295. "msgtype": "text",
  296. "text": {
  297. "content": "%s",
  298. "mentioned_mobile_list":[%s]
  299. }
  300. }`
  301. //告警信息
  302. func SendWarnInfo(content string) {
  303. defer util.Catch()
  304. toUserMsg := fmt.Sprintf(TextModel, content, "13373929153,15090279371") //李俊亮、王江含
  305. log.Info("SendWarnInfo", zap.String("toUserMsg", toUserMsg))
  306. resp1, err := http.Post(
  307. "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=45962efc-ca87-4996-9ffa-08bf6608ab7a",
  308. "application/json",
  309. bytes.NewBuffer([]byte(toUserMsg)),
  310. )
  311. if err != nil {
  312. fmt.Println("request error:", err)
  313. return
  314. }
  315. defer resp1.Body.Close()
  316. }
  317. var SP = make(chan bool, 5)
  318. var ErrorInfoCache = make(chan map[string]interface{}, 1000) //异常信息集合
  319. //批量保存异常信息
  320. func SaveErrorInfo() {
  321. log.Info("SaveErrorInfo 异常信息...")
  322. savearr := make([]map[string]interface{}, 200)
  323. indexh := 0
  324. for {
  325. select {
  326. case v := <-ErrorInfoCache:
  327. savearr[indexh] = v
  328. indexh++
  329. if indexh == 200 {
  330. SP <- true
  331. go func(savearr []map[string]interface{}) {
  332. defer func() {
  333. <-SP
  334. }()
  335. MgoB.SaveBulk("bidding_warn", savearr...)
  336. }(savearr)
  337. savearr = make([]map[string]interface{}, 200)
  338. indexh = 0
  339. }
  340. case <-time.After(1 * time.Minute):
  341. if indexh > 0 {
  342. SP <- true
  343. go func(savearr []map[string]interface{}) {
  344. defer func() {
  345. <-SP
  346. }()
  347. MgoB.SaveBulk("bidding_warn", savearr...)
  348. }(savearr[:indexh])
  349. savearr = make([]map[string]interface{}, 200)
  350. indexh = 0
  351. }
  352. }
  353. }
  354. }