task.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "field-dispose/config"
  8. "field-dispose/proto"
  9. "fmt"
  10. "go.mongodb.org/mongo-driver/bson"
  11. "go.uber.org/zap"
  12. "google.golang.org/grpc"
  13. "google.golang.org/grpc/credentials/insecure"
  14. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  15. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  16. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  17. "net/http"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. service "jygit.jydev.jianyu360.cn/BP/servicerd/proto"
  23. )
  24. var IpDialErrNum, IpGetErrNum, ExtractDialErrNum = int64(0), int64(0), int64(0)
  25. // @Description 处理字段procurementlist、review_experts
  26. // @Author J 2022/8/31 14:57
  27. func getIntention(mapinfo map[string]interface{}) {
  28. defer util.Catch()
  29. gtid, _ := mapinfo["gtid"].(string)
  30. lteid, _ := mapinfo["lteid"].(string)
  31. sess := MgoB.GetMgoConn()
  32. defer MgoB.DestoryMongoConn(sess)
  33. ch := make(chan bool, config.Conf.Serve.Thread)
  34. wg := &sync.WaitGroup{}
  35. query := map[string]interface{}{
  36. "_id": map[string]interface{}{
  37. "$gt": mongodb.StringTOBsonId(gtid),
  38. "$lte": mongodb.StringTOBsonId(lteid),
  39. },
  40. }
  41. field := map[string]interface{}{
  42. "toptype": 1,
  43. "title": 1,
  44. "attach_text": 1,
  45. "contenthtml": 1,
  46. "site": 1,
  47. "detail": 1,
  48. }
  49. log.Info(fmt.Sprintf("count --- %d", MgoB.Count("bidding", query)))
  50. it := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(query).Select(&field).Iter()
  51. count := 0
  52. clock := sync.RWMutex{}
  53. c1, c2 := 0, 0
  54. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  55. if count%200 == 0 {
  56. log.Info("getIntention", zap.Int("current:", count))
  57. }
  58. ch <- true
  59. wg.Add(1)
  60. go func(tmp map[string]interface{}) {
  61. defer func() {
  62. <-ch
  63. wg.Done()
  64. }()
  65. id := mongodb.BsonIdToSId(tmp["_id"])
  66. update := make(map[string]interface{})
  67. result2 := taskB(tmp, gtid, lteid)
  68. result1 := taskA(tmp, gtid, lteid)
  69. if r, ok := result2["result"].(map[string]interface{}); ok {
  70. if r[id] != nil && len(r[id].([]interface{})) > 0 {
  71. update["review_experts"] = strings.Join(util.ObjArrToStringArr(r[id].([]interface{})), ",")
  72. }
  73. }
  74. if result1 != nil && len(result1) > 0 {
  75. clock.Lock()
  76. if result1["purchasinglist"] != nil && len(result1["purchasinglist"].([]interface{})) > 0 {
  77. c1++
  78. update["purchasinglist"] = result1["purchasinglist"]
  79. update["purchasing"] = result1["purchasing"]
  80. }
  81. if result1["procurementlist"] != nil {
  82. c2++
  83. update["procurementlist"] = result1["procurementlist"]
  84. }
  85. if result1["purchasingsource"] != nil {
  86. update["purchasingsource"] = result1["purchasingsource"]
  87. }
  88. clock.Unlock()
  89. }
  90. if len(update) > 0 {
  91. updatePool <- []map[string]interface{}{
  92. {"_id": mongodb.StringTOBsonId(id)},
  93. {"$set": update},
  94. }
  95. //updateEsPool <- []map[string]interface{}{{
  96. // "_id": id,
  97. //},
  98. // update,
  99. //}
  100. }
  101. }(tmp)
  102. tmp = map[string]interface{}{}
  103. }
  104. wg.Wait()
  105. log.Info("dispose over...", zap.Int("count:", count), zap.String("gtid:", gtid), zap.String("lteid:", lteid))
  106. MgoB.Update("bidding_processing_ids", bson.M{"gtid": gtid}, bson.M{"$set": bson.M{"dataprocess": 2, "updatetime": time.Now().Unix(), "pu_size": c1, "pr_size": c2}}, false, false)
  107. NextNode(mapinfo)
  108. }
  109. // @Description procurementlist、purchasinglist
  110. // @Author J 2022/8/31 15:28
  111. func taskA(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
  112. id := mongodb.BsonIdToSId(tmp["_id"])
  113. delete(tmp, "_id")
  114. delete(tmp, "detail")
  115. reqStr, err := json.Marshal(tmp)
  116. if err != nil {
  117. ErrorInfoCache <- map[string]interface{}{
  118. "err": "Json Marshal Error",
  119. "type": "采购意向/标的物",
  120. "id": id,
  121. "comeintime": time.Now().Unix(),
  122. "ok": false,
  123. "gtid": gtid,
  124. "letid": lteid,
  125. }
  126. return nil
  127. }
  128. //处理数据
  129. result, err := rpcGetFieldP(string(reqStr), id)
  130. if err != nil { //保存处理异常信息
  131. ErrorInfoCache <- map[string]interface{}{
  132. "err": err.Error(),
  133. "type": "采购意向/标的物",
  134. "id": id,
  135. "comeintime": time.Now().Unix(),
  136. "ok": false,
  137. "gtid": gtid,
  138. "letid": lteid,
  139. }
  140. }
  141. return result
  142. }
  143. // @Description review_experts
  144. // @Author J 2022/8/31 15:28
  145. func taskB(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
  146. id := mongodb.BsonIdToSId(tmp["_id"])
  147. reqStr, err := json.Marshal(map[string]interface{}{id: tmp["detail"]})
  148. if err != nil {
  149. ErrorInfoCache <- map[string]interface{}{
  150. "err": "Json Marshal Error",
  151. "type": "评标专家字段",
  152. "id": id,
  153. "comeintime": time.Now().Unix(),
  154. "ok": false,
  155. "gtid": gtid,
  156. "letid": lteid,
  157. }
  158. return nil
  159. }
  160. //处理数据
  161. result, err := rpcGetFieldR(string(reqStr), id)
  162. if err != nil {
  163. ErrorInfoCache <- map[string]interface{}{
  164. "err": err.Error(),
  165. "type": "评标专家字段",
  166. "id": id,
  167. "comeintime": time.Now().Unix(),
  168. "ok": false,
  169. "gtid": gtid,
  170. "letid": lteid,
  171. }
  172. }
  173. return result
  174. }
  175. func rpcGetFieldP(reqStr, id string) (map[string]interface{}, error) {
  176. defer util.Catch()
  177. //获取ip、port服务
  178. ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  179. if ipErr != nil {
  180. atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1
  181. return nil, errors.New("Ip Grpc Dial Error")
  182. }
  183. atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置
  184. defer ipConn.Close()
  185. ipClient := service.NewServiceClient(ipConn)
  186. ip := ""
  187. port := -1
  188. //重试获取ip、port
  189. if Skipping {
  190. for i := 1; i <= 3; i++ {
  191. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 3})
  192. if err != nil {
  193. continue
  194. } else {
  195. ip = repl.Ip
  196. port = int(repl.Port)
  197. break
  198. }
  199. }
  200. } else {
  201. for {
  202. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 3})
  203. if err != nil {
  204. continue
  205. } else {
  206. ip = repl.Ip
  207. port = int(repl.Port)
  208. break
  209. }
  210. }
  211. }
  212. if ip == "" || port == -1 { //重试三次,回去ip、port失败
  213. atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1
  214. return nil, errors.New("Get Ip Error")
  215. }
  216. atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
  217. //处理数据
  218. addr := ip + ":" + fmt.Sprint(port)
  219. conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  220. if err != nil {
  221. atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
  222. return nil, errors.New("Extract Grpc Dial Error,addr:" + addr)
  223. }
  224. atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置
  225. defer conn.Close()
  226. client := proto.NewGoodsExtractClient(conn)
  227. req := &proto.GoodsRequest{
  228. Contents: reqStr,
  229. }
  230. ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
  231. go func(ctx context.Context) {
  232. select {
  233. case <-ctx.Done():
  234. _, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
  235. case <-time.After(time.Second * 30):
  236. // 超时处理
  237. //log.Info("rpcGetFieldP 字段识别超过2min", zap.Any("serve", "goods_service"), zap.String("id", id), zap.Any("ip+port", addr))
  238. //ErrorInfoCache <- map[string]interface{}{
  239. // "err": "接口处理超过2min",
  240. // "type": "采购意向/标的物",
  241. // "id": id,
  242. // "comeintime": time.Now().Unix(),
  243. // "ok": false,
  244. // "gtid": id,
  245. // "letid": id,
  246. //}
  247. }
  248. }(ctx)
  249. resp, err := client.GoodsExtract(ctx, req)
  250. cancel()
  251. if err != nil {
  252. //_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
  253. return nil, errors.Join(err, errors.New(fmt.Sprintf("%s:%d", ip, port)))
  254. }
  255. result := map[string]interface{}{}
  256. if json.Unmarshal([]byte(resp.Goods), &result) != nil {
  257. _, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
  258. return nil, errors.New("Json Unmarshal Error")
  259. }
  260. // 服务中心释放服务
  261. _, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
  262. return result, nil
  263. }
  264. func rpcGetFieldR(reqStr, id string) (map[string]interface{}, error) {
  265. defer util.Catch()
  266. //获取ip、port服务
  267. ipConn, ipErr := grpc.Dial(config.Conf.Serve.GrpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  268. if ipErr != nil {
  269. atomic.AddInt64(&IpDialErrNum, 1) //异常次数+1
  270. return nil, errors.New("Ip Grpc Dial Error")
  271. }
  272. atomic.StoreInt64(&IpDialErrNum, 0) //异常次数重置
  273. defer ipConn.Close()
  274. ipClient := service.NewServiceClient(ipConn)
  275. ip := ""
  276. port := -1
  277. //重试获取ip、port
  278. if Skipping {
  279. for i := 1; i <= 3; i++ {
  280. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0})
  281. if err != nil {
  282. continue
  283. } else {
  284. ip = repl.Ip
  285. port = int(repl.Port)
  286. break
  287. }
  288. }
  289. } else {
  290. for {
  291. repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0})
  292. if err != nil {
  293. continue
  294. } else {
  295. ip = repl.Ip
  296. port = int(repl.Port)
  297. break
  298. }
  299. }
  300. }
  301. if ip == "" || port == -1 { //重试三次,回去ip、port失败
  302. atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1
  303. return nil, errors.New("Get Ip Error")
  304. }
  305. atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
  306. //处理数据
  307. addr := ip + ":" + fmt.Sprint(port)
  308. conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  309. if err != nil {
  310. atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
  311. return nil, errors.New("Extract Grpc Dial Error,addr:" + addr)
  312. }
  313. atomic.StoreInt64(&ExtractDialErrNum, 0) //异常次数重置
  314. defer conn.Close()
  315. client := proto.NewExistsExpertClient(conn)
  316. req := &proto.ContentRequest{
  317. Contents: reqStr,
  318. }
  319. ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
  320. go func(ctx context.Context) {
  321. select {
  322. case <-ctx.Done():
  323. case <-time.After(time.Second * 30):
  324. // 超时处理
  325. //log.Info("rpcGetFieldP 字段识别超过2min", zap.Any("serve", "extract_expert_service"), zap.String("id", id), zap.Any("ip+port", addr))
  326. }
  327. }(ctx)
  328. resp, err := client.Extract(ctx, req)
  329. cancel()
  330. if err != nil {
  331. return nil, err
  332. }
  333. result := make(map[string]interface{})
  334. if json.Unmarshal([]byte(resp.Results), &result) != nil {
  335. return nil, errors.New("Json Unmarshal Error")
  336. }
  337. return result, nil
  338. }
  339. // 定时检测异常数据量,发告警
  340. func CheckErrorNum() {
  341. defer util.Catch()
  342. for {
  343. warnText := ""
  344. //消息治理中心服务连接异常告警
  345. if IpDialErrNum > 3 {
  346. warnText += "消息治理中心连接失败次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  347. }
  348. //获取治理中心IP异常告警
  349. if IpDialErrNum > 5 {
  350. warnText += "获取IP异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  351. }
  352. //连接IP服务异常告警
  353. if IpDialErrNum > 5 {
  354. warnText += "IP连接异常次数:" + fmt.Sprint(IpDialErrNum) + ";\n"
  355. }
  356. if warnText != "" {
  357. SendWarnInfo(warnText)
  358. }
  359. time.Sleep(30 * time.Minute)
  360. }
  361. }
  362. var TextModel = `{
  363. "msgtype": "text",
  364. "text": {
  365. "content": "%s",
  366. "mentioned_mobile_list":[%s]
  367. }
  368. }`
  369. // 告警信息
  370. func SendWarnInfo(content string) {
  371. defer util.Catch()
  372. toUserMsg := fmt.Sprintf(TextModel, content, "13373929153,15090279371") //李俊亮、王江含
  373. log.Info("SendWarnInfo", zap.String("toUserMsg", toUserMsg))
  374. resp1, err := http.Post(
  375. "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=45962efc-ca87-4996-9ffa-08bf6608ab7a",
  376. "application/json",
  377. bytes.NewBuffer([]byte(toUserMsg)),
  378. )
  379. if err != nil {
  380. fmt.Println("request error:", err)
  381. return
  382. }
  383. defer resp1.Body.Close()
  384. }
  385. var SP = make(chan bool, 5)
  386. var ErrorInfoCache = make(chan map[string]interface{}, 1000) //异常信息集合
  387. // 批量保存异常信息
  388. func SaveErrorInfo() {
  389. log.Info("SaveErrorInfo 异常信息...")
  390. savearr := make([]map[string]interface{}, 200)
  391. indexh := 0
  392. for {
  393. select {
  394. case v := <-ErrorInfoCache:
  395. savearr[indexh] = v
  396. indexh++
  397. if indexh == 200 {
  398. SP <- true
  399. go func(savearr []map[string]interface{}) {
  400. defer func() {
  401. <-SP
  402. }()
  403. MgoB.SaveBulk("bidding_py_err", savearr...)
  404. }(savearr)
  405. savearr = make([]map[string]interface{}, 200)
  406. indexh = 0
  407. }
  408. case <-time.After(1 * time.Minute):
  409. if indexh > 0 {
  410. SP <- true
  411. go func(savearr []map[string]interface{}) {
  412. defer func() {
  413. <-SP
  414. }()
  415. MgoB.SaveBulk("bidding_py_err", savearr...)
  416. }(savearr[:indexh])
  417. savearr = make([]map[string]interface{}, 200)
  418. indexh = 0
  419. }
  420. }
  421. }
  422. }