task.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package action
  2. import (
  3. "fmt"
  4. "log"
  5. "taskCenter/config"
  6. . "app.yhyue.com/moapp/jybase/api"
  7. qu "app.yhyue.com/moapp/jybase/common"
  8. "app.yhyue.com/moapp/jybase/go-xweb/xweb"
  9. )
  10. type TaskAction struct {
  11. *xweb.Action
  12. task xweb.Mapper `xweb:"/task/push"` //分发任务
  13. completion xweb.Mapper `xweb:"/task/completion"` //任务状态返回
  14. get xweb.Mapper `xweb:"/task/get"`
  15. }
  16. var tm = &TaskManager{}
  17. func init() {
  18. xweb.AddAction(&TaskAction{})
  19. tm = NewTaskManager(1000)
  20. Init()
  21. }
  22. func (this *TaskAction) Get() {
  23. typ, _ := this.GetInteger("typ")
  24. data := tm.Get(typ)
  25. this.ServeJson(Result{
  26. Data: data,
  27. })
  28. }
  29. //分发任务
  30. func (this *TaskAction) Task() {
  31. task := tm.ClaimTask()
  32. this.ServeJson(Result{
  33. Data: task,
  34. })
  35. }
  36. //结束任务
  37. func (this *TaskAction) Completion() {
  38. status, _ := this.GetInteger("status")
  39. spidercode := this.GetString("spidercode")
  40. channel := this.GetString("channel")
  41. site := this.GetString("site")
  42. channelurl := this.GetString("channelurl")
  43. spidersource := this.GetString("spidersource")
  44. st := SpiderTask{
  45. SpiderCode: spidercode,
  46. Channel: channel,
  47. Site: site,
  48. ChannelUrl: channelurl,
  49. Status: 0,
  50. SpiderSource: "",
  51. ClaimTime: 0,
  52. }
  53. if status == -1 {
  54. //执行失败 放回任务池 等待认领
  55. tm.AddTaskToPool(st)
  56. tm.DelExecutePool(st)
  57. log.Println("执行失败", spidersource)
  58. } else {
  59. //执行成功
  60. tm.DelExecutePool(st)
  61. }
  62. this.ServeJson(Result{})
  63. }
  64. //DelExecutePool 删除执行池任务
  65. func (tm *TaskManager) DelExecutePool(st SpiderTask) {
  66. for {
  67. select {
  68. case task := <-tm.ExecutePool:
  69. if task.SpiderCode == st.SpiderCode {
  70. // 处理目标任务
  71. log.Println("删除任务", st.SpiderCode, "认领时间:", st.ClaimTime, "认领来源:", st.SpiderSource)
  72. return
  73. } else {
  74. // 将任务重新推送到 Completed 通道
  75. tm.ExecutePool <- task
  76. }
  77. default:
  78. // 没有找到目标任务
  79. log.Println("没有找到目标任务", st.SpiderCode)
  80. return
  81. }
  82. }
  83. }
  84. type SpiderTask struct {
  85. SpiderCode string //爬虫
  86. Channel string //文章名称
  87. Site string //站点
  88. ChannelUrl string //地址
  89. Status int //爬虫状态, 0未分配 1已分配 2执行失败 3执行完成
  90. SpiderSource string //认领来源
  91. ClaimTime int64 //认领时间
  92. }
  93. /*
  94. 1.需要两个池子 第一个池子是任务池 获取到所有的爬虫,将爬虫状态置为未分配
  95. 2.第二个池子是 执行池 ,从第一个池子中 获取未分配的爬虫数据
  96. 两个接口第一个接口实现 从任务池中获取数据 并放入执行池, 改变任务池中的状态为 已分配
  97. 第二个接口实现 获取爬虫完成状态 从执行池中删除,并把任务池中的数据改为 执行完成或者执行失败
  98. */
  99. func Init() {
  100. log.Println("start")
  101. // query := map[string]interface{}{
  102. // "spidercode": map[string]interface{}{
  103. // "$in": []string{
  104. // "ah_ahhfggzyjyzx_zfcg_htgs",
  105. // "ah_ahhfggzyjyzx_zfcg_dybg",
  106. // },
  107. // },
  108. // }
  109. query := map[string]interface{}{
  110. "result.todayData": map[string]interface{}{
  111. "$ne": `{}`,
  112. },
  113. }
  114. sess := config.MQFW.GetMgoConn()
  115. defer config.MQFW.DestoryMongoConn(sess)
  116. it := sess.DB("zxl").C("checklist_0719").Find(query).Sort("_id").Select(map[string]interface{}{
  117. "spidercode": 1,
  118. "channel": 1,
  119. "channelurl": 1,
  120. "site": 1,
  121. }).Limit(1000).Iter()
  122. total := 0
  123. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  124. if total%50 == 0 {
  125. log.Println("cur index ", total)
  126. }
  127. spidercode := qu.ObjToString(tmp["spidercode"])
  128. log.Println(spidercode)
  129. channel := qu.ObjToString(tmp["channel"])
  130. channelurl := qu.ObjToString(tmp["channelurl"])
  131. site := qu.ObjToString(tmp["site"])
  132. tm.AddTaskToPool(SpiderTask{
  133. SpiderCode: spidercode,
  134. Channel: channel,
  135. Site: site,
  136. ChannelUrl: channelurl,
  137. Status: 0,
  138. SpiderSource: "",
  139. ClaimTime: 0,
  140. })
  141. tmp = make(map[string]interface{})
  142. }
  143. log.Println("end")
  144. }
  145. // TaskManager 用于管理任务
  146. type TaskManager struct {
  147. TaskPool chan SpiderTask
  148. ExecutePool chan SpiderTask
  149. }
  150. // NewTaskManager 创建一个新的任务管理器
  151. func NewTaskManager(taskCap int) *TaskManager {
  152. return &TaskManager{
  153. TaskPool: make(chan SpiderTask, taskCap),
  154. ExecutePool: make(chan SpiderTask, taskCap),
  155. }
  156. }
  157. // AddTaskToPool 添加任务到任务池
  158. func (tm *TaskManager) AddTaskToPool(task SpiderTask) {
  159. tm.TaskPool <- task
  160. }
  161. // AddExecutePool 添加任务到执行池池
  162. func (tm *TaskManager) AddExecutePool(task SpiderTask) {
  163. tm.ExecutePool <- task
  164. }
  165. // ClaimTask 从任务池中取出一个状态为0的任务,更新其状态,并将其放入执行池
  166. func (tm *TaskManager) ClaimTask() *SpiderTask {
  167. for {
  168. select {
  169. case task := <-tm.TaskPool: // 从任务池中取出一个任务
  170. log.Println("取数据")
  171. if task.Status == 0 {
  172. task.Status = 1 // 更新任务状态为已分配
  173. tm.ExecutePool <- task // 将任务放入执行池
  174. // tm.TaskPool <- task //放入任务池
  175. return &task
  176. } else {
  177. log.Println("放回任务池")
  178. // 如果任务状态不是0,将其放回任务池
  179. tm.TaskPool <- task
  180. }
  181. default:
  182. // 如果任务池为空,表示没有可用的任务
  183. fmt.Println("没有可用的任务")
  184. return nil
  185. }
  186. }
  187. return nil
  188. }
  189. // ClaimTask 从任务池中取出一个状态为0的任务,更新其状态,并将其放入执行池
  190. func (tm *TaskManager) Get(typ int) *SpiderTask {
  191. if typ == 1 {
  192. for {
  193. select {
  194. case task := <-tm.TaskPool: // 从任务池中取出一个任务
  195. log.Println("放回任务池")
  196. tm.TaskPool <- task
  197. return &task
  198. default:
  199. // 如果任务池为空,表示没有可用的任务
  200. fmt.Println("TaskPool 没有可用的任务")
  201. return nil
  202. }
  203. }
  204. } else if typ == 2 {
  205. for {
  206. select {
  207. case task := <-tm.ExecutePool: // 从任务池中取出一个任务
  208. log.Println("取任务")
  209. return &task
  210. default:
  211. // 如果任务池为空,表示没有可用的任务
  212. fmt.Println("ExecutePool 没有可用的任务")
  213. return nil
  214. }
  215. }
  216. }
  217. return nil
  218. }