timetask.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package timetask
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "log"
  6. "net/http"
  7. "net/smtp"
  8. qu "qfw/util"
  9. mgdb "qfw/util/mongodb"
  10. mgu "qfw/util/mongodbutil"
  11. sp "spiderutil"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "util"
  16. "github.com/cron"
  17. "gopkg.in/mgo.v2/bson"
  18. . "gopkg.in/mgo.v2/bson"
  19. )
  20. var Mail map[string]interface{}
  21. func TimeTask() {
  22. defer qu.Catch()
  23. c := cron.New()
  24. c.Start()
  25. c.AddFunc("0 20 9 ? * MON-FRI", CheckCreateTask)
  26. c.AddFunc("0 0 */1 ? * *", CheckLuaMove)
  27. c.AddFunc("0 30 23 * * *", UpdateSiteInfo) //定时更新站点信息
  28. c.AddFunc("0 0 23 * * *", UpdateCodeHeart) //定时更新爬虫心跳信息
  29. }
  30. //监测爬虫由历史转增量时未成功的
  31. func CheckLuaMove() {
  32. defer qu.Catch()
  33. qu.Debug("开始检测爬虫节点移动...")
  34. query := map[string]interface{}{
  35. "comeintime": map[string]interface{}{
  36. "$gte": time.Now().Add(-(time.Hour * 1)).Unix(),
  37. "$lte": time.Now().Unix(),
  38. },
  39. "ok": false,
  40. }
  41. qu.Debug("query:", query)
  42. list := *mgdb.Find("luamovelog", query, nil, nil, false, -1, -1)
  43. text := ""
  44. if len(list) > 0 {
  45. for _, l := range list {
  46. stype := qu.ObjToString(l["type"])
  47. code := qu.ObjToString(l["code"])
  48. text += code + ":" + stype + ";"
  49. }
  50. }
  51. if text != "" {
  52. for i := 1; i <= 3; i++ {
  53. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", sp.Config.JkMail["api"], sp.Config.JkMail["to"], "lua-move-fail", text))
  54. if err == nil {
  55. res.Body.Close()
  56. read, err := ioutil.ReadAll(res.Body)
  57. qu.Debug("邮件发送:", string(read), err)
  58. break
  59. }
  60. }
  61. }
  62. }
  63. //检测创建任务失败的爬虫
  64. func CheckCreateTask() {
  65. defer qu.Catch()
  66. qu.Debug("开始检测任务创建...")
  67. query := map[string]interface{}{
  68. "comeintime": map[string]interface{}{
  69. "$gte": GetTime(0),
  70. },
  71. }
  72. codes := []string{}
  73. list := *mgdb.Find("luacreatetaskerr", query, nil, nil, false, -1, -1)
  74. if len(list) > 0 {
  75. for _, l := range list {
  76. code := qu.ObjToString(l["code"])
  77. codes = append(codes, code)
  78. }
  79. }
  80. if len(codes) > 0 {
  81. for i := 1; i <= 3; i++ {
  82. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", sp.Config.JkMail["api"], sp.Config.JkMail["to"], "lua-createtask-err", "爬虫:"+strings.Join(codes, ";")))
  83. if err == nil {
  84. res.Body.Close()
  85. read, err := ioutil.ReadAll(res.Body)
  86. qu.Debug("邮件发送:", string(read), err)
  87. break
  88. }
  89. }
  90. }
  91. }
  92. func SendToMail() {
  93. mailInfo := *(qu.ObjToMap(Mail["smtp"]))
  94. host := mailInfo["host"].(string)
  95. from := mailInfo["from"].(string)
  96. pwd := mailInfo["password"].(string)
  97. subject := mailInfo["subject"].(string)
  98. hour := time.Now().Hour()
  99. if hour == 8 {
  100. //定时查询数据库 查询需要发邮件的人和相关信息
  101. timeStr := time.Now().Format("2006-01-02")
  102. the_time, _ := time.ParseInLocation("2006-01-02", timeStr, time.Local)
  103. time_zero := the_time.Unix() //当日凌晨的时间戳
  104. time_twentyFour := time_zero + 86399 //当日24时的时间戳
  105. //聚合查询数据
  106. //mgdb.InitMongodbPool(5, "192.168.3.207:27080", "editor")
  107. sess := mgdb.GetMgoConn()
  108. defer mgdb.DestoryMongoConn(sess)
  109. var res []M
  110. sess.DB("editor").C("task").Pipe([]M{M{"$match": M{"l_complete": M{"$gte": time_zero, "$lte": time_twentyFour}, "i_state": M{"$gte": 1, "$lte": 2}}},
  111. M{"$group": M{"_id": "$s_modifyid", "count": M{"$sum": 1}}}}).All(&res)
  112. //遍历数据进行发邮件
  113. for _, v := range res {
  114. _id, ok := v["_id"].(string)
  115. if ok {
  116. query := bson.M{
  117. "_id": bson.ObjectIdHex(_id),
  118. }
  119. user := *mgdb.FindOne("user", query)
  120. if user["s_email"] == nil {
  121. continue
  122. }
  123. num := strconv.Itoa(v["count"].(int))
  124. body := `<html><body><h3>你有` + num + `条任务需要今天完成</h3></body></html>`
  125. hp := strings.Split(host, ":")
  126. auth := smtp.PlainAuth("", from, pwd, hp[0])
  127. content_type := "Content-Type: text/html; charset=UTF-8"
  128. msg := []byte("To: " + user["s_email"].(string) + "\r\nFrom: " + from + "\r\nSubject: " + subject + "\r\n" + content_type + "\r\n\r\n" + body)
  129. send_to := strings.Split(user["s_email"].(string), ";")
  130. err := smtp.SendMail(host, auth, from, send_to, msg)
  131. if err == nil {
  132. log.Println(user["s_email"].(string), " sendMail success")
  133. } else {
  134. log.Println(user["s_email"].(string), " sendMail fail")
  135. }
  136. }
  137. }
  138. time.Sleep(1 * time.Hour)
  139. }
  140. //time.AfterFunc(30*time.Minute, func() { SendToMail(to, num) })
  141. time.AfterFunc(30*time.Minute, SendToMail)
  142. }
  143. //获取第day天凌晨的时间戳
  144. func GetTime(day int) int64 {
  145. defer qu.Catch()
  146. nowTime := time.Now().AddDate(0, 0, day)
  147. timeStr := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
  148. t, _ := time.ParseInLocation(qu.Date_Short_Layout, timeStr, time.Local)
  149. return t.Unix()
  150. }
  151. func UpdateSiteInfo() {
  152. defer qu.Catch()
  153. qu.Debug("定时更新站点信息开始...")
  154. sites, _ := util.MgoE.Find(sp.Config.SiteColl, ``, ``, `{"site":1}`, false, -1, -1)
  155. for _, s := range *sites {
  156. site := qu.ObjToString(s["site"])
  157. domain, status, event, platform, area, city, district, _ := util.GetLuasInfoBySite(site)
  158. set := map[string]interface{}{
  159. "$set": map[string]interface{}{
  160. "platform": platform,
  161. "event": event,
  162. "spider_status": status,
  163. "domain": domain,
  164. "area": area,
  165. "city": city,
  166. "district": district,
  167. "updatetime": time.Now().Unix(),
  168. },
  169. }
  170. util.MgoE.UpdateById(sp.Config.SiteColl, s["_id"], set)
  171. }
  172. qu.Debug("定时更新站点信息完成...")
  173. }
  174. func UpdateCodeHeart() {
  175. qu.Debug("定时更新爬虫心跳信息...")
  176. defer qu.Catch()
  177. query := map[string]interface{}{
  178. "state": map[string]interface{}{
  179. "$gt": 6,
  180. },
  181. }
  182. list, _ := util.MgoE.Find("luaconfig", query, nil, map[string]interface{}{"code": 1}, false, -1, -1)
  183. for _, l := range *list {
  184. mgu.Update("spider_heart", "spider", "spider", map[string]interface{}{"code": l["code"]}, map[string]interface{}{
  185. "$set": map[string]interface{}{"del": true},
  186. }, false, false)
  187. }
  188. qu.Debug("定时更新爬虫心跳信息完成...")
  189. }