main.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/net/gclient"
  7. "github.com/gogf/gf/v2/os/gtime"
  8. "github.com/robfig/cron/v3"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "go.mongodb.org/mongo-driver/bson/primitive"
  11. "go.mongodb.org/mongo-driver/mongo/options"
  12. "go.uber.org/zap"
  13. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  15. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  16. "strings"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. MgoB *mongodb.MongodbSim
  22. MgoP *mongodb.MongodbSim
  23. columns = make([]map[string]interface{}, 0) //存储配置 栏目
  24. // 更新mongo
  25. updatePool = make(chan []map[string]interface{}, 5000)
  26. )
  27. func main() {
  28. go updateMethod()
  29. local, _ := time.LoadLocation("Asia/Shanghai")
  30. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  31. eid, err := c.AddFunc(GF.Cron.Spec, dealData)
  32. if err != nil {
  33. log.Info("main", zap.Any("AddFunc err", err))
  34. }
  35. log.Info("main", zap.Any("eid", eid))
  36. c.Start()
  37. defer c.Stop()
  38. //
  39. select {}
  40. }
  41. func dealData() {
  42. go dealBidding()
  43. go dealProject()
  44. }
  45. // dealBidding 处理标讯数据
  46. func dealBidding() {
  47. sess := MgoB.GetMgoConn()
  48. defer MgoB.DestoryMongoConn(sess)
  49. var q interface{}
  50. var startTime = GF.Cron.Start
  51. now := time.Now()
  52. ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour)
  53. coll := sess.M.C.Database(GF.MongoB.DB).Collection(GF.MongoB.Coll)
  54. find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"comeintime", 1}}).SetProjection(bson.M{"comeintime": 1, "toptype": 1, "subtype": 1, "buyerclass": 1, "title": 1, "detail": 1, "package": 1, "funds": 1})
  55. if startTime != 0 && GF.Cron.End != 0 {
  56. q = map[string]interface{}{
  57. "comeintime": map[string]interface{}{
  58. "$gt": GF.Cron.Start,
  59. "$lte": GF.Cron.End,
  60. },
  61. }
  62. } else if startTime != 0 {
  63. q = map[string]interface{}{
  64. "comeintime": map[string]interface{}{
  65. "$gt": startTime,
  66. },
  67. }
  68. } else if startTime == 0 && GF.Cron.End == 0 {
  69. //默认 取大于 昨天的数据
  70. q = map[string]interface{}{
  71. "comeintime": map[string]interface{}{
  72. "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(),
  73. },
  74. }
  75. }
  76. cur, err := coll.Find(ctx, q, find)
  77. if err != nil {
  78. log.Error("dealBidding,coll.Find ", zap.Error(err))
  79. }
  80. log.Info("dealBidding", zap.Any("q", q))
  81. //query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(q).Select(map[string]interface{}{
  82. // "contenthtml": 0}).Iter()
  83. count := 0
  84. ch := make(chan bool, 15)
  85. wg := &sync.WaitGroup{}
  86. //for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  87. for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
  88. if cur != nil {
  89. cur.Decode(&tmp)
  90. }
  91. startTime = util.IntAll(tmp["comeintime"])
  92. if count%1000 == 0 {
  93. log.Info("dealBidding", zap.Int("current", count), zap.Any("comeintime", tmp["comeintime"]))
  94. }
  95. ch <- true
  96. wg.Add(1)
  97. go func(tmp map[string]interface{}) {
  98. defer func() {
  99. <-ch
  100. wg.Done()
  101. }()
  102. rea := TagBidding(tmp)
  103. if len(rea) > 0 {
  104. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  105. //update := map[string]interface{}{
  106. // "nav_column": reb,
  107. //}
  108. //where := map[string]interface{}{
  109. // "_id": tmp["_id"],
  110. //}
  111. updatePool <- []map[string]interface{}{
  112. {"_id": tmp["_id"]},
  113. {"$set": bson.M{
  114. "nav_column": reb,
  115. }},
  116. }
  117. //MgoB.Update(GF.MongoB.Coll, where, map[string]interface{}{"$set": update}, true, false)
  118. }
  119. }(tmp)
  120. tmp = map[string]interface{}{}
  121. }
  122. wg.Wait()
  123. log.Info("dealBidding", zap.Int("over ", count))
  124. //没有数据时,发送邮件
  125. if count == 0 {
  126. SendMail("每日数据监控", "查询数据为空,请处理")
  127. }
  128. //处理热门标讯数据/热门项目
  129. getHot()
  130. log.Info("dealBidding", zap.Int("over ", count))
  131. }
  132. // getHot 获取热门数据
  133. func getHot() {
  134. var hotMap = make(map[string]bool)
  135. //获取已有热门数据
  136. hots, _ := MgoB.Find("bidding_hots", nil, nil, map[string]interface{}{"_id": 1}, false, -1, -1)
  137. if len(*hots) > 0 {
  138. for _, v := range *hots {
  139. hotMap[mongodb.BsonIdToSId(v["_id"])] = true
  140. }
  141. }
  142. file := gtime.Now().AddDate(0, 0, -1).Format("Y-m-d")
  143. res := gclient.New().GetContent(context.Background(), "http://172.17.145.164:18880/jyartvisit/"+file+".res")
  144. //res := gclient.New().GetContent(context.Background(), "http://127.0.0.1:50080/jyartvisit/"+file+".res")
  145. arrs := strings.Split(res, "\n")
  146. for _, v := range arrs {
  147. vs := strings.Split(v, " ")
  148. if len(vs) == 2 {
  149. insert := map[string]interface{}{
  150. "_id": mongodb.StringTOBsonId(vs[0]),
  151. "num": g.NewVar(vs[1]).Int(),
  152. "createtime": time.Now().Unix(),
  153. }
  154. err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hot_data", insert)
  155. if err != nil {
  156. log.Error("getHot", zap.Error(err))
  157. }
  158. if !hotMap[vs[0]] {
  159. // 将字符串转换为 ObjectId
  160. objectID, err := primitive.ObjectIDFromHex(vs[0])
  161. if err != nil {
  162. fmt.Println("Error parsing ObjectId:", err)
  163. continue
  164. }
  165. // 从 ObjectId 中提取时间戳
  166. timestamp := objectID.Timestamp()
  167. //// 计算时间差
  168. //timeDifference := time.Now().Sub(timestamp)
  169. // 判断时间是否在最近一年内
  170. oneYearAgo := time.Now().AddDate(-1, 0, 0)
  171. isWithinOneYear := timestamp.After(oneYearAgo)
  172. //最近一年数据并且 数量大于100
  173. if isWithinOneYear && g.NewVar(vs[1]).Int() > GF.Cron.HotNum {
  174. err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hots", insert)
  175. if err != nil {
  176. log.Error("getHot", zap.Error(err))
  177. }
  178. bidding, _ := MgoB.FindById("bidding", vs[0], nil)
  179. biddingData := *bidding
  180. if biddingData == nil {
  181. continue
  182. }
  183. biddingData["hot_data"] = 1
  184. rea := TagBidding(biddingData)
  185. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  186. log.Info("getHot", zap.Any("reb", reb), zap.String("id", vs[0]))
  187. updatePool <- []map[string]interface{}{
  188. {"_id": insert["_id"]},
  189. {"$set": bson.M{
  190. "nav_column": reb,
  191. }},
  192. }
  193. }
  194. }
  195. }
  196. }
  197. }
  198. // dealProject 处理拟建项目数据标签
  199. func dealProject() {
  200. sess := MgoP.GetMgoConn()
  201. defer MgoP.DestoryMongoConn(sess)
  202. // 指定对应的时间格式
  203. //layout := "2006-01-02 15:04:05"
  204. // 获取当前时间
  205. now := time.Now()
  206. var q interface{}
  207. var startTime = GF.Cron.Start
  208. if startTime != 0 && GF.Cron.End != 0 {
  209. q = map[string]interface{}{
  210. "pici": map[string]interface{}{
  211. "$gt": GF.Cron.Start,
  212. "$lte": GF.Cron.End,
  213. },
  214. }
  215. } else if startTime != 0 {
  216. q = map[string]interface{}{
  217. "pici": map[string]interface{}{
  218. "$gt": startTime,
  219. },
  220. }
  221. } else if startTime == 0 && GF.Cron.End == 0 {
  222. //默认 取大于 昨天的数据
  223. q = map[string]interface{}{
  224. "pici": map[string]interface{}{
  225. "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(),
  226. },
  227. }
  228. }
  229. log.Info("dealProject", zap.Any("q", q))
  230. query := sess.DB(GF.MongoP.DB).C(GF.MongoP.Coll).Find(q).Select(map[string]interface{}{
  231. "ids": 0, "list": 0}).Iter()
  232. count := 0
  233. ch := make(chan bool, 15)
  234. wg := &sync.WaitGroup{}
  235. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  236. if count%1000 == 0 {
  237. log.Info("dealProject", zap.Int("current", count))
  238. }
  239. ch <- true
  240. wg.Add(1)
  241. go func(tmp map[string]interface{}) {
  242. defer func() {
  243. <-ch
  244. wg.Done()
  245. }()
  246. rea := TagProject(tmp)
  247. if len(rea) > 0 {
  248. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  249. update := map[string]interface{}{
  250. "nav_column": reb,
  251. }
  252. where := map[string]interface{}{
  253. "_id": tmp["_id"],
  254. }
  255. MgoP.Update(GF.MongoP.Coll, where, map[string]interface{}{"$set": update}, true, false)
  256. }
  257. }(tmp)
  258. tmp = map[string]interface{}{}
  259. }
  260. wg.Wait()
  261. log.Info("dealProject", zap.Int("over ", count))
  262. //没有数据时,发送邮件
  263. if count == 0 {
  264. SendMail("网站导航 数据标签", "查询 拟建项目数据为空,请查收")
  265. }
  266. }
  267. // updateMethod 更新MongoDB
  268. func updateMethod() {
  269. updateSp := make(chan bool, 8)
  270. arru := make([][]map[string]interface{}, 200)
  271. indexu := 0
  272. for {
  273. select {
  274. case v := <-updatePool:
  275. arru[indexu] = v
  276. indexu++
  277. if indexu == 200 {
  278. updateSp <- true
  279. go func(arru [][]map[string]interface{}) {
  280. defer func() {
  281. <-updateSp
  282. }()
  283. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  284. }(arru)
  285. arru = make([][]map[string]interface{}, 200)
  286. indexu = 0
  287. }
  288. case <-time.After(1000 * time.Millisecond):
  289. if indexu > 0 {
  290. updateSp <- true
  291. go func(arru [][]map[string]interface{}) {
  292. defer func() {
  293. <-updateSp
  294. }()
  295. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  296. }(arru[:indexu])
  297. arru = make([][]map[string]interface{}, 200)
  298. indexu = 0
  299. }
  300. }
  301. }
  302. }