package action import ( "fmt" "log" "taskCenter/config" . "app.yhyue.com/moapp/jybase/api" qu "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/go-xweb/xweb" ) type TaskAction struct { *xweb.Action task xweb.Mapper `xweb:"/task/push"` //分发任务 completion xweb.Mapper `xweb:"/task/completion"` //任务状态返回 get xweb.Mapper `xweb:"/task/get"` } var tm = &TaskManager{} func init() { xweb.AddAction(&TaskAction{}) tm = NewTaskManager(1000) Init() } func (this *TaskAction) Get() { typ, _ := this.GetInteger("typ") data := tm.Get(typ) this.ServeJson(Result{ Data: data, }) } //分发任务 func (this *TaskAction) Task() { task := tm.ClaimTask() this.ServeJson(Result{ Data: task, }) } //结束任务 func (this *TaskAction) Completion() { status, _ := this.GetInteger("status") spidercode := this.GetString("spidercode") channel := this.GetString("channel") site := this.GetString("site") channelurl := this.GetString("channelurl") spidersource := this.GetString("spidersource") st := SpiderTask{ SpiderCode: spidercode, Channel: channel, Site: site, ChannelUrl: channelurl, Status: 0, SpiderSource: "", ClaimTime: 0, } if status == -1 { //执行失败 放回任务池 等待认领 tm.AddTaskToPool(st) tm.DelExecutePool(st) log.Println("执行失败", spidersource) } else { //执行成功 tm.DelExecutePool(st) } this.ServeJson(Result{}) } //DelExecutePool 删除执行池任务 func (tm *TaskManager) DelExecutePool(st SpiderTask) { for { select { case task := <-tm.ExecutePool: if task.SpiderCode == st.SpiderCode { // 处理目标任务 log.Println("删除任务", st.SpiderCode, "认领时间:", st.ClaimTime, "认领来源:", st.SpiderSource) return } else { // 将任务重新推送到 Completed 通道 tm.ExecutePool <- task } default: // 没有找到目标任务 log.Println("没有找到目标任务", st.SpiderCode) return } } } type SpiderTask struct { SpiderCode string //爬虫 Channel string //文章名称 Site string //站点 ChannelUrl string //地址 Status int //爬虫状态, 0未分配 1已分配 2执行失败 3执行完成 SpiderSource string //认领来源 ClaimTime int64 //认领时间 } /* 1.需要两个池子 第一个池子是任务池 获取到所有的爬虫,将爬虫状态置为未分配 2.第二个池子是 执行池 ,从第一个池子中 获取未分配的爬虫数据 两个接口第一个接口实现 从任务池中获取数据 并放入执行池, 改变任务池中的状态为 已分配 第二个接口实现 获取爬虫完成状态 从执行池中删除,并把任务池中的数据改为 执行完成或者执行失败 */ func Init() { log.Println("start") // query := map[string]interface{}{ // "spidercode": map[string]interface{}{ // "$in": []string{ // "ah_ahhfggzyjyzx_zfcg_htgs", // "ah_ahhfggzyjyzx_zfcg_dybg", // }, // }, // } query := map[string]interface{}{ "result.todayData": map[string]interface{}{ "$ne": `{}`, }, } sess := config.MQFW.GetMgoConn() defer config.MQFW.DestoryMongoConn(sess) it := sess.DB("zxl").C("checklist_0719").Find(query).Sort("_id").Select(map[string]interface{}{ "spidercode": 1, "channel": 1, "channelurl": 1, "site": 1, }).Limit(1000).Iter() total := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%50 == 0 { log.Println("cur index ", total) } spidercode := qu.ObjToString(tmp["spidercode"]) log.Println(spidercode) channel := qu.ObjToString(tmp["channel"]) channelurl := qu.ObjToString(tmp["channelurl"]) site := qu.ObjToString(tmp["site"]) tm.AddTaskToPool(SpiderTask{ SpiderCode: spidercode, Channel: channel, Site: site, ChannelUrl: channelurl, Status: 0, SpiderSource: "", ClaimTime: 0, }) tmp = make(map[string]interface{}) } log.Println("end") } // TaskManager 用于管理任务 type TaskManager struct { TaskPool chan SpiderTask ExecutePool chan SpiderTask } // NewTaskManager 创建一个新的任务管理器 func NewTaskManager(taskCap int) *TaskManager { return &TaskManager{ TaskPool: make(chan SpiderTask, taskCap), ExecutePool: make(chan SpiderTask, taskCap), } } // AddTaskToPool 添加任务到任务池 func (tm *TaskManager) AddTaskToPool(task SpiderTask) { tm.TaskPool <- task } // AddExecutePool 添加任务到执行池池 func (tm *TaskManager) AddExecutePool(task SpiderTask) { tm.ExecutePool <- task } // ClaimTask 从任务池中取出一个状态为0的任务,更新其状态,并将其放入执行池 func (tm *TaskManager) ClaimTask() *SpiderTask { for { select { case task := <-tm.TaskPool: // 从任务池中取出一个任务 log.Println("取数据") if task.Status == 0 { task.Status = 1 // 更新任务状态为已分配 tm.ExecutePool <- task // 将任务放入执行池 // tm.TaskPool <- task //放入任务池 return &task } else { log.Println("放回任务池") // 如果任务状态不是0,将其放回任务池 tm.TaskPool <- task } default: // 如果任务池为空,表示没有可用的任务 fmt.Println("没有可用的任务") return nil } } return nil } // ClaimTask 从任务池中取出一个状态为0的任务,更新其状态,并将其放入执行池 func (tm *TaskManager) Get(typ int) *SpiderTask { if typ == 1 { for { select { case task := <-tm.TaskPool: // 从任务池中取出一个任务 log.Println("放回任务池") tm.TaskPool <- task return &task default: // 如果任务池为空,表示没有可用的任务 fmt.Println("TaskPool 没有可用的任务") return nil } } } else if typ == 2 { for { select { case task := <-tm.ExecutePool: // 从任务池中取出一个任务 log.Println("取任务") return &task default: // 如果任务池为空,表示没有可用的任务 fmt.Println("ExecutePool 没有可用的任务") return nil } } } return nil }