package main import ( "fmt" "github.com/robfig/cron/v3" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" utils "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "time" ) var ( MgoB *mongodb.MongodbSim MgoC *mongodb.MongodbSim Rest = make(map[string]interface{}, 0) //存储配置 栏目 // 更新mongo //千里马对应的标讯 channel channels = []string{"招标公告", "重新招标", "意见征集", "招标预告", "信息变更", "答疑公告", "废标公告", "流标公告", "开标公示", "候选人公示", "中标通知", "合同公告", "验收合同", "违规公告", "其他公告", "预告", "公告", "变更", "结果", "其他"} channels2 = []string{"可研", "立项", "核准", "备案", "环评", "审批", "施工许可"} // 拟建数据 channels3 = []string{"国土"} //产权数据 //标讯数据细分,招标预告、招标公告、结果公告 predictionChannels = []string{"预告", "招标预告", "意见征集"} //招标预告 biddingChannels = []string{"公告", "变更", "招标公告", "重新招标", "信息变更", "答疑公告"} //招标公告 resultChannels = []string{"废标公告", "流标公告", "结果", "开标公示", "候选人公示", "中标通知", "合同公告"} //结果公告 Yesterday time.Time Today time.Time dataSource = make(map[string]interface{}, 0) //数据源收录指标 dataCollection = make(map[string]interface{}, 0) //数据采集指标 dataCompete = make(map[string]interface{}, 0) //竞品对比指标 dataTime = make(map[string]interface{}, 0) //数据时效指标 dataQuality = make(map[string]interface{}, 0) //数据质量指标 ) func main() { local, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(local), cron.WithSeconds()) _, err := c.AddFunc(GF.Cron.Spec, getIndicators) if err != nil { log.Error("main", zap.Error(err)) } log.Info("main", zap.String("spec", GF.Cron.Spec)) c.Start() defer c.Stop() select {} } // getIndicators 获取数据指标数据 func getIndicators() { // 获取昨天零点和今天零点的时间戳 now := time.Now() start := GF.Cron.Start end := GF.Cron.End if start == 0 { start = -1 } Yesterday = time.Date(now.Year(), now.Month(), now.Day()+start, 0, 0, 0, 0, time.Local) Today = time.Date(now.Year(), now.Month(), now.Day()+end, 0, 0, 0, 0, time.Local) dataSource = make(map[string]interface{}, 0) //数据源收录指标 dataCollection = make(map[string]interface{}, 0) //数据采集指标 dataCompete = make(map[string]interface{}, 0) //竞品对比指标 dataTime = make(map[string]interface{}, 0) //数据时效指标 dataQuality = make(map[string]interface{}, 0) //数据质量指标 //1. 数据采集指标 getCollection() //2.统计竞品对比指标 dayOfWeek := Today.Weekday() if dayOfWeek == time.Wednesday || GF.Cron.Week { coverageA() coverageB() } //3.数据时效指标 getTimeLines() //4.数据行质量合格率,暂时写死 dataQuality["数据行质量合格率"] = GF.Cron.QualityRate //5.统计 数据源收录指标 getCollectionData() Rest["数据源收录指标"] = dataSource Rest["数据采集指标"] = dataCollection Rest["竞品对比指标"] = dataCompete Rest["数据时效指标"] = dataTime Rest["数据质量指标"] = dataQuality Rest["日期"] = Yesterday.Format("2006-01-02") MgoB.Save("bidding_zhibiao", Rest) fmt.Println("over") } // getCollection 获取数据采集指标 func getCollection() { //1.数据日采集量 whereBidding := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": Yesterday.Unix(), "$lte": Today.Unix(), }, } biddingCount := MgoB.Count("bidding", whereBidding) if biddingCount == 0 { SendMail("数据昨日采为0", "请检查相关流程") return } dataCollection["数据采集日采集量"] = biddingCount log.Info("getCollection", zap.Int("数据日采集量", biddingCount)) //2. 统计爬虫总量 whereT := map[string]interface{}{ "state": map[string]interface{}{ "$ne": []interface{}{4, 10}, }, } collectAll := MgoC.Count("luaconfig", whereT) dataCollection["爬虫总量"] = collectAll log.Info("getCollection", zap.Int("爬虫总量", collectAll)) //3. 爬虫异常数量 whereCollectErr := map[string]interface{}{ "l_comeintime": map[string]interface{}{ "$gt": Yesterday.Unix(), "$lte": Today.Unix(), }, } collectErrCount := MgoC.Count("task", whereCollectErr) dataCollection["爬虫日异常量"] = collectErrCount errPercentage := (float64(collectErrCount) / float64(collectAll)) * 100.0 dataCollection["爬虫日异常量比例"] = fmt.Sprintf("%.2f%%", errPercentage) log.Info("getCollection", zap.Int("爬虫日异常量", collectErrCount)) //4.爬虫上架时效(小时) dayOfWeek := Today.Weekday() // 获取星期几 lastSunday := time.Date(Today.Year(), Today.Month(), Today.Day()-1, 0, 0, 0, 0, time.Local) //上周日 lastMonday := time.Date(Today.Year(), Today.Month(), Today.Day()-7, 0, 0, 0, 0, time.Local) //上周一 //4. 周一或者强制统计,出上周一到周日的 爬虫上架时效/爬虫维护时效(小时) if dayOfWeek == time.Monday || GF.Cron.Week { //4.1 爬虫上架时效(小时) whereShelves := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": lastMonday.Unix(), "$lte": lastSunday.Unix(), }, } shelves, _ := MgoC.Find("luaconfig", whereShelves, nil, map[string]interface{}{"code": 1, "comeintime": 1}, false, -1, -1) if len(*shelves) > 0 { shelvesCount := int64(0) shelvesTime := int64(0) for _, v := range *shelves { code := utils.ObjToString(v["code"]) shelveNew, _ := MgoC.FindOne("lua_logs_auditor_new", map[string]interface{}{"code": code, "types": "审核"}) if shelveNew == nil { continue } else { comeintimeNew := utils.Int64All((*shelveNew)["comeintime"]) comeintime := utils.Int64All(v["comeintime"]) if comeintimeNew == 0 { continue } if comeintimeNew-comeintime > 0 { shelvesTime = shelvesTime + comeintimeNew - comeintime shelvesCount++ } } } if shelvesCount > 0 { dataCollection["爬虫上架时效(小时)"] = (shelvesTime / shelvesCount) / 3600 log.Info("getCollection", zap.Any("爬虫上架时效", (shelvesTime/shelvesCount)/3600)) } else { dataCollection["爬虫上架时效(小时)"] = "" } } //4.2 爬虫维护时效(小时) whereAuditor := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": lastMonday.Unix(), "$lte": lastSunday.Unix(), }, "types": "审核", } maintainCount := int64(0) //维护数量 maintainTime := int64(0) //维护总时间 auditors, _ := MgoC.Find("lua_logs_auditor", whereAuditor, nil, nil, false, -1, -1) if len(*auditors) > 0 { for _, v := range *auditors { code := utils.ObjToString(v["code"]) shelveNew, _ := MgoC.FindOne("lua_logs_auditor_new", map[string]interface{}{"code": code, "types": "审核"}) if shelveNew == nil || len(*shelveNew) == 0 { taskWhere := map[string]interface{}{ "s_code": code, "i_state": 4, } tasks, _ := MgoC.Find("task", taskWhere, map[string]interface{}{"l_complete": -1}, nil, false, -1, -1) if len(*tasks) > 0 { completeTime := utils.Int64All((*tasks)[0]["l_comeintime"]) comeinTime := utils.Int64All(v["comeintime"]) diff := completeTime - comeinTime if diff > 0 { maintainCount++ maintainTime += diff } } } } if maintainCount > 0 { dataCollection["爬虫维护时效(小时)"] = (maintainTime / maintainCount) / 3600 log.Info("getCollection", zap.Any("爬虫维护时效(小时)", (maintainTime/maintainCount)/3600)) } else { dataCollection["爬虫维护时效(小时)"] = "" } } } } // coverageA 统计 剑鱼对千里马覆盖率 func coverageA() { //5.竞品覆盖率,每周4统计上周的数据 sessC := MgoC.GetMgoConn() defer MgoC.DestoryMongoConn(sessC) //获取上周3,千里马的招标数据;然后获取标讯前后个3天,一共7天的所有数据,对比看标题或者项目名称是否存在 lastWednesday := time.Date(Today.Year(), Today.Month(), Today.Day()-7, 0, 0, 0, 0, time.Local) whereQlm := map[string]interface{}{ "publishtime": lastWednesday.Format("2006-01-02"), "site": "千里马", } query := sessC.DB("qlm").C("data_merge").Find(whereQlm).Select(map[string]interface{}{"title": 1, "projectname": 1, "channel": 1, "qlm_toptype": 1}).Iter() count := 0 qlmData := make([]map[string]interface{}, 0) //标讯所有数据 njData := make([]map[string]interface{}, 0) //拟建数据 cqData := make([]map[string]interface{}, 0) //产权数据 preData := make([]map[string]interface{}, 0) //招标预告数据 biddingData := make([]map[string]interface{}, 0) // 招标公告数据 resultData := make([]map[string]interface{}, 0) // 结果公告数据 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { data := map[string]interface{}{ "title": tmp["title"], "projectname": tmp["projectname"], } channel := utils.ObjToString(tmp["channel"]) qlmToptype := utils.ObjToString(tmp["qlm_toptype"]) //标讯所有数据 if IsInStringArray(channel, channels) { qlmData = append(qlmData, data) } if IsInStringArray(channel, predictionChannels) { preData = append(preData, data) } if IsInStringArray(channel, biddingChannels) { biddingData = append(biddingData, data) } if IsInStringArray(channel, resultChannels) { resultData = append(resultData, data) } //拟建数据 if IsInStringArray(channel, channels2) { njData = append(njData, data) } //产权数据 if IsInStringArray(channel, channels3) || qlmToptype == "产权" { cqData = append(cqData, data) } } log.Info("getIndicators", zap.Int("千里马上周三总数", count)) biddingWhere := map[string]interface{}{ "publishtime": map[string]interface{}{ "$gte": lastWednesday.AddDate(0, 0, -3).Unix(), "$lte": lastWednesday.AddDate(0, 0, 3).Unix(), }, } biddingDatas, _ := MgoB.Find("bidding", biddingWhere, nil, map[string]interface{}{"title": 1, "projectname": 1}, false, -1, -1) log.Info("coverageA", zap.Int("标讯一周总数", len(*biddingDatas))) // 将切片B中的标题和项目名称分别存储在哈希表中 titlesInB, projectsInB := getUniqueFields(*biddingDatas) //5.1.1 统计 标讯-整体 数据 matches := countMatches(qlmData, titlesInB, projectsInB) matchesA := map[string]interface{}{ "标讯整体": map[string]interface{}{ "date": lastWednesday.Format("2006-01-02"), "count": len(qlmData), "match": matches, "no-match": len(qlmData) - matches, "qlm-total": count, "rate": fmt.Sprintf("%.2f%%", float64(matches)/float64(len(qlmData))*100), }, } //5.1.2 统计 标讯-招标预告 数据 matchesPre := countMatches(preData, titlesInB, projectsInB) matchesA["招标预告"] = map[string]interface{}{ "match": matchesPre, "no-match": len(preData) - matchesPre, "total": len(preData), "rate": fmt.Sprintf("%.2f%%", float64(matchesPre)/float64(len(preData))*100), } //5.1.3 统计 标讯-招标公告 数据 matchBidding := countMatches(biddingData, titlesInB, projectsInB) matchesA["招标公告"] = map[string]interface{}{ "match": matchBidding, "no-match": len(biddingData) - matchBidding, "total": len(biddingData), "rate": fmt.Sprintf("%.2f%%", float64(matchBidding)/float64(len(biddingData))*100), } //5.1.4 统计 标讯-结果公告 数据 matchResult := countMatches(resultData, titlesInB, projectsInB) matchesA["结果公告"] = map[string]interface{}{ "match": matchResult, "no-match": len(resultData) - matchResult, "total": len(resultData), "rate": fmt.Sprintf("%.2f%%", float64(matchResult)/float64(len(resultData))*100), } dataCompete["剑鱼对千里马覆盖率(标讯)"] = matchesA log.Info("coverageA", zap.String("剑鱼对千里马覆盖率-标讯", "处理完毕")) //5.2 拟建数据覆盖率 matches2 := countMatches(njData, titlesInB, projectsInB) matchesB := map[string]interface{}{ "match": matches2, "total": len(njData), "no-match": len(njData) - matches2, "date": lastWednesday.Format("2006-01-02"), "rate": fmt.Sprintf("%.2f%%", float64(matches2)/float64(len(njData))*100), } dataCompete["剑鱼对千里马覆盖率(拟建)"] = matchesB //5.3 产权数据统计 matches3 := countMatches(cqData, titlesInB, projectsInB) matchesC := map[string]interface{}{ "match": matches3, "total": len(cqData), "no-match": len(cqData) - matches3, "date": lastWednesday.Format("2006-01-02"), "rate": fmt.Sprintf("%.2f%%", float64(matches3)/float64(len(cqData))*100), } dataCompete["剑鱼对千里马覆盖率(产权)"] = matchesC log.Info("coverageA", zap.String("剑鱼对千里马覆盖率-产权", "处理完毕")) } // coverageB 统计 千里马对剑鱼的覆盖率 func coverageB() { sessB := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sessB) lastWednesday := time.Date(Today.Year(), Today.Month(), Today.Day()-7, 0, 0, 0, 0, time.Local) lastThursday := time.Date(Today.Year(), Today.Month(), Today.Day()-6, 0, 0, 0, 0, time.Local) whereQlm := map[string]interface{}{ "publishtime": map[string]interface{}{ "$gt": lastWednesday.Unix(), "$lte": lastThursday.Unix(), }, } query := sessB.DB(GF.MongoB.DB).C("bidding").Find(whereQlm).Select(map[string]interface{}{"title": 1, "projectname": 1, "toptype": 1, "infoformat": 1}).Iter() count := 0 qlmData := make([]map[string]interface{}, 0) //标讯所有数据 njData := make([]map[string]interface{}, 0) //拟建数据 cqData := make([]map[string]interface{}, 0) //产权数据 preData := make([]map[string]interface{}, 0) //招标预告数据 biddingData := make([]map[string]interface{}, 0) // 招标公告数据 resultData := make([]map[string]interface{}, 0) // 结果公告数据 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { data := map[string]interface{}{ "title": tmp["title"], "projectname": tmp["projectname"], } toptype := utils.ObjToString(tmp["toptype"]) //标讯所有数据 if utils.IntAll(tmp["infoformat"]) == 1 { qlmData = append(qlmData, data) } if utils.IntAll(tmp["infoformat"]) == 2 { njData = append(njData, data) } if utils.IntAll(tmp["infoformat"]) == 3 { cqData = append(cqData, data) } if toptype == "预告" || toptype == "采购意向" { preData = append(preData, data) } if toptype == "招标" { biddingData = append(biddingData, data) } if toptype == "结果" { resultData = append(resultData, data) } } log.Info("coverageB", zap.Int("剑鱼上周三总数", count)) biddingWhere := map[string]interface{}{ "publishtime": map[string]interface{}{ "$gte": lastWednesday.AddDate(0, 0, -3).Format("2006-01-02"), "$lte": lastWednesday.AddDate(0, 0, 3).Format("2006-01-02"), }, } //竞品 qlm 数据库 mgoQ := mongodb.MongodbSim{ MongodbAddr: MgoC.MongodbAddr, DbName: "qlm", Size: 10, UserName: GF.MongoC.Username, Password: GF.MongoC.Password, Direct: GF.MongoC.Direct, } mgoQ.InitPool() biddingDatas, _ := mgoQ.Find("data_merge", biddingWhere, nil, map[string]interface{}{"title": 1, "projectname": 1}, false, -1, -1) log.Info("coverageB", zap.Int("千里马一周总数", len(*biddingDatas))) // 将切片B中的标题和项目名称分别存储在哈希表中 titlesInB, projectsInB := getUniqueFields(*biddingDatas) //5.1.1 统计 标讯-整体 数据 matches := countMatches(qlmData, titlesInB, projectsInB) //totalMatchesA := make(map[string]interface{}, 0) //剑鱼对千里马覆盖率(标讯) matchesA := map[string]interface{}{ "标讯整体": map[string]interface{}{ "date": lastWednesday.Format("2006-01-02"), "count": len(qlmData), "match": matches, "no-match": len(qlmData) - matches, "jianyu-total": count, "rate": fmt.Sprintf("%.2f%%", float64(matches)/float64(len(qlmData))*100), }, } //5.1.2 统计 标讯-招标预告 数据 matchesPre := countMatches(preData, titlesInB, projectsInB) matchesA["招标预告"] = map[string]interface{}{ "match": matchesPre, "no-match": len(preData) - matchesPre, "total": len(preData), "rate": fmt.Sprintf("%.2f%%", float64(matchesPre)/float64(len(preData))*100), } //5.1.3 统计 标讯-招标公告 数据 matchBidding := countMatches(biddingData, titlesInB, projectsInB) matchesA["招标公告"] = map[string]interface{}{ "match": matchBidding, "no-match": len(biddingData) - matchBidding, "total": len(biddingData), "rate": fmt.Sprintf("%.2f%%", float64(matchBidding)/float64(len(biddingData))*100), } //5.1.4 统计 标讯-结果公告 数据 matchResult := countMatches(resultData, titlesInB, projectsInB) matchesA["结果公告"] = map[string]interface{}{ "match": matchResult, "no-match": len(resultData) - matchResult, "total": len(resultData), "rate": fmt.Sprintf("%.2f%%", float64(matchResult)/float64(len(resultData))*100), } dataCompete["千里马对剑鱼覆盖率(标讯)"] = matchesA log.Info("coverageB", zap.String("剑鱼对千里马覆盖率-标讯", "处理完毕")) //5.2 拟建数据覆盖率 matches2 := countMatches(njData, titlesInB, projectsInB) matchesB := map[string]interface{}{ "match": matches2, "total": len(njData), "no-match": len(njData) - matches2, "date": lastWednesday.Format("2006-01-02"), "rate": fmt.Sprintf("%.2f%%", float64(matches2)/float64(len(njData))*100), } dataCompete["千里马对剑鱼覆盖率(拟建)"] = matchesB //5.3 产权数据统计 matches3 := countMatches(cqData, titlesInB, projectsInB) matchesC := map[string]interface{}{ "match": matches3, "total": len(cqData), "no-match": len(cqData) - matches3, "date": lastWednesday.Format("2006-01-02"), "rate": fmt.Sprintf("%.2f%%", float64(matches3)/float64(len(cqData))*100), } dataCompete["千里马对剑鱼覆盖率(产权)"] = matchesC log.Info("coverageB", zap.String("千里马对剑鱼覆盖率-产权", "处理完毕")) } // getTimeLines 获取时效性指标 func getTimeLines() { //6.数据整体流程均耗时(分钟) whereBidding := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": Yesterday.Unix(), "$lte": Today.Unix(), }, } sessB := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sessB) fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1} queryB := sessB.DB("qfw").C("bidding").Find(whereBidding).Select(fd).Iter() esCount := 0 //采集的数据需要生索引的数量 biddingRealCount := 0 pici_publish_totaltime := int64(0) //comeintime 和 生索引 publish 时间 差值的总和 pici_comein_totaltime := int64(0) //publishtime 和 生索引 pici 时间 差值的总和 for tmp := make(map[string]interface{}); queryB.Next(tmp); { if utils.IntAll(tmp["extracttype"]) != -1 && utils.ObjToString(tmp["sensitive"]) != "测试" && utils.IntAll(tmp["dataging"]) != 1 && utils.Float64All(tmp["infoformat"]) != 3 { comeintime := utils.Int64All(tmp["comeintime"]) publishtime := utils.Int64All(tmp["publishtime"]) pici := utils.Int64All(tmp["pici"]) if pici > 0 { esCount++ } if (comeintime-publishtime) < 12*60*60 && pici > 0 { biddingRealCount++ diff1 := pici - publishtime diff2 := pici - comeintime pici_publish_totaltime += diff1 pici_comein_totaltime += diff2 } } } dataCollection["数据采集日索引量"] = esCount //数据采集指标-数据采集日索引量 if biddingRealCount > 0 { pici_publish_avgtime := pici_publish_totaltime / int64(biddingRealCount) pici_comein_avgtime := pici_comein_totaltime / int64(biddingRealCount) dataTime["数据整体流程均耗时(分钟)"] = fmt.Sprintf("%.2f", float64(pici_publish_avgtime)/float64(60)) dataTime["数据处理均耗时(分钟)"] = fmt.Sprintf("%.2f", float64(pici_comein_avgtime)/float64(60)) dataTime["数据采集均耗时(分钟)"] = fmt.Sprintf("%.2f", float64(pici_publish_avgtime-pici_comein_avgtime)/float64(60)) } } // getCollectionData 获取收录指标数据 func getCollectionData() { //1.新收录数据源数量 newCollectionWhere := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": Yesterday.Unix(), }, } newCount := MgoC.Count("site", newCollectionWhere) dataSource["新收录数据源数量"] = newCount //2.已收录数据源数量 Count := MgoC.Count("site", nil) dataSource["已收录数据源数量"] = Count //3.待开发数据源数量 whereConfig := map[string]interface{}{ "state": 0, } unSiteCount := int64(0) //待开发数据源数量 unSites, _ := MgoC.Find("luaconfig", whereConfig, nil, nil, false, -1, -1) if len(*unSites) > 0 { for _, v := range *unSites { code := utils.ObjToString(v["code"]) num := MgoC.Count("lua_logs_auditor", map[string]interface{}{"code": code}) if num == 0 { unSiteCount++ } } } dataSource["待开发数据源数量"] = unSiteCount //4.各网站分类数据源数量 // 用 map 来存储一级分类和对应的二级分类数据的数量 categoryCounts := make(map[string]map[string]int) classes, _ := MgoC.Find("site", nil, nil, map[string]interface{}{"site_type": 1, "second_type": 1}, false, -1, -1) for _, v := range *classes { siteType := utils.ObjToString(v["site_type"]) secondType := utils.ObjToString(v["second_type"]) if _, ok := categoryCounts[siteType]; !ok { categoryCounts[siteType] = make(map[string]int) } categoryCounts[siteType][secondType]++ } dataSource["网站分类数据源数量"] = categoryCounts //5.应采尽采率 dataSource["应采尽采率"] = GF.Cron.CollectionRate }