|
@@ -1,13 +1,17 @@
|
|
|
package service
|
|
|
|
|
|
import (
|
|
|
+ "encoding/json"
|
|
|
"github.com/go-xweb/xweb"
|
|
|
"gopkg.in/mgo.v2/bson"
|
|
|
"log"
|
|
|
+ "net"
|
|
|
+ "qfw/common/src/qfw/util"
|
|
|
qu "qfw/util"
|
|
|
"strings"
|
|
|
"time"
|
|
|
. "util"
|
|
|
+ mu "mfw/util"
|
|
|
)
|
|
|
|
|
|
type TaskRule struct {
|
|
@@ -19,6 +23,52 @@ type TaskRule struct {
|
|
|
taskDelete xweb.Mapper `xweb:"/service/task/delete"` //任务保存
|
|
|
taskStart xweb.Mapper `xweb:"/service/task/start"` //任务保存
|
|
|
taskEnd xweb.Mapper `xweb:"/service/task/end"` //任务保存
|
|
|
+}
|
|
|
+var (
|
|
|
+ udpclient mu.UdpClient //udp对象
|
|
|
+ taskConfig map[string]interface{} //配置文件
|
|
|
+ isTaskOK bool
|
|
|
+ timeout = 3
|
|
|
+)
|
|
|
+func init() {
|
|
|
+ qu.ReadConfig(&taskConfig) //初始化配置
|
|
|
+ updport := taskConfig["udpport"].(string)
|
|
|
+ udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
|
+ log.Println("Udp服务监听", updport)
|
|
|
+ udpclient.Listen(processUdpMsg)
|
|
|
+}
|
|
|
+
|
|
|
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
+ switch act {
|
|
|
+ case mu.OP_TYPE_DATA:
|
|
|
+ var rep map[string]interface{}
|
|
|
+ err := json.Unmarshal(data, &rep)
|
|
|
+ if err != nil { //测试接收
|
|
|
+
|
|
|
+ go udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
|
|
|
+ } else {
|
|
|
+ by, _ := json.Marshal(map[string]interface{}{
|
|
|
+ "taskid": rep["taskid"],
|
|
|
+ "stype": rep["stype"],
|
|
|
+ })
|
|
|
+ go udpclient.WriteUdp(by, mu.OP_NOOP, ra) //回应上一个节点
|
|
|
+
|
|
|
+ }
|
|
|
+ case mu.OP_NOOP: //下个节点回应
|
|
|
+ log.Println("接收回应:",string(data))
|
|
|
+ var rep map[string]interface{}
|
|
|
+ err := json.Unmarshal(data, &rep)
|
|
|
+ if err != nil {//空数据
|
|
|
+ //
|
|
|
+ }else {//正确
|
|
|
+ if qu.ObjToString(rep["stype"])=="startTask" {
|
|
|
+ updateMgoIsuse("1",qu.ObjToString(rep["taskid"]))
|
|
|
+ }else if rep["stype"]=="stopTask"{
|
|
|
+ updateMgoIsuse("0",qu.ObjToString(rep["taskid"]))
|
|
|
+ }else {
|
|
|
+
|
|
|
+ }
|
|
|
+ }}
|
|
|
|
|
|
}
|
|
|
|
|
@@ -61,7 +111,7 @@ func (task *TaskRule) TaskCreate() {
|
|
|
func (task *TaskRule) TaskEdit() {
|
|
|
defer qu.Catch()
|
|
|
log.Println("编辑")
|
|
|
- id := task.GetString("id") //标签列表编辑
|
|
|
+ id := task.GetString("id")
|
|
|
query := bson.M{}
|
|
|
query["_id"] = qu.StringTOBsonId(id)
|
|
|
data, _ := Mgo.FindOneByField("taskinfo", query, `{}`)
|
|
@@ -112,40 +162,110 @@ func (task *TaskRule) TaskDelete() {
|
|
|
|
|
|
//开始任务
|
|
|
func (task *TaskRule) TaskStart() {
|
|
|
- log.Println("启动")
|
|
|
defer qu.Catch()
|
|
|
- id := task.GetString("_id")
|
|
|
- set := bson.M{
|
|
|
- "$set": bson.M{
|
|
|
- "s_isuse": "1",
|
|
|
- },
|
|
|
- }
|
|
|
- b := Mgo.UpdateById("taskinfo", id, set)
|
|
|
- task.ServeJson(map[string]interface{}{
|
|
|
- "rep": b,
|
|
|
+ data := GetPostForm(task.Request)
|
|
|
+ id := qu.ObjToString(data["id"])
|
|
|
+ //发送udp 开启
|
|
|
+ by, _ := json.Marshal(map[string]interface{}{
|
|
|
+ "taskid": id,
|
|
|
+ "stype": "startTask",
|
|
|
})
|
|
|
+ addr := &net.UDPAddr{
|
|
|
+ IP: net.ParseIP(data["addr"].(string)),
|
|
|
+ Port: util.IntAll(data["port"]),
|
|
|
+ }
|
|
|
+
|
|
|
+ err :=udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
+ if err!=nil {
|
|
|
+ task.ServeJson(map[string]interface{}{
|
|
|
+ "rep": false,
|
|
|
+ "msg":"任务开始失败-udp",
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ n:=0
|
|
|
+ for {
|
|
|
+ if !isTaskOK && n < timeout {
|
|
|
+ time.Sleep(1 * time.Second)
|
|
|
+ n++
|
|
|
+ } else {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Println("循环结束",isTaskOK,n,"处理开启post回调")
|
|
|
+ if isTaskOK {
|
|
|
+ isTaskOK = false
|
|
|
+ task.ServeJson(map[string]interface{}{
|
|
|
+ "rep": true,
|
|
|
+ "msg":"任务开始成功",
|
|
|
+ })
|
|
|
+ }else {
|
|
|
+ isTaskOK = false
|
|
|
+ task.ServeJson(map[string]interface{}{
|
|
|
+ "rep": false,
|
|
|
+ "msg":"任务开始失败-超时",
|
|
|
+ })
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//关闭任务
|
|
|
func (task *TaskRule) TaskEnd() {
|
|
|
- log.Println("关闭")
|
|
|
defer qu.Catch()
|
|
|
- id := task.GetString("_id")
|
|
|
+ data := GetPostForm(task.Request)
|
|
|
+ id := qu.ObjToString(data["id"])
|
|
|
+ //发送udp 关闭
|
|
|
+ by, _ := json.Marshal(map[string]interface{}{
|
|
|
+ "taskid": id,
|
|
|
+ "stype": "stopTask",
|
|
|
+ })
|
|
|
+ addr := &net.UDPAddr{
|
|
|
+ IP: net.ParseIP(data["addr"].(string)),
|
|
|
+ Port: util.IntAll(data["port"]),
|
|
|
+ }
|
|
|
+ err :=udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
+ if err!=nil {
|
|
|
+ task.ServeJson(map[string]interface{}{
|
|
|
+ "rep": false,
|
|
|
+ "msg":"任务开始失败-udp",
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ n:=0
|
|
|
+ for {
|
|
|
+ if !isTaskOK && n < timeout {
|
|
|
+ time.Sleep(1 * time.Second)
|
|
|
+ n++
|
|
|
+ } else {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Println("循环结束",isTaskOK,n,"处理关闭post回调")
|
|
|
+ if isTaskOK {
|
|
|
+ isTaskOK = false
|
|
|
+ task.ServeJson(map[string]interface{}{
|
|
|
+ "rep": true,
|
|
|
+ "msg":"任务开始成功",
|
|
|
+ })
|
|
|
+ }else {
|
|
|
+ isTaskOK = false
|
|
|
+ task.ServeJson(map[string]interface{}{
|
|
|
+ "rep": false,
|
|
|
+ "msg":"任务开始失败-超时",
|
|
|
+ })
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func updateMgoIsuse(isu string,id string) {
|
|
|
set := bson.M{
|
|
|
"$set": bson.M{
|
|
|
- "s_isuse": "0",
|
|
|
+ "s_isuse": isu,
|
|
|
},
|
|
|
}
|
|
|
- b := Mgo.UpdateById("taskinfo", id, set)
|
|
|
- task.ServeJson(map[string]interface{}{
|
|
|
- "rep": b,
|
|
|
- })
|
|
|
+ log.Println(id)
|
|
|
+ Mgo.UpdateById("taskinfo", id, set)
|
|
|
+ isTaskOK = true
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
//******方法
|
|
|
func saveTaskMongo(id string, rdata map[string]interface{}) (rid string, rep bool) {
|
|
|
defer qu.Catch()
|