package main import ( "cluster" "config" "encoding/json" "fmt" "github.com/cron" "gopkg.in/mgo.v2/bson" "html/template" "log" "math" mu "mfw/util" "net" "net/http" qu "qfw/util" "qfw/util/mongodb" "regexp" "strings" "sync" "time" ) var udpclient mu.UdpClient //udp对象 var sys sync.RWMutex var DeleteNode = time.NewTimer(time.Minute * 50) func main() { udpclient = mu.UdpClient{Local: config.Sysconfig["udpip"].(string) + ":" + config.Sysconfig["udpport"].(string), BufSize: 1024} udpclient.Listen(processUdpMsg) log.Printf("Udp listening port: %s:%s\n", config.Sysconfig["udpip"], config.Sysconfig["udpport"]) if config.Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点 ips := qu.ObjToString(config.Sysconfig["broadcast_ips"]) ipsArr := strings.Split(ips, ";") for _, v := range ipsArr { udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(v), Port: qu.IntAll(config.Sysconfig["broadcast_port"]), }) } } mux := http.NewServeMux() mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { http.Redirect(writer, request, "/login", http.StatusFound) }) mux.HandleFunc("/favicon.ico", func(writer http.ResponseWriter, request *http.Request) { writer.WriteHeader(http.StatusOK) }) mux.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) { tp, err := template.ParseFiles("src/web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } if request.Method == "GET" { tp.Execute(writer, "") return } if request.Method == "POST" { err := request.ParseForm() if err != nil { http.Error(writer, err.Error(), http.StatusBadRequest) return } email := request.Form.Get("username") pwd := request.Form.Get("pwd") if email == "ocr_task" && pwd == "ocr_task_pwd" { http.SetCookie(writer, &http.Cookie{Name: "username", Value: qu.GetMd5String("email")}) http.Redirect(writer, request, "/query", http.StatusFound) } else { writer.WriteHeader(http.StatusUnauthorized) tp.Execute(writer, "密码错误") } return } }) res := http.FileServer(http.Dir("src/res")) mux.Handle("/res/", http.StripPrefix("/res/", res)) mux.HandleFunc("/query", func(writer http.ResponseWriter, request *http.Request) { tplogin, err := template.ParseFiles("src/web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } cookie, _ := request.Cookie("username") if cookie == nil { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "请先登录") return } if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc" { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "密钥错误") return } tp, err := template.ParseFiles("src/web/index.html", "src/public/header.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } task := queryOcrTask() tp.Execute(writer, task) }) mux.HandleFunc("/reload", func(writer http.ResponseWriter, request *http.Request) { tplogin, err := template.ParseFiles("src/web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } cookie, _ := request.Cookie("username") if cookie == nil { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "请先登录") return } tpReload, err := template.ParseFiles("src/web/reload.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } if request.Method == "POST" { err = request.ParseForm() if err != nil { tpReload.Execute(writer, err) return } data := request.PostForm["ips"] if len(data) <= 0 { tpReload.Execute(writer, "参数无效") return } var tmpstr string for _, v := range data { if !regexp.MustCompile("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}").MatchString(v) { tmpstr += v + "ip格式不正确
" continue } result := reload(v) tmpstr += v + "---->" + result + "
" log.Println(v, "重新部署完成", ) } tpReload.Execute(writer, template.HTML( "
"+tmpstr+" 重新部署结束
"+ "
"+ "
"+ "
"+ ""+ "
")) return } tpReload.Execute(writer, template.HTML("
"+ "
"+ ""+ "
")) }) c := cron.New() spec := qu.ObjToString(config.Sysconfig["cornstr"]) c.AddFunc(spec, func() { d := time.Now() nowday := time.Date(d.Year(), d.Month(), d.Day(), 0, 0, 0, 0, d.Location()) cluster.DescribeInstances() //查询多台实例的详细信息 ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1) if ocrescs != nil || len(*ocrescs) > 0 { for _, v := range (*ocrescs) { if qu.ObjToString(v["AutoReleaseTime"]) != "" { utc, err := time.ParseInLocation("2006-01-02T15:04Z", qu.ObjToString(v["AutoReleaseTime"]), time.Local) if err != nil { log.Println("解析时间异常", err) continue } if utc.Before(nowday) { log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)})) } } } } taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1) taskNum := len(*taskArr) log.Println("当前任务数量:", taskNum) //if taskNum <= 0 { //计算释放,发送udp ccnum := compute() log.Println("ccnum:", ccnum, ", len(config.CID):", len(config.CID), config.CID) if ccnum <= 0 { if len(config.CID) == 0 { log.Println("当前实例为空,无需释放", config.CID, ) return } if ccnum == 0 && len(config.CID) > 0 { log.Println("释放所有实例", config.CID) for tmpIid, _ := range config.CID { ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid}) mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid}) log.Println("5分钟后释放实例", tmpIid) udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{ IP: net.ParseIP(qu.ObjToString((*ttt)["ip_nw"])), Port: qu.IntAll(config.Sysconfig["broadcast_port"]), }) go func(tmpIid string) { time.Sleep(time.Minute * 5) cluster.DeleteInstance(tmpIid) log.Println("5分钟后释放实例完成", tmpIid) }(tmpIid) } sys.Lock() config.CID = make(map[string]bool) sys.Unlock() } else { var tmpnum int for k, _ := range config.CID { if ccnum*(-1) >= tmpnum { return } tmpIid := k ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid}) mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid}) log.Println("5分钟后释放实例", tmpIid) udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{ IP: net.ParseIP(qu.ObjToString((*ttt)["ip_nw"])), Port: qu.IntAll(config.Sysconfig["broadcast_port"]), }) go func(tmpIid string) { time.Sleep(time.Minute * 5) cluster.DeleteInstance(tmpIid) log.Println("5分钟后释放实例完成", tmpIid) }(tmpIid) sys.Lock() delete(config.CID, k) sys.Unlock() } } return } //} if len(config.CID) >= qu.IntAll(config.Sysconfig["pernum"]) { log.Println("实例申请上限,当前实例:", config.CID) escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1) log.Println("实例未部署数量", len(*escObject)) if escObject != nil || len(*escObject) > 0 { for i, v := range (*escObject) { if tmpip := qu.ObjToString(v["ip_nw"]); tmpip == "" { log.Println("没用获取到ip,实例ip异常", tmpip) } else { go func(tmpip, InstanceId string, i int) { if DoCMD(tmpip) { var tmpstr string isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019") tmpstr += udpstr + ";  " isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text") tmpstr += fil2textstr if isok2 && isok3 { (*escObject)[i]["OcrTaskStatus"] = "successful" mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false) sys.Lock() if config.CID["InstanceId"] == false { config.CID["InstanceId"] = true } sys.Unlock() log.Println((*escObject)[i]["_id"], tmpip, "部署成功") } else { log.Println(tmpip, "部署异常,"+tmpstr) } } else { log.Println(tmpip, "部署失败") } }(tmpip, qu.ObjToString(v["InstanceId"]), i) } } } return } //if taskNum > 1 { log.Println("申请实例") now := time.Now() hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours() cluster.RunInstances("ocr_task_arr", "8", "false", ccnum, int(math.Round(hours))) log.Println("实例申请成功") DynamicTask() //动态任务 log.Println("申请实例结束") //} }) c.Start() log.Println("Http listening port: ", qu.ObjToString(config.Sysconfig["http_port"])) if err := http.ListenAndServe(":"+qu.ObjToString(config.Sysconfig["http_port"]), mux); err != nil { fmt.Println("start http server faild:", err) } } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer qu.Catch() switch act { case mu.OP_TYPE_DATA: //保存服务 go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra) tmp := make(map[string]interface{}) err := json.Unmarshal(data, &tmp) if err != nil { go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra) return } tmp["start"] = tmp[qu.ObjToString(config.SidField)] tmp["import_time"] = time.Now().Unix() bytes, _ := json.Marshal(tmp) b := mongodb.Save("ocr_task", string(bytes)) mongodb.Save("ocr_task_bak", string(bytes)) log.Println("保存id:", b) case mu.OP_NOOP: //其他节点回应消息打印 log.Println("节点接收成功", string(data), ra.String()) case mu.OP_GET_DOWNLOADERCODE: //分发任务 if `{"permission":"get_ocr_task"}` != string(data) { log.Println("没有权限:", string(data), ra) go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra) return } sys.Lock() datas := mongodb.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1) if len(*datas) == 0 { go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra) sys.Unlock() return } tmp := (*datas)[0] ObjectId := tmp["_id"] sid := qu.ObjToString(tmp[qu.ObjToString(config.SidField)]) eid := qu.ObjToString(tmp[qu.ObjToString(config.EidField)]) rdata := mongodb.FindOneByField(config.MgoC, bson.M{"_id": bson.M{ "$gt": bson.ObjectIdHex(sid), }}, `{"_id":1,"`+config.MgoFileFiled+`":1}`) //log.Println(rdata) if len((*rdata)) == 0 { go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra) sys.Unlock() return } newId := (*rdata)["_id"] if newId.(bson.ObjectId).Hex() >= eid { go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置 go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务 totmp := make(map[string]string) totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")]) totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(config.EidField)]) tobyte, _ := json.Marshal(totmp) go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(qu.ObjToString(config.Sysconfig["toudpip"])), Port: qu.IntAll(config.Sysconfig["toudpport"]), }) log.Println("ocr_task处理完成,发送下个节点", string(tobyte)) mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)}) tmp["end_time"] = time.Now().Unix() mongodb.Update("ocr_task_bak", bson.M{"_id": ObjectId.(bson.ObjectId)}, map[string]interface{}{"$set": tmp}, true, false) sys.Unlock() return } go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务 //log.Println(newId.(bson.ObjectId).Hex()) tmp[config.SidField] = newId.(bson.ObjectId).Hex() mongodb.Update("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false) sys.Unlock() } } func queryOcrTask() map[string]int { data := make(map[string]int) taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1) taskNum := len(*taskArr) if taskNum == 0 { return data } data["taskNum"] = taskNum sumNum := 0 nowSumNum := 0 for i, v := range *taskArr { sid := bson.ObjectIdHex(qu.ObjToString(v["start"])) eid := bson.ObjectIdHex(qu.ObjToString(v[qu.ObjToString(config.EidField)])) sumNum += mongodb.Count("bidding", bson.M{"_id": bson.M{ "$gte": sid, "$lte": eid, }}) if i == 0 { nowSumNum = sumNum } } data["sumNum"] = sumNum data["nowSumNum"] = nowSumNum tmpsid := bson.ObjectIdHex(qu.ObjToString((*taskArr)[0]["start"])) over := (*taskArr)[0][qu.ObjToString(config.SidField)] overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{ "$gte": tmpsid, "$lt": bson.ObjectIdHex(qu.ObjToString(over)), }}) data["overNum"] = overNum undoneNum := sumNum - overNum data["undoneNum"] = undoneNum nowUnDoneNum := nowSumNum - overNum data["nowUnDoneNum"] = nowUnDoneNum return data } func DynamicTask() { time.Sleep(time.Second * 30) cluster.DescribeInstances() //查询多台实例的详细信息 escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1) log.Println("实例未部署数量", len(*escObject)) if escObject != nil || len(*escObject) > 0 { for i, v := range (*escObject) { if tmpip := qu.ObjToString(v["ip_nw"]); tmpip == "" { log.Println("没用获取到ip,实例ip异常", tmpip) DynamicTask() return } else { go func(tmpip,InstanceId string,i int) { if DoCMD(tmpip) { var tmpstr string isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019") tmpstr += udpstr + ";  " isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text") tmpstr += fil2textstr if isok2 && isok3 { (*escObject)[i]["OcrTaskStatus"] = "successful" mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false) sys.Lock() if config.CID[InstanceId] == false { config.CID[InstanceId] = true } sys.Unlock() log.Println((*escObject)[i]["_id"], tmpip, "部署成功") } else { log.Println(tmpip, "部署异常,"+tmpstr) } } else { log.Println(tmpip, "部署失败") } }(tmpip,qu.ObjToString(v["InstanceId"]),i) } } } else { log.Println("动态任务创建失败") } } func DoCMD(ip string) bool { if ip == "" { return false } return cluster.RunSsh(ip) } func reload(ip string) string { tmp := mongodb.FindOne("ocr_ecs", bson.M{"ip_nw": ip}) if tmp == nil || len(*tmp) <= 0 { return "ip不存在" } sys.Lock() if config.CID[qu.ObjToString((*tmp)["InstanceId"])] == true{ delete(config.CID,qu.ObjToString((*tmp)["InstanceId"])) } sys.Unlock() cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例 now := time.Now() hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours() cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours))) //创建新实例 time.Sleep(time.Second * 30) cluster.DescribeInstances() //查询多台实例的详细信息 escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1) if escObject != nil || len(*escObject) > 0 { var tmpip string if tmpip = qu.ObjToString((*escObject)[0]["ip_nw"]); tmpip == "" { log.Println("没用获取到ip,实例ip异常", tmpip) time.Sleep(time.Minute) cluster.DescribeInstances() //查询多台实例的详细信息 escObject = mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1) tmpip = qu.ObjToString((*escObject)[0]["ip_nw"]) } if DoCMD(tmpip) { var tmpstr string isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019") tmpstr += udpstr + ";  " isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text") tmpstr += fil2textstr if isok2 && isok3 { (*escObject)[0]["OcrTaskStatus"] = "successful" mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[0]["_id"]}, (*escObject)[0], true, false) sys.Lock() if config.CID[qu.ObjToString((*escObject)[0]["InstanceId"])] == false { config.CID[qu.ObjToString((*escObject)[0]["InstanceId"])] = true } sys.Unlock() log.Println((*escObject)[0]["_id"], tmpip, "部署成功") return tmpip + "部署成功," + tmpstr } else { return tmpip + "部署异常," + tmpstr } } else { return tmpip + "部署失败" } } return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例" } func compute() int { nowtime := time.Now().Unix() taskArrase := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1) if taskArrase == nil || len(*taskArrase) == 0 { log.Println(464, "nil ro len(*taskArrase) == 0") return 0 } stmp := (*taskArrase)[0] etmp := (*taskArrase)[len(*taskArrase)-1] if stmp != nil && etmp != nil { stime := qu.Int64All(stmp["import_time"]) if nowtime-stime <= qu.Int64All(config.Sysconfig["time_consuming_limit"]) { log.Println(472, nowtime-stime, "<=", qu.Int64All(config.Sysconfig["time_consuming_limit"])) return 0 } sid := bson.ObjectIdHex(qu.ObjToString(stmp["start"])) eid := bson.ObjectIdHex(qu.ObjToString(etmp[qu.ObjToString(config.EidField)])) sum := mongodb.Count("bidding", bson.M{"_id": bson.M{ "$gte": sid, "$lt": eid, }}) if sum <= qu.IntAll(config.Sysconfig["accumulated_task_limit"]) { log.Println(483, sum, sid, eid) return 0 } gteid := bson.ObjectIdHex(qu.ObjToString(stmp[qu.ObjToString(config.SidField)])) overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{ "$gte": sid, "$lt": gteid, }}) if overNum == 0 { log.Println(492, overNum, sid, gteid) return 0 } if sum <= qu.IntAll(config.Sysconfig["accumulated_task_lowlimit"]) { log.Println(496, sum, "<=", qu.IntAll(config.Sysconfig["accumulated_task_lowlimit"])) return 0 } mtmm := float64(overNum) / float64(nowtime-stime) / float64(len(config.CID)+3) //每台每秒 if mtmm <= 0 { log.Println(501, overNum, int(nowtime-stime), (len(config.CID) + 3)) return 0 } cc := float64(sum)/float64(qu.IntAll(config.Sysconfig["corntime_consuming"]))/mtmm - float64(len(config.CID)) - 3 log.Println("overNum:", overNum, ",hs:", int(nowtime-stime), ",mtms:", mtmm, ",sum:", sum, cc) if cc > qu.Float64All(config.Sysconfig["pernum"]) { return qu.IntAll(config.Sysconfig["pernum"]) } else { return int(cc) } } return 0 }