task.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "bytes"
  7. "context"
  8. "encoding/json"
  9. "errors"
  10. "field-dispose/config"
  11. "field-dispose/proto"
  12. "fmt"
  13. "go.mongodb.org/mongo-driver/bson"
  14. "go.uber.org/zap"
  15. "google.golang.org/grpc"
  16. "google.golang.org/grpc/credentials/insecure"
  17. "net/http"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. service "app.yhyue.com/BP/servicerd/proto"
  23. )
  24. var IpDialErrNum, IpGetErrNum, ExtractDialErrNum = int64(0), int64(0), int64(0)
  25. //@Description 处理字段procurementlist、review_experts
  26. // @Author J 2022/8/31 14:57
  27. func getIntention(gtid, lteid string, mapinfo map[string]interface{}) {
  28. defer util.Catch()
  29. MgoB.Update("bidding_processing_ids", bson.M{"gtid": gtid}, bson.M{"$set": bson.M{"dataprocess": 2, "updatetime": time.Now().Unix()}}, false, false)
  30. sess := MgoB.GetMgoConn()
  31. defer MgoB.DestoryMongoConn(sess)
  32. ch := make(chan bool, config.Conf.Serve.Thread)
  33. wg := &sync.WaitGroup{}
  34. query := map[string]interface{}{
  35. "_id": map[string]interface{}{
  36. "$gt": mongodb.StringTOBsonId(gtid),
  37. "$lte": mongodb.StringTOBsonId(lteid),
  38. },
  39. }
  40. field := map[string]interface{}{
  41. "toptype": 1,
  42. "attach_text": 1,
  43. "contenthtml": 1,
  44. "site": 1,
  45. "detail": 1,
  46. }
  47. log.Info(fmt.Sprintf("count --- %d", MgoB.Count("bidding", query)))
  48. it := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding").Find(&query).Select(&field).Iter()
  49. count := 0
  50. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  51. if count%200 == 0 {
  52. log.Info("getIntention", zap.Int("current:", count))
  53. }
  54. ch <- true
  55. wg.Add(1)
  56. go func(tmp map[string]interface{}) {
  57. defer func() {
  58. <-ch
  59. wg.Done()
  60. }()
  61. id := mongodb.BsonIdToSId(tmp["_id"])
  62. update := make(map[string]interface{})
  63. result2 := taskB(tmp, gtid, lteid)
  64. result1 := taskA(tmp, gtid, lteid)
  65. if r, ok := result2["result"].(map[string]interface{}); ok {
  66. if r[id] != nil && len(r[id].([]interface{})) > 0 {
  67. update["review_experts"] = strings.Join(util.ObjArrToStringArr(r[id].([]interface{})), ",")
  68. }
  69. }
  70. if result1 != nil && len(result1) > 0 {
  71. if result1["purchasinglist"] != nil {
  72. update["purchasinglist"] = result1["purchasinglist"]
  73. }
  74. if result1["procurementlist"] != nil {
  75. update["procurementlist"] = result1["procurementlist"]
  76. }
  77. }
  78. if len(update) > 0 {
  79. updatePool <- []map[string]interface{}{
  80. {"_id": mongodb.StringTOBsonId(id)},
  81. {"$set": update},
  82. }
  83. //updateEsPool <- []map[string]interface{}{{
  84. // "_id": id,
  85. //},
  86. // update,
  87. //}
  88. }
  89. }(tmp)
  90. tmp = map[string]interface{}{}
  91. }
  92. wg.Wait()
  93. log.Info("dispose over...", zap.Int("count:", count), zap.String("gtid:", gtid), zap.String("lteid:", lteid))
  94. NextNode(mapinfo)
  95. }
  96. // @Description procurementlist
  97. // @Author J 2022/8/31 15:28
  98. func taskA(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
  99. id := mongodb.BsonIdToSId(tmp["_id"])
  100. delete(tmp, "_id")
  101. delete(tmp, "detail")
  102. reqStr, err := json.Marshal(tmp)
  103. if err != nil {
  104. ErrorInfoCache <- map[string]interface{}{
  105. "err": "Json Marshal Error",
  106. "type": "采购意向",
  107. "id": id,
  108. "comeintime": time.Now().Unix(),
  109. "ok": false,
  110. "gtid": gtid,
  111. "letid": lteid,
  112. }
  113. return nil
  114. }
  115. //处理数据
  116. result, err := rpcGetFieldP(string(reqStr))
  117. if err != nil { //保存处理异常信息
  118. ErrorInfoCache <- map[string]interface{}{
  119. "err": err.Error(),
  120. "type": "采购意向",
  121. "id": id,
  122. "comeintime": time.Now().Unix(),
  123. "ok": false,
  124. "gtid": gtid,
  125. "letid": lteid,
  126. }
  127. }
  128. return result
  129. }
  130. // @Description review_experts
  131. // @Author J 2022/8/31 15:28
  132. func taskB(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
  133. id := mongodb.BsonIdToSId(tmp["_id"])
  134. reqStr, err := json.Marshal(map[string]interface{}{id: tmp["detail"]})
  135. if err != nil {
  136. ErrorInfoCache <- map[string]interface{}{
  137. "err": "Json Marshal Error",
  138. "type": "评标专家字段",
  139. "id": id,
  140. "comeintime": time.Now().Unix(),
  141. "ok": false,
  142. "gtid": gtid,
  143. "letid": lteid,
  144. }
  145. return nil
  146. }
  147. //处理数据
  148. result, err := rpcGetFieldR(string(reqStr))
  149. if err != nil { //保存处理异常信息
  150. ErrorInfoCache <- map[string]interface{}{
  151. "err": err.Error(),
  152. "type": "评标专家字段",
  153. "id": id,
  154. "comeintime": time.Now().Unix(),
  155. "ok": false,
  156. "gtid": gtid,
  157. "letid": lteid,
  158. }
  159. }
  160. return result
  161. }
  162. func rpcGetFieldP(reqStr string) (map[string]interface{}, error) {
  163. defer util.Catch()
  164. //获取ip、port服务
  165. ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  166. if ipErr != nil {
  167. atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1
  168. return nil, errors.New("Ip Grpc Dial Error")
  169. }
  170. atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置
  171. defer ipConn.Close()
  172. ipClient := service.NewServiceClient(ipConn)
  173. ip := ""
  174. port := -1
  175. //重试获取ip、port
  176. for i := 1; i <= 3; i++ {
  177. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 3})
  178. if err != nil {
  179. continue
  180. } else {
  181. ip = repl.Ip
  182. port = int(repl.Port)
  183. break
  184. }
  185. }
  186. if ip == "" || port == -1 { //重试三次,回去ip、port失败
  187. atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1
  188. return nil, errors.New("Get Ip Error")
  189. }
  190. atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
  191. //处理数据
  192. addr := ip + ":" + fmt.Sprint(port)
  193. start := time.Now()
  194. conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  195. if err != nil {
  196. atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
  197. return nil, errors.New("Extract Grpc Dial Error,addr:" + addr)
  198. }
  199. atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置
  200. defer conn.Close()
  201. client := proto.NewGoodsExtractClient(conn)
  202. req := &proto.GoodsRequest{
  203. Contents: reqStr,
  204. }
  205. //ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
  206. //defer cancel()
  207. resp, err := client.GoodsExtract(context.Background(), req)
  208. if err != nil {
  209. return nil, errors.New("Deal Data Error")
  210. }
  211. result := map[string]interface{}{}
  212. if json.Unmarshal([]byte(resp.Goods), &result) != nil {
  213. return nil, errors.New("Json Unmarshal Error")
  214. }
  215. // 服务中心释放服务
  216. _, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
  217. if err != nil {
  218. return nil, err
  219. }
  220. if time.Since(start).Minutes() > 5 {
  221. // py接口字段识别超过5分钟
  222. log.Info("rpcGetFieldP 字段识别超过5min", zap.Any("serve", "goods_service"), zap.Any("reqStr", reqStr), zap.Any("ip+port", addr))
  223. }
  224. return result, nil
  225. }
  226. func rpcGetFieldR(reqStr string) (map[string]interface{}, error) {
  227. defer util.Catch()
  228. //获取ip、port服务
  229. ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  230. if ipErr != nil {
  231. atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1
  232. return nil, errors.New("Ip Grpc Dial Error")
  233. }
  234. atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置
  235. defer ipConn.Close()
  236. ipClient := service.NewServiceClient(ipConn)
  237. ip := ""
  238. port := -1
  239. //重试获取ip、port
  240. for i := 1; i <= 3; i++ {
  241. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0})
  242. if err != nil {
  243. continue
  244. } else {
  245. ip = repl.Ip
  246. port = int(repl.Port)
  247. break
  248. }
  249. }
  250. if ip == "" || port == -1 { //重试三次,回去ip、port失败
  251. atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1
  252. return nil, errors.New("Get Ip Error")
  253. }
  254. atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
  255. //处理数据
  256. addr := ip + ":" + fmt.Sprint(port)
  257. start := time.Now()
  258. conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  259. if err != nil {
  260. atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
  261. return nil, errors.New("Extract Grpc Dial Error,addr:" + addr)
  262. }
  263. atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置
  264. defer conn.Close()
  265. client := proto.NewExistsExpertClient(conn)
  266. req := &proto.ContentRequest{
  267. Contents: reqStr,
  268. }
  269. //ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
  270. //defer cancel()
  271. resp, err := client.Extract(context.Background(), req)
  272. if err != nil {
  273. return nil, errors.New("Deal Data Error")
  274. }
  275. result := make(map[string]interface{})
  276. if json.Unmarshal([]byte(resp.Results), &result) != nil {
  277. return nil, errors.New("Json Unmarshal Error")
  278. }
  279. if time.Since(start).Minutes() > 5 {
  280. // py接口字段识别超过5分钟
  281. log.Info("rpcGetFieldR 字段识别超过5min", zap.Any("serve", "extract_expert_service"), zap.Any("reqStr", reqStr), zap.Any("ip+port", addr))
  282. }
  283. return result, nil
  284. }
  285. //定时检测异常数据量,发告警
  286. func CheckErrorNum() {
  287. defer util.Catch()
  288. for {
  289. warnText := ""
  290. //消息治理中心服务连接异常告警
  291. if IpDialErrNum > 3 {
  292. warnText += "消息治理中心连接失败次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  293. }
  294. //获取治理中心IP异常告警
  295. if IpDialErrNum > 5 {
  296. warnText += "获取IP异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  297. }
  298. //连接IP服务异常告警
  299. if IpDialErrNum > 5 {
  300. warnText += "IP连接异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  301. }
  302. if warnText != "" {
  303. SendWarnInfo(warnText)
  304. }
  305. time.Sleep(30 * time.Minute)
  306. }
  307. }
  308. var TextModel = `{
  309. "msgtype": "text",
  310. "text": {
  311. "content": "%s",
  312. "mentioned_mobile_list":[%s]
  313. }
  314. }`
  315. //告警信息
  316. func SendWarnInfo(content string) {
  317. defer util.Catch()
  318. toUserMsg := fmt.Sprintf(TextModel, content, "13373929153,15090279371") //李俊亮、王江含
  319. log.Info("SendWarnInfo", zap.String("toUserMsg", toUserMsg))
  320. resp1, err := http.Post(
  321. "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=45962efc-ca87-4996-9ffa-08bf6608ab7a",
  322. "application/json",
  323. bytes.NewBuffer([]byte(toUserMsg)),
  324. )
  325. if err != nil {
  326. fmt.Println("request error:", err)
  327. return
  328. }
  329. defer resp1.Body.Close()
  330. }
  331. var SP = make(chan bool, 5)
  332. var ErrorInfoCache = make(chan map[string]interface{}, 1000) //异常信息集合
  333. //批量保存异常信息
  334. func SaveErrorInfo() {
  335. log.Info("SaveErrorInfo 异常信息...")
  336. savearr := make([]map[string]interface{}, 200)
  337. indexh := 0
  338. for {
  339. select {
  340. case v := <-ErrorInfoCache:
  341. savearr[indexh] = v
  342. indexh++
  343. if indexh == 200 {
  344. SP <- true
  345. go func(savearr []map[string]interface{}) {
  346. defer func() {
  347. <-SP
  348. }()
  349. MgoB.SaveBulk("bidding_warn", savearr...)
  350. }(savearr)
  351. savearr = make([]map[string]interface{}, 200)
  352. indexh = 0
  353. }
  354. case <-time.After(1 * time.Minute):
  355. if indexh > 0 {
  356. SP <- true
  357. go func(savearr []map[string]interface{}) {
  358. defer func() {
  359. <-SP
  360. }()
  361. MgoB.SaveBulk("bidding_warn", savearr...)
  362. }(savearr[:indexh])
  363. savearr = make([]map[string]interface{}, 200)
  364. indexh = 0
  365. }
  366. }
  367. }
  368. }