main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/v2/frame/g"
  5. "github.com/gogf/gf/v2/net/gclient"
  6. "github.com/gogf/gf/v2/os/gtime"
  7. "github.com/robfig/cron/v3"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.mongodb.org/mongo-driver/mongo/options"
  10. "go.uber.org/zap"
  11. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. MgoB *mongodb.MongodbSim
  20. MgoP *mongodb.MongodbSim
  21. columns = make([]map[string]interface{}, 0) //存储配置 栏目
  22. // 更新mongo
  23. updatePool = make(chan []map[string]interface{}, 5000)
  24. )
  25. func main() {
  26. go updateMethod()
  27. local, _ := time.LoadLocation("Asia/Shanghai")
  28. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  29. eid, err := c.AddFunc(GF.Cron.Spec, getHot)
  30. c.AddFunc(GF.Cron.Spec2, dealData) // 每2小时执行
  31. if err != nil {
  32. log.Info("main", zap.Any("AddFunc err", err))
  33. }
  34. log.Info("main", zap.Any("eid", eid))
  35. c.Start()
  36. defer c.Stop()
  37. //
  38. select {}
  39. }
  40. func dealData() {
  41. go dealBidding()
  42. go dealProject()
  43. }
  44. // dealBidding 处理标讯数据
  45. func dealBidding() {
  46. sess := MgoB.GetMgoConn()
  47. defer MgoB.DestoryMongoConn(sess)
  48. var q interface{}
  49. var startTime = GF.Cron.Start
  50. now := time.Now()
  51. ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour)
  52. coll := sess.M.C.Database(GF.MongoB.DB).Collection(GF.MongoB.Coll)
  53. find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"comeintime", 1}}).SetProjection(bson.M{"comeintime": 1, "toptype": 1, "subtype": 1, "buyerclass": 1, "title": 1, "detail": 1, "package": 1, "funds": 1, "spidercode": 1, "area": 1, "city": 1, "publishtime": 1})
  54. if startTime != 0 && GF.Cron.End != 0 {
  55. q = map[string]interface{}{
  56. "comeintime": map[string]interface{}{
  57. "$gt": GF.Cron.Start,
  58. "$lte": GF.Cron.End,
  59. },
  60. }
  61. } else if startTime != 0 {
  62. q = map[string]interface{}{
  63. "comeintime": map[string]interface{}{
  64. "$gt": startTime,
  65. },
  66. }
  67. } else if startTime == 0 && GF.Cron.End == 0 {
  68. //默认 取大于 昨天的数据
  69. q = map[string]interface{}{
  70. "comeintime": map[string]interface{}{
  71. "$gt": time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-2, 0, 0, 0, now.Location()).Unix(),
  72. },
  73. }
  74. }
  75. cur, err := coll.Find(ctx, q, find)
  76. if err != nil {
  77. log.Error("dealBidding,coll.Find ", zap.Error(err))
  78. }
  79. log.Info("dealBidding", zap.Any("q", q))
  80. //query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(q).Select(map[string]interface{}{
  81. // "contenthtml": 0}).Iter()
  82. count := 0
  83. ch := make(chan bool, 15)
  84. wg := &sync.WaitGroup{}
  85. //for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  86. for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
  87. if cur != nil {
  88. cur.Decode(&tmp)
  89. }
  90. startTime = util.IntAll(tmp["comeintime"])
  91. if count%1000 == 0 {
  92. log.Info("dealBidding", zap.Int("current", count), zap.Any("comeintime", tmp["comeintime"]))
  93. }
  94. ch <- true
  95. wg.Add(1)
  96. go func(tmp map[string]interface{}) {
  97. defer func() {
  98. <-ch
  99. wg.Done()
  100. }()
  101. rea := TagBidding(tmp)
  102. if len(rea) > 0 {
  103. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  104. //update := map[string]interface{}{
  105. // "nav_column": reb,
  106. //}
  107. //where := map[string]interface{}{
  108. // "_id": tmp["_id"],
  109. //}
  110. updatePool <- []map[string]interface{}{
  111. {"_id": tmp["_id"]},
  112. {"$set": bson.M{
  113. "nav_column": reb,
  114. }},
  115. }
  116. // 改动栏目的数据,存储临时表;刷存量数据使用
  117. //if util.IntAll(rea["项目分包"]) == 1 || util.IntAll(rea["省级项目"]) == 1 {
  118. // insert := map[string]interface{}{
  119. // "bid_id": mongodb.BsonIdToSId(tmp["_id"]),
  120. // "area": tmp["area"],
  121. // "city": tmp["city"],
  122. // "publishtime": time.Unix(util.Int64All(tmp["publishtime"]), 0).Format("2006-01-02 15:04:05"),
  123. // "nav_column": reb,
  124. // }
  125. // MgoB.InsertOrUpdate(MgoB.DbName, "wcc_column_tmp", insert)
  126. //}
  127. //MgoB.Update(GF.MongoB.Coll, where, map[string]interface{}{"$set": update}, true, false)
  128. }
  129. }(tmp)
  130. tmp = map[string]interface{}{}
  131. }
  132. wg.Wait()
  133. log.Info("dealBidding", zap.Int("over ", count))
  134. //没有数据时,发送邮件
  135. if count == 0 {
  136. SendMail("每日数据监控", "查询数据为空,请处理")
  137. }
  138. //处理热门标讯数据/热门项目
  139. //getHot()
  140. //dealHotDataAll()
  141. log.Info("dealBidding", zap.Int("over ", count))
  142. }
  143. // getHot 获取热门数据
  144. func getHot() {
  145. var existsMap = make(map[string]bool) //bidding_hots 已经存在的ID
  146. //获取已有热门数据
  147. hots, _ := MgoB.Find("bidding_hots", nil, nil, map[string]interface{}{"bidding_id": 1}, false, -1, -1)
  148. if len(*hots) > 0 {
  149. for _, v := range *hots {
  150. existsMap[util.ObjToString(v["bidding_id"])] = true
  151. }
  152. }
  153. var hotMap = make(map[string]int)
  154. //1.获取昨日数据
  155. file := gtime.Now().AddDate(0, 0, -1).Format("Y-m-d")
  156. res := gclient.New().GetContent(context.Background(), "http://172.17.145.164:18880/jyartvisit/"+file+".res")
  157. arrs := strings.Split(res, "\n")
  158. log.Info("getHot", zap.Int("res", len(arrs)))
  159. for _, va := range arrs {
  160. vs := strings.Split(va, " ")
  161. if len(vs) == 2 {
  162. insert := map[string]interface{}{
  163. "bidding_id": vs[0],
  164. "num": g.NewVar(vs[1]).Int(),
  165. "date": file,
  166. }
  167. err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hot_data", insert)
  168. if err != nil {
  169. log.Error("getHot", zap.Error(err))
  170. }
  171. }
  172. }
  173. //2. 汇总15天内,符合条件热门数据
  174. where := map[string]interface{}{
  175. "date": map[string]interface{}{
  176. "$gte": gtime.Now().AddDate(0, 0, -GF.Cron.HotDay).Format("Y-m-d"),
  177. },
  178. }
  179. hotData, _ := MgoB.Find("bidding_hot_data", where, nil, nil, false, -1, -1)
  180. for _, data := range *hotData {
  181. biddingID := util.ObjToString(data["bidding_id"])
  182. // 将字符串转换为 ObjectId
  183. objectID := mongodb.StringTOBsonId(biddingID)
  184. // 从 ObjectId 中提取时间戳
  185. timestamp := objectID.Timestamp()
  186. // 判断时间是否在最近一年内
  187. oneYearAgo := time.Now().AddDate(-1, 0, 0)
  188. isWithinOneYear := timestamp.After(oneYearAgo)
  189. if !isWithinOneYear {
  190. continue
  191. }
  192. num := util.IntAll(data["num"])
  193. if da, ok := hotMap[biddingID]; ok {
  194. hotMap[biddingID] = num + da
  195. } else {
  196. hotMap[biddingID] = num
  197. }
  198. }
  199. newCount := 0
  200. for k, v := range hotMap {
  201. //之前存在过的数据,不再入 bidding_hots
  202. if existsMap[k] {
  203. continue
  204. }
  205. if v > GF.Cron.HotNum {
  206. bidding, _ := MgoB.FindById("bidding", k, nil)
  207. biddingData := *bidding
  208. if biddingData == nil {
  209. continue
  210. }
  211. toptype := util.ObjToString(biddingData["toptype"])
  212. //拟建和采购意向数据过滤
  213. if toptype == "拟建" || toptype == "采购意向" {
  214. continue
  215. }
  216. newCount++
  217. insert := map[string]interface{}{
  218. "bidding_id": k,
  219. "num": v,
  220. "createtime": time.Now().Unix(),
  221. "date": time.Now().Format("2006-01-02"),
  222. }
  223. err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hots", insert)
  224. if err != nil {
  225. log.Error("getHot", zap.Error(err))
  226. }
  227. biddingData["hot_data"] = 1
  228. rea := TagBidding(biddingData)
  229. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  230. updatePool <- []map[string]interface{}{
  231. {"_id": mongodb.StringTOBsonId(k)},
  232. {"$set": bson.M{
  233. "nav_column": reb,
  234. }},
  235. }
  236. }
  237. }
  238. log.Info("getHot", zap.Int("over", newCount))
  239. }
  240. // dealProject 处理拟建项目数据标签
  241. func dealProject() {
  242. sess := MgoP.GetMgoConn()
  243. defer MgoP.DestoryMongoConn(sess)
  244. // 指定对应的时间格式
  245. //layout := "2006-01-02 15:04:05"
  246. // 获取当前时间
  247. now := time.Now()
  248. var q interface{}
  249. var startTime = GF.Cron.Start
  250. if startTime != 0 && GF.Cron.End != 0 {
  251. q = map[string]interface{}{
  252. "pici": map[string]interface{}{
  253. "$gt": GF.Cron.Start,
  254. "$lte": GF.Cron.End,
  255. },
  256. }
  257. } else if startTime != 0 {
  258. q = map[string]interface{}{
  259. "pici": map[string]interface{}{
  260. "$gt": startTime,
  261. },
  262. }
  263. } else if startTime == 0 && GF.Cron.End == 0 {
  264. //默认 取大于 昨天的数据
  265. q = map[string]interface{}{
  266. "pici": map[string]interface{}{
  267. "$gt": time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-2, 0, 0, 0, now.Location()).Unix(),
  268. },
  269. }
  270. }
  271. log.Info("dealProject", zap.Any("q", q))
  272. query := sess.DB(GF.MongoP.DB).C(GF.MongoP.Coll).Find(q).Select(nil).Iter()
  273. count := 0
  274. ch := make(chan bool, 15)
  275. wg := &sync.WaitGroup{}
  276. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  277. if count%1000 == 0 {
  278. log.Info("dealProject", zap.Int("current", count))
  279. }
  280. ch <- true
  281. wg.Add(1)
  282. go func(tmp map[string]interface{}) {
  283. defer func() {
  284. <-ch
  285. wg.Done()
  286. }()
  287. rea := TagProject(tmp)
  288. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  289. update := map[string]interface{}{
  290. "nav_column": reb,
  291. }
  292. where := map[string]interface{}{
  293. "_id": tmp["_id"],
  294. }
  295. MgoP.Update(GF.MongoP.Coll, where, map[string]interface{}{"$set": update}, true, false)
  296. }(tmp)
  297. tmp = map[string]interface{}{}
  298. }
  299. wg.Wait()
  300. log.Info("dealProject", zap.Int("over ", count))
  301. //没有数据时,发送邮件
  302. if count == 0 {
  303. SendMail("网站导航 数据标签", "查询 拟建项目数据为空,请查收")
  304. }
  305. }
  306. // updateMethod 更新MongoDB
  307. func updateMethod() {
  308. updateSp := make(chan bool, 8)
  309. arru := make([][]map[string]interface{}, 200)
  310. indexu := 0
  311. for {
  312. select {
  313. case v := <-updatePool:
  314. arru[indexu] = v
  315. indexu++
  316. if indexu == 200 {
  317. updateSp <- true
  318. go func(arru [][]map[string]interface{}) {
  319. defer func() {
  320. <-updateSp
  321. }()
  322. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  323. }(arru)
  324. arru = make([][]map[string]interface{}, 200)
  325. indexu = 0
  326. }
  327. case <-time.After(1000 * time.Millisecond):
  328. if indexu > 0 {
  329. updateSp <- true
  330. go func(arru [][]map[string]interface{}) {
  331. defer func() {
  332. <-updateSp
  333. }()
  334. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  335. }(arru[:indexu])
  336. arru = make([][]map[string]interface{}, 200)
  337. indexu = 0
  338. }
  339. }
  340. }
  341. }
  342. // dealHotDataAll 处理存量热门数据标签
  343. func dealHotDataAll() {
  344. hotData, _ := MgoB.Find("bidding_hots", nil, nil, nil, false, -1, -1)
  345. for _, data := range *hotData {
  346. biddingID := util.ObjToString(data["bidding_id"])
  347. bidding, _ := MgoB.FindById("bidding", biddingID, nil)
  348. biddingData := *bidding
  349. if biddingData == nil {
  350. continue
  351. }
  352. biddingData["hot_data"] = 1
  353. rea := TagBidding(biddingData)
  354. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  355. updatePool <- []map[string]interface{}{
  356. {"_id": mongodb.StringTOBsonId(biddingID)},
  357. {"$set": bson.M{
  358. "nav_column": reb,
  359. }},
  360. }
  361. }
  362. log.Info("dealHotDataAll", zap.Int("over", len(*hotData)))
  363. }