|
- // main
- package main
- import (
- "encoding/json"
- "fmt"
- "github.com/robfig/cron/v3"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "sync"
- "time"
- )
- var (
- Mgo *mongodb.MongodbSim
- tree = NewMultiTree() //全局的数据匹配
- Mb float64 = 1024 * 1024
- UdpClient udp.UdpClient
- )
- func main() {
- local, _ := time.LoadLocation("Asia/Shanghai")
- c := cron.New(cron.WithLocation(local), cron.WithSeconds())
- if GF.Cron.Spec != "" {
- eid, err := c.AddFunc(GF.Cron.Spec, loadIncBidding) //定时执行芜湖数据
- if err != nil {
- log.Error("main", zap.Any("AddFunc err", err))
- }
- log.Info("main", zap.Any("eid", eid))
- }
- if GF.Cron.Specq != "" {
- _, err := c.AddFunc(GF.Cron.Specq, loadIncQyxy)
- if err != nil {
- log.Error("main", zap.Any("AddFunc loadIncQyxy err", err))
- }
- }
- c.Start()
- defer c.Stop()
- select {}
- }
- //processUdpMsg 处理udp
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo))
- if err != nil {
- log.Error("processUdpMsg", zap.Error(err))
- } else {
- if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- }
- tasktype, _ := mapInfo["stype"].(string)
- switch tasktype {
- case "inc_data": //每天增量bidding数据
- loadIncBidding()
- case "inc_qyxy": //每周企业数据
- loadIncQyxy()
- case "alldata": // 芜湖存量bidding数据
- biddingAllData()
- }
- }
- default:
- fmt.Println("processUdpMsg : processUdpMsg =====")
- }
- }
- //loadIncData 加载芜湖每周增量企业数据
- func loadIncQyxy() {
- log.Info("loadIncQyxy", zap.String("loadIncQyxy", "loadIncQyxy"))
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- // 获取当前时间
- now := time.Now()
- // 获取当前时间的星期几(0表示周一,1表示周二,以此类推)
- weekday := now.Weekday()
- // 计算从周日到现在的天数(因为一周从周日开始,所以需要减一天)
- daysSinceSunday := int(weekday) - 1
- // 计算周一的时间
- monday := now.AddDate(0, 0, -daysSinceSunday)
- // 将时间格式化为凌晨(00:00:00)
- mondayAtMidnight := time.Date(monday.Year(), monday.Month(), monday.Day(), 0, 0, 0, 0, monday.Location())
- q := map[string]interface{}{
- "updatetime": map[string]interface{}{
- "$gt": mondayAtMidnight.Unix(),
- //"$lte": todayTime.Unix(),
- },
- "company_type": map[string]interface{}{
- "$ne": "个体工商户",
- },
- "company_city": "芜湖市",
- }
- log.Info("loadIncQyxy", zap.Any("q", q))
- query := sess.DB("mixdata").C("qyxy_std").Find(q).Select(nil).Iter()
- count := 0
- ch := make(chan bool, 15)
- wg := &sync.WaitGroup{}
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%1000 == 0 {
- log.Info("loadIncQyxy", zap.Int("current", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- saveQyxy(tmp)
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- log.Info("loadIncQyxy", zap.Int("over ", count))
- }
- //saveQyxy 处理增量企业信息
- func saveQyxy(tmp map[string]interface{}) {
- if util.ObjToString(tmp["company_status"]) != "存续" {
- return
- }
- tree.Add(util.ObjToString(tmp["company_name"]))
- update := make(map[string]interface{})
- update["$set"] = tmp
- updataInfo := []map[string]interface{}{
- {"_id": tmp["_id"]},
- update,
- }
- Mgo.UpSertBulk(GF.Env.QyxyColl, updataInfo)
- }
- //loadIncBidding 每天标讯数据
- func loadIncBidding() {
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- log.Info("loadIncBidding", zap.String("loadIncBidding", "开始处理增量标讯数据"))
- // 获取当前时间
- now := time.Now()
- targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 0, 0, 0, 0, now.Location())
- todayTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.End, 0, 0, 0, 0, now.Location())
- q := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gt": targetTime.Unix(),
- "$lte": todayTime.Unix(),
- },
- }
- log.Info("loadIncBidding", zap.Any("q", q))
- query := sess.DB("qfw").C("bidding").Find(q).Select(nil).Iter()
- count := 0
- ch := make(chan bool, 15)
- wg := &sync.WaitGroup{}
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%1000 == 0 {
- log.Info("loadIncBidding", zap.Int("current", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- saveBidding(tmp)
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- log.Info("loadIncBidding", zap.Int("over ", count))
- }
- //saveBidding 保存芜湖bidding数据
- func saveBidding(tmp map[string]interface{}) {
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
- tmp = make(map[string]interface{})
- return
- }
- // 针对存量数据,重复数据不进索引
- if util.IntAll(tmp["extracttype"]) == -1 {
- return
- }
- if util.IntAll(tmp["dataprocess"]) != 8 {
- return
- }
- //1.采购单位
- buyer := util.ObjToString(tmp["buyer"])
- rests := tree.Match(buyer, true)
- if len(rests) > 0 {
- Mgo.SaveByOriID(GF.Env.BiddingColl, tmp)
- return
- }
- //2.中标单位
- winner := util.ObjToString(tmp["winner"])
- rests = tree.Match(winner, true)
- if len(rests) > 0 {
- Mgo.SaveByOriID(GF.Env.BiddingColl, tmp)
- return
- }
- //3.中标候选人
- winnerorder, ok := tmp["winnerorder"].([]map[string]interface{})
- if ok {
- for _, v := range winnerorder {
- res := tree.Match(util.ObjToString(v["entname"]), true)
- if len(res) > 0 {
- Mgo.SaveByOriID(GF.Env.BiddingColl, tmp)
- return
- }
- }
- }
- }
|