main.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/v2/frame/g"
  5. "github.com/gogf/gf/v2/net/gclient"
  6. "github.com/gogf/gf/v2/os/gtime"
  7. "github.com/robfig/cron/v3"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.mongodb.org/mongo-driver/mongo/options"
  10. "go.uber.org/zap"
  11. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. MgoB *mongodb.MongodbSim
  20. MgoP *mongodb.MongodbSim
  21. columns = make([]map[string]interface{}, 0) //存储配置 栏目
  22. // 更新mongo
  23. updatePool = make(chan []map[string]interface{}, 5000)
  24. )
  25. func main() {
  26. go updateMethod()
  27. local, _ := time.LoadLocation("Asia/Shanghai")
  28. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  29. eid, err := c.AddFunc(GF.Cron.Spec, dealData)
  30. if err != nil {
  31. log.Info("main", zap.Any("AddFunc err", err))
  32. }
  33. log.Info("main", zap.Any("eid", eid))
  34. c.Start()
  35. defer c.Stop()
  36. //
  37. select {}
  38. }
  39. func dealData() {
  40. go dealBidding()
  41. go dealProject()
  42. }
  43. // dealBidding 处理标讯数据
  44. func dealBidding() {
  45. sess := MgoB.GetMgoConn()
  46. defer MgoB.DestoryMongoConn(sess)
  47. var q interface{}
  48. var startTime = GF.Cron.Start
  49. now := time.Now()
  50. ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour)
  51. coll := sess.M.C.Database(GF.MongoB.DB).Collection(GF.MongoB.Coll)
  52. find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"comeintime", 1}}).SetProjection(bson.M{"comeintime": 1, "toptype": 1, "subtype": 1, "buyerclass": 1, "title": 1, "detail": 1, "package": 1, "funds": 1, "spidercode": 1})
  53. if startTime != 0 && GF.Cron.End != 0 {
  54. q = map[string]interface{}{
  55. "comeintime": map[string]interface{}{
  56. "$gt": GF.Cron.Start,
  57. "$lte": GF.Cron.End,
  58. },
  59. }
  60. } else if startTime != 0 {
  61. q = map[string]interface{}{
  62. "comeintime": map[string]interface{}{
  63. "$gt": startTime,
  64. },
  65. }
  66. } else if startTime == 0 && GF.Cron.End == 0 {
  67. //默认 取大于 昨天的数据
  68. q = map[string]interface{}{
  69. "comeintime": map[string]interface{}{
  70. "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(),
  71. },
  72. }
  73. }
  74. cur, err := coll.Find(ctx, q, find)
  75. if err != nil {
  76. log.Error("dealBidding,coll.Find ", zap.Error(err))
  77. }
  78. log.Info("dealBidding", zap.Any("q", q))
  79. //query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(q).Select(map[string]interface{}{
  80. // "contenthtml": 0}).Iter()
  81. count := 0
  82. ch := make(chan bool, 15)
  83. wg := &sync.WaitGroup{}
  84. //for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  85. for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
  86. if cur != nil {
  87. cur.Decode(&tmp)
  88. }
  89. startTime = util.IntAll(tmp["comeintime"])
  90. if count%1000 == 0 {
  91. log.Info("dealBidding", zap.Int("current", count), zap.Any("comeintime", tmp["comeintime"]))
  92. }
  93. ch <- true
  94. wg.Add(1)
  95. go func(tmp map[string]interface{}) {
  96. defer func() {
  97. <-ch
  98. wg.Done()
  99. }()
  100. rea := TagBidding(tmp)
  101. if len(rea) > 0 {
  102. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  103. //update := map[string]interface{}{
  104. // "nav_column": reb,
  105. //}
  106. //where := map[string]interface{}{
  107. // "_id": tmp["_id"],
  108. //}
  109. updatePool <- []map[string]interface{}{
  110. {"_id": tmp["_id"]},
  111. {"$set": bson.M{
  112. "nav_column": reb,
  113. }},
  114. }
  115. //MgoB.Update(GF.MongoB.Coll, where, map[string]interface{}{"$set": update}, true, false)
  116. }
  117. }(tmp)
  118. tmp = map[string]interface{}{}
  119. }
  120. wg.Wait()
  121. log.Info("dealBidding", zap.Int("over ", count))
  122. //没有数据时,发送邮件
  123. if count == 0 {
  124. SendMail("每日数据监控", "查询数据为空,请处理")
  125. }
  126. //处理热门标讯数据/热门项目
  127. getHot()
  128. log.Info("dealBidding", zap.Int("over ", count))
  129. }
  130. // getHot 获取热门数据
  131. func getHot() {
  132. var existsMap = make(map[string]bool) //bidding_hots 已经存在的ID
  133. //获取已有热门数据
  134. hots, _ := MgoB.Find("bidding_hots", nil, nil, map[string]interface{}{"bidding_id": 1}, false, -1, -1)
  135. if len(*hots) > 0 {
  136. for _, v := range *hots {
  137. existsMap[util.ObjToString(v["bidding_id"])] = true
  138. }
  139. }
  140. var hotMap = make(map[string]int)
  141. //1.获取昨日数据
  142. file := gtime.Now().AddDate(0, 0, -1).Format("Y-m-d")
  143. res := gclient.New().GetContent(context.Background(), "http://172.17.145.164:18880/jyartvisit/"+file+".res")
  144. arrs := strings.Split(res, "\n")
  145. log.Info("getHot", zap.Int("res", len(arrs)))
  146. for _, va := range arrs {
  147. vs := strings.Split(va, " ")
  148. if len(vs) == 2 {
  149. insert := map[string]interface{}{
  150. "bidding_id": vs[0],
  151. "num": g.NewVar(vs[1]).Int(),
  152. "date": file,
  153. }
  154. err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hot_data", insert)
  155. if err != nil {
  156. log.Error("getHot", zap.Error(err))
  157. }
  158. }
  159. }
  160. //2. 汇总15天内,符合条件热门数据
  161. where := map[string]interface{}{
  162. "date": map[string]interface{}{
  163. "$gte": gtime.Now().AddDate(0, 0, -GF.Cron.HotDay).Format("Y-m-d"),
  164. },
  165. }
  166. hotData, _ := MgoB.Find("bidding_hot_data", where, nil, nil, false, -1, -1)
  167. for _, data := range *hotData {
  168. biddingID := util.ObjToString(data["bidding_id"])
  169. // 将字符串转换为 ObjectId
  170. objectID := mongodb.StringTOBsonId(biddingID)
  171. // 从 ObjectId 中提取时间戳
  172. timestamp := objectID.Timestamp()
  173. // 判断时间是否在最近一年内
  174. oneYearAgo := time.Now().AddDate(-1, 0, 0)
  175. isWithinOneYear := timestamp.After(oneYearAgo)
  176. if !isWithinOneYear {
  177. continue
  178. }
  179. num := util.IntAll(data["num"])
  180. if da, ok := hotMap[biddingID]; ok {
  181. hotMap[biddingID] = num + da
  182. } else {
  183. hotMap[biddingID] = num
  184. }
  185. }
  186. newCount := 0
  187. for k, v := range hotMap {
  188. //之前存在过的数据,不再入 bidding_hots
  189. if existsMap[k] {
  190. continue
  191. }
  192. if v > GF.Cron.HotNum {
  193. bidding, _ := MgoB.FindById("bidding", k, nil)
  194. biddingData := *bidding
  195. if biddingData == nil {
  196. continue
  197. }
  198. newCount++
  199. insert := map[string]interface{}{
  200. "bidding_id": k,
  201. "num": v,
  202. "createtime": time.Now().Unix(),
  203. "date": time.Now().Format("2006-01-02"),
  204. }
  205. err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hots", insert)
  206. if err != nil {
  207. log.Error("getHot", zap.Error(err))
  208. }
  209. biddingData["hot_data"] = 1
  210. rea := TagBidding(biddingData)
  211. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  212. updatePool <- []map[string]interface{}{
  213. {"_id": mongodb.StringTOBsonId(k)},
  214. {"$set": bson.M{
  215. "nav_column": reb,
  216. }},
  217. }
  218. }
  219. }
  220. log.Info("getHot", zap.Int("over", newCount))
  221. }
  222. // dealProject 处理拟建项目数据标签
  223. func dealProject() {
  224. sess := MgoP.GetMgoConn()
  225. defer MgoP.DestoryMongoConn(sess)
  226. // 指定对应的时间格式
  227. //layout := "2006-01-02 15:04:05"
  228. // 获取当前时间
  229. now := time.Now()
  230. var q interface{}
  231. var startTime = GF.Cron.Start
  232. if startTime != 0 && GF.Cron.End != 0 {
  233. q = map[string]interface{}{
  234. "pici": map[string]interface{}{
  235. "$gt": GF.Cron.Start,
  236. "$lte": GF.Cron.End,
  237. },
  238. }
  239. } else if startTime != 0 {
  240. q = map[string]interface{}{
  241. "pici": map[string]interface{}{
  242. "$gt": startTime,
  243. },
  244. }
  245. } else if startTime == 0 && GF.Cron.End == 0 {
  246. //默认 取大于 昨天的数据
  247. q = map[string]interface{}{
  248. "pici": map[string]interface{}{
  249. "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(),
  250. },
  251. }
  252. }
  253. log.Info("dealProject", zap.Any("q", q))
  254. query := sess.DB(GF.MongoP.DB).C(GF.MongoP.Coll).Find(q).Select(map[string]interface{}{
  255. "ids": 0, "list": 0}).Iter()
  256. count := 0
  257. ch := make(chan bool, 15)
  258. wg := &sync.WaitGroup{}
  259. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  260. if count%1000 == 0 {
  261. log.Info("dealProject", zap.Int("current", count))
  262. }
  263. ch <- true
  264. wg.Add(1)
  265. go func(tmp map[string]interface{}) {
  266. defer func() {
  267. <-ch
  268. wg.Done()
  269. }()
  270. rea := TagProject(tmp)
  271. if len(rea) > 0 {
  272. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  273. update := map[string]interface{}{
  274. "nav_column": reb,
  275. }
  276. where := map[string]interface{}{
  277. "_id": tmp["_id"],
  278. }
  279. MgoP.Update(GF.MongoP.Coll, where, map[string]interface{}{"$set": update}, true, false)
  280. }
  281. }(tmp)
  282. tmp = map[string]interface{}{}
  283. }
  284. wg.Wait()
  285. log.Info("dealProject", zap.Int("over ", count))
  286. //没有数据时,发送邮件
  287. if count == 0 {
  288. SendMail("网站导航 数据标签", "查询 拟建项目数据为空,请查收")
  289. }
  290. }
  291. // updateMethod 更新MongoDB
  292. func updateMethod() {
  293. updateSp := make(chan bool, 8)
  294. arru := make([][]map[string]interface{}, 200)
  295. indexu := 0
  296. for {
  297. select {
  298. case v := <-updatePool:
  299. arru[indexu] = v
  300. indexu++
  301. if indexu == 200 {
  302. updateSp <- true
  303. go func(arru [][]map[string]interface{}) {
  304. defer func() {
  305. <-updateSp
  306. }()
  307. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  308. }(arru)
  309. arru = make([][]map[string]interface{}, 200)
  310. indexu = 0
  311. }
  312. case <-time.After(1000 * time.Millisecond):
  313. if indexu > 0 {
  314. updateSp <- true
  315. go func(arru [][]map[string]interface{}) {
  316. defer func() {
  317. <-updateSp
  318. }()
  319. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  320. }(arru[:indexu])
  321. arru = make([][]map[string]interface{}, 200)
  322. indexu = 0
  323. }
  324. }
  325. }
  326. }