package main import ( "encoding/json" "fmt" "github.com/robfig/cron/v3" "github.com/spf13/viper" wlog "github.com/wcc4869/common_utils/log" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "strings" "time" ) var ( GF Conf Mgo *mongodb.MongodbSim //NN = 100 lastID string UdpClient udp.UdpClient classficationAddr *net.UDPAddr ) func init() { InitConfig() InitLog() Mgo = &mongodb.MongodbSim{ MongodbAddr: GF.Mongo.Host, DbName: GF.Mongo.DB, UserName: GF.Mongo.Username, Password: GF.Mongo.Password, Size: 10, } Mgo.InitPool() UdpClient = udp.UdpClient{Local: GF.Classfication.Localport, BufSize: 1024} UdpClient.Listen(processUdpMsg) wlog.Info("init", wlog.Any("本地监听", GF.Classfication.Localport)) classficationAddr = &net.UDPAddr{ Port: util.IntAll(GF.Classfication.Port), IP: net.ParseIP(GF.Classfication.IP), } wlog.Info("init", wlog.Any("classficationAddr", classficationAddr)) } // processUdpMsg 处理udp func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) wlog.Info("processUdpMsg", wlog.Any("mapInfo", mapInfo)) if err != nil { fmt.Println(err) } if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) } default: fmt.Println("cpuser_listen : processUdpMsg =====") } } // InitConfig init config func InitConfig() { // 优先级 // 1. -d 添加目录 // 2. SERVER_TOML 环境变量 // 3. ./ 添加目录 // 4. ./conf/ 添加目录 viper.SetConfigFile("config.toml") // 指定配置文件路径 viper.SetConfigName("config") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置 viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置 err := viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 fmt.Println("ReadInConfig err =>", err) } err = viper.Unmarshal(&GF) if err != nil { panic(err) } } // InitLog 初始化日志 func InitLog() { logConfig := &wlog.Options{} err := viper.UnmarshalKey("log", logConfig) if err != nil { wlog.Panic("UnmarshallLogConfig", wlog.FieldErr(err)) panic(err.Error()) } wlog.InitLogger(*logConfig) } func main() { local, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(local), cron.WithSeconds()) c.AddFunc(GF.Spec, cpUser) c.Start() defer c.Stop() //cpUser() select {} } // cpUser 处理用户订阅词,然后拷贝到新表,提供分类使用 func cpUser() { wlog.Info("cpUser", wlog.String("start", "")) var nuId string nu := getLastUser(GF.Mongo.Coll) nuId = BsonIdToSId(nu["_id"]) //配置不为空,使用配置文件的最新注册时间 if lastID == "" { if GF.Mongo.LastID != "" { lastID = GF.Mongo.LastID } else { lastID = "000000000000000000000000" } } q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(lastID), "$lte": StringTOBsonId(nuId), }, } wlog.Info("cpUser", wlog.Any("q", q)) sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) // query := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(q).Select(nil).Sort("_id").Iter() count := 0 var saveUserPool = make([]map[string]interface{}, 0) var sendMsg bool for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { wlog.Info("cpuser", wlog.Int("current", count)) wlog.Info("cpuser", wlog.Any("_id", tmp["_id"])) } keys := GetUserKeys(tmp) update := map[string]interface{}{} if len(keys) > 0 { tags := []string{} for _, v := range keys { tag := toString(v) tags = append(tags, tag) } //用户关键词 update["key_list"] = strings.Join(tags, ",") update["i_appid"] = tmp["i_appid"] update["_id"] = tmp["_id"] saveUserPool = append(saveUserPool, update) //存储到新表 if GF.Mongo.SaveColl != "" { if len(saveUserPool) >= 400 { Mgo.SaveBulk(GF.Mongo.SaveColl, saveUserPool...) saveUserPool = []map[string]interface{}{} } } sendMsg = true } } if len(saveUserPool) > 0 { Mgo.SaveBulk(GF.Mongo.SaveColl, saveUserPool...) saveUserPool = []map[string]interface{}{} } //调用udp 处理用户行业分类 udpData := map[string]interface{}{ "gtid": lastID, "lteid": nuId, "stype": "yonghuhangye", } if sendMsg { wlog.Info("cpUser", wlog.Any("udpData", udpData)) SendUdpMsg(udpData, classficationAddr) } lastID = nuId wlog.Info("cpuser", wlog.String("lastID", lastID)) wlog.Info("处理用户订阅词", wlog.Int("over", count)) } // reAllUser 重新更新用户订阅词,打标签 func reAllUser() { sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) // query := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(nil).Select(nil).Sort("_id").Iter() count := 0 gtid := "" lteid := "" //var saveUserPool = make([]map[string]interface{}, 0) var sendMsg bool for tmp := make(map[string]interface{}); query.Next(tmp); count++ { idStr := mongodb.BsonIdToSId(tmp["_id"]) if count == 0 { gtid = idStr } else { lteid = idStr } if count%1000 == 0 { wlog.Info("reAllUser", wlog.Int("current", count)) wlog.Info("reAllUser", wlog.Any("_id", tmp["_id"])) } keys := GetUserKeys(tmp) update := map[string]interface{}{} if len(keys) > 0 { tags := []string{} for _, v := range keys { tag := toString(v) tags = append(tags, tag) } //用户关键词 update["key_list"] = strings.Join(tags, ",") update["i_appid"] = tmp["i_appid"] update["_id"] = tmp["_id"] err := Mgo.InsertOrUpdate(GF.Mongo.DB, GF.Mongo.SaveColl, update) if err != nil { wlog.Info("reAllUser", wlog.String("id 更新错误", idStr)) wlog.Error(idStr, err) } sendMsg = true } } wlog.Info("reAllUser", wlog.String("gtid", gtid), wlog.String("lteid", lteid)) //调用udp 处理用户行业分类 udpData := map[string]interface{}{ "gtid": gtid, "lteid": lteid, "stype": "yonghuhangye", } if sendMsg { wlog.Info("reAllUser", wlog.Any("udpData", udpData)) SendUdpMsg(udpData, classficationAddr) } wlog.Info("处理用户订阅词", wlog.Int("over", count)) }