summary.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. package timetask
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/donnie4w/go-logger/logger"
  6. "net/http"
  7. qu "qfw/util"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "util"
  12. )
  13. var (
  14. CodePlatformMap map[string]string
  15. LuaListDownloadAllNum int64
  16. LuaListDownloadSuccessAllNum int64
  17. LuaBiddingDownloadAllNum int64
  18. PythonListDownloadAllNum int64
  19. PythonListDownloadSuccessAllNum int64
  20. PythonBiddingDownloadAllNum int64
  21. Publishtime string
  22. )
  23. var LuaPythonNumModel = `{
  24. "msgtype": "text",
  25. "text": {
  26. "content": "%s"
  27. }
  28. }`
  29. var MarkdownModel = `{
  30. "msgtype": "markdown",
  31. "markdown": {
  32. "content": "%s"
  33. }
  34. }`
  35. var NumContentModel = `
  36. >平台:<font color=\"warning\">%s</font>
  37. >列表页采集量:<font color=\"warning\">%d</font>
  38. >列表页采集成功量:<font color=\"warning\">%d</font>\n
  39. >Bidding成功量:<font color=\"warning\">%d</font>\n
  40. `
  41. //每日采集量统计
  42. func CountLuaPythonNumEveryDay() {
  43. //lua python每日采集量统计
  44. CodePlatformMap = map[string]string{}
  45. startTime := util.GetTime(-1)
  46. Publishtime = qu.FormatDateByInt64(&startTime, qu.Date_Short_Layout)
  47. //重置
  48. LuaListDownloadAllNum = 0
  49. LuaListDownloadSuccessAllNum = 0
  50. LuaBiddingDownloadAllNum = 0
  51. PythonListDownloadAllNum = 0
  52. PythonListDownloadSuccessAllNum = 0
  53. PythonBiddingDownloadAllNum = 0
  54. GetCodePlatform() //爬虫所有平台
  55. GetBiddingCount() //统计bidding表爬虫采集量
  56. GetPythonListDownloadNum()
  57. GetLuaListDownloadNum()
  58. SendLuaPythonAllNum()
  59. }
  60. func GetCodePlatform() {
  61. defer qu.Catch()
  62. sess := util.MgoEB.GetMgoConn()
  63. defer util.MgoEB.DestoryMongoConn(sess)
  64. lock := &sync.Mutex{}
  65. wg := &sync.WaitGroup{}
  66. ch := make(chan bool, 5)
  67. query := map[string]interface{}{}
  68. fields := map[string]interface{}{
  69. "platform": 1,
  70. "code": 1,
  71. }
  72. it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter()
  73. n := 0
  74. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  75. wg.Add(1)
  76. ch <- true
  77. go func(tmp map[string]interface{}) {
  78. defer func() {
  79. <-ch
  80. wg.Done()
  81. }()
  82. platform := qu.ObjToString(tmp["platform"])
  83. code := qu.ObjToString(tmp["code"])
  84. lock.Lock()
  85. CodePlatformMap[code] = platform
  86. lock.Unlock()
  87. }(tmp)
  88. if n%1000 == 0 {
  89. logger.Debug(n)
  90. }
  91. tmp = map[string]interface{}{}
  92. }
  93. wg.Wait()
  94. logger.Debug("爬虫所属平台信息准备完成...", len(CodePlatformMap))
  95. }
  96. func GetBiddingCount() {
  97. defer qu.Catch()
  98. sess := util.MgoB.GetMgoConn()
  99. defer util.MgoB.DestoryMongoConn(sess)
  100. //lock := &sync.Mutex{}
  101. wg := &sync.WaitGroup{}
  102. ch := make(chan bool, 5)
  103. query := map[string]interface{}{
  104. "comeintime": map[string]interface{}{
  105. "$gte": util.GetTime(-1),
  106. "$lt": util.GetTime(0),
  107. },
  108. }
  109. fieles := map[string]interface{}{
  110. "spidercode": 1,
  111. }
  112. count := util.MgoB.Count("bidding", query)
  113. logger.Debug("bidding采集数据量:", count)
  114. it := sess.DB(util.MgoB.DbName).C("bidding").Find(&query).Select(&fieles).Iter()
  115. n := 0
  116. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  117. wg.Add(1)
  118. ch <- true
  119. go func(tmp map[string]interface{}) {
  120. defer func() {
  121. <-ch
  122. wg.Done()
  123. }()
  124. code := qu.ObjToString(tmp["spidercode"])
  125. platform := CodePlatformMap[code]
  126. if platform == "golua平台" || platform == "chrome" {
  127. atomic.AddInt64(&LuaBiddingDownloadAllNum, 1)
  128. } else if platform == "python" {
  129. atomic.AddInt64(&PythonBiddingDownloadAllNum, 1)
  130. } else {
  131. atomic.AddInt64(&PythonBiddingDownloadAllNum, 1)
  132. qu.Debug(code)
  133. }
  134. }(tmp)
  135. if n%10000 == 0 {
  136. logger.Debug(n)
  137. }
  138. tmp = map[string]interface{}{}
  139. }
  140. wg.Wait()
  141. logger.Debug("Bidding数据量统计完成...", LuaBiddingDownloadAllNum, PythonBiddingDownloadAllNum)
  142. }
  143. //python统计列表页采集量
  144. func GetPythonListDownloadNum() {
  145. defer qu.Catch()
  146. logger.Debug("python列表页数据下载量统计开始...")
  147. sess := util.MgoPy.GetMgoConn()
  148. defer util.MgoPy.DestoryMongoConn(sess)
  149. query := map[string]interface{}{
  150. "runtime": Publishtime,
  151. "rel_count": map[string]interface{}{
  152. "$gt": 0,
  153. },
  154. }
  155. fields := map[string]interface{}{
  156. "rel_count": 1,
  157. }
  158. wg := &sync.WaitGroup{}
  159. ch := make(chan bool, 5)
  160. it := sess.DB(util.MgoPy.DbName).C("list").Find(&query).Select(&fields).Iter()
  161. n := 0
  162. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  163. wg.Add(1)
  164. ch <- true
  165. go func(tmp map[string]interface{}) {
  166. defer func() {
  167. <-ch
  168. wg.Done()
  169. }()
  170. count := qu.IntAll(tmp["rel_count"])
  171. atomic.AddInt64(&PythonListDownloadAllNum, int64(count))
  172. }(tmp)
  173. if n%1000 == 0 {
  174. logger.Debug(n)
  175. }
  176. tmp = map[string]interface{}{}
  177. }
  178. wg.Wait()
  179. queryAll := map[string]interface{}{
  180. "comeintime": map[string]interface{}{
  181. "$gte": util.GetTime(-1),
  182. "$lt": util.GetTime(0),
  183. },
  184. }
  185. count := util.MgoPy.Count("data_bak", queryAll)
  186. PythonListDownloadSuccessAllNum = int64(count)
  187. qu.Debug("python列表页采集量:", PythonListDownloadAllNum, "采集成功量:", PythonListDownloadSuccessAllNum)
  188. }
  189. //lua统计列表页采集量
  190. func GetLuaListDownloadNum() {
  191. queryAll := map[string]interface{}{
  192. "comeintime": map[string]interface{}{
  193. "$gte": util.GetTime(-1),
  194. "$lt": util.GetTime(0),
  195. },
  196. }
  197. querySuccess := map[string]interface{}{
  198. "comeintime": map[string]interface{}{
  199. "$gte": util.GetTime(-1),
  200. "$lt": util.GetTime(0),
  201. },
  202. "state": 1,
  203. }
  204. //spider_highlistdata
  205. allNum1 := util.MgoS.Count("spider_highlistdata", queryAll)
  206. successNum1 := util.MgoS.Count("spider_highlistdata", querySuccess)
  207. qu.Debug("spider_highlistdata", allNum1, successNum1)
  208. //spider_listdata
  209. allNum2 := util.MgoS.Count("spider_listdata", queryAll)
  210. successNum2 := util.MgoS.Count("spider_listdata", querySuccess)
  211. qu.Debug("spider_listdata", allNum2, successNum2)
  212. //spider_historydata
  213. allNum3 := util.MgoS.Count("spider_historydata", queryAll)
  214. successNum3 := util.MgoS.Count("spider_historydata", querySuccess)
  215. qu.Debug("spider_historydata", allNum3, successNum3)
  216. //spider_historydata_back
  217. allNum4 := util.MgoS.Count("spider_historydata_back", queryAll)
  218. successNum4 := util.MgoS.Count("spider_historydata_back", querySuccess)
  219. qu.Debug("spider_historydata_back", allNum4, successNum4)
  220. LuaListDownloadAllNum = int64(allNum1) + int64(allNum2) + int64(allNum3) + int64(allNum4)
  221. LuaListDownloadSuccessAllNum = int64(successNum1) + int64(successNum2) + int64(successNum3) + int64(successNum4)
  222. qu.Debug("lua列表页采集量:", LuaListDownloadAllNum, "采集成功量:", LuaListDownloadSuccessAllNum)
  223. }
  224. func SendLuaPythonAllNum() {
  225. defer qu.Catch()
  226. luaContent := fmt.Sprintf(NumContentModel, "Lua", LuaListDownloadAllNum, LuaListDownloadSuccessAllNum, LuaBiddingDownloadAllNum)
  227. pythonContent := fmt.Sprintf(NumContentModel, "python", PythonListDownloadAllNum, PythonListDownloadSuccessAllNum, PythonBiddingDownloadAllNum)
  228. resultContent := fmt.Sprintf(MarkdownModel, Publishtime+",Lua、Python各维度采集量统计结果如下:\n"+luaContent+pythonContent)
  229. qu.Debug(resultContent)
  230. //保存记录
  231. util.MgoS.Save("spider_luapythoncount", map[string]interface{}{
  232. "lualistnum": LuaListDownloadAllNum,
  233. "lualistsuccessnum": LuaListDownloadSuccessAllNum,
  234. "luabiddingnum": LuaBiddingDownloadAllNum,
  235. "pythonlistnum": PythonListDownloadAllNum,
  236. "pythonlistsuccessnum": PythonListDownloadSuccessAllNum,
  237. "pythonbiddingnum": PythonBiddingDownloadAllNum,
  238. "comeintime": time.Now().Unix(),
  239. "date": Publishtime,
  240. })
  241. //发送统计
  242. resp, err := http.Post(
  243. "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=97850772-88d0-4544-a2c3-6201aeddff9e",
  244. "application/json",
  245. bytes.NewBuffer([]byte(resultContent)),
  246. )
  247. if err != nil {
  248. fmt.Println("request error:", err)
  249. return
  250. }
  251. defer resp.Body.Close()
  252. }
  253. func SummaryCode() {
  254. defer qu.Catch()
  255. qu.Debug("上架爬虫信息汇总开始...")
  256. qu.Debug("开始统计spider_highlisthdata信息...")
  257. //统计spider_highlisthdata信息
  258. codeHlistDnumMap := map[string]int{} //记录爬虫昨天下载量
  259. codeErrDnumMap := map[string]int{} //记录爬虫昨天下载失败量
  260. sm_ch1 := make(chan bool, 5)
  261. sm_wg1 := &sync.WaitGroup{}
  262. sm_lock1 := &sync.Mutex{}
  263. sm_stime, sm_etime := util.GetTime(-1), util.GetTime(0)
  264. sess_s := util.MgoS.GetMgoConn()
  265. defer util.MgoS.DestoryMongoConn(sess_s)
  266. timestr := qu.FormatDateByInt64(&sm_stime, qu.Date_Short_Layout)
  267. query := map[string]interface{}{
  268. "publishtime": map[string]interface{}{
  269. "$regex": timestr,
  270. },
  271. }
  272. fs := map[string]interface{}{
  273. "spidercode": 1,
  274. "state": 1,
  275. }
  276. count, _ := sess_s.DB("spider").C("spider_highlistdata").Find(&query).Count()
  277. qu.Debug(timestr, "spider_highlisthdata共采集数据:", count)
  278. it_sh := sess_s.DB("spider").C("spider_highlistdata").Find(&query).Select(&fs).Iter()
  279. for tmp := make(map[string]interface{}); it_sh.Next(&tmp); {
  280. sm_wg1.Add(1)
  281. sm_ch1 <- true
  282. go func(tmp map[string]interface{}) {
  283. defer func() {
  284. <-sm_ch1
  285. sm_wg1.Done()
  286. }()
  287. state := qu.IntAll(tmp["state"])
  288. code := qu.ObjToString(tmp["spidercode"])
  289. sm_lock1.Lock()
  290. if state == -1 {
  291. codeErrDnumMap[code]++
  292. }
  293. codeHlistDnumMap[code]++
  294. sm_lock1.Unlock()
  295. }(tmp)
  296. tmp = map[string]interface{}{}
  297. }
  298. qu.Debug("spider_highlistdata采集信息的爬虫总量:", len(codeHlistDnumMap), " 下载失败爬虫的总量:", len(codeErrDnumMap))
  299. qu.Debug("开始统计data_bak信息...")
  300. codeDbakDnumMap := map[string]int{} //记录爬虫昨天下载量
  301. query = map[string]interface{}{
  302. "l_np_publishtime": map[string]interface{}{
  303. "$gte": sm_stime,
  304. "$lte": sm_etime,
  305. },
  306. }
  307. fs = map[string]interface{}{
  308. "spidercode": 1,
  309. }
  310. count, _ = sess_s.DB("spider").C("data_bak").Find(&query).Count()
  311. qu.Debug(timestr, "data_bak共采集数据:", count)
  312. it_sd := sess_s.DB("spider").C("data_bak").Find(&query).Select(&fs).Iter()
  313. for tmp := make(map[string]interface{}); it_sd.Next(&tmp); {
  314. sm_wg1.Add(1)
  315. sm_ch1 <- true
  316. go func(tmp map[string]interface{}) {
  317. defer func() {
  318. <-sm_ch1
  319. sm_wg1.Done()
  320. }()
  321. code := qu.ObjToString(tmp["spidercode"])
  322. sm_lock1.Lock()
  323. codeDbakDnumMap[code]++
  324. sm_lock1.Unlock()
  325. }(tmp)
  326. tmp = map[string]interface{}{}
  327. }
  328. sm_wg1.Wait()
  329. qu.Debug("data_bak采集信息的爬虫总量:", len(codeDbakDnumMap))
  330. //统计爬虫
  331. query = map[string]interface{}{
  332. "$or": []interface{}{
  333. map[string]interface{}{"state": 5},
  334. map[string]interface{}{
  335. "state": map[string]interface{}{
  336. "$in": []int{0, 1, 2},
  337. },
  338. "event": map[string]interface{}{
  339. "$ne": 7000,
  340. },
  341. },
  342. },
  343. }
  344. sm_ch2 := make(chan bool, 5)
  345. sm_wg2 := &sync.WaitGroup{}
  346. sm_lock2 := &sync.Mutex{}
  347. arr := []map[string]interface{}{}
  348. sess_e := util.MgoEB.GetMgoConn()
  349. defer util.MgoEB.DestoryMongoConn(sess_e)
  350. fe := map[string]interface{}{
  351. "code": 1,
  352. "event": 1,
  353. "param_common": 1,
  354. "model": 1,
  355. "platform": 1,
  356. "createuser": 1,
  357. "createuserid": 1,
  358. }
  359. it_e := sess_e.DB("editor").C("luaconfig").Find(&query).Select(&fe).Iter()
  360. n := 0
  361. for tmp := make(map[string]interface{}); it_e.Next(&tmp); n++ {
  362. sm_wg2.Add(1)
  363. sm_ch2 <- true
  364. go func(tmp map[string]interface{}) {
  365. defer func() {
  366. <-sm_ch2
  367. sm_wg2.Done()
  368. }()
  369. result := map[string]interface{}{}
  370. code := qu.ObjToString(tmp["code"])
  371. result["code"] = code
  372. result["modify"] = tmp["createuser"]
  373. result["modifyid"] = tmp["createuserid"]
  374. result["event"] = tmp["event"]
  375. result["platform"] = tmp["platform"]
  376. result["comeintime"] = time.Now().Unix()
  377. //1、统计data_bak下载量
  378. result["download"] = codeDbakDnumMap[code]
  379. //2、统计spider_highlistdata下载量和下载失败量
  380. result["hl_download"] = codeHlistDnumMap[code]
  381. result["hl_downloaderr"] = codeErrDnumMap[code]
  382. //3、查询spider_sitecheck中url状态码
  383. q := map[string]interface{}{
  384. "code": code,
  385. "comeintime": map[string]interface{}{
  386. "$gte": sm_stime,
  387. "$lte": sm_etime,
  388. },
  389. }
  390. data, _ := util.MgoS.FindOne("spider_sitecheck", q) //spider_sitecheck只记录了错误状态码爬虫
  391. if data != nil && len(*data) > 0 {
  392. result["statuscode"] = qu.Int64All((*data)["statuscode"])
  393. } else {
  394. result["statuscode"] = 200
  395. }
  396. //4、查询spider_warn爬虫的下载错误信息
  397. errinfo := map[string]interface{}{}
  398. fnMap_lev1 := map[string]int{}
  399. fnMap_lev2 := map[string]int{}
  400. warnDatas, _ := util.MgoS.Find("spider_warn", q, nil, `{"field":1,"level":1}`, false, -1, -1)
  401. for _, d := range *warnDatas {
  402. field := qu.ObjToString(d["field"])
  403. level := qu.IntAll(d["level"])
  404. if level == 1 {
  405. fnMap_lev1[field] += 1
  406. } else {
  407. fnMap_lev2[field] += 1
  408. }
  409. }
  410. if len(fnMap_lev1) > 0 {
  411. errinfo["1"] = fnMap_lev1
  412. }
  413. if len(fnMap_lev2) > 0 {
  414. errinfo["2"] = fnMap_lev2
  415. }
  416. result["errinfo"] = errinfo
  417. //
  418. pc := tmp["param_common"].([]interface{})
  419. if len(pc) > 2 {
  420. result["site"] = pc[1]
  421. result["channel"] = pc[2]
  422. }
  423. if len(pc) > 12 {
  424. result["url"] = pc[11]
  425. }
  426. if model, ok := tmp["model"].(map[string]interface{}); ok && model != nil {
  427. result["area"] = qu.ObjToString(model["area"])
  428. result["city"] = qu.ObjToString(model["city"])
  429. result["district"] = qu.ObjToString(model["district"])
  430. }
  431. sm_lock2.Lock()
  432. arr = append(arr, result)
  433. if len(arr) > 500 {
  434. tmps := arr
  435. util.MgoS.SaveBulk("spider_summaryinfo", tmps...)
  436. arr = []map[string]interface{}{}
  437. }
  438. sm_lock2.Unlock()
  439. }(tmp)
  440. if n%500 == 0 {
  441. qu.Debug("current:", n)
  442. }
  443. tmp = map[string]interface{}{}
  444. }
  445. sm_wg2.Wait()
  446. if len(arr) > 0 {
  447. util.MgoS.SaveBulk("spider_summaryinfo", arr...)
  448. arr = []map[string]interface{}{}
  449. }
  450. qu.Debug("上架爬虫信息汇总结束...")
  451. }