code.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726
  1. package main
  2. import (
  3. "fmt"
  4. "math"
  5. qu "qfw/util"
  6. "sort"
  7. "sync"
  8. "time"
  9. )
  10. var (
  11. YearMinCodeMap map[string]bool //luayearmincode中,爬虫代码:循环周期
  12. SendFirstMap map[string]*Lua //
  13. YearMinDownloadNum int //一年下载最低值
  14. IntervalMaxNum int //区间最大值
  15. PublishtimeInterval = []float64{1.0, 3.0, 10.0, 20.0, 31.0, 93.0} //[0,1),[1,3),[3,10),[10,20),[20,31),[31,31*3),[31*3,···)
  16. IntervalMap = map[int]string{
  17. 1: "[0,1)",
  18. 2: "[1,3)",
  19. 3: "[3,10)",
  20. 4: "[10,20)",
  21. 5: "[20,31)",
  22. 6: "[31,93)",
  23. 7: "[93,···)",
  24. }
  25. IntervalRotateTime = map[string]int{ //区间爬虫一轮次时间(月)
  26. "[0,1)": 3,
  27. "[1,3)": 3,
  28. "[3,10)": 6,
  29. "[10,20)": 6,
  30. "[20,31)": 6,
  31. "[31,93)": 12,
  32. "[93,···)": 12,
  33. }
  34. )
  35. type Lua struct {
  36. Site string
  37. Channel string
  38. Modify string
  39. Modifyid string
  40. Code string
  41. Event int
  42. Count int
  43. }
  44. func LuaYearMinCodeCreateTask() {
  45. defer qu.Catch()
  46. GetAllLuaYearMinCode() //获取luayearmincode所有爬虫
  47. CreateTask() //
  48. }
  49. func GetAllLuaYearMinCode() {
  50. defer qu.Catch()
  51. YearMinCodeMap = map[string]bool{}
  52. SendFirstMap = map[string]*Lua{}
  53. list, _ := MgoE.Find("luayearmincode", nil, nil, `{"publishtime":0}`, false, -1, -1)
  54. for _, l := range *list {
  55. code := qu.ObjToString(l["code"])
  56. YearMinCodeMap[code] = true
  57. sf, _ := l["sendfirst"].(bool)
  58. sd, _ := l["send"].(bool)
  59. if sf && !sd {
  60. lua := &Lua{
  61. Site: qu.ObjToString(l["site"]),
  62. Channel: qu.ObjToString(l["channel"]),
  63. Modify: qu.ObjToString(l["modify"]),
  64. Modifyid: qu.ObjToString(l["modifyid"]),
  65. Code: code,
  66. Count: qu.IntAll(l["count"]),
  67. Event: qu.IntAll(l["event"]),
  68. }
  69. SendFirstMap[code] = lua
  70. }
  71. }
  72. }
  73. func CreateTask() {
  74. defer qu.Catch()
  75. //1.sendfirst建任务(只建一次该任务)
  76. CreateFirstCodeTask()
  77. //2.根据区间轮循建任务
  78. list, _ := MgoE.Find("luayearmincodeinterval", nil, nil, nil, false, -1, -1)
  79. for _, l := range *list {
  80. CreateTaskByInterval(l)
  81. }
  82. }
  83. //根据区间建任务
  84. func CreateTaskByInterval(l map[string]interface{}) {
  85. defer qu.Catch()
  86. interval := qu.ObjToString(l["interval"])
  87. qu.Debug(interval, "区间开始创建任务...")
  88. timesnum := qu.IntAll(l["timesnum"])
  89. cycletime := qu.IntAll(l["cycletime"])
  90. ct_wg := &sync.WaitGroup{}
  91. ct_lock := &sync.Mutex{}
  92. ct_ch := make(chan bool, 3)
  93. savetaskArr := []map[string]interface{}{}
  94. updateArr := [][]map[string]interface{}{}
  95. list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`","send":false}`, ``, `{"publishtime":0}`, false, 0, timesnum)
  96. for _, l := range *list {
  97. ct_wg.Add(1)
  98. ct_ch <- true
  99. go func(tmp map[string]interface{}) {
  100. defer func() {
  101. <-ct_ch
  102. ct_wg.Done()
  103. }()
  104. update := []map[string]interface{}{ //更新
  105. map[string]interface{}{"_id": tmp["_id"]},
  106. map[string]interface{}{
  107. "$set": map[string]interface{}{
  108. "send": true,
  109. },
  110. },
  111. }
  112. code := qu.ObjToString(tmp["code"])
  113. description := ""
  114. state := 0 //任务状态
  115. /*
  116. 统计是否有已下几种情况,时间定为一周内数据:
  117. 1、统计spider_highlistdata是否有下载异常数据
  118. 2、统计spider_warn异常数据(发布时间异常、乱码)
  119. 3、统计spider_sitecheck 站点异常爬虫(404)
  120. */
  121. stime, etime := GetTime(-cycletime), GetTime(0)
  122. //统计周期内下载量
  123. query := map[string]interface{}{
  124. "spidercode": code,
  125. "l_np_publishtime": map[string]interface{}{
  126. "$gte": stime,
  127. "$lte": etime,
  128. },
  129. }
  130. downloadnum := MgoS.Count("data_bak", query)
  131. //1、下载异常
  132. query = map[string]interface{}{
  133. "comeintime": map[string]interface{}{
  134. "$gte": stime,
  135. "$lte": etime,
  136. },
  137. "state": -1,
  138. "spidercode": code,
  139. }
  140. data_downloaderr, _ := MgoS.Find("spider_highlistdata", query, `{"_id":-1}`, `{"href":1}`, false, 0, 10)
  141. if data_downloaderr != nil && len(*data_downloaderr) > 0 {
  142. if len(*data_downloaderr) == 10 {
  143. state = 1
  144. }
  145. description += "下载异常:\n"
  146. for _, derr := range *data_downloaderr {
  147. description += qu.ObjToString(derr["href"]) + "\n"
  148. }
  149. }
  150. //2、发布时间异常、乱码
  151. query = map[string]interface{}{
  152. "comeintime": map[string]interface{}{
  153. "$gte": stime,
  154. "$lte": etime,
  155. },
  156. "level": 2, //2:error数据 1:warn数据
  157. "code": code,
  158. }
  159. data_warn, _ := MgoS.Find("spider_warn", query, `{"_id":-1}`, `{"href":1,"field":1}`, false, 0, 10)
  160. if data_warn != nil && len(*data_warn) > 0 {
  161. destmp_publishtime := "发布时间异常:\n"
  162. destmp_code := "正文标题异常:\n"
  163. for _, dw := range *data_warn {
  164. field := qu.ObjToString(dw["field"])
  165. if field == "publishtime" {
  166. state = 1
  167. destmp_publishtime += qu.ObjToString(dw["href"]) + "\n"
  168. } else {
  169. destmp_code += qu.ObjToString(dw["href"]) + "\n"
  170. }
  171. }
  172. description += destmp_code
  173. description += destmp_publishtime
  174. }
  175. //3、404
  176. query = map[string]interface{}{
  177. "comeintime": map[string]interface{}{
  178. "$gte": stime,
  179. "$lte": etime,
  180. },
  181. "statuscode": 404,
  182. "code": code,
  183. }
  184. data_404, _ := MgoS.FindOne("spider_sitecheck", query)
  185. if data_404 != nil && len(*data_404) > 0 {
  186. if downloadnum == 0 { //有采集数据,不认为是404
  187. state = 1
  188. description += "网站监测:404\n" + qu.ObjToString((*data_404)["url"]) + "\n"
  189. }
  190. }
  191. result := map[string]interface{}{}
  192. result["s_code"] = code
  193. result["s_site"] = tmp["site"]
  194. result["s_channel"] = tmp["channel"]
  195. result["s_descript"] = description
  196. result["l_comeintime"] = time.Now().Unix()
  197. result["l_complete"] = time.Now().AddDate(0, 0, cycletime).Unix()
  198. result["s_modifyid"] = tmp["modifyid"]
  199. result["s_modify"] = tmp["modify"]
  200. result["i_event"] = tmp["event"]
  201. result["s_source"] = "程序"
  202. result["i_num"] = downloadnum
  203. result["i_min"] = 0
  204. result["i_state"] = state
  205. result["s_type"] = "7"
  206. result["s_urgency"] = "1"
  207. result["i_times"] = 0
  208. result["s_downloadtime"] = qu.FormatDateByInt64(&stime, qu.Date_Full_Layout) + "/" + qu.FormatDateByInt64(&etime, qu.Date_Full_Layout)
  209. ct_lock.Lock()
  210. savetaskArr = append(savetaskArr, result)
  211. updateArr = append(updateArr, update)
  212. ct_lock.Unlock()
  213. }(l)
  214. }
  215. ct_wg.Wait()
  216. ct_lock.Lock()
  217. if len(savetaskArr) > 0 {
  218. MgoE.SaveBulk("task", savetaskArr...)
  219. savetaskArr = []map[string]interface{}{}
  220. }
  221. if len(updateArr) > 0 {
  222. MgoE.UpdateBulk("luayearmincode", updateArr...)
  223. updateArr = [][]map[string]interface{}{}
  224. }
  225. ct_lock.Unlock()
  226. //time.AfterFunc(time.Duration(cycletime)*time.Second, func() { CreateTaskByInterval(l) })
  227. time.AfterFunc(time.Duration(cycletime*24)*time.Hour, func() { CreateTaskByInterval(l) })
  228. }
  229. //历史数据采集为0的建任务
  230. func CreateFirstCodeTask() {
  231. defer qu.Catch()
  232. qu.Debug("开始创建sendfirst任务...")
  233. stime := time.Now().AddDate(-1, 0, 0).Unix()
  234. etime := GetTime(0)
  235. cl_wg := &sync.WaitGroup{}
  236. cl_lock := &sync.Mutex{}
  237. cl_ch := make(chan bool, 3)
  238. savetaskArr := []map[string]interface{}{}
  239. updateArr := [][]map[string]interface{}{}
  240. for _, lua := range SendFirstMap {
  241. cl_wg.Add(1)
  242. cl_ch <- true
  243. go func(l *Lua) {
  244. defer func() {
  245. <-cl_ch
  246. cl_wg.Done()
  247. }()
  248. update := []map[string]interface{}{ //更新
  249. map[string]interface{}{"code": l.Code},
  250. map[string]interface{}{
  251. "$set": map[string]interface{}{
  252. "send": true,
  253. },
  254. },
  255. }
  256. result := map[string]interface{}{}
  257. result["s_code"] = l.Code
  258. result["s_site"] = l.Site
  259. result["s_channel"] = l.Channel
  260. result["s_descript"] = "下载量异常:\n一年内数据下载量:" + fmt.Sprint(l.Count)
  261. result["l_comeintime"] = time.Now().Unix()
  262. result["l_complete"] = time.Now().AddDate(1, 0, 0).Unix()
  263. result["s_modifyid"] = l.Modifyid
  264. result["s_modify"] = l.Modify
  265. result["i_event"] = l.Event
  266. result["s_source"] = "程序"
  267. result["i_num"] = l.Count
  268. result["i_min"] = 0
  269. result["i_state"] = 0
  270. result["s_type"] = "10"
  271. result["s_urgency"] = "1"
  272. result["i_times"] = 0
  273. result["s_downloadtime"] = qu.FormatDateByInt64(&stime, qu.Date_Full_Layout) + "/" + qu.FormatDateByInt64(&etime, qu.Date_Full_Layout)
  274. cl_lock.Lock()
  275. savetaskArr = append(savetaskArr, result)
  276. updateArr = append(updateArr, update)
  277. if len(savetaskArr) > 500 {
  278. MgoE.SaveBulk("task", savetaskArr...)
  279. savetaskArr = []map[string]interface{}{}
  280. }
  281. if len(updateArr) > 500 {
  282. MgoE.UpdateBulk("luayearmincode", updateArr...)
  283. updateArr = [][]map[string]interface{}{}
  284. }
  285. cl_lock.Unlock()
  286. }(lua)
  287. }
  288. cl_wg.Wait()
  289. cl_lock.Lock()
  290. if len(savetaskArr) > 0 {
  291. MgoE.SaveBulk("task", savetaskArr...)
  292. savetaskArr = []map[string]interface{}{}
  293. }
  294. if len(updateArr) > 0 {
  295. MgoE.UpdateBulk("luayearmincode", updateArr...)
  296. updateArr = [][]map[string]interface{}{}
  297. }
  298. cl_lock.Unlock()
  299. SendFirstMap = map[string]*Lua{}
  300. qu.Debug("sendfirst任务创建完毕...")
  301. }
  302. //计算循环周期和每轮新建任务爬虫的个数
  303. func CycleTime() {
  304. defer qu.Catch()
  305. for k, interval := range IntervalMap {
  306. cycletime := -1
  307. if k == 1 { //区间在[0,1),循环周期设置为10天
  308. cycletime = 10
  309. } else if k == 2 || k == 3 { //confinval最大值都在x以下,可设置为x天
  310. list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`"}`, `{"confinval":-1}`, `{"confinval":1}`, false, 0, 1)
  311. if list != nil && len(*list) == 1 {
  312. cycletime = qu.IntAll((*list)[0]["confinval"])
  313. }
  314. } else if k == 4 || k == 5 || k == 6 { //最大值90%都在x以下,可设置为x天
  315. percent := 0.9
  316. if k == 6 {
  317. percent = 0.5
  318. }
  319. count := MgoE.Count("luayearmincode", `{"interval":"`+interval+`"}`)
  320. index := int(math.Floor(float64(count) * percent))
  321. list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`"}`, `{"confinval":1}`, `{"confinval":1}`, false, 0, index+1)
  322. if list != nil && len(*list) == index+1 {
  323. cycletime = qu.IntAll((*list)[index]["confinval"])
  324. }
  325. } else if k == 7 {
  326. cycletime = 180
  327. }
  328. updata := map[string]interface{}{
  329. "$set": map[string]interface{}{
  330. "cycletime": cycletime,
  331. "send": false,
  332. },
  333. }
  334. MgoE.Update("luayearmincode", `{"interval":"`+interval+`"}`, updata, false, true)
  335. q := map[string]interface{}{
  336. "interval": interval,
  337. "sendfirst": map[string]interface{}{
  338. "$exists": false,
  339. },
  340. }
  341. count := MgoE.Count("luayearmincode", q)
  342. t := float64((count * cycletime)) / float64((30 * IntervalRotateTime[interval]))
  343. rotateNum := math.Ceil(t)
  344. text := interval + ",总数:" + fmt.Sprint(count) + "," + fmt.Sprint(30*IntervalRotateTime[interval]) + "天发送完毕。每" + fmt.Sprint(cycletime) + "天轮循一次,一次发送" + fmt.Sprint(rotateNum) + "条"
  345. qu.Debug(text)
  346. MgoE.Save("luayearmincodeinterval", map[string]interface{}{"interval": interval, "timesnum": int(rotateNum), "cycletime": cycletime, "text": text})
  347. }
  348. }
  349. //标记数据
  350. func TagCode() {
  351. defer qu.Catch()
  352. sess := MgoE.GetMgoConn()
  353. defer MgoE.DestoryMongoConn(sess)
  354. ch := make(chan bool, 3)
  355. wg := &sync.WaitGroup{}
  356. lock := &sync.Mutex{}
  357. arr := [][]map[string]interface{}{}
  358. it := sess.DB("editor").C("luayearmincode").Find(nil).Iter()
  359. n := 0
  360. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  361. ch <- true
  362. wg.Add(1)
  363. go func(tmp map[string]interface{}) {
  364. defer func() {
  365. <-ch
  366. wg.Done()
  367. }()
  368. update := []map[string]interface{}{}
  369. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  370. set := map[string]interface{}{}
  371. //code := qu.ObjToString(tmp["code"])
  372. count := qu.IntAll(tmp["count"])
  373. if count == 1 || count == 0 { //爬虫下载量为1,放入第7区间
  374. set["interval"] = IntervalMap[7]
  375. if count == 0 {
  376. set["sendfirst"] = true
  377. }
  378. } else {
  379. var tmpArr Int64Slice
  380. for _, tp := range tmp["publishtime"].([]interface{}) {
  381. tmpArr = append(tmpArr, tp.(int64))
  382. }
  383. sort.Sort(tmpArr) //发布时间排序
  384. //
  385. intervalNumArr := map[int][]float64{} //记录每个区间发布时间间隔信息
  386. for i, p := range tmpArr {
  387. if i == 0 {
  388. continue
  389. }
  390. dval := float64(p-tmpArr[i-1]) / 86400
  391. //计算区间
  392. intervalNum := -1 //区间
  393. for j, pi := range PublishtimeInterval { //1.0, 3.0, 10.0, 20.0, 31.0, 93.0
  394. if dval == pi {
  395. intervalNum = j + 2
  396. break
  397. } else if dval < pi {
  398. intervalNum = j + 1
  399. break
  400. }
  401. }
  402. if intervalNum == -1 { //如果为初始值,证明dval大于93
  403. intervalNum = 7
  404. }
  405. intervalNumArr[intervalNum] = append(intervalNumArr[intervalNum], dval)
  406. }
  407. //
  408. maxIn := 0 //记录最大区间
  409. maxInLen := 0 //记录最大区间长度
  410. flag := true //记录是否只有第一区间有值
  411. for in := 1; in <= 7; in++ {
  412. lens := len(intervalNumArr[in])
  413. if (in == 1 && lens == 0) || (in != 1 && lens > 0) {
  414. flag = false
  415. }
  416. if in != 1 && lens >= maxInLen {
  417. maxInLen = lens
  418. maxIn = in
  419. }
  420. }
  421. //qu.Debug(flag, "最大区间:", maxIn, "最大区间长度:", maxInLen)
  422. if flag { //只有第一区间有值
  423. if count < IntervalMaxNum { //划分到第七区间,直接新建任务
  424. set["sendfirst"] = true
  425. set["interval"] = IntervalMap[7]
  426. } else {
  427. set["interval"] = IntervalMap[1]
  428. }
  429. } else if maxIn != 0 && maxInLen != 0 {
  430. sumInval := float64(0)
  431. for _, inval := range intervalNumArr[maxIn] {
  432. sumInval += inval
  433. }
  434. mean := sumInval / float64(maxInLen)
  435. se := mean / math.Pow(float64(maxInLen), 0.5)
  436. confInval := math.Ceil(mean + se*2.32)
  437. set["confinval"] = int(confInval) //置信区间
  438. set["interval"] = IntervalMap[maxIn]
  439. } else {
  440. qu.Debug("错误数据id:", tmp["_id"])
  441. }
  442. }
  443. if len(set) > 0 {
  444. update = append(update, map[string]interface{}{"$set": set})
  445. }
  446. lock.Lock()
  447. if len(update) == 2 {
  448. arr = append(arr, update)
  449. }
  450. if len(arr) >= 500 {
  451. tmps := arr
  452. MgoE.UpdateBulk("luayearmincode", tmps...)
  453. arr = [][]map[string]interface{}{}
  454. }
  455. lock.Unlock()
  456. }(tmp)
  457. if n%1000 == 0 {
  458. qu.Debug("current:", n)
  459. }
  460. tmp = map[string]interface{}{}
  461. }
  462. wg.Wait()
  463. if len(arr) > 0 {
  464. MgoE.UpdateBulk("luayearmincode", arr...)
  465. arr = [][]map[string]interface{}{}
  466. }
  467. qu.Debug("标记完成")
  468. }
  469. //统计爬虫下载量
  470. func GetSpidercode() {
  471. defer qu.Catch()
  472. query := map[string]interface{}{
  473. "$or": []interface{}{
  474. map[string]interface{}{"state": 5},
  475. map[string]interface{}{
  476. "state": map[string]interface{}{
  477. "$in": []int{0, 1, 2},
  478. },
  479. "event": map[string]interface{}{
  480. "$ne": 7000,
  481. },
  482. },
  483. },
  484. }
  485. codeMap := map[string]*Lua{}
  486. luas, _ := MgoE.Find("luaconfig", query, nil, `{"code":1,"event":1,"param_common":1,"createuser":1,"createuserid":1}`, false, -1, -1)
  487. for _, l := range *luas {
  488. pc := l["param_common"].([]interface{})
  489. lua := &Lua{
  490. Modify: qu.ObjToString(l["createuser"]),
  491. Modifyid: qu.ObjToString(l["createuserid"]),
  492. Event: qu.IntAll(l["event"]),
  493. }
  494. if len(pc) > 2 {
  495. lua.Site = qu.ObjToString(pc[1])
  496. lua.Channel = qu.ObjToString(pc[2])
  497. }
  498. code := qu.ObjToString(l["code"])
  499. codeMap[code] = lua
  500. }
  501. qu.Debug("开始统计...", len(codeMap))
  502. sess := MgoS.GetMgoConn()
  503. defer MgoS.DestoryMongoConn(sess)
  504. q := map[string]interface{}{
  505. "publishtime": map[string]interface{}{
  506. "$gte": time.Now().AddDate(-1, 0, 0).Unix(),
  507. "$lte": time.Now().Unix(),
  508. },
  509. }
  510. f := map[string]interface{}{
  511. "spidercode": 1,
  512. "publishtime": 1,
  513. }
  514. ch := make(chan bool, 5)
  515. wg := &sync.WaitGroup{}
  516. lock := &sync.Mutex{}
  517. codeNum := map[string]int{}
  518. codePublishtime := map[string][]int64{}
  519. i := 0
  520. it1 := sess.DB("spider").C("data_bak").Find(&q).Select(&f).Iter()
  521. for tmp := make(map[string]interface{}); it1.Next(&tmp); i++ {
  522. wg.Add(1)
  523. ch <- true
  524. go func(tmp map[string]interface{}) {
  525. defer func() {
  526. <-ch
  527. wg.Done()
  528. }()
  529. publishtime := qu.Int64All(tmp["publishtime"])
  530. if publishtime > 0 {
  531. spidercode := qu.ObjToString(tmp["spidercode"])
  532. if codeMap[spidercode] != nil {
  533. lock.Lock()
  534. codeNum[spidercode] += 1
  535. if codeNum[spidercode] > YearMinDownloadNum {
  536. lock.Unlock()
  537. return
  538. }
  539. codePublishtime[spidercode] = append(codePublishtime[spidercode], publishtime)
  540. lock.Unlock()
  541. }
  542. }
  543. }(tmp)
  544. if i%1000 == 0 {
  545. qu.Debug(i)
  546. }
  547. tmp = map[string]interface{}{}
  548. }
  549. qu.Debug("data_bak查询完毕", len(codeNum))
  550. i = 0
  551. it2 := sess.DB("spider").C("data_bak_202011030854").Find(&q).Select(&f).Iter()
  552. for tmp := make(map[string]interface{}); it2.Next(&tmp); i++ {
  553. wg.Add(1)
  554. ch <- true
  555. go func(tmp map[string]interface{}) {
  556. defer func() {
  557. <-ch
  558. wg.Done()
  559. }()
  560. publishtime := qu.Int64All(tmp["publishtime"])
  561. if publishtime > 0 {
  562. spidercode := qu.ObjToString(tmp["spidercode"])
  563. if codeMap[spidercode] != nil {
  564. lock.Lock()
  565. codeNum[spidercode] += 1
  566. if codeNum[spidercode] > YearMinDownloadNum {
  567. lock.Unlock()
  568. return
  569. }
  570. codePublishtime[spidercode] = append(codePublishtime[spidercode], publishtime)
  571. lock.Unlock()
  572. }
  573. }
  574. }(tmp)
  575. if i%1000 == 0 {
  576. qu.Debug(i)
  577. }
  578. tmp = map[string]interface{}{}
  579. }
  580. wg.Wait()
  581. qu.Debug("data_bak_202011030854查询完毕", len(codeNum))
  582. for code, num := range codeNum {
  583. lua := codeMap[code]
  584. delete(codeMap, code)
  585. if num <= YearMinDownloadNum {
  586. parr := codePublishtime[code]
  587. //sort.Sort(parr)
  588. MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": num, "publishtime": parr, "event": lua.Event, "site": lua.Site, "channel": lua.Channel, "modify": lua.Modify, "modifyid": lua.Modifyid})
  589. }
  590. }
  591. for code, lua := range codeMap { //下载量为0
  592. MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": 0, "event": lua.Event, "site": lua.Site, "channel": lua.Channel, "modify": lua.Modify, "modifyid": lua.Modifyid, "publishtime": []int64{}})
  593. }
  594. qu.Debug("统计完毕...")
  595. }
  596. //补充信息
  597. func getlua() {
  598. luas, _ := MgoE.Find("luaconfig", nil, nil, `{"code":1,"event":1,"param_common":1,"createuser":1,"createuserid":1}`, false, -1, -1)
  599. for i, l := range *luas {
  600. qu.Debug(i)
  601. pc := l["param_common"].([]interface{})
  602. Site := ""
  603. Channel := ""
  604. if len(pc) > 2 {
  605. Site = qu.ObjToString(pc[1])
  606. Channel = qu.ObjToString(pc[2])
  607. }
  608. Modify := qu.ObjToString(l["createuser"])
  609. Modifyid := qu.ObjToString(l["createuserid"])
  610. code := qu.ObjToString(l["code"])
  611. MgoE.Update("luayearmincode", `{"code":"`+code+`"}`, map[string]interface{}{"$set": map[string]interface{}{"site": Site, "channel": Channel, "modify": Modify, "modifyid": Modifyid}}, false, false)
  612. }
  613. }
  614. //分组查询
  615. func GetSpidercode_back() {
  616. defer qu.Catch()
  617. qu.Debug("开始统计...")
  618. sess := MgoS.GetMgoConn()
  619. defer MgoS.DestoryMongoConn(sess)
  620. q := map[string]interface{}{
  621. "publishtime": map[string]interface{}{
  622. "$gte": time.Now().AddDate(-1, 0, 0).Unix(),
  623. "$lte": time.Now().Unix(),
  624. },
  625. }
  626. g := map[string]interface{}{
  627. "_id": "$spidercode",
  628. "count": map[string]interface{}{"$sum": 1},
  629. }
  630. pro := map[string]interface{}{
  631. "spidercode": 1,
  632. }
  633. s := map[string]interface{}{
  634. "count": 1,
  635. }
  636. p := []map[string]interface{}{
  637. map[string]interface{}{"$match": q},
  638. map[string]interface{}{"$project": pro},
  639. map[string]interface{}{"$group": g},
  640. map[string]interface{}{"$sort": s},
  641. }
  642. it1 := sess.DB("spider").C("data_bak").Pipe(p).Iter()
  643. codeCount := map[string]int{}
  644. i := 0
  645. for tmp := make(map[string]interface{}); it1.Next(&tmp); i++ {
  646. code := qu.ObjToString(tmp["_id"])
  647. count := qu.IntAll(tmp["count"])
  648. qu.Debug(code, count)
  649. if count <= YearMinDownloadNum {
  650. codeCount[code] = count
  651. } else {
  652. break
  653. }
  654. if i%50 == 0 {
  655. qu.Debug(i)
  656. }
  657. }
  658. i = 0
  659. it2 := sess.DB("spider").C("data_bak_202011030854").Pipe(p).Iter()
  660. for tmp := make(map[string]interface{}); it2.Next(&tmp); i++ {
  661. code := qu.ObjToString(tmp["_id"])
  662. count := qu.IntAll(tmp["count"])
  663. qu.Debug(code, count)
  664. if count <= YearMinDownloadNum {
  665. codeCount[code] += count
  666. } else {
  667. break
  668. }
  669. if i%50 == 0 {
  670. qu.Debug(i)
  671. }
  672. }
  673. for code, count := range codeCount {
  674. if count <= 100 {
  675. MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": count})
  676. }
  677. }
  678. qu.Debug("统计数量完毕...")
  679. list, _ := MgoE.Find("luayearmincode", nil, nil, nil, false, -1, -1)
  680. for _, l := range *list {
  681. code := qu.ObjToString(l["code"])
  682. count := qu.IntAll(l["count"])
  683. if count > YearMinDownloadNum {
  684. continue
  685. }
  686. d1s, _ := MgoS.Find("data_bak", `{"spidercode":"`+code+`"}`, nil, `{"publishtime":1}`, false, -1, -1)
  687. d2s, _ := MgoS.Find("data_bak_202011030854", `{"spidercode":"`+code+`"}`, nil, `{"publishtime":1}`, false, -1, -1)
  688. var publishtimeArr Int64Slice
  689. for _, d1 := range *d1s {
  690. publishtime := qu.Int64All(d1["publishtime"])
  691. if publishtime > 0 {
  692. publishtimeArr = append(publishtimeArr, publishtime)
  693. }
  694. }
  695. for _, d2 := range *d2s {
  696. publishtime := qu.Int64All(d2["publishtime"])
  697. if publishtime > 0 {
  698. publishtimeArr = append(publishtimeArr, publishtime)
  699. }
  700. }
  701. sort.Sort(publishtimeArr)
  702. MgoE.Update("luayearmincode", map[string]interface{}{"_id": l["_id"]}, map[string]interface{}{"$set": map[string]interface{}{"publishtime": publishtimeArr}}, false, false)
  703. }
  704. qu.Debug("统计完毕...")
  705. }
  706. //自定义[]int64数组排序
  707. type Int64Slice []int64
  708. func (p Int64Slice) Len() int { return len(p) }
  709. func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] }
  710. func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }