timetask.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. package timetask
  2. import (
  3. "fmt"
  4. "github.com/cron"
  5. "io/ioutil"
  6. "luacheck"
  7. "net/http"
  8. qu "qfw/util"
  9. "spider"
  10. sp "spiderutil"
  11. "strings"
  12. "sync"
  13. "time"
  14. "util"
  15. )
  16. var Mail map[string]interface{}
  17. func TimeTask() {
  18. defer qu.Catch()
  19. c := cron.New()
  20. c.Start()
  21. c.AddFunc("0 20 9 ? * MON-FRI", CheckCreateTask)
  22. c.AddFunc("0 30 23 * * *", UpdateSiteInfo) //定时更新站点信息
  23. c.AddFunc("0 0 23 * * *", UpdateCodeHeart) //定时更新爬虫心跳信息
  24. c.AddFunc("0 0 */1 ? * *", CheckLuaMove) //7000节点转增量爬虫失败告警
  25. c.AddFunc("0 */10 * * * *", SpiderMoveEvent) //7000节点转增量爬虫
  26. c.AddFunc("0 0 8 * * *", UpdateImportantCode) //更新重点网站爬虫信息
  27. c.AddFunc("0 */1 * * * *", luacheck.TimeTaskGetLua) //爬虫机检定时任务
  28. }
  29. // 检测创建任务失败的爬虫
  30. func CheckCreateTask() {
  31. defer qu.Catch()
  32. qu.Debug("开始检测任务创建...")
  33. query := map[string]interface{}{
  34. "comeintime": map[string]interface{}{
  35. "$gte": util.GetTime(0),
  36. },
  37. }
  38. codes := []string{}
  39. list, _ := util.MgoEB.Find("luacreatetaskerr", query, nil, nil, false, -1, -1)
  40. if len(*list) > 0 {
  41. for _, l := range *list {
  42. code := qu.ObjToString(l["code"])
  43. codes = append(codes, code)
  44. }
  45. }
  46. if len(codes) > 0 {
  47. for i := 1; i <= 3; i++ {
  48. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", sp.Config.JkMail["api"], sp.Config.JkMail["to"], "lua-createtask-err", "爬虫:"+strings.Join(codes, ";")))
  49. if err == nil {
  50. res.Body.Close()
  51. read, err := ioutil.ReadAll(res.Body)
  52. qu.Debug("邮件发送:", string(read), err)
  53. break
  54. }
  55. }
  56. }
  57. }
  58. func UpdateSiteInfo() {
  59. defer qu.Catch()
  60. qu.Debug("定时更新站点信息开始...")
  61. sites, _ := util.MgoEB.Find(sp.Config.SiteColl, map[string]interface{}{"delete": false}, ``, `{"site":1}`, false, -1, -1)
  62. for _, s := range *sites {
  63. site := qu.ObjToString(s["site"])
  64. domain, status, event, platform, infotype, specialtype, _ := util.GetLuasInfoBySite(site, "", "", "")
  65. set := map[string]interface{}{
  66. "$set": map[string]interface{}{
  67. "platform": platform,
  68. "event": event,
  69. "spider_status": status,
  70. "domain": domain,
  71. "infotype": infotype,
  72. //"area": area,
  73. //"city": city,
  74. //"district": district,
  75. "updatetime": time.Now().Unix(),
  76. "special_type": specialtype,
  77. },
  78. }
  79. util.MgoEB.UpdateById(sp.Config.SiteColl, s["_id"], set)
  80. }
  81. qu.Debug("定时更新站点信息完成...")
  82. }
  83. // 更新重点网站爬虫信息
  84. func UpdateImportantCode() {
  85. data, _ := util.MgoEB.Find("site_code_baseinfo", nil, nil, map[string]interface{}{"spidercode": 1}, false, -1, -1)
  86. for _, d := range *data {
  87. lua, _ := util.MgoEB.FindOneByField("luaconfig", map[string]interface{}{"code": d["spidercode"]}, map[string]interface{}{"state": 1, "modifyuser": 1, "platform": 1, "channel": 1, "_id": 0})
  88. if len(*lua) > 0 {
  89. util.MgoEB.UpdateById("site_code_baseinfo", d["_id"], map[string]interface{}{"$set": *lua})
  90. }
  91. }
  92. }
  93. func UpdateCodeHeart() {
  94. qu.Debug("定时更新爬虫心跳信息...")
  95. defer qu.Catch()
  96. query := map[string]interface{}{
  97. "$or": []interface{}{
  98. map[string]interface{}{
  99. "platform": "python",
  100. },
  101. map[string]interface{}{
  102. "state": map[string]interface{}{
  103. "$in": []int{4, 6, 7, 8, 9, 10, 11},
  104. },
  105. },
  106. },
  107. }
  108. //list, _ := util.MgoE.Find("luaconfig", query, nil, map[string]interface{}{"code": 1}, false, -1, -1)
  109. list, _ := util.MgoEB.Find("luaconfig", query, nil, map[string]interface{}{"code": 1}, false, -1, -1)
  110. qu.Debug("定时更新爬虫心跳信息个数:", len(*list))
  111. for _, l := range *list {
  112. util.MgoS.Update("spider_heart", map[string]interface{}{"code": l["code"]}, map[string]interface{}{
  113. "$set": map[string]interface{}{"del": true},
  114. }, false, true)
  115. }
  116. qu.Debug("定时更新爬虫心跳信息完成...")
  117. }
  118. // 监测爬虫由历史转增量时未成功的
  119. func CheckLuaMove() {
  120. defer qu.Catch()
  121. qu.Debug("开始检测爬虫节点移动...")
  122. query := map[string]interface{}{
  123. "comeintime": map[string]interface{}{
  124. "$lte": time.Now().Add(-(time.Hour * 1)).Unix(),
  125. //"$lte": time.Now().Unix(),
  126. },
  127. "ok": false,
  128. }
  129. qu.Debug("query:", query)
  130. list, _ := util.MgoEB.Find("luamovelog", query, nil, nil, false, -1, -1)
  131. if len(*list) > 0 {
  132. codes := []string{}
  133. arr := [][]map[string]interface{}{}
  134. for _, l := range *list {
  135. code := qu.ObjToString(l["code"])
  136. codes = append(codes, code)
  137. update := []map[string]interface{}{}
  138. update = append(update, map[string]interface{}{"_id": l["_id"]})
  139. update = append(update, map[string]interface{}{"$set": map[string]interface{}{"ok": true, "updatetime": time.Now().Unix()}})
  140. arr = append(arr, update)
  141. }
  142. for i := 1; i <= 3; i++ {
  143. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", sp.Config.JkMail["api"], sp.Config.JkMail["to"], "lua-move-fail", strings.Join(codes, ";")))
  144. if err == nil {
  145. res.Body.Close()
  146. read, err := ioutil.ReadAll(res.Body)
  147. qu.Debug("邮件发送:", string(read), err)
  148. break
  149. }
  150. }
  151. util.MgoEB.UpdateBulk("luamovelog", arr...)
  152. arr = [][]map[string]interface{}{}
  153. }
  154. }
  155. func SpiderMoveEvent() {
  156. defer qu.Catch()
  157. sess := util.MgoEB.GetMgoConn()
  158. defer util.MgoEB.DestoryMongoConn(sess)
  159. ch := make(chan bool, 2)
  160. wg := &sync.WaitGroup{}
  161. lock := &sync.Mutex{}
  162. query := map[string]interface{}{
  163. "ok": false,
  164. "state": map[string]interface{}{
  165. "$ne": 1,
  166. },
  167. }
  168. count := util.MgoEB.Count("luamovelog", query)
  169. if count == 0 {
  170. return
  171. }
  172. it := sess.DB(util.MgoEB.DbName).C("luamovelog").Find(&query).Iter()
  173. n := 0
  174. arr := [][]map[string]interface{}{}
  175. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  176. ch <- true
  177. wg.Add(1)
  178. go func(tmp map[string]interface{}) {
  179. defer func() {
  180. <-ch
  181. wg.Done()
  182. }()
  183. code := qu.ObjToString(tmp["code"])
  184. //更新爬虫节点信息,上架
  185. lua, _ := util.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code})
  186. spidertype := qu.ObjToString((*lua)["spidertype"])
  187. event := qu.IntAll((*lua)["event"])
  188. var upresult bool
  189. var err error
  190. set := map[string]interface{}{}
  191. qu.Debug("lua move:", code, event, spidertype, (*lua)["state"])
  192. if spidertype == "history" {
  193. newevent := GetEvent(code, (*lua))
  194. qu.Debug(code, " new event:", newevent)
  195. set["event"] = newevent
  196. set["spidertype"] = "increment"
  197. //type_content, _ := (*lua)["type_content"].(int)
  198. //iscopycontent, _ := (*lua)["iscopycontent"].(bool)
  199. //str_content := qu.ObjToString((*lua)["str_content"])
  200. //str_recontent := qu.ObjToString((*lua)["str_recontent"])
  201. //if type_content == 1 && iscopycontent && str_recontent != "" { //三级页是专家模式且有复制三级页代码
  202. // set["iscopycontent"] = false
  203. // set["str_content"] = str_recontent
  204. // set["str_recontent"] = str_content
  205. //}
  206. upset := map[string]interface{}{
  207. "$set": set,
  208. }
  209. if downevent := qu.IntAll((*lua)["downevent"]); downevent != 0 {
  210. spider.UpdateSpiderByCodeState(code, "6", downevent)
  211. qu.Debug(code, "下架历史节点:", downevent)
  212. upset["$unset"] = map[string]interface{}{"downevent": ""}
  213. }
  214. upOk := util.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, upset, false, false)
  215. qu.Debug(code, "脚本更新成功:", upOk)
  216. if upOk {
  217. upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架
  218. qu.Debug(code, "脚本上架", upresult, err)
  219. }
  220. }
  221. ok := false
  222. if upresult && err == nil { //上架成功
  223. ok = true
  224. qu.Debug("Code:", code, "历史迁移到增量节点成功")
  225. } else { //上架失败
  226. util.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": event, "state": 6}}, false, false)
  227. qu.Debug("Code:", code, "历史迁移到增量节点失败")
  228. }
  229. update := []map[string]interface{}{}
  230. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  231. update = append(update, map[string]interface{}{"$set": map[string]interface{}{"ok": ok, "state": 1, "updatetime": time.Now().Unix()}})
  232. lock.Lock()
  233. arr = append(arr, update)
  234. lock.Unlock()
  235. }(tmp)
  236. tmp = map[string]interface{}{}
  237. }
  238. wg.Wait()
  239. lock.Lock()
  240. if len(arr) > 0 {
  241. util.MgoEB.UpdateBulk("luamovelog", arr...)
  242. arr = [][]map[string]interface{}{}
  243. }
  244. lock.Unlock()
  245. }
  246. func GetEvent(code string, lua map[string]interface{}) int {
  247. defer qu.Catch()
  248. //1、历史节点
  249. if lua["incrementevent"] != nil {
  250. return qu.IntAll(lua["incrementevent"])
  251. }
  252. //2、根据站点找节点
  253. query := map[string]interface{}{
  254. "code": map[string]interface{}{
  255. "$ne": code,
  256. },
  257. "site": lua["site"],
  258. "state": 5,
  259. }
  260. tmp, _ := util.MgoEB.FindOne("luaconfig", query)
  261. if tmp != nil && len(*tmp) > 0 {
  262. return qu.IntAll((*tmp)["event"])
  263. }
  264. //3、7700
  265. //spidermovevent := qu.ObjToString(lua["spidermovevent"])
  266. //if spidermovevent == "7700" {
  267. // return 7700
  268. //}
  269. //4、根据数量分配节点
  270. //num := 0
  271. //result := 7700
  272. //for k, t := range sp.Config.Uploadevents {
  273. // if qu.ObjToString(t) == spidermovevent { //bid、comm
  274. // event := qu.IntAll(k)
  275. // //count := mgdb.Count("luaconfig", map[string]interface{}{"state": 5, "event": event})
  276. // count := u.MgoEB.Count("luaconfig", map[string]interface{}{"state": 5, "event": event})
  277. // if num == 0 || count < num {
  278. // result = event
  279. // num = count
  280. // }
  281. // }
  282. //}
  283. //5、指定节点
  284. return 7200
  285. }