main.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package main
  2. import (
  3. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/robfig/cron/v3"
  7. "github.com/spf13/viper"
  8. wlog "github.com/wcc4869/common_utils/log"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "net"
  12. "strings"
  13. "time"
  14. )
  15. var (
  16. GF Conf
  17. Mgo *mongodb.MongodbSim
  18. //NN = 100
  19. lastID string
  20. UdpClient udp.UdpClient
  21. classficationAddr *net.UDPAddr
  22. )
  23. func init() {
  24. InitConfig()
  25. InitLog()
  26. Mgo = &mongodb.MongodbSim{
  27. MongodbAddr: GF.Mongo.Host,
  28. DbName: GF.Mongo.DB,
  29. UserName: GF.Mongo.Username,
  30. Password: GF.Mongo.Password,
  31. Size: 10,
  32. }
  33. Mgo.InitPool()
  34. UdpClient = udp.UdpClient{Local: GF.Classfication.Localport, BufSize: 1024}
  35. UdpClient.Listen(processUdpMsg)
  36. wlog.Info("init", wlog.Any("本地监听", GF.Classfication.Localport))
  37. classficationAddr = &net.UDPAddr{
  38. Port: util.IntAll(GF.Classfication.Port),
  39. IP: net.ParseIP(GF.Classfication.IP),
  40. }
  41. wlog.Info("init", wlog.Any("classficationAddr", classficationAddr))
  42. }
  43. //processUdpMsg 处理udp
  44. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  45. switch act {
  46. case udp.OP_TYPE_DATA:
  47. var mapInfo map[string]interface{}
  48. err := json.Unmarshal(data, &mapInfo)
  49. wlog.Info("processUdpMsg", wlog.Any("mapInfo", mapInfo))
  50. if err != nil {
  51. fmt.Println(err)
  52. }
  53. if mapInfo != nil {
  54. key, _ := mapInfo["key"].(string)
  55. if key == "" {
  56. key = "udpok"
  57. }
  58. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  59. }
  60. default:
  61. fmt.Println("cpuser_listen : processUdpMsg =====")
  62. }
  63. }
  64. // InitConfig init config
  65. func InitConfig() {
  66. // 优先级
  67. // 1. -d 添加目录
  68. // 2. SERVER_TOML 环境变量
  69. // 3. ./ 添加目录
  70. // 4. ./conf/ 添加目录
  71. viper.SetConfigFile("config.toml") // 指定配置文件路径
  72. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  73. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  74. viper.AddConfigPath("./")
  75. viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
  76. viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
  77. err := viper.ReadInConfig() // 查找并读取配置文件
  78. if err != nil { // 处理读取配置文件的错误
  79. fmt.Println("ReadInConfig err =>", err)
  80. }
  81. err = viper.Unmarshal(&GF)
  82. if err != nil {
  83. panic(err)
  84. }
  85. }
  86. //InitLog 初始化日志
  87. func InitLog() {
  88. logConfig := &wlog.Options{}
  89. err := viper.UnmarshalKey("log", logConfig)
  90. if err != nil {
  91. wlog.Panic("UnmarshallLogConfig", wlog.FieldErr(err))
  92. panic(err.Error())
  93. }
  94. wlog.InitLogger(*logConfig)
  95. }
  96. func main() {
  97. local, _ := time.LoadLocation("Asia/Shanghai")
  98. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  99. c.AddFunc(GF.Spec, cpUser)
  100. c.Start()
  101. defer c.Stop()
  102. //cpUser()
  103. select {}
  104. }
  105. //cpUser 处理用户订阅词,然后拷贝到新表,提供分类使用
  106. func cpUser() {
  107. wlog.Info("cpUser", wlog.String("start", ""))
  108. var nuId string
  109. nu := getLastUser(GF.Mongo.Coll)
  110. nuId = BsonIdToSId(nu["_id"])
  111. //配置不为空,使用配置文件的最新注册时间
  112. if lastID == "" {
  113. if GF.Mongo.LastID != "" {
  114. lastID = GF.Mongo.LastID
  115. } else {
  116. lastID = "000000000000000000000000"
  117. }
  118. }
  119. q := map[string]interface{}{
  120. "_id": map[string]interface{}{
  121. "$gt": StringTOBsonId(lastID),
  122. "$lte": StringTOBsonId(nuId),
  123. },
  124. }
  125. wlog.Info("cpUser", wlog.Any("q", q))
  126. sess := Mgo.GetMgoConn()
  127. defer Mgo.DestoryMongoConn(sess)
  128. //
  129. query := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(q).Select(nil).Sort("_id").Iter()
  130. count := 0
  131. var saveUserPool = make([]map[string]interface{}, 0)
  132. var sendMsg bool
  133. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  134. if count%1000 == 0 {
  135. wlog.Info("cpuser", wlog.Int("current", count))
  136. wlog.Info("cpuser", wlog.Any("_id", tmp["_id"]))
  137. }
  138. keys := GetUserKeys(tmp)
  139. update := map[string]interface{}{}
  140. if len(keys) > 0 {
  141. tags := []string{}
  142. for _, v := range keys {
  143. tag := toString(v)
  144. tags = append(tags, tag)
  145. }
  146. //用户关键词
  147. update["key_list"] = strings.Join(tags, ",")
  148. update["i_appid"] = tmp["i_appid"]
  149. update["_id"] = tmp["_id"]
  150. saveUserPool = append(saveUserPool, update)
  151. //存储到新表
  152. if GF.Mongo.SaveColl != "" {
  153. if len(saveUserPool) >= 400 {
  154. Mgo.SaveBulk(GF.Mongo.SaveColl, saveUserPool...)
  155. saveUserPool = []map[string]interface{}{}
  156. }
  157. }
  158. sendMsg = true
  159. }
  160. }
  161. if len(saveUserPool) > 0 {
  162. Mgo.SaveBulk(GF.Mongo.SaveColl, saveUserPool...)
  163. saveUserPool = []map[string]interface{}{}
  164. }
  165. //调用udp 处理用户行业分类
  166. udpData := map[string]interface{}{
  167. "gtid": lastID,
  168. "lteid": nuId,
  169. "stype": "yonghuhangye",
  170. }
  171. if sendMsg {
  172. wlog.Info("cpUser", wlog.Any("udpData", udpData))
  173. SendUdpMsg(udpData, classficationAddr)
  174. }
  175. lastID = nuId
  176. wlog.Info("cpuser", wlog.String("lastID", lastID))
  177. wlog.Info("处理用户订阅词", wlog.Int("over", count))
  178. }