package main import ( "cluster" "config" "encoding/json" "fmt" "github.com/cron" "gopkg.in/mgo.v2/bson" "html/template" "log" "math" mu "mfw/util" "net" "net/http" "qfw/common/src/qfw/util" qu "qfw/util" "qfw/util/mongodb" "regexp" "strings" "sync" "time" ) var udpclient mu.UdpClient //udp对象 var auto bool var sys sync.RWMutex func main() { auto = false udpclient = mu.UdpClient{Local: config.Sysconfig["udpip"].(string) + ":" + config.Sysconfig["udpport"].(string), BufSize: 1024} udpclient.Listen(processUdpMsg) log.Printf("Udp listening port: %s:%s\n", config.Sysconfig["udpip"], config.Sysconfig["udpport"]) if config.Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点 ips := qu.ObjToString(config.Sysconfig["broadcast_ips"]) ipsArr := strings.Split(ips, ";") for _, v := range ipsArr { udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(v), Port: qu.IntAll(config.Sysconfig["broadcast_port"]), }) } } mux := http.NewServeMux() mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { http.Redirect(writer, request, "/login", http.StatusFound) }) mux.HandleFunc("/favicon.ico", func(writer http.ResponseWriter, request *http.Request) { writer.WriteHeader(http.StatusOK) }) mux.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) { tp, err := template.ParseFiles("web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } if request.Method == "GET" { tp.Execute(writer, "") return } if request.Method == "POST" { err := request.ParseForm() if err != nil { http.Error(writer, err.Error(), http.StatusBadRequest) return } email := request.Form.Get("username") pwd := request.Form.Get("pwd") if email == "ocr_task" && pwd == "ocr_task_pwd" { http.SetCookie(writer, &http.Cookie{Name: "username", Value: util.GetMd5String("email")}) http.Redirect(writer, request, "/query", http.StatusFound) } else { writer.WriteHeader(http.StatusUnauthorized) tp.Execute(writer, "密码错误") } return } }) res := http.FileServer(http.Dir("res")) mux.Handle("/res/", http.StripPrefix("/res/", res)) mux.HandleFunc("/query", func(writer http.ResponseWriter, request *http.Request) { tplogin, err := template.ParseFiles("web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } cookie, _ := request.Cookie("username") if cookie == nil { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "请先登录") return } if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc" { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "密钥错误") return } tp, err := template.ParseFiles("web/index.html", "public/header.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } task := queryOcrTask() tp.Execute(writer, task) }) mux.HandleFunc("/reload", func(writer http.ResponseWriter, request *http.Request) { tplogin, err := template.ParseFiles("web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } cookie, _ := request.Cookie("username") if cookie == nil { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "请先登录") return } tpReload, err := template.ParseFiles("web/reload.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } if request.Method == "POST" { err = request.ParseForm() if err != nil { tpReload.Execute(writer, err) return } data := request.PostForm["ips"] if len(data) <= 0 { tpReload.Execute(writer, "参数无效") return } var tmpstr string for _, v := range data { if !regexp.MustCompile("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}").MatchString(v){ tmpstr+= v +"ip格式不正确
" continue } result := reload(v) tmpstr += v + "---->" + result + "
" log.Println( v, "重新部署完成", ) } tpReload.Execute(writer, template.HTML( "
"+tmpstr+" 重新部署结束
"+ "
"+ "
"+ "
"+ ""+ "
")) return } tpReload.Execute(writer, template.HTML("
"+ "
"+ ""+ "
")) }) c := cron.New() spec := qu.ObjToString(config.Sysconfig["cornstr"]) c.AddFunc(spec, func() { log.Println(168, auto) if auto { return } d := time.Now() nowday := time.Date(d.Year(), d.Month(), d.Day(), 0, 0, 0, 0, d.Location()) taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1) taskNum := len(*taskArr) log.Println("当前任务数量:", taskNum) if taskNum <= 1 { return } if taskNum > 1 { cluster.DescribeInstances() //查询多台实例的详细信息 ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1) if ocrescs != nil || len(*ocrescs) > 0 { for _, v := range (*ocrescs) { if qu.ObjToString(v["AutoReleaseTime"]) != "" { utc, err := time.ParseInLocation("2006-01-02T15:04Z", qu.ObjToString(v["AutoReleaseTime"]),time.Local) if err != nil { log.Println("解析时间异常", err) continue } if utc.Before(nowday) { log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)})) } } } } log.Println("申请实例") now := time.Now() hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours() cluster.RunInstances("ocr_task_arr", "8", "false", qu.IntAll(config.Sysconfig["pernum"]), int(math.Round(hours))) log.Println("实例申请成功") DynamicTask(&auto) //动态任务 log.Println("申请实例结束") } }) c.AddFunc("0 0 0 1/1 * ? ", func() { auto = false }) c.Start() log.Println("Http listening port: ", qu.ObjToString(config.Sysconfig["http_port"])) if err := http.ListenAndServe(":"+qu.ObjToString(config.Sysconfig["http_port"]), mux); err != nil { fmt.Println("start http server faild:", err) } } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer qu.Catch() switch act { case mu.OP_TYPE_DATA: //保存服务 go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra) tmp := make(map[string]interface{}) err := json.Unmarshal(data, &tmp) if err != nil { go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra) return } tmp["start"] = tmp[qu.ObjToString(config.SidField)] bytes, _ := json.Marshal(tmp) b := mongodb.Save("ocr_task", string(bytes)) log.Println("保存id:", b) case mu.OP_NOOP: //其他节点回应消息打印 log.Println("节点接收成功", string(data), ra.String()) case mu.OP_GET_DOWNLOADERCODE: //分发任务 if `{"permission":"get_ocr_task"}` != string(data) { log.Println("没有权限:", string(data), ra) go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra) return } sys.Lock() datas := mongodb.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1) if len(*datas) == 0 { go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra) sys.Unlock() return } tmp := (*datas)[0] ObjectId := tmp["_id"] sid := qu.ObjToString(tmp[qu.ObjToString(config.SidField)]) eid := qu.ObjToString(tmp[qu.ObjToString(config.EidField)]) rdata := mongodb.FindOneByField(config.MgoC, bson.M{"_id": bson.M{ "$gt": bson.ObjectIdHex(sid), }}, `{"_id":1,"`+config.MgoFileFiled+`":1}`) //log.Println(rdata) if len((*rdata)) == 0 { go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra) sys.Unlock() return } newId := (*rdata)["_id"] if newId.(bson.ObjectId).Hex() >= eid { go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置 go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务 totmp := make(map[string]string) totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")]) totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(config.EidField)]) tobyte, _ := json.Marshal(totmp) go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(qu.ObjToString(config.Sysconfig["toudpip"])), Port: qu.IntAll(config.Sysconfig["toudpport"]), }) log.Println("ocr_task处理完成,发送下个节点", string(tobyte)) mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)}) sys.Unlock() return } go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务 //log.Println(newId.(bson.ObjectId).Hex()) tmp[config.SidField] = newId.(bson.ObjectId).Hex() mongodb.Update("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false) sys.Unlock() } } func queryOcrTask() map[string]int { data := make(map[string]int) taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1) taskNum := len(*taskArr) if taskNum == 0 { return data } data["taskNum"] = taskNum sumNum := 0 nowSumNum := 0 for i, v := range *taskArr { sid := bson.ObjectIdHex(qu.ObjToString(v["start"])) eid := bson.ObjectIdHex(qu.ObjToString(v[qu.ObjToString(config.EidField)])) sumNum += mongodb.Count("bidding", bson.M{"_id": bson.M{ "$gte": sid, "$lte": eid, }}) if i == 0 { nowSumNum = sumNum } } data["sumNum"] = sumNum data["nowSumNum"] = nowSumNum tmpsid := bson.ObjectIdHex(qu.ObjToString((*taskArr)[0]["start"])) over := (*taskArr)[0][qu.ObjToString(config.SidField)] overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{ "$gte": tmpsid, "$lt": bson.ObjectIdHex(qu.ObjToString(over)), }}) data["overNum"] = overNum undoneNum := sumNum - overNum data["undoneNum"] = undoneNum nowUnDoneNum := nowSumNum - overNum data["nowUnDoneNum"] = nowUnDoneNum return data } func DynamicTask(auto *bool) { time.Sleep(time.Second*30) cluster.DescribeInstances() //查询多台实例的详细信息 escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1) log.Println("实例未部署数量", len(*escObject)) if escObject != nil || len(*escObject) > 0 { for i, v := range (*escObject) { if tmpip := qu.ObjToString(v["ip_nw"]); tmpip == "" { log.Println("没用获取到ip,实例ip异常", tmpip) DynamicTask(auto) } else { if DoCMD(tmpip) { var isok bool var tmpstr string isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019", isok) tmpstr += udpstr + ";  " isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text", isok2) tmpstr += fil2textstr if isok2 && isok3 { (*escObject)[i]["OcrTaskStatus"] = "successful" mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false) (*auto) = true log.Println((*escObject)[i]["_id"], tmpip, "部署成功") } else { log.Println(tmpip, "部署异常,"+tmpstr) } } else { log.Println(tmpip, "部署失败") } } } } else { log.Println("动态任务创建失败") } } func DoCMD(ip string) bool { if ip == "" { return false } return cluster.RunSsh(ip) } func reload(ip string) string { tmp := mongodb.FindOne("ocr_ecs", bson.M{"ip_nw": ip}) if tmp == nil || len(*tmp) <= 0 { return "ip不存在" } cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例 now := time.Now() hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours() cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours))) //创建新实例 time.Sleep(time.Second*30) cluster.DescribeInstances() //查询多台实例的详细信息 escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1) if escObject != nil || len(*escObject) > 0 { var tmpip string if tmpip = qu.ObjToString((*escObject)[0]["ip_nw"]); tmpip == "" { log.Println("没用获取到ip,实例ip异常", tmpip) time.Sleep(time.Minute) cluster.DescribeInstances() //查询多台实例的详细信息 escObject = mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1) tmpip = qu.ObjToString((*escObject)[0]["ip_nw"]) } if DoCMD(tmpip) { var isok bool var tmpstr string isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019", isok) tmpstr += udpstr + ";  " isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text", isok2) tmpstr += fil2textstr if isok2 && isok3 { (*escObject)[0]["OcrTaskStatus"] = "successful" mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[0]["_id"]}, (*escObject)[0], true, false) log.Println((*escObject)[0]["_id"], tmpip, "部署成功") return tmpip + "部署成功," + tmpstr } else { return tmpip + "部署异常," + tmpstr } } else { return tmpip + "部署失败" } } return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例" }