main.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/robfig/cron/v3"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "go.uber.org/zap"
  7. utils "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "time"
  11. )
  12. var (
  13. MgoB *mongodb.MongodbSim
  14. MgoC *mongodb.MongodbSim
  15. Rest = make(map[string]interface{}, 0) //存储配置 栏目
  16. //// 更新mongo
  17. //updatePool = make(chan []map[string]interface{}, 5000)
  18. //千里马对应的招标 channel
  19. channels = []string{"招标公告", "重新招标", "意见征集", "招标预告", "信息变更", "答疑公告", "废标公告", "流标公告",
  20. "开标公示", "候选人公示", "中标通知", "合同公告", "验收合同", "违规公告", "其他公告", "预告", "公告", "变更", "结果", "其他"}
  21. )
  22. func main() {
  23. local, _ := time.LoadLocation("Asia/Shanghai")
  24. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  25. _, err := c.AddFunc(GF.Cron.Spec, getIndicators)
  26. if err != nil {
  27. log.Error("main", zap.Error(err))
  28. }
  29. log.Info("main", zap.String("spec", GF.Cron.Spec))
  30. c.Start()
  31. defer c.Stop()
  32. select {}
  33. }
  34. //获取数据指标数据
  35. func getIndicators() {
  36. // 获取昨天零点和今天零点的时间戳
  37. now := time.Now()
  38. start := GF.Cron.Start
  39. end := GF.Cron.End
  40. if start == 0 {
  41. start = -1
  42. }
  43. yesterday := time.Date(now.Year(), now.Month(), now.Day()+start, 0, 0, 0, 0, time.Local)
  44. today := time.Date(now.Year(), now.Month(), now.Day()+end, 0, 0, 0, 0, time.Local)
  45. //1.数据日采集量
  46. whereBidding := map[string]interface{}{
  47. "comeintime": map[string]interface{}{
  48. "$gt": yesterday.Unix(),
  49. "$lte": today.Unix(),
  50. },
  51. }
  52. biddingCount := MgoB.Count("bidding", whereBidding)
  53. if biddingCount == 0 {
  54. SendMail("数据昨日采为0", "请检查相关流程")
  55. }
  56. Rest["date"] = yesterday.Format("2006-01-02")
  57. Rest["数据日采集量"] = biddingCount
  58. log.Info("getIndicators", zap.Int("数据日采集量", biddingCount))
  59. //2. 统计爬虫总量
  60. whereT := map[string]interface{}{
  61. "state": map[string]interface{}{
  62. "$ne": []interface{}{4, 10},
  63. },
  64. }
  65. collectAll := MgoC.Count("luaconfig", whereT)
  66. Rest["爬虫总量"] = collectAll
  67. log.Info("getIndicators", zap.Int("爬虫总量", collectAll))
  68. //3. 爬虫异常数量
  69. whereCollectErr := map[string]interface{}{
  70. "l_comeintime": map[string]interface{}{
  71. "$gt": yesterday.Unix(),
  72. "$lte": today.Unix(),
  73. },
  74. }
  75. collectErrCount := MgoC.Count("task", whereCollectErr)
  76. Rest["爬虫日异常量"] = collectErrCount
  77. errPercentage := (float64(collectErrCount) / float64(collectAll)) * 100.0
  78. Rest["爬虫日异常量比例"] = fmt.Sprintf("%.2f%%", errPercentage)
  79. log.Info("getIndicators", zap.Int("爬虫日异常量", collectErrCount))
  80. //4.爬虫上架时效(小时)
  81. // 获取星期几
  82. //dayOfWeek := now.Weekday()
  83. // 判断是否为周一,每周日 统计一次 上周 周一到周日 爬虫上架时效
  84. //if dayOfWeek == time.Sunday {
  85. lastSunday := time.Date(today.Year(), today.Month(), today.Day()-13, 0, 0, 0, 0, time.Local)
  86. lastMonday := time.Date(today.Year(), today.Month(), today.Day()-6, 0, 0, 0, 0, time.Local)
  87. //fmt.Println(lastMonday)
  88. whereShelves := map[string]interface{}{
  89. "comeintime": map[string]interface{}{
  90. "$gt": lastSunday.Unix(),
  91. "$lte": lastMonday.Unix(),
  92. },
  93. }
  94. shelves, _ := MgoC.Find("luaconfig", whereShelves, nil, map[string]interface{}{"code": 1, "comeintime": 1}, false, -1, -1)
  95. if len(*shelves) > 0 {
  96. shelvesCount := int64(0)
  97. shelvesTime := int64(0)
  98. for _, v := range *shelves {
  99. code := utils.ObjToString(v["code"])
  100. shelveNew, _ := MgoC.FindOne("lua_logs_auditor_new", map[string]interface{}{"code": code, "types": "审核"})
  101. fmt.Println(shelveNew)
  102. if shelveNew == nil {
  103. continue
  104. } else {
  105. comeintimeNew := utils.Int64All((*shelveNew)["comeintime"])
  106. comeintime := utils.Int64All(v["comeintime"])
  107. if comeintimeNew == 0 {
  108. continue
  109. }
  110. shelvesTime = shelvesTime + comeintimeNew - comeintime
  111. shelvesCount++
  112. }
  113. }
  114. Rest["爬虫上架时效"] = (shelvesTime / shelvesCount) / 3600
  115. Rest["爬虫上架时效-详情"] = map[string]interface{}{
  116. "统计开始时间": lastSunday.Format("2006-01-02"),
  117. "统计结束时间": lastMonday.Format("2006-01-02"),
  118. }
  119. log.Info("getIndicators", zap.Any("爬虫上架时效", (shelvesTime/shelvesCount)/3600))
  120. }
  121. //}
  122. //5.竞品覆盖率,每周4统计上周的数据
  123. sessC := MgoC.GetMgoConn()
  124. defer MgoC.DestoryMongoConn(sessC)
  125. //if dayOfWeek == time.Thursday {
  126. //获取上周四,千里马的招标数据;然后获取标讯前后个3天,一共7天的所有数据,对比看标题或者项目名称是否存在
  127. lastWednesday := time.Date(today.Year(), today.Month(), today.Day()-8, 0, 0, 0, 0, time.Local)
  128. //lastThursday := time.Date(now.Year(), now.Month(), now.Day()-7, 0, 0, 0, 0, time.Local)
  129. whereQlm := map[string]interface{}{
  130. "publishtime": lastWednesday.Format("2006-01-02"),
  131. "site": "千里马",
  132. "channel": map[string]interface{}{
  133. "$in": channels,
  134. },
  135. }
  136. query := sessC.DB("qlm").C("data_merge").Find(whereQlm).Select(map[string]interface{}{"title": 1, "projectname": 1}).Iter()
  137. count := 0
  138. qlmDatas := make([]map[string]interface{}, 0)
  139. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  140. data := map[string]interface{}{
  141. "title": tmp["title"],
  142. "projectname": tmp["projectname"],
  143. }
  144. qlmDatas = append(qlmDatas, data)
  145. }
  146. log.Info("getIndicators", zap.Int("千里马上周三总数", count))
  147. biddingWhere := map[string]interface{}{
  148. "publishtime": map[string]interface{}{
  149. "$gt": lastWednesday.AddDate(0, 0, -3).Unix(),
  150. "$lte": lastWednesday.AddDate(0, 0, 4).Unix(),
  151. },
  152. }
  153. biddingDatas, _ := MgoB.Find("bidding", biddingWhere, nil, map[string]interface{}{"title": 1, "projectname": 1}, false, -1, -1)
  154. log.Info("getIndicators", zap.Int("标讯一周总数", len(*biddingDatas)))
  155. // 将切片B中的标题和项目名称分别存储在哈希表中
  156. titlesInB, projectsInB := getUniqueFields(*biddingDatas)
  157. matchs := countMatches(qlmDatas, titlesInB, projectsInB)
  158. Rest["竞品覆盖率-详情"] = map[string]interface{}{
  159. "date": lastWednesday.Format("2006-01-02"),
  160. "count": count,
  161. "matchs": matchs,
  162. }
  163. Rest["竞品覆盖率"] = fmt.Sprintf("%.2f%%", float64(matchs)/float64(count)*100)
  164. log.Info("getIndicators", zap.String("竞品覆盖率", fmt.Sprintf("%.2f%%", float64(matchs)/float64(count)*100)))
  165. //}
  166. //6.数据整体流程均耗时(分钟)
  167. sessB := MgoB.GetMgoConn()
  168. defer MgoB.DestoryMongoConn(sessB)
  169. fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
  170. queryB := sessB.DB("qfw").C("bidding").Find(whereBidding).Select(fd).Iter()
  171. biddingRealCount := 0
  172. pici_publish_totaltime := int64(0) //comeintime 和 生索引 publish 时间 差值的总和
  173. pici_comein_totaltime := int64(0) //publishtime 和 生索引 pici 时间 差值的总和
  174. for tmp := make(map[string]interface{}); queryB.Next(tmp); {
  175. if utils.IntAll(tmp["extracttype"]) != -1 && utils.ObjToString(tmp["sensitive"]) != "测试" && utils.IntAll(tmp["dataging"]) != 1 && utils.Float64All(tmp["infoformat"]) != 3 {
  176. comeintime := utils.Int64All(tmp["comeintime"])
  177. publishtime := utils.Int64All(tmp["publishtime"])
  178. pici := utils.Int64All(tmp["pici"])
  179. if (comeintime-publishtime) < 12*60*60 && pici > 0 {
  180. biddingRealCount++
  181. diff1 := pici - publishtime
  182. diff2 := pici - comeintime
  183. pici_publish_totaltime += diff1
  184. pici_comein_totaltime += diff2
  185. }
  186. }
  187. }
  188. if biddingRealCount > 0 {
  189. pici_publish_avgtime := pici_publish_totaltime / int64(biddingRealCount)
  190. pici_comein_avgtime := pici_comein_totaltime / int64(biddingRealCount)
  191. Rest["数据整体流程均耗时(分钟)"] = fmt.Sprintf("%.2f", float64(pici_publish_avgtime)/float64(60))
  192. Rest["数据处理均耗时(分钟)"] = fmt.Sprintf("%.2f", float64(pici_comein_avgtime)/float64(60))
  193. Rest["数据采集均耗时(分钟)"] = fmt.Sprintf("%.2f", float64(pici_publish_avgtime-pici_comein_avgtime)/float64(60))
  194. }
  195. //7.数据行质量合格率,暂时写死
  196. Rest["数据行质量合格率"] = "90%"
  197. MgoB.Save("bidding_zhibiao", Rest)
  198. fmt.Println("over")
  199. }