package main
import (
"cluster"
"config"
"encoding/json"
"fmt"
"github.com/cron"
"gopkg.in/mgo.v2/bson"
"html/template"
"log"
"math"
mu "mfw/util"
"net"
"net/http"
qu "qfw/util"
"qfw/util/mongodb"
"regexp"
"strings"
"sync"
"time"
)
var udpclient mu.UdpClient //udp对象
var sys sync.RWMutex
var DeleteNode = time.NewTimer(time.Minute * 50)
func main() {
udpclient = mu.UdpClient{Local: config.Sysconfig["udpip"].(string) + ":" + config.Sysconfig["udpport"].(string), BufSize: 1024}
udpclient.Listen(processUdpMsg)
log.Printf("Udp listening port: %s:%s\n", config.Sysconfig["udpip"], config.Sysconfig["udpport"])
if config.Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点
ips := qu.ObjToString(config.Sysconfig["broadcast_ips"])
ipsArr := strings.Split(ips, ";")
for _, v := range ipsArr {
udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
IP: net.ParseIP(v),
Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
})
}
}
mux := http.NewServeMux()
mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
http.Redirect(writer, request, "/login", http.StatusFound)
})
mux.HandleFunc("/favicon.ico", func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusOK)
})
mux.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) {
tp, err := template.ParseFiles("src/web/login.html")
if err != nil {
log.Println(err)
writer.Write([]byte( "页面找不到了~"))
return
}
if request.Method == "GET" {
tp.Execute(writer, "")
return
}
if request.Method == "POST" {
err := request.ParseForm()
if err != nil {
http.Error(writer, err.Error(), http.StatusBadRequest)
return
}
email := request.Form.Get("username")
pwd := request.Form.Get("pwd")
if email == "ocr_task" && pwd == "ocr_task_pwd" {
http.SetCookie(writer, &http.Cookie{Name: "username", Value: qu.GetMd5String("email")})
http.Redirect(writer, request, "/query", http.StatusFound)
} else {
writer.WriteHeader(http.StatusUnauthorized)
tp.Execute(writer, "密码错误")
}
return
}
})
res := http.FileServer(http.Dir("src/res"))
mux.Handle("/res/", http.StripPrefix("/res/", res))
mux.HandleFunc("/query", func(writer http.ResponseWriter, request *http.Request) {
tplogin, err := template.ParseFiles("src/web/login.html")
if err != nil {
log.Println(err)
writer.Write([]byte( "页面找不到了~"))
return
}
cookie, _ := request.Cookie("username")
if cookie == nil {
http.RedirectHandler("/login", http.StatusUnauthorized)
tplogin.Execute(writer, "请先登录")
return
}
if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc" {
http.RedirectHandler("/login", http.StatusUnauthorized)
tplogin.Execute(writer, "密钥错误")
return
}
tp, err := template.ParseFiles("src/web/index.html", "src/public/header.html")
if err != nil {
log.Println(err)
writer.Write([]byte( "页面找不到了~"))
return
}
task := queryOcrTask()
tp.Execute(writer, task)
})
mux.HandleFunc("/reload", func(writer http.ResponseWriter, request *http.Request) {
tplogin, err := template.ParseFiles("src/web/login.html")
if err != nil {
log.Println(err)
writer.Write([]byte( "页面找不到了~"))
return
}
cookie, _ := request.Cookie("username")
if cookie == nil {
http.RedirectHandler("/login", http.StatusUnauthorized)
tplogin.Execute(writer, "请先登录")
return
}
tpReload, err := template.ParseFiles("src/web/reload.html")
if err != nil {
log.Println(err)
writer.Write([]byte( "页面找不到了~"))
return
}
if request.Method == "POST" {
err = request.ParseForm()
if err != nil {
tpReload.Execute(writer, err)
return
}
data := request.PostForm["ips"]
if len(data) <= 0 {
tpReload.Execute(writer, "参数无效")
return
}
var tmpstr string
for _, v := range data {
if !regexp.MustCompile("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}").MatchString(v) {
tmpstr += v + "ip格式不正确"
continue
}
result := reload(v)
tmpstr += v + "---->" + result + ""
log.Println(v, "重新部署完成", )
}
tpReload.Execute(writer, template.HTML(
""+tmpstr+" 重新部署结束"+
""+
"
"))
return
}
tpReload.Execute(writer, template.HTML(""))
})
c := cron.New()
spec := qu.ObjToString(config.Sysconfig["cornstr"])
c.AddFunc(spec, func() {
d := time.Now()
nowday := time.Date(d.Year(), d.Month(), d.Day(), 0, 0, 0, 0, d.Location())
taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
taskNum := len(*taskArr)
log.Println("当前任务数量:", taskNum)
//if taskNum <= 0 {
//计算释放,发送udp
ccnum := compute()
log.Println("ccnum:",ccnum,", len(config.CID):", len(config.CID),config.CID)
if ccnum <= 0 {
if len(config.CID) == 0 {
log.Println("当前实例为空,无需释放", config.CID, )
return
}
if ccnum == 0 && len(config.CID) >0{
for i,_ := range config.CID{
tmpIid := config.CID[i]
go func(tmpIid string) {
config.CID = config.CID[i+1:]
mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
log.Println("5分钟后释放实例", tmpIid)
udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
IP: net.ParseIP(tmpIid),
Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
})
time.Sleep(time.Minute * 5)
cluster.DeleteInstance(tmpIid)
log.Println("5分钟后释放实例完成", tmpIid)
}(tmpIid)
}
}else {
for i:=0; i= qu.IntAll(config.Sysconfig["pernum"]) {
log.Println("实例申请上限,当前实例:", config.CID)
return
}
//if taskNum > 1 {
cluster.DescribeInstances() //查询多台实例的详细信息
ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1)
if ocrescs != nil || len(*ocrescs) > 0 {
for _, v := range (*ocrescs) {
if qu.ObjToString(v["AutoReleaseTime"]) != "" {
utc, err := time.ParseInLocation("2006-01-02T15:04Z", qu.ObjToString(v["AutoReleaseTime"]), time.Local)
if err != nil {
log.Println("解析时间异常", err)
continue
}
if utc.Before(nowday) {
log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
}
}
}
}
log.Println("申请实例")
now := time.Now()
hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
cluster.RunInstances("ocr_task_arr", "8", "false", ccnum, int(math.Round(hours)))
log.Println("实例申请成功")
DynamicTask() //动态任务
log.Println("申请实例结束")
//}
})
c.Start()
log.Println("Http listening port: ", qu.ObjToString(config.Sysconfig["http_port"]))
if err := http.ListenAndServe(":"+qu.ObjToString(config.Sysconfig["http_port"]), mux); err != nil {
fmt.Println("start http server faild:", err)
}
}
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
defer qu.Catch()
switch act {
case mu.OP_TYPE_DATA: //保存服务
go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra)
tmp := make(map[string]interface{})
err := json.Unmarshal(data, &tmp)
if err != nil {
go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
return
}
tmp["start"] = tmp[qu.ObjToString(config.SidField)]
tmp["import_time"] = time.Now().Unix()
bytes, _ := json.Marshal(tmp)
b := mongodb.Save("ocr_task", string(bytes))
mongodb.Save("ocr_task_bak", string(bytes))
log.Println("保存id:", b)
case mu.OP_NOOP: //其他节点回应消息打印
log.Println("节点接收成功", string(data), ra.String())
case mu.OP_GET_DOWNLOADERCODE: //分发任务
if `{"permission":"get_ocr_task"}` != string(data) {
log.Println("没有权限:", string(data), ra)
go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra)
return
}
sys.Lock()
datas := mongodb.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
if len(*datas) == 0 {
go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
sys.Unlock()
return
}
tmp := (*datas)[0]
ObjectId := tmp["_id"]
sid := qu.ObjToString(tmp[qu.ObjToString(config.SidField)])
eid := qu.ObjToString(tmp[qu.ObjToString(config.EidField)])
rdata := mongodb.FindOneByField(config.MgoC, bson.M{"_id": bson.M{
"$gt": bson.ObjectIdHex(sid),
}}, `{"_id":1,"`+config.MgoFileFiled+`":1}`)
//log.Println(rdata)
if len((*rdata)) == 0 {
go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
sys.Unlock()
return
}
newId := (*rdata)["_id"]
if newId.(bson.ObjectId).Hex() >= eid {
go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置
go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
totmp := make(map[string]string)
totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(config.EidField)])
tobyte, _ := json.Marshal(totmp)
go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
IP: net.ParseIP(qu.ObjToString(config.Sysconfig["toudpip"])),
Port: qu.IntAll(config.Sysconfig["toudpport"]),
})
log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
tmp["end_time"] = time.Now().Unix()
mongodb.Update("ocr_task_bak", bson.M{"_id": ObjectId.(bson.ObjectId)},map[string]interface{}{"$set": tmp}, true, false)
sys.Unlock()
return
}
go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
//log.Println(newId.(bson.ObjectId).Hex())
tmp[config.SidField] = newId.(bson.ObjectId).Hex()
mongodb.Update("ocr_task",
bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
sys.Unlock()
}
}
func queryOcrTask() map[string]int {
data := make(map[string]int)
taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
taskNum := len(*taskArr)
if taskNum == 0 {
return data
}
data["taskNum"] = taskNum
sumNum := 0
nowSumNum := 0
for i, v := range *taskArr {
sid := bson.ObjectIdHex(qu.ObjToString(v["start"]))
eid := bson.ObjectIdHex(qu.ObjToString(v[qu.ObjToString(config.EidField)]))
sumNum += mongodb.Count("bidding", bson.M{"_id": bson.M{
"$gte": sid,
"$lte": eid,
}})
if i == 0 {
nowSumNum = sumNum
}
}
data["sumNum"] = sumNum
data["nowSumNum"] = nowSumNum
tmpsid := bson.ObjectIdHex(qu.ObjToString((*taskArr)[0]["start"]))
over := (*taskArr)[0][qu.ObjToString(config.SidField)]
overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{
"$gte": tmpsid,
"$lt": bson.ObjectIdHex(qu.ObjToString(over)),
}})
data["overNum"] = overNum
undoneNum := sumNum - overNum
data["undoneNum"] = undoneNum
nowUnDoneNum := nowSumNum - overNum
data["nowUnDoneNum"] = nowUnDoneNum
return data
}
func DynamicTask() {
time.Sleep(time.Second * 30)
cluster.DescribeInstances() //查询多台实例的详细信息
escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
log.Println("实例未部署数量", len(*escObject))
if escObject != nil || len(*escObject) > 0 {
for i, v := range (*escObject) {
if tmpip := qu.ObjToString(v["ip_nw"]); tmpip == "" {
log.Println("没用获取到ip,实例ip异常", tmpip)
DynamicTask()
} else {
if DoCMD(tmpip) {
var tmpstr string
isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
tmpstr += udpstr + "; "
isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
tmpstr += fil2textstr
if isok2 && isok3 {
(*escObject)[i]["OcrTaskStatus"] = "successful"
mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
config.CID = append(config.CID, qu.ObjToString(v["InstanceId"]))
log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
} else {
log.Println(tmpip, "部署异常,"+tmpstr)
}
} else {
log.Println(tmpip, "部署失败")
}
}
}
} else {
log.Println("动态任务创建失败")
}
}
func DoCMD(ip string) bool {
if ip == "" {
return false
}
return cluster.RunSsh(ip)
}
func reload(ip string) string {
tmp := mongodb.FindOne("ocr_ecs", bson.M{"ip_nw": ip})
if tmp == nil || len(*tmp) <= 0 {
return "ip不存在"
}
for i, v := range config.CID {
if v == qu.ObjToString((*tmp)["InstanceId"]) {
config.CID = append(config.CID[:i], config.CID[i+1:]...)
break
}
}
cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例
now := time.Now()
hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours))) //创建新实例
time.Sleep(time.Second * 30)
cluster.DescribeInstances() //查询多台实例的详细信息
escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1)
if escObject != nil || len(*escObject) > 0 {
var tmpip string
if tmpip = qu.ObjToString((*escObject)[0]["ip_nw"]); tmpip == "" {
log.Println("没用获取到ip,实例ip异常", tmpip)
time.Sleep(time.Minute)
cluster.DescribeInstances() //查询多台实例的详细信息
escObject = mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1)
tmpip = qu.ObjToString((*escObject)[0]["ip_nw"])
}
if DoCMD(tmpip) {
var tmpstr string
isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
tmpstr += udpstr + "; "
isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
tmpstr += fil2textstr
if isok2 && isok3 {
(*escObject)[0]["OcrTaskStatus"] = "successful"
mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[0]["_id"]}, (*escObject)[0], true, false)
config.CID = append(config.CID, qu.ObjToString((*escObject)[0]["InstanceId"]))
log.Println((*escObject)[0]["_id"], tmpip, "部署成功")
return tmpip + "部署成功," + tmpstr
} else {
return tmpip + "部署异常," + tmpstr
}
} else {
return tmpip + "部署失败"
}
}
return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例"
}
func compute() int {
nowtime := time.Now().Unix()
taskArrase := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
if taskArrase == nil || len(*taskArrase) == 0 {
return 0
}
stmp := (*taskArrase)[0]
etmp := (*taskArrase)[len(*taskArrase)-1]
if stmp != nil && etmp != nil {
stime := qu.Int64All(stmp["import_time"])
if nowtime-stime <= qu.Int64All(config.Sysconfig["time_consuming_limit"]) {
return 0
}
sid := bson.ObjectIdHex(qu.ObjToString(stmp["start"]))
eid := bson.ObjectIdHex(qu.ObjToString(etmp[qu.ObjToString(config.EidField)]))
sum := mongodb.Count("bidding", bson.M{"_id": bson.M{
"$gte": sid,
"$lt": eid,
}})
if sum <= qu.IntAll(config.Sysconfig["accumulated_task_limit"]) {
return 0
}
gteid := bson.ObjectIdHex(qu.ObjToString(stmp[qu.ObjToString(config.SidField)]))
overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{
"$gte": sid,
"$lt": gteid,
}})
if overNum == 0 {
return 0
}
if sum <= qu.IntAll(config.Sysconfig["accumulated_task_lowlimit"]) {
return 0
}
mtmm := overNum / int(nowtime-stime) / (len(config.CID) + 3) //每台每秒
cc := sum/qu.IntAll(config.Sysconfig["corntime_consuming"])/mtmm - len(config.CID) - 3
log.Println("overNum:", overNum, ",hs:", int(nowtime-stime), ",mtms:", mtmm, ",sum:", sum, cc)
if cc > qu.IntAll(config.Sysconfig["pernum"]) {
return qu.IntAll(config.Sysconfig["pernum"])
} else {
return cc
}
}
return 0
}