// main package main import ( "encoding/json" "fmt" "github.com/robfig/cron/v3" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "sync" "time" ) var ( Mgo *mongodb.MongodbSim tree = NewMultiTree() //全局的数据匹配 Mb float64 = 1024 * 1024 UdpClient udp.UdpClient ) func main() { local, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(local), cron.WithSeconds()) if GF.Cron.Spec != "" { eid, err := c.AddFunc(GF.Cron.Spec, loadIncBidding) //定时执行芜湖数据 if err != nil { log.Error("main", zap.Any("AddFunc err", err)) } log.Info("main", zap.Any("eid", eid)) } if GF.Cron.Specq != "" { _, err := c.AddFunc(GF.Cron.Specq, loadIncQyxy) if err != nil { log.Error("main", zap.Any("AddFunc loadIncQyxy err", err)) } } c.Start() defer c.Stop() select {} } //processUdpMsg 处理udp func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo)) if err != nil { log.Error("processUdpMsg", zap.Error(err)) } else { if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) } tasktype, _ := mapInfo["stype"].(string) switch tasktype { case "inc_data": //每天增量bidding数据 loadIncBidding() case "inc_qyxy": //每周企业数据 loadIncQyxy() case "alldata": // 芜湖存量bidding数据 biddingAllData() } } default: fmt.Println("processUdpMsg : processUdpMsg =====") } } //loadIncData 加载芜湖每周增量企业数据 func loadIncQyxy() { log.Info("loadIncQyxy", zap.String("loadIncQyxy", "loadIncQyxy")) sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) // 获取当前时间 now := time.Now() // 获取当前时间的星期几(0表示周一,1表示周二,以此类推) weekday := now.Weekday() // 计算从周日到现在的天数(因为一周从周日开始,所以需要减一天) daysSinceSunday := int(weekday) - 1 // 计算周一的时间 monday := now.AddDate(0, 0, -daysSinceSunday) // 将时间格式化为凌晨(00:00:00) mondayAtMidnight := time.Date(monday.Year(), monday.Month(), monday.Day(), 0, 0, 0, 0, monday.Location()) q := map[string]interface{}{ "updatetime": map[string]interface{}{ "$gt": mondayAtMidnight.Unix(), //"$lte": todayTime.Unix(), }, "company_type": map[string]interface{}{ "$ne": "个体工商户", }, "company_city": "芜湖市", } log.Info("loadIncQyxy", zap.Any("q", q)) query := sess.DB("mixdata").C("qyxy_std").Find(q).Select(nil).Iter() count := 0 ch := make(chan bool, 15) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Info("loadIncQyxy", zap.Int("current", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() saveQyxy(tmp) }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("loadIncQyxy", zap.Int("over ", count)) } //saveQyxy 处理增量企业信息 func saveQyxy(tmp map[string]interface{}) { if util.ObjToString(tmp["company_status"]) != "存续" { return } tree.Add(util.ObjToString(tmp["company_name"])) update := make(map[string]interface{}) update["$set"] = tmp updataInfo := []map[string]interface{}{ {"_id": tmp["_id"]}, update, } Mgo.UpSertBulk(GF.Env.QyxyColl, updataInfo) } //loadIncBidding 每天标讯数据 func loadIncBidding() { sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) log.Info("loadIncBidding", zap.String("loadIncBidding", "开始处理增量标讯数据")) // 获取当前时间 now := time.Now() targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 0, 0, 0, 0, now.Location()) todayTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.End, 0, 0, 0, 0, now.Location()) q := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": targetTime.Unix(), "$lte": todayTime.Unix(), }, } log.Info("loadIncBidding", zap.Any("q", q)) query := sess.DB("qfw").C("bidding").Find(q).Select(nil).Iter() count := 0 ch := make(chan bool, 15) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Info("loadIncBidding", zap.Int("current", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() saveBidding(tmp) }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("loadIncBidding", zap.Int("over ", count)) } //saveBidding 保存芜湖bidding数据 func saveBidding(tmp map[string]interface{}) { if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) return } // 针对存量数据,重复数据不进索引 if util.IntAll(tmp["extracttype"]) == -1 { return } if util.IntAll(tmp["dataprocess"]) != 8 { return } //1.采购单位 buyer := util.ObjToString(tmp["buyer"]) rests := tree.Match(buyer, true) if len(rests) > 0 { Mgo.SaveByOriID(GF.Env.BiddingColl, tmp) return } //2.中标单位 winner := util.ObjToString(tmp["winner"]) rests = tree.Match(winner, true) if len(rests) > 0 { Mgo.SaveByOriID(GF.Env.BiddingColl, tmp) return } //3.中标候选人 winnerorder, ok := tmp["winnerorder"].([]map[string]interface{}) if ok { for _, v := range winnerorder { res := tree.Match(util.ObjToString(v["entname"]), true) if len(res) > 0 { Mgo.SaveByOriID(GF.Env.BiddingColl, tmp) return } } } }