main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/v2/frame/g"
  5. "github.com/gogf/gf/v2/net/gclient"
  6. "github.com/gogf/gf/v2/os/gtime"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.mongodb.org/mongo-driver/mongo/options"
  9. "go.uber.org/zap"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. MgoB *mongodb.MongodbSim
  19. MgoP *mongodb.MongodbSim
  20. columns = make([]map[string]interface{}, 0) //存储配置 栏目
  21. // 更新mongo
  22. updatePool = make(chan []map[string]interface{}, 5000)
  23. )
  24. func main() {
  25. go updateMethod()
  26. getHot()
  27. //local, _ := time.LoadLocation("Asia/Shanghai")
  28. //c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  29. //eid, err := c.AddFunc(GF.Cron.Spec, getHot)
  30. //c.AddFunc(GF.Cron.Spec2, dealData) // 每2小时执行
  31. //
  32. //if err != nil {
  33. // log.Info("main", zap.Any("AddFunc err", err))
  34. //}
  35. //log.Info("main", zap.Any("eid", eid))
  36. //
  37. //c.Start()
  38. //defer c.Stop()
  39. //
  40. select {}
  41. }
  42. func dealData() {
  43. go dealBidding()
  44. go dealProject()
  45. }
  46. // dealBidding 处理标讯数据
  47. func dealBidding() {
  48. sess := MgoB.GetMgoConn()
  49. defer MgoB.DestoryMongoConn(sess)
  50. var q interface{}
  51. var startTime = GF.Cron.Start
  52. now := time.Now()
  53. ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour)
  54. coll := sess.M.C.Database(GF.MongoB.DB).Collection(GF.MongoB.Coll)
  55. find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"comeintime", 1}}).SetProjection(bson.M{"comeintime": 1, "toptype": 1, "subtype": 1, "buyerclass": 1, "title": 1, "detail": 1, "package": 1, "funds": 1, "spidercode": 1, "area": 1, "city": 1, "publishtime": 1})
  56. if startTime != 0 && GF.Cron.End != 0 {
  57. q = map[string]interface{}{
  58. "comeintime": map[string]interface{}{
  59. "$gt": GF.Cron.Start,
  60. "$lte": GF.Cron.End,
  61. },
  62. }
  63. } else if startTime != 0 {
  64. q = map[string]interface{}{
  65. "comeintime": map[string]interface{}{
  66. "$gt": startTime,
  67. },
  68. }
  69. } else if startTime == 0 && GF.Cron.End == 0 {
  70. //默认 取大于 昨天的数据
  71. q = map[string]interface{}{
  72. "comeintime": map[string]interface{}{
  73. "$gt": time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, now.Location()).Unix(),
  74. },
  75. }
  76. }
  77. cur, err := coll.Find(ctx, q, find)
  78. if err != nil {
  79. log.Error("dealBidding,coll.Find ", zap.Error(err))
  80. }
  81. log.Info("dealBidding", zap.Any("q", q))
  82. //query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(q).Select(map[string]interface{}{
  83. // "contenthtml": 0}).Iter()
  84. count := 0
  85. ch := make(chan bool, 15)
  86. wg := &sync.WaitGroup{}
  87. //for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  88. for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
  89. if cur != nil {
  90. cur.Decode(&tmp)
  91. }
  92. startTime = util.IntAll(tmp["comeintime"])
  93. if count%1000 == 0 {
  94. log.Info("dealBidding", zap.Int("current", count), zap.Any("comeintime", tmp["comeintime"]))
  95. }
  96. ch <- true
  97. wg.Add(1)
  98. go func(tmp map[string]interface{}) {
  99. defer func() {
  100. <-ch
  101. wg.Done()
  102. }()
  103. rea := TagBidding(tmp)
  104. if len(rea) > 0 {
  105. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  106. //update := map[string]interface{}{
  107. // "nav_column": reb,
  108. //}
  109. //where := map[string]interface{}{
  110. // "_id": tmp["_id"],
  111. //}
  112. updatePool <- []map[string]interface{}{
  113. {"_id": tmp["_id"]},
  114. {"$set": bson.M{
  115. "nav_column": reb,
  116. }},
  117. }
  118. // 改动栏目的数据,存储临时表;刷存量数据使用
  119. //if util.IntAll(rea["项目分包"]) == 1 || util.IntAll(rea["省级项目"]) == 1 {
  120. // insert := map[string]interface{}{
  121. // "bid_id": mongodb.BsonIdToSId(tmp["_id"]),
  122. // "area": tmp["area"],
  123. // "city": tmp["city"],
  124. // "publishtime": time.Unix(util.Int64All(tmp["publishtime"]), 0).Format("2006-01-02 15:04:05"),
  125. // "nav_column": reb,
  126. // }
  127. // MgoB.InsertOrUpdate(MgoB.DbName, "wcc_column_tmp", insert)
  128. //}
  129. //MgoB.Update(GF.MongoB.Coll, where, map[string]interface{}{"$set": update}, true, false)
  130. }
  131. }(tmp)
  132. tmp = map[string]interface{}{}
  133. }
  134. wg.Wait()
  135. log.Info("dealBidding", zap.Int("over ", count))
  136. //没有数据时,发送邮件
  137. if count == 0 {
  138. SendMail("每日数据监控", "查询数据为空,请处理")
  139. }
  140. //处理热门标讯数据/热门项目
  141. //getHot()
  142. //dealHotDataAll()
  143. log.Info("dealBidding", zap.Int("over ", count))
  144. }
  145. // getHot 获取热门数据
  146. func getHot() {
  147. var existsMap = make(map[string]bool) //bidding_hots 已经存在的ID
  148. //获取已有热门数据
  149. hots, _ := MgoB.Find("bidding_hots", nil, nil, map[string]interface{}{"bidding_id": 1}, false, -1, -1)
  150. if len(*hots) > 0 {
  151. for _, v := range *hots {
  152. existsMap[util.ObjToString(v["bidding_id"])] = true
  153. }
  154. }
  155. var hotMap = make(map[string]int)
  156. //1.获取昨日数据
  157. file := gtime.Now().AddDate(0, 0, -1).Format("Y-m-d")
  158. res := gclient.New().GetContent(context.Background(), "http://172.17.162.26:18880/jyartvisit/"+file+".res")
  159. //http://172.17.162.26:18880/jyartvisit/2024-04-29.res
  160. arrs := strings.Split(res, "\n")
  161. log.Info("getHot", zap.Int("res", len(arrs)), zap.Any("file", file))
  162. for _, va := range arrs {
  163. vs := strings.Split(va, " ")
  164. if len(vs) == 2 {
  165. insert := map[string]interface{}{
  166. "bidding_id": vs[0],
  167. "num": g.NewVar(vs[1]).Int(),
  168. "date": file,
  169. }
  170. err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hot_data", insert)
  171. if err != nil {
  172. log.Error("getHot", zap.Error(err))
  173. }
  174. }
  175. }
  176. //2. 汇总15天内,符合条件热门数据
  177. where := map[string]interface{}{
  178. "date": map[string]interface{}{
  179. "$gte": gtime.Now().AddDate(0, 0, -GF.Cron.HotDay).Format("Y-m-d"),
  180. },
  181. }
  182. hotData, _ := MgoB.Find("bidding_hot_data", where, nil, nil, false, -1, -1)
  183. for _, data := range *hotData {
  184. biddingID := util.ObjToString(data["bidding_id"])
  185. // 将字符串转换为 ObjectId
  186. objectID := mongodb.StringTOBsonId(biddingID)
  187. // 从 ObjectId 中提取时间戳
  188. timestamp := objectID.Timestamp()
  189. // 判断时间是否在最近一年内
  190. oneYearAgo := time.Now().AddDate(-1, 0, 0)
  191. isWithinOneYear := timestamp.After(oneYearAgo)
  192. if !isWithinOneYear {
  193. continue
  194. }
  195. num := util.IntAll(data["num"])
  196. if da, ok := hotMap[biddingID]; ok {
  197. hotMap[biddingID] = num + da
  198. } else {
  199. hotMap[biddingID] = num
  200. }
  201. }
  202. newCount := 0
  203. for k, v := range hotMap {
  204. //之前存在过的数据,不再入 bidding_hots
  205. if existsMap[k] {
  206. continue
  207. }
  208. if v > GF.Cron.HotNum {
  209. bidding, _ := MgoB.FindById("bidding", k, nil)
  210. biddingData := *bidding
  211. if biddingData == nil {
  212. continue
  213. }
  214. toptype := util.ObjToString(biddingData["toptype"])
  215. //拟建和采购意向数据过滤
  216. if toptype == "拟建" || toptype == "采购意向" {
  217. continue
  218. }
  219. newCount++
  220. insert := map[string]interface{}{
  221. "bidding_id": k,
  222. "num": v,
  223. "createtime": time.Now().Unix(),
  224. "date": time.Now().Format("2006-01-02"),
  225. }
  226. err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hots", insert)
  227. if err != nil {
  228. log.Error("getHot", zap.Error(err))
  229. }
  230. biddingData["hot_data"] = 1
  231. rea := TagBidding(biddingData)
  232. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  233. updatePool <- []map[string]interface{}{
  234. {"_id": mongodb.StringTOBsonId(k)},
  235. {"$set": bson.M{
  236. "nav_column": reb,
  237. }},
  238. }
  239. }
  240. }
  241. log.Info("getHot", zap.Int("over", newCount))
  242. }
  243. // dealProject 处理拟建项目数据标签
  244. func dealProject() {
  245. sess := MgoP.GetMgoConn()
  246. defer MgoP.DestoryMongoConn(sess)
  247. // 指定对应的时间格式
  248. //layout := "2006-01-02 15:04:05"
  249. // 获取当前时间
  250. now := time.Now()
  251. var q interface{}
  252. var startTime = GF.Cron.Start
  253. if startTime != 0 && GF.Cron.End != 0 {
  254. q = map[string]interface{}{
  255. "pici": map[string]interface{}{
  256. "$gt": GF.Cron.Start,
  257. "$lte": GF.Cron.End,
  258. },
  259. }
  260. } else if startTime != 0 {
  261. q = map[string]interface{}{
  262. "pici": map[string]interface{}{
  263. "$gt": startTime,
  264. },
  265. }
  266. } else if startTime == 0 && GF.Cron.End == 0 {
  267. //默认 取大于 昨天的数据
  268. q = map[string]interface{}{
  269. "pici": map[string]interface{}{
  270. "$gt": time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, now.Location()).Unix(),
  271. },
  272. }
  273. }
  274. log.Info("dealProject", zap.Any("q", q))
  275. query := sess.DB(GF.MongoP.DB).C(GF.MongoP.Coll).Find(q).Select(nil).Iter()
  276. count := 0
  277. ch := make(chan bool, 15)
  278. wg := &sync.WaitGroup{}
  279. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  280. if count%1000 == 0 {
  281. log.Info("dealProject", zap.Int("current", count))
  282. }
  283. ch <- true
  284. wg.Add(1)
  285. go func(tmp map[string]interface{}) {
  286. defer func() {
  287. <-ch
  288. wg.Done()
  289. }()
  290. rea := TagProject(tmp)
  291. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  292. update := map[string]interface{}{
  293. "nav_column": reb,
  294. }
  295. where := map[string]interface{}{
  296. "_id": tmp["_id"],
  297. }
  298. MgoP.Update(GF.MongoP.Coll, where, map[string]interface{}{"$set": update}, true, false)
  299. }(tmp)
  300. tmp = map[string]interface{}{}
  301. }
  302. wg.Wait()
  303. log.Info("dealProject", zap.Int("over ", count))
  304. //没有数据时,发送邮件
  305. if count == 0 {
  306. SendMail("网站导航 数据标签", "查询 拟建项目数据为空,请查收")
  307. }
  308. }
  309. // updateMethod 更新MongoDB
  310. func updateMethod() {
  311. updateSp := make(chan bool, 8)
  312. arru := make([][]map[string]interface{}, 200)
  313. indexu := 0
  314. for {
  315. select {
  316. case v := <-updatePool:
  317. arru[indexu] = v
  318. indexu++
  319. if indexu == 200 {
  320. updateSp <- true
  321. go func(arru [][]map[string]interface{}) {
  322. defer func() {
  323. <-updateSp
  324. }()
  325. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  326. }(arru)
  327. arru = make([][]map[string]interface{}, 200)
  328. indexu = 0
  329. }
  330. case <-time.After(1000 * time.Millisecond):
  331. if indexu > 0 {
  332. updateSp <- true
  333. go func(arru [][]map[string]interface{}) {
  334. defer func() {
  335. <-updateSp
  336. }()
  337. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  338. }(arru[:indexu])
  339. arru = make([][]map[string]interface{}, 200)
  340. indexu = 0
  341. }
  342. }
  343. }
  344. }
  345. // dealHotDataAll 处理存量热门数据标签
  346. func dealHotDataAll() {
  347. hotData, _ := MgoB.Find("bidding_hots", nil, nil, nil, false, -1, -1)
  348. for _, data := range *hotData {
  349. biddingID := util.ObjToString(data["bidding_id"])
  350. bidding, _ := MgoB.FindById("bidding", biddingID, nil)
  351. biddingData := *bidding
  352. if biddingData == nil {
  353. continue
  354. }
  355. biddingData["hot_data"] = 1
  356. rea := TagBidding(biddingData)
  357. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  358. updatePool <- []map[string]interface{}{
  359. {"_id": mongodb.StringTOBsonId(biddingID)},
  360. {"$set": bson.M{
  361. "nav_column": reb,
  362. }},
  363. }
  364. }
  365. log.Info("dealHotDataAll", zap.Int("over", len(*hotData)))
  366. }