main.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/net/gclient"
  7. "github.com/robfig/cron/v3"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.mongodb.org/mongo-driver/bson/primitive"
  10. "go.mongodb.org/mongo-driver/mongo/options"
  11. "go.uber.org/zap"
  12. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. MgoB *mongodb.MongodbSim
  21. MgoP *mongodb.MongodbSim
  22. columns = make([]map[string]interface{}, 0) //存储配置 栏目
  23. // 更新mongo
  24. updatePool = make(chan []map[string]interface{}, 5000)
  25. )
  26. func main() {
  27. go updateMethod()
  28. local, _ := time.LoadLocation("Asia/Shanghai")
  29. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  30. eid, err := c.AddFunc(GF.Cron.Spec, dealData)
  31. if err != nil {
  32. log.Info("main", zap.Any("AddFunc err", err))
  33. }
  34. log.Info("main", zap.Any("eid", eid))
  35. c.Start()
  36. defer c.Stop()
  37. //
  38. select {}
  39. }
  40. func dealData() {
  41. go dealBidding()
  42. go dealProject()
  43. }
  44. // dealBidding 处理标讯数据
  45. func dealBidding() {
  46. sess := MgoB.GetMgoConn()
  47. defer MgoB.DestoryMongoConn(sess)
  48. var q interface{}
  49. var startTime = GF.Cron.Start
  50. now := time.Now()
  51. ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour)
  52. coll := sess.M.C.Database(GF.MongoB.DB).Collection(GF.MongoB.Coll)
  53. find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"comeintime", 1}}).SetProjection(bson.M{"comeintime": 1, "toptype": 1, "subtype": 1, "buyerclass": 1, "title": 1, "detail": 1, "package": 1, "funds": 1, "spidercode": 1})
  54. if startTime != 0 && GF.Cron.End != 0 {
  55. q = map[string]interface{}{
  56. "comeintime": map[string]interface{}{
  57. "$gt": GF.Cron.Start,
  58. "$lte": GF.Cron.End,
  59. },
  60. }
  61. } else if startTime != 0 {
  62. q = map[string]interface{}{
  63. "comeintime": map[string]interface{}{
  64. "$gt": startTime,
  65. },
  66. }
  67. } else if startTime == 0 && GF.Cron.End == 0 {
  68. //默认 取大于 昨天的数据
  69. q = map[string]interface{}{
  70. "comeintime": map[string]interface{}{
  71. "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(),
  72. },
  73. }
  74. }
  75. cur, err := coll.Find(ctx, q, find)
  76. if err != nil {
  77. log.Error("dealBidding,coll.Find ", zap.Error(err))
  78. }
  79. log.Info("dealBidding", zap.Any("q", q))
  80. //query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(q).Select(map[string]interface{}{
  81. // "contenthtml": 0}).Iter()
  82. count := 0
  83. ch := make(chan bool, 15)
  84. wg := &sync.WaitGroup{}
  85. //for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  86. for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
  87. if cur != nil {
  88. cur.Decode(&tmp)
  89. }
  90. startTime = util.IntAll(tmp["comeintime"])
  91. if count%1000 == 0 {
  92. log.Info("dealBidding", zap.Int("current", count), zap.Any("comeintime", tmp["comeintime"]))
  93. }
  94. ch <- true
  95. wg.Add(1)
  96. go func(tmp map[string]interface{}) {
  97. defer func() {
  98. <-ch
  99. wg.Done()
  100. }()
  101. rea := TagBidding(tmp)
  102. if len(rea) > 0 {
  103. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  104. //update := map[string]interface{}{
  105. // "nav_column": reb,
  106. //}
  107. //where := map[string]interface{}{
  108. // "_id": tmp["_id"],
  109. //}
  110. updatePool <- []map[string]interface{}{
  111. {"_id": tmp["_id"]},
  112. {"$set": bson.M{
  113. "nav_column": reb,
  114. }},
  115. }
  116. //MgoB.Update(GF.MongoB.Coll, where, map[string]interface{}{"$set": update}, true, false)
  117. }
  118. }(tmp)
  119. tmp = map[string]interface{}{}
  120. }
  121. wg.Wait()
  122. log.Info("dealBidding", zap.Int("over ", count))
  123. //没有数据时,发送邮件
  124. if count == 0 {
  125. SendMail("每日数据监控", "查询数据为空,请处理")
  126. }
  127. //处理热门标讯数据/热门项目
  128. getHot()
  129. log.Info("dealBidding", zap.Int("over ", count))
  130. }
  131. // getHot 获取热门数据
  132. func getHot() {
  133. var hotMap = make(map[string]bool)
  134. //获取已有热门数据
  135. hots, _ := MgoB.Find("bidding_hots", nil, nil, map[string]interface{}{"_id": 1}, false, -1, -1)
  136. if len(*hots) > 0 {
  137. for _, v := range *hots {
  138. hotMap[mongodb.BsonIdToSId(v["_id"])] = true
  139. }
  140. }
  141. getRes := make([]string, 0)
  142. end := time.Date(2023, 12, 5, 0, 0, 0, 0, time.Local)
  143. now := time.Now()
  144. var recentDays []string
  145. for i := 0; i < 15; i++ {
  146. day := now.AddDate(0, 0, -i)
  147. // 如果日期在 2023-12-05 及之后,则添加到 recentDays
  148. if !day.Before(end) {
  149. recentDays = append(recentDays, day.Format("2006-01-02"))
  150. }
  151. }
  152. for _, v := range recentDays {
  153. res := gclient.New().GetContent(context.Background(), "http://172.17.145.164:18880/jyartvisit/"+v+".res")
  154. arrs := strings.Split(res, "\n")
  155. getRes = append(getRes, arrs...)
  156. }
  157. for _, v := range getRes {
  158. vs := strings.Split(v, " ")
  159. if len(vs) == 2 {
  160. insert := map[string]interface{}{
  161. "_id": mongodb.StringTOBsonId(vs[0]),
  162. "num": g.NewVar(vs[1]).Int(),
  163. "createtime": time.Now().Unix(),
  164. }
  165. err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hot_data", insert)
  166. if err != nil {
  167. log.Error("getHot", zap.Error(err))
  168. }
  169. if !hotMap[vs[0]] {
  170. // 将字符串转换为 ObjectId
  171. objectID, err := primitive.ObjectIDFromHex(vs[0])
  172. if err != nil {
  173. fmt.Println("Error parsing ObjectId:", err)
  174. continue
  175. }
  176. // 从 ObjectId 中提取时间戳
  177. timestamp := objectID.Timestamp()
  178. //// 计算时间差
  179. //timeDifference := time.Now().Sub(timestamp)
  180. // 判断时间是否在最近一年内
  181. oneYearAgo := time.Now().AddDate(-1, 0, 0)
  182. isWithinOneYear := timestamp.After(oneYearAgo)
  183. //最近一年数据并且 数量大于100
  184. if isWithinOneYear && g.NewVar(vs[1]).Int() > GF.Cron.HotNum {
  185. err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hots", insert)
  186. if err != nil {
  187. log.Error("getHot", zap.Error(err))
  188. }
  189. bidding, _ := MgoB.FindById("bidding", vs[0], nil)
  190. biddingData := *bidding
  191. if biddingData == nil {
  192. continue
  193. }
  194. biddingData["hot_data"] = 1
  195. rea := TagBidding(biddingData)
  196. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  197. log.Info("getHot", zap.Any("reb", reb), zap.String("id", vs[0]))
  198. updatePool <- []map[string]interface{}{
  199. {"_id": insert["_id"]},
  200. {"$set": bson.M{
  201. "nav_column": reb,
  202. }},
  203. }
  204. }
  205. }
  206. }
  207. }
  208. }
  209. // dealProject 处理拟建项目数据标签
  210. func dealProject() {
  211. sess := MgoP.GetMgoConn()
  212. defer MgoP.DestoryMongoConn(sess)
  213. // 指定对应的时间格式
  214. //layout := "2006-01-02 15:04:05"
  215. // 获取当前时间
  216. now := time.Now()
  217. var q interface{}
  218. var startTime = GF.Cron.Start
  219. if startTime != 0 && GF.Cron.End != 0 {
  220. q = map[string]interface{}{
  221. "pici": map[string]interface{}{
  222. "$gt": GF.Cron.Start,
  223. "$lte": GF.Cron.End,
  224. },
  225. }
  226. } else if startTime != 0 {
  227. q = map[string]interface{}{
  228. "pici": map[string]interface{}{
  229. "$gt": startTime,
  230. },
  231. }
  232. } else if startTime == 0 && GF.Cron.End == 0 {
  233. //默认 取大于 昨天的数据
  234. q = map[string]interface{}{
  235. "pici": map[string]interface{}{
  236. "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(),
  237. },
  238. }
  239. }
  240. log.Info("dealProject", zap.Any("q", q))
  241. query := sess.DB(GF.MongoP.DB).C(GF.MongoP.Coll).Find(q).Select(map[string]interface{}{
  242. "ids": 0, "list": 0}).Iter()
  243. count := 0
  244. ch := make(chan bool, 15)
  245. wg := &sync.WaitGroup{}
  246. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  247. if count%1000 == 0 {
  248. log.Info("dealProject", zap.Int("current", count))
  249. }
  250. ch <- true
  251. wg.Add(1)
  252. go func(tmp map[string]interface{}) {
  253. defer func() {
  254. <-ch
  255. wg.Done()
  256. }()
  257. rea := TagProject(tmp)
  258. if len(rea) > 0 {
  259. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  260. update := map[string]interface{}{
  261. "nav_column": reb,
  262. }
  263. where := map[string]interface{}{
  264. "_id": tmp["_id"],
  265. }
  266. MgoP.Update(GF.MongoP.Coll, where, map[string]interface{}{"$set": update}, true, false)
  267. }
  268. }(tmp)
  269. tmp = map[string]interface{}{}
  270. }
  271. wg.Wait()
  272. log.Info("dealProject", zap.Int("over ", count))
  273. //没有数据时,发送邮件
  274. if count == 0 {
  275. SendMail("网站导航 数据标签", "查询 拟建项目数据为空,请查收")
  276. }
  277. }
  278. // updateMethod 更新MongoDB
  279. func updateMethod() {
  280. updateSp := make(chan bool, 8)
  281. arru := make([][]map[string]interface{}, 200)
  282. indexu := 0
  283. for {
  284. select {
  285. case v := <-updatePool:
  286. arru[indexu] = v
  287. indexu++
  288. if indexu == 200 {
  289. updateSp <- true
  290. go func(arru [][]map[string]interface{}) {
  291. defer func() {
  292. <-updateSp
  293. }()
  294. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  295. }(arru)
  296. arru = make([][]map[string]interface{}, 200)
  297. indexu = 0
  298. }
  299. case <-time.After(1000 * time.Millisecond):
  300. if indexu > 0 {
  301. updateSp <- true
  302. go func(arru [][]map[string]interface{}) {
  303. defer func() {
  304. <-updateSp
  305. }()
  306. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  307. }(arru[:indexu])
  308. arru = make([][]map[string]interface{}, 200)
  309. indexu = 0
  310. }
  311. }
  312. }
  313. }