package main import ( "cluster" "config" "corntask" "encoding/json" "fmt" "github.com/cron" "gopkg.in/mgo.v2/bson" "html/template" "info" "log" "math" mu "mfw/util" "net" "net/http" qu "qfw/util" "qfw/util/mongodb" "regexp" "strings" "time" ) func init() { cluster.DescribeInstances() log.Println("初始化CID:",cluster.CID) } func main() { config.Udpclient.Listen(processUdpMsg) log.Printf("Udp listening port: %s:%s\n", config.Sysconfig["udpip"], config.Sysconfig["udpport"]) if config.Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点 ips := qu.ObjToString(config.Sysconfig["broadcast_ips"]) ipsArr := strings.Split(ips, ";") for _, v := range ipsArr { log.Println("通知udp来取数据1:",v,config.Sysconfig["broadcast_port"],config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(v), Port: qu.IntAll(config.Sysconfig["broadcast_port"]), })) } } for _,v:= range cluster.CID{ log.Println("通知udp来取数据2:",v,config.Sysconfig["broadcast_port"],config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(v), Port: qu.IntAll(config.Sysconfig["broadcast_port"]), })) } mux := http.NewServeMux() mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { http.Redirect(writer, request, "/login", http.StatusFound) }) mux.HandleFunc("/favicon.ico", func(writer http.ResponseWriter, request *http.Request) { writer.WriteHeader(http.StatusOK) }) mux.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) { tp, err := template.ParseFiles("src/web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } if request.Method == "GET" { tp.Execute(writer, "") return } if request.Method == "POST" { err := request.ParseForm() if err != nil { http.Error(writer, err.Error(), http.StatusBadRequest) return } email := request.Form.Get("username") pwd := request.Form.Get("pwd") if email == "ocr_task" && pwd == "ocr_task_pwd" { http.SetCookie(writer, &http.Cookie{Name: "username", Value: qu.GetMd5String("email")}) http.Redirect(writer, request, "/query", http.StatusFound) } else { writer.WriteHeader(http.StatusUnauthorized) tp.Execute(writer, "密码错误") } return } }) res := http.FileServer(http.Dir("src/res")) mux.Handle("/res/", http.StripPrefix("/res/", res)) mux.HandleFunc("/query", func(writer http.ResponseWriter, request *http.Request) { tplogin, err := template.ParseFiles("src/web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } cookie, _ := request.Cookie("username") if cookie == nil { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "请先登录") return } if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc" { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "密钥错误") return } tp, err := template.ParseFiles("src/web/index.html", "src/public/header.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } task := info.QueryInfo() tp.Execute(writer, task) }) mux.HandleFunc("/reload", func(writer http.ResponseWriter, request *http.Request) { tplogin, err := template.ParseFiles("src/web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } cookie, _ := request.Cookie("username") if cookie == nil { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "请先登录") return } tpReload, err := template.ParseFiles("src/web/reload.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } if request.Method == "POST" { err = request.ParseForm() if err != nil { tpReload.Execute(writer, err) return } data := request.PostForm["ips"] if len(data) <= 0 { tpReload.Execute(writer, "参数无效") return } var tmpstr string for _, v := range data { if !regexp.MustCompile("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}").MatchString(v) { tmpstr += v + "ip格式不正确
" continue } result := reload(v) tmpstr += v + "---->" + result + "
" log.Println(v, "重新部署完成", ) } tpReload.Execute(writer, template.HTML( "
"+tmpstr+" 重新部署结束
"+ "
"+ "
"+ "
"+ ""+ "
")) return } tpReload.Execute(writer, template.HTML("
"+ "
"+ ""+ "
")) }) c := cron.New() spec := qu.ObjToString(config.Sysconfig["cornstr"]) c.AddFunc(spec, corntask.Auto) c.Start() log.Println("Http listening port: ", qu.ObjToString(config.Sysconfig["http_port"])) if err := http.ListenAndServe(":"+qu.ObjToString(config.Sysconfig["http_port"]), mux); err != nil { fmt.Println("start http server faild:", err) } } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer qu.Catch() switch act { case mu.OP_TYPE_DATA: //保存服务 go config.Udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra) tmp := make(map[string]interface{}) err := json.Unmarshal(data, &tmp) if err != nil { go config.Udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra) return } tmp["start"] = tmp[qu.ObjToString(config.SidField)] tmp["import_time"] = time.Now().Unix() bytes, _ := json.Marshal(tmp) b := mongodb.Save("ocr_task", string(bytes)) mongodb.Save("ocr_task_bak", string(bytes)) log.Println("保存id:", b) case mu.OP_NOOP: //其他节点回应消息打印 log.Println("节点接收成功", string(data), ra.String()) case mu.OP_GET_DOWNLOADERCODE: //分发任务 if `{"permission":"get_ocr_task"}` != string(data) { log.Println("没有权限:", string(data), ra) go config.Udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra) return } config.Sys.Lock() datas := mongodb.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1) if len(*datas) == 0 { go config.Udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra) config.Sys.Unlock() return } tmp := (*datas)[0] ObjectId := tmp["_id"] sid := qu.ObjToString(tmp[qu.ObjToString(config.SidField)]) eid := qu.ObjToString(tmp[qu.ObjToString(config.EidField)]) rdata := mongodb.FindOneByField(config.MgoC, bson.M{"_id": bson.M{ "$gt": bson.ObjectIdHex(sid), }}, `{"_id":1,"`+config.MgoFileFiled+`":1}`) //log.Println(rdata) if len((*rdata)) == 0 { go config.Udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra) config.Sys.Unlock() return } newId := (*rdata)["_id"] if newId.(bson.ObjectId).Hex() >= eid { go config.Udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置 go config.Udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务 totmp := make(map[string]string) totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")]) totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(config.EidField)]) tobyte, _ := json.Marshal(totmp) go config.Udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(qu.ObjToString(config.Sysconfig["toudpip"])), Port: qu.IntAll(config.Sysconfig["toudpport"]), }) log.Println("ocr_task处理完成,发送下个节点", string(tobyte)) mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)}) tmp["end_time"] = time.Now().Unix() mongodb.Update("ocr_task_bak", bson.M{"_id": ObjectId.(bson.ObjectId)}, map[string]interface{}{"$set": tmp}, true, false) config.Sys.Unlock() return } go config.Udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务 //log.Println(newId.(bson.ObjectId).Hex()) tmp[config.SidField] = newId.(bson.ObjectId).Hex() mongodb.Update("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false) config.Sys.Unlock() } } func reload(ip string) string { tmp := mongodb.FindOne("ocr_ecs", bson.M{"ip_nw": ip}) if tmp == nil || len(*tmp) <= 0 { return "ip不存在" } now := time.Now() hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours() cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours))) //创建新实例 time.Sleep(time.Second * 20) cluster.DescribeInstances() //查询多台实例的详细信息 escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1) if escObject != nil || len(*escObject) > 0 { var tmpip string if tmpip = qu.ObjToString((*escObject)[0]["ip_nw"]); tmpip == "" { log.Println("没用获取到ip,实例ip异常", tmpip) time.Sleep(time.Second * 20) cluster.DescribeInstances() //查询多台实例的详细信息 escObject = mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1) tmpip = qu.ObjToString((*escObject)[0]["ip_nw"]) } if corntask.DoCMD(tmpip) { var tmpstr string isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019") tmpstr += udpstr + ";  " isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text") tmpstr += fil2textstr if isok2 && isok3 { (*escObject)[0]["OcrTaskStatus"] = "successful" mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[0]["_id"]}, (*escObject)[0], true, false) log.Println((*escObject)[0]["_id"], tmpip, "部署成功") return tmpip + "部署成功," + tmpstr } else { return tmpip + "部署异常," + tmpstr } } else { return tmpip + "部署失败" } } cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例 cluster.DescribeInstances() //查询多台实例的详细信息 return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例" }