123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- package action
- import (
- "fmt"
- "log"
- "taskCenter/config"
- . "app.yhyue.com/moapp/jybase/api"
- qu "app.yhyue.com/moapp/jybase/common"
- "app.yhyue.com/moapp/jybase/go-xweb/xweb"
- )
- type TaskAction struct {
- *xweb.Action
- task xweb.Mapper `xweb:"/task/push"` //分发任务
- completion xweb.Mapper `xweb:"/task/completion"` //任务状态返回
- get xweb.Mapper `xweb:"/task/get"`
- }
- var tm = &TaskManager{}
- func init() {
- xweb.AddAction(&TaskAction{})
- tm = NewTaskManager(1000)
- Init()
- }
- func (this *TaskAction) Get() {
- typ, _ := this.GetInteger("typ")
- data := tm.Get(typ)
- this.ServeJson(Result{
- Data: data,
- })
- }
- //分发任务
- func (this *TaskAction) Task() {
- task := tm.ClaimTask()
- this.ServeJson(Result{
- Data: task,
- })
- }
- //结束任务
- func (this *TaskAction) Completion() {
- status, _ := this.GetInteger("status")
- spidercode := this.GetString("spidercode")
- channel := this.GetString("channel")
- site := this.GetString("site")
- channelurl := this.GetString("channelurl")
- spidersource := this.GetString("spidersource")
- st := SpiderTask{
- SpiderCode: spidercode,
- Channel: channel,
- Site: site,
- ChannelUrl: channelurl,
- Status: 0,
- SpiderSource: "",
- ClaimTime: 0,
- }
- if status == -1 {
- //执行失败 放回任务池 等待认领
- tm.AddTaskToPool(st)
- tm.DelExecutePool(st)
- log.Println("执行失败", spidersource)
- } else {
- //执行成功
- tm.DelExecutePool(st)
- }
- this.ServeJson(Result{})
- }
- //DelExecutePool 删除执行池任务
- func (tm *TaskManager) DelExecutePool(st SpiderTask) {
- for {
- select {
- case task := <-tm.ExecutePool:
- if task.SpiderCode == st.SpiderCode {
- // 处理目标任务
- log.Println("删除任务", st.SpiderCode, "认领时间:", st.ClaimTime, "认领来源:", st.SpiderSource)
- return
- } else {
- // 将任务重新推送到 Completed 通道
- tm.ExecutePool <- task
- }
- default:
- // 没有找到目标任务
- log.Println("没有找到目标任务", st.SpiderCode)
- return
- }
- }
- }
- type SpiderTask struct {
- SpiderCode string //爬虫
- Channel string //文章名称
- Site string //站点
- ChannelUrl string //地址
- Status int //爬虫状态, 0未分配 1已分配 2执行失败 3执行完成
- SpiderSource string //认领来源
- ClaimTime int64 //认领时间
- }
- /*
- 1.需要两个池子 第一个池子是任务池 获取到所有的爬虫,将爬虫状态置为未分配
- 2.第二个池子是 执行池 ,从第一个池子中 获取未分配的爬虫数据
- 两个接口第一个接口实现 从任务池中获取数据 并放入执行池, 改变任务池中的状态为 已分配
- 第二个接口实现 获取爬虫完成状态 从执行池中删除,并把任务池中的数据改为 执行完成或者执行失败
- */
- func Init() {
- log.Println("start")
- // query := map[string]interface{}{
- // "spidercode": map[string]interface{}{
- // "$in": []string{
- // "ah_ahhfggzyjyzx_zfcg_htgs",
- // "ah_ahhfggzyjyzx_zfcg_dybg",
- // },
- // },
- // }
- query := map[string]interface{}{
- "result.todayData": map[string]interface{}{
- "$ne": `{}`,
- },
- }
- sess := config.MQFW.GetMgoConn()
- defer config.MQFW.DestoryMongoConn(sess)
- it := sess.DB("zxl").C("checklist_0719").Find(query).Sort("_id").Select(map[string]interface{}{
- "spidercode": 1,
- "channel": 1,
- "channelurl": 1,
- "site": 1,
- }).Limit(1000).Iter()
- total := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if total%50 == 0 {
- log.Println("cur index ", total)
- }
- spidercode := qu.ObjToString(tmp["spidercode"])
- log.Println(spidercode)
- channel := qu.ObjToString(tmp["channel"])
- channelurl := qu.ObjToString(tmp["channelurl"])
- site := qu.ObjToString(tmp["site"])
- tm.AddTaskToPool(SpiderTask{
- SpiderCode: spidercode,
- Channel: channel,
- Site: site,
- ChannelUrl: channelurl,
- Status: 0,
- SpiderSource: "",
- ClaimTime: 0,
- })
- tmp = make(map[string]interface{})
- }
- log.Println("end")
- }
- // TaskManager 用于管理任务
- type TaskManager struct {
- TaskPool chan SpiderTask
- ExecutePool chan SpiderTask
- }
- // NewTaskManager 创建一个新的任务管理器
- func NewTaskManager(taskCap int) *TaskManager {
- return &TaskManager{
- TaskPool: make(chan SpiderTask, taskCap),
- ExecutePool: make(chan SpiderTask, taskCap),
- }
- }
- // AddTaskToPool 添加任务到任务池
- func (tm *TaskManager) AddTaskToPool(task SpiderTask) {
- tm.TaskPool <- task
- }
- // AddExecutePool 添加任务到执行池池
- func (tm *TaskManager) AddExecutePool(task SpiderTask) {
- tm.ExecutePool <- task
- }
- // ClaimTask 从任务池中取出一个状态为0的任务,更新其状态,并将其放入执行池
- func (tm *TaskManager) ClaimTask() *SpiderTask {
- for {
- select {
- case task := <-tm.TaskPool: // 从任务池中取出一个任务
- log.Println("取数据")
- if task.Status == 0 {
- task.Status = 1 // 更新任务状态为已分配
- tm.ExecutePool <- task // 将任务放入执行池
- // tm.TaskPool <- task //放入任务池
- return &task
- } else {
- log.Println("放回任务池")
- // 如果任务状态不是0,将其放回任务池
- tm.TaskPool <- task
- }
- default:
- // 如果任务池为空,表示没有可用的任务
- fmt.Println("没有可用的任务")
- return nil
- }
- }
- return nil
- }
- // ClaimTask 从任务池中取出一个状态为0的任务,更新其状态,并将其放入执行池
- func (tm *TaskManager) Get(typ int) *SpiderTask {
- if typ == 1 {
- for {
- select {
- case task := <-tm.TaskPool: // 从任务池中取出一个任务
- log.Println("放回任务池")
- tm.TaskPool <- task
- return &task
- default:
- // 如果任务池为空,表示没有可用的任务
- fmt.Println("TaskPool 没有可用的任务")
- return nil
- }
- }
- } else if typ == 2 {
- for {
- select {
- case task := <-tm.ExecutePool: // 从任务池中取出一个任务
- log.Println("取任务")
- return &task
- default:
- // 如果任务池为空,表示没有可用的任务
- fmt.Println("ExecutePool 没有可用的任务")
- return nil
- }
- }
- }
- return nil
- }
|