123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- package main
- import (
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "encoding/json"
- "fmt"
- "github.com/robfig/cron/v3"
- "github.com/spf13/viper"
- wlog "github.com/wcc4869/common_utils/log"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "net"
- "strings"
- "time"
- )
- var (
- GF Conf
- Mgo *mongodb.MongodbSim
- //NN = 100
- lastID string
- UdpClient udp.UdpClient
- classficationAddr *net.UDPAddr
- )
- func init() {
- InitConfig()
- InitLog()
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: GF.Mongo.Host,
- DbName: GF.Mongo.DB,
- UserName: GF.Mongo.Username,
- Password: GF.Mongo.Password,
- Size: 10,
- }
- Mgo.InitPool()
- UdpClient = udp.UdpClient{Local: GF.Classfication.Localport, BufSize: 1024}
- UdpClient.Listen(processUdpMsg)
- wlog.Info("init", wlog.Any("本地监听", GF.Classfication.Localport))
- classficationAddr = &net.UDPAddr{
- Port: util.IntAll(GF.Classfication.Port),
- IP: net.ParseIP(GF.Classfication.IP),
- }
- wlog.Info("init", wlog.Any("classficationAddr", classficationAddr))
- }
- //processUdpMsg 处理udp
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- wlog.Info("processUdpMsg", wlog.Any("mapInfo", mapInfo))
- if err != nil {
- fmt.Println(err)
- }
- if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- }
- default:
- fmt.Println("cpuser_listen : processUdpMsg =====")
- }
- }
- // InitConfig init config
- func InitConfig() {
- // 优先级
- // 1. -d 添加目录
- // 2. SERVER_TOML 环境变量
- // 3. ./ 添加目录
- // 4. ./conf/ 添加目录
- viper.SetConfigFile("config.toml") // 指定配置文件路径
- viper.SetConfigName("config") // 配置文件名称(无扩展名)
- viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
- viper.AddConfigPath("./")
- viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
- viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
- err := viper.ReadInConfig() // 查找并读取配置文件
- if err != nil { // 处理读取配置文件的错误
- fmt.Println("ReadInConfig err =>", err)
- }
- err = viper.Unmarshal(&GF)
- if err != nil {
- panic(err)
- }
- }
- //InitLog 初始化日志
- func InitLog() {
- logConfig := &wlog.Options{}
- err := viper.UnmarshalKey("log", logConfig)
- if err != nil {
- wlog.Panic("UnmarshallLogConfig", wlog.FieldErr(err))
- panic(err.Error())
- }
- wlog.InitLogger(*logConfig)
- }
- func main() {
- local, _ := time.LoadLocation("Asia/Shanghai")
- c := cron.New(cron.WithLocation(local), cron.WithSeconds())
- c.AddFunc(GF.Spec, cpUser)
- c.Start()
- defer c.Stop()
- //cpUser()
- select {}
- }
- //cpUser 处理用户订阅词,然后拷贝到新表,提供分类使用
- func cpUser() {
- wlog.Info("cpUser", wlog.String("start", ""))
- var nuId string
- nu := getLastUser(GF.Mongo.Coll)
- nuId = BsonIdToSId(nu["_id"])
- //配置不为空,使用配置文件的最新注册时间
- if lastID == "" {
- if GF.Mongo.LastID != "" {
- lastID = GF.Mongo.LastID
- } else {
- lastID = "000000000000000000000000"
- }
- }
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(lastID),
- "$lte": StringTOBsonId(nuId),
- },
- }
- wlog.Info("cpUser", wlog.Any("q", q))
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- //
- query := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(q).Select(nil).Sort("_id").Iter()
- count := 0
- var saveUserPool = make([]map[string]interface{}, 0)
- var sendMsg bool
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%1000 == 0 {
- wlog.Info("cpuser", wlog.Int("current", count))
- wlog.Info("cpuser", wlog.Any("_id", tmp["_id"]))
- }
- keys := GetUserKeys(tmp)
- update := map[string]interface{}{}
- if len(keys) > 0 {
- tags := []string{}
- for _, v := range keys {
- tag := toString(v)
- tags = append(tags, tag)
- }
- //用户关键词
- update["key_list"] = strings.Join(tags, ",")
- update["i_appid"] = tmp["i_appid"]
- update["_id"] = tmp["_id"]
- saveUserPool = append(saveUserPool, update)
- //存储到新表
- if GF.Mongo.SaveColl != "" {
- if len(saveUserPool) >= 400 {
- Mgo.SaveBulk(GF.Mongo.SaveColl, saveUserPool...)
- saveUserPool = []map[string]interface{}{}
- }
- }
- sendMsg = true
- }
- }
- if len(saveUserPool) > 0 {
- Mgo.SaveBulk(GF.Mongo.SaveColl, saveUserPool...)
- saveUserPool = []map[string]interface{}{}
- }
- //调用udp 处理用户行业分类
- udpData := map[string]interface{}{
- "gtid": lastID,
- "lteid": nuId,
- "stype": "yonghuhangye",
- }
- if sendMsg {
- wlog.Info("cpUser", wlog.Any("udpData", udpData))
- SendUdpMsg(udpData, classficationAddr)
- }
- lastID = nuId
- wlog.Info("cpuser", wlog.String("lastID", lastID))
- wlog.Info("处理用户订阅词", wlog.Int("over", count))
- }
|