task.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "field-dispose/config"
  8. "field-dispose/proto"
  9. "fmt"
  10. "go.uber.org/zap"
  11. "google.golang.org/grpc"
  12. "google.golang.org/grpc/credentials/insecure"
  13. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  15. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  16. "net/http"
  17. "strings"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. service "jygit.jydev.jianyu360.cn/BP/servicerd/proto"
  22. )
  23. var IpDialErrNum, IpGetErrNum, ExtractDialErrNum = int64(0), int64(0), int64(0)
  24. // @Description 处理字段procurementlist、review_experts
  25. // @Author J 2022/8/31 14:57
  26. func getIntention(mapinfo map[string]interface{}) {
  27. defer util.Catch()
  28. gtid, _ := mapinfo["gtid"].(string)
  29. lteid, _ := mapinfo["lteid"].(string)
  30. //MgoB.Update("bidding_processing_ids", bson.M{"gtid": gtid}, bson.M{"$set": bson.M{"dataprocess": 2, "updatetime": time.Now().Unix()}}, false, false)
  31. sess := MgoB.GetMgoConn()
  32. defer MgoB.DestoryMongoConn(sess)
  33. ch := make(chan bool, config.Conf.Serve.Thread)
  34. wg := &sync.WaitGroup{}
  35. query := map[string]interface{}{
  36. "_id": map[string]interface{}{
  37. "$gt": mongodb.StringTOBsonId(gtid),
  38. "$lte": mongodb.StringTOBsonId(lteid),
  39. },
  40. }
  41. field := map[string]interface{}{
  42. "toptype": 1,
  43. "attach_text": 1,
  44. "contenthtml": 1,
  45. "site": 1,
  46. "detail": 1,
  47. }
  48. log.Info(fmt.Sprintf("count --- %d", MgoB.Count("bidding", query)))
  49. it := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(&query).Select(&field).Iter()
  50. count := 0
  51. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  52. if count%200 == 0 {
  53. log.Info("getIntention", zap.Int("current:", count))
  54. }
  55. ch <- true
  56. wg.Add(1)
  57. go func(tmp map[string]interface{}) {
  58. defer func() {
  59. <-ch
  60. wg.Done()
  61. }()
  62. id := mongodb.BsonIdToSId(tmp["_id"])
  63. update := make(map[string]interface{})
  64. result2 := taskB(tmp, gtid, lteid)
  65. result1 := taskA(tmp, gtid, lteid)
  66. if r, ok := result2["result"].(map[string]interface{}); ok {
  67. if r[id] != nil && len(r[id].([]interface{})) > 0 {
  68. update["review_experts"] = strings.Join(util.ObjArrToStringArr(r[id].([]interface{})), ",")
  69. }
  70. }
  71. if result1 != nil && len(result1) > 0 {
  72. if result1["purchasinglist"] != nil && len(result1["purchasinglist"].([]interface{})) > 0 {
  73. update["purchasinglist"] = result1["purchasinglist"]
  74. update["purchasing"] = result1["purchasing"]
  75. }
  76. if result1["procurementlist"] != nil {
  77. update["procurementlist"] = result1["procurementlist"]
  78. }
  79. }
  80. if len(update) > 0 {
  81. updatePool <- []map[string]interface{}{
  82. {"_id": mongodb.StringTOBsonId(id)},
  83. {"$set": update},
  84. }
  85. //updateEsPool <- []map[string]interface{}{{
  86. // "_id": id,
  87. //},
  88. // update,
  89. //}
  90. }
  91. }(tmp)
  92. tmp = map[string]interface{}{}
  93. }
  94. wg.Wait()
  95. log.Info("dispose over...", zap.Int("count:", count), zap.String("gtid:", gtid), zap.String("lteid:", lteid))
  96. NextNode(mapinfo)
  97. }
  98. // @Description procurementlist、purchasinglist
  99. // @Author J 2022/8/31 15:28
  100. func taskA(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
  101. id := mongodb.BsonIdToSId(tmp["_id"])
  102. delete(tmp, "_id")
  103. delete(tmp, "detail")
  104. reqStr, err := json.Marshal(tmp)
  105. if err != nil {
  106. ErrorInfoCache <- map[string]interface{}{
  107. "err": "Json Marshal Error",
  108. "type": "采购意向/标的物",
  109. "id": id,
  110. "comeintime": time.Now().Unix(),
  111. "ok": false,
  112. "gtid": gtid,
  113. "letid": lteid,
  114. }
  115. return nil
  116. }
  117. //处理数据
  118. result, err := rpcGetFieldP(string(reqStr), id)
  119. if err != nil { //保存处理异常信息
  120. ErrorInfoCache <- map[string]interface{}{
  121. "err": err.Error(),
  122. "type": "采购意向/标的物",
  123. "id": id,
  124. "comeintime": time.Now().Unix(),
  125. "ok": false,
  126. "gtid": gtid,
  127. "letid": lteid,
  128. }
  129. }
  130. return result
  131. }
  132. // @Description review_experts
  133. // @Author J 2022/8/31 15:28
  134. func taskB(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
  135. id := mongodb.BsonIdToSId(tmp["_id"])
  136. reqStr, err := json.Marshal(map[string]interface{}{id: tmp["detail"]})
  137. if err != nil {
  138. ErrorInfoCache <- map[string]interface{}{
  139. "err": "Json Marshal Error",
  140. "type": "评标专家字段",
  141. "id": id,
  142. "comeintime": time.Now().Unix(),
  143. "ok": false,
  144. "gtid": gtid,
  145. "letid": lteid,
  146. }
  147. return nil
  148. }
  149. //处理数据
  150. result, err := rpcGetFieldR(string(reqStr), id)
  151. if err != nil {
  152. ErrorInfoCache <- map[string]interface{}{
  153. "err": err.Error(),
  154. "type": "评标专家字段",
  155. "id": id,
  156. "comeintime": time.Now().Unix(),
  157. "ok": false,
  158. "gtid": gtid,
  159. "letid": lteid,
  160. }
  161. }
  162. return result
  163. }
  164. func rpcGetFieldP(reqStr, id string) (map[string]interface{}, error) {
  165. defer util.Catch()
  166. //获取ip、port服务
  167. ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  168. if ipErr != nil {
  169. atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1
  170. return nil, errors.New("Ip Grpc Dial Error")
  171. }
  172. atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置
  173. defer ipConn.Close()
  174. ipClient := service.NewServiceClient(ipConn)
  175. ip := ""
  176. port := -1
  177. //重试获取ip、port
  178. if Skipping {
  179. for i := 1; i <= 3; i++ {
  180. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 2})
  181. if err != nil {
  182. continue
  183. } else {
  184. ip = repl.Ip
  185. port = int(repl.Port)
  186. break
  187. }
  188. }
  189. } else {
  190. for {
  191. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 2})
  192. if err != nil {
  193. continue
  194. } else {
  195. ip = repl.Ip
  196. port = int(repl.Port)
  197. break
  198. }
  199. }
  200. }
  201. if ip == "" || port == -1 { //重试三次,回去ip、port失败
  202. atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1
  203. return nil, errors.New("Get Ip Error")
  204. }
  205. atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
  206. //处理数据
  207. addr := ip + ":" + fmt.Sprint(port)
  208. conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  209. if err != nil {
  210. atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
  211. return nil, errors.New("Extract Grpc Dial Error,addr:" + addr)
  212. }
  213. atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置
  214. defer conn.Close()
  215. client := proto.NewGoodsExtractClient(conn)
  216. req := &proto.GoodsRequest{
  217. Contents: reqStr,
  218. }
  219. ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
  220. go func(ctx context.Context) {
  221. select {
  222. case <-ctx.Done():
  223. //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
  224. case <-time.After(time.Second * 30):
  225. // 超时处理
  226. //log.Info("rpcGetFieldP 字段识别超过2min", zap.Any("serve", "goods_service"), zap.String("id", id), zap.Any("ip+port", addr))
  227. //ErrorInfoCache <- map[string]interface{}{
  228. // "err": "接口处理超过2min",
  229. // "type": "采购意向/标的物",
  230. // "id": id,
  231. // "comeintime": time.Now().Unix(),
  232. // "ok": false,
  233. // "gtid": id,
  234. // "letid": id,
  235. //}
  236. }
  237. }(ctx)
  238. resp, err := client.GoodsExtract(ctx, req)
  239. cancel()
  240. if err != nil {
  241. //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
  242. return nil, err
  243. }
  244. result := map[string]interface{}{}
  245. if json.Unmarshal([]byte(resp.Goods), &result) != nil {
  246. //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
  247. return nil, errors.New("Json Unmarshal Error")
  248. }
  249. // 服务中心释放服务
  250. //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
  251. return result, nil
  252. }
  253. func rpcGetFieldR(reqStr, id string) (map[string]interface{}, error) {
  254. defer util.Catch()
  255. //获取ip、port服务
  256. ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  257. if ipErr != nil {
  258. atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1
  259. return nil, errors.New("Ip Grpc Dial Error")
  260. }
  261. atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置
  262. defer ipConn.Close()
  263. ipClient := service.NewServiceClient(ipConn)
  264. ip := ""
  265. port := -1
  266. //重试获取ip、port
  267. if Skipping {
  268. for i := 1; i <= 3; i++ {
  269. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0})
  270. if err != nil {
  271. continue
  272. } else {
  273. ip = repl.Ip
  274. port = int(repl.Port)
  275. break
  276. }
  277. }
  278. } else {
  279. for {
  280. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0})
  281. if err != nil {
  282. continue
  283. } else {
  284. ip = repl.Ip
  285. port = int(repl.Port)
  286. break
  287. }
  288. }
  289. }
  290. if ip == "" || port == -1 { //重试三次,回去ip、port失败
  291. atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1
  292. return nil, errors.New("Get Ip Error")
  293. }
  294. atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
  295. //处理数据
  296. addr := ip + ":" + fmt.Sprint(port)
  297. conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  298. if err != nil {
  299. atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
  300. return nil, errors.New("Extract Grpc Dial Error,addr:" + addr)
  301. }
  302. atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置
  303. defer conn.Close()
  304. client := proto.NewExistsExpertClient(conn)
  305. req := &proto.ContentRequest{
  306. Contents: reqStr,
  307. }
  308. ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
  309. go func(ctx context.Context) {
  310. select {
  311. case <-ctx.Done():
  312. case <-time.After(time.Second * 30):
  313. // 超时处理
  314. //log.Info("rpcGetFieldP 字段识别超过2min", zap.Any("serve", "extract_expert_service"), zap.String("id", id), zap.Any("ip+port", addr))
  315. }
  316. }(ctx)
  317. resp, err := client.Extract(ctx, req)
  318. cancel()
  319. if err != nil {
  320. return nil, err
  321. }
  322. result := make(map[string]interface{})
  323. if json.Unmarshal([]byte(resp.Results), &result) != nil {
  324. return nil, errors.New("Json Unmarshal Error")
  325. }
  326. return result, nil
  327. }
  328. // 定时检测异常数据量,发告警
  329. func CheckErrorNum() {
  330. defer util.Catch()
  331. for {
  332. warnText := ""
  333. //消息治理中心服务连接异常告警
  334. if IpDialErrNum > 3 {
  335. warnText += "消息治理中心连接失败次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  336. }
  337. //获取治理中心IP异常告警
  338. if IpDialErrNum > 5 {
  339. warnText += "获取IP异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  340. }
  341. //连接IP服务异常告警
  342. if IpDialErrNum > 5 {
  343. warnText += "IP连接异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  344. }
  345. if warnText != "" {
  346. SendWarnInfo(warnText)
  347. }
  348. time.Sleep(30 * time.Minute)
  349. }
  350. }
  351. var TextModel = `{
  352. "msgtype": "text",
  353. "text": {
  354. "content": "%s",
  355. "mentioned_mobile_list":[%s]
  356. }
  357. }`
  358. // 告警信息
  359. func SendWarnInfo(content string) {
  360. defer util.Catch()
  361. toUserMsg := fmt.Sprintf(TextModel, content, "13373929153,15090279371") //李俊亮、王江含
  362. log.Info("SendWarnInfo", zap.String("toUserMsg", toUserMsg))
  363. resp1, err := http.Post(
  364. "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=45962efc-ca87-4996-9ffa-08bf6608ab7a",
  365. "application/json",
  366. bytes.NewBuffer([]byte(toUserMsg)),
  367. )
  368. if err != nil {
  369. fmt.Println("request error:", err)
  370. return
  371. }
  372. defer resp1.Body.Close()
  373. }
  374. var SP = make(chan bool, 5)
  375. var ErrorInfoCache = make(chan map[string]interface{}, 1000) //异常信息集合
  376. // 批量保存异常信息
  377. func SaveErrorInfo() {
  378. log.Info("SaveErrorInfo 异常信息...")
  379. savearr := make([]map[string]interface{}, 200)
  380. indexh := 0
  381. for {
  382. select {
  383. case v := <-ErrorInfoCache:
  384. savearr[indexh] = v
  385. indexh++
  386. if indexh == 200 {
  387. SP <- true
  388. go func(savearr []map[string]interface{}) {
  389. defer func() {
  390. <-SP
  391. }()
  392. MgoB.SaveBulk("bidding_warn_1", savearr...)
  393. }(savearr)
  394. savearr = make([]map[string]interface{}, 200)
  395. indexh = 0
  396. }
  397. case <-time.After(1 * time.Minute):
  398. if indexh > 0 {
  399. SP <- true
  400. go func(savearr []map[string]interface{}) {
  401. defer func() {
  402. <-SP
  403. }()
  404. MgoB.SaveBulk("bidding_warn_1", savearr...)
  405. }(savearr[:indexh])
  406. savearr = make([]map[string]interface{}, 200)
  407. indexh = 0
  408. }
  409. }
  410. }
  411. }