package main import ( "cluster" "config" "corntask" "encoding/json" "fmt" "gopkg.in/mgo.v2/bson" "html/template" "info" "io/ioutil" "log" "math" mu "mfw/util" "net" "net/http" "qfw/common/src/github.com/robfig/cron" qu "qfw/util" "qfw/util/mongodb" "regexp" "strings" "time" ) func init() { cluster.DescribeInstances() log.Println("初始化CID:", cluster.CID) } func main() { config.Udpclient.Listen(processUdpMsg) log.Printf("Udp listening port: %s:%s\n", config.Sysconfig["udpip"], config.Sysconfig["udpport"]) if config.Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点 ips := qu.ObjToString(config.Sysconfig["broadcast_ips"]) ipsArr := strings.Split(ips, ";") for _, v := range ipsArr { log.Println("通知udp来取数据1:", v, config.Sysconfig["broadcast_port"], config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(v), Port: qu.IntAll(config.Sysconfig["broadcast_port"]), })) } } //for _, v := range cluster.CID { // log.Println("通知udp来取数据2:", v, config.Sysconfig["broadcast_port"], config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{ // IP: net.ParseIP(v), // Port: qu.IntAll(config.Sysconfig["broadcast_port"]), // })) //} cluster.CID.Range(func(key, value interface{}) bool { log.Println("通知udp来取数据2:", value, config.Sysconfig["broadcast_port"], config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(qu.ObjToString(value)), Port: qu.IntAll(config.Sysconfig["broadcast_port"]), })) return true }) mux := http.NewServeMux() mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { http.Redirect(writer, request, "/login", http.StatusFound) }) mux.HandleFunc("/favicon.ico", func(writer http.ResponseWriter, request *http.Request) { writer.WriteHeader(http.StatusOK) }) //mux.HandleFunc("/udpkz", func(writer http.ResponseWriter, request *http.Request) { // tplogin, err := template.ParseFiles("src/web/login.html") // if err != nil { // log.Println(err) // writer.Write([]byte( "页面找不到了~")) // return // } // cookie, _ := request.Cookie("username") // if cookie == nil { // http.RedirectHandler("/login", http.StatusUnauthorized) // tplogin.Execute(writer, "请先登录") // return // } // if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc" { // http.RedirectHandler("/login", http.StatusUnauthorized) // tplogin.Execute(writer, "密钥错误") // return // } // err = request.ParseForm() // if err !=nil{ // writer.WriteHeader(http.StatusBadGateway) // writer.Write([]byte(err.Error())) // return // } // data := request.Form.Get("do") // //log.Println(data) // if data == "start"{ // config.Sys.Lock() // config.IsRun = true // config.Sys.Unlock() // }else if data =="stop"{ // config.Sys.Lock() // config.IsRun = false // config.Sys.Unlock() // } // writer.WriteHeader(http.StatusOK) //}) mux.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) { tp, err := template.ParseFiles("src/web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } if request.Method == "GET" { tp.Execute(writer, "") return } if request.Method == "POST" { err := request.ParseForm() if err != nil { http.Error(writer, err.Error(), http.StatusBadRequest) return } email := request.Form.Get("username") pwd := request.Form.Get("pwd") if email == "ocr_task" && pwd == "ocr_task_pwd" { http.SetCookie(writer, &http.Cookie{Name: "username", Value: qu.GetMd5String("email")}) http.Redirect(writer, request, "/query", http.StatusFound) } else { writer.WriteHeader(http.StatusUnauthorized) tp.Execute(writer, "密码错误") } return } }) res := http.FileServer(http.Dir("src/res")) mux.Handle("/res/", http.StripPrefix("/res/", res)) mux.HandleFunc("/query", func(writer http.ResponseWriter, request *http.Request) { tplogin, err := template.ParseFiles("src/web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } cookie, _ := request.Cookie("username") if cookie == nil { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "请先登录") return } if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc" { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "密钥错误") return } tp, err := template.ParseFiles("src/web/index.html", "src/public/header.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } task := info.QueryInfo() tp.Execute(writer, task) }) mux.HandleFunc("/reload", func(writer http.ResponseWriter, request *http.Request) { tplogin, err := template.ParseFiles("src/web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } cookie, _ := request.Cookie("username") if cookie == nil { http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer, "请先登录") return } tpReload, err := template.ParseFiles("src/web/reload.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } if request.Method == "POST" { err = request.ParseForm() if err != nil { tpReload.Execute(writer, err) return } data := request.PostForm["ips"] if len(data) <= 0 { tpReload.Execute(writer, "参数无效") return } var tmpstr string for _, v := range data { if !regexp.MustCompile("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}").MatchString(v) { tmpstr += v + "ip格式不正确
" continue } result := reload(v) tmpstr += v + "---->" + result + "
" log.Println(v, "重新部署完成", ) } tpReload.Execute(writer, template.HTML( "
"+tmpstr+" 重新部署结束
"+ "
"+ "
"+ "
"+ ""+ "
")) return } tpReload.Execute(writer, template.HTML("
"+ "
"+ ""+ "
")) }) c := cron.New() spec := qu.ObjToString(config.Sysconfig["cornstr"]) c.Start() c.AddFunc(spec, corntask.Auto) c.AddFunc("0 0 6 1/1 * ?", func() { config.Isjjr = true nowtime := time.Now().Format("20060102") r, err := http.Get("http://tool.bitefu.net/jiari/?d=" + nowtime + "&back=json") defer r.Body.Close() if err != nil { log.Println("tool.bitefu.net/jiari/", err) config.Isjjr = false } data, err := ioutil.ReadAll(r.Body) if err != nil { log.Println(191, err) config.Isjjr = false } tmp := map[string]interface{}{} json.Unmarshal(data, &tmp) if tmp[nowtime] == 0 || tmp[nowtime] == "0" { config.Isjjr = false } r2, err := http.Get("http://api.goseek.cn/Tools/holiday?date=" + nowtime) defer r2.Body.Close() if err != nil { log.Println("http://api.goseek.cn/Tools/holiday", err) config.Isjjr = false } data2, err := ioutil.ReadAll(r2.Body) if err != nil { log.Println(208, err) config.Isjjr = false } tmp2 := map[string]interface{}{} json.Unmarshal(data2, &tmp2) if tmp2["data"] == 0 || tmp2["data"] == "0" || tmp2["data"] == 2 || tmp2["data"] == "2" { config.Isjjr = false } }) c.AddFunc("0 */1 * * * *", func() { qu.ReadConfig(&config.Sysconfig) //log.Println(111) //log.Println(config.Sysconfig) }) log.Println("Http listening port: ", qu.ObjToString(config.Sysconfig["http_port"])) if err := http.ListenAndServe(":"+qu.ObjToString(config.Sysconfig["http_port"]), mux); err != nil { fmt.Println("start http server faild:", err) } } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer qu.Catch() switch act { case mu.OP_TYPE_DATA: //保存服务 go config.Udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra) tmp := make(map[string]interface{}) err := json.Unmarshal(data, &tmp) if err != nil { go config.Udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra) return } tmp["start"] = tmp[qu.ObjToString(config.SidField)] tmp["import_time"] = time.Now().Unix() tmp["isrun"] = "run" //bytes, _ := json.Marshal(tmp) b := mongodb.Save("ocr_task", &tmp) //mongodb.Save("ocr_task_bak", string(bytes)) log.Println("保存id:", b) case mu.OP_NOOP: //其他节点回应消息打印 log.Println("节点接收成功", string(data), ra.String()) case mu.OP_GET_DOWNLOADERCODE: //分发任务 if `{"permission":"get_ocr_task"}` != string(data) { log.Println("没有权限:", string(data), ra) go config.Udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra) return } config.Sys.Lock() datas := mongodb.Find("ocr_task", bson.M{"isrun": bson.M{ "$ne": "stop", }}, `{"_id":1}`, nil, false, -1, -1) if len(*datas) == 0 { go config.Udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra) config.Sys.Unlock() return } tmp := (*datas)[0] ObjectId := tmp["_id"] sid := qu.ObjToString(tmp[qu.ObjToString(config.SidField)]) eid := qu.ObjToString(tmp[qu.ObjToString(config.EidField)]) rdata := mongodb.FindOneByField(config.MgoC, bson.M{"_id": bson.M{ "$gt": bson.ObjectIdHex(sid), }}, `{"_id":1,"`+config.MgoFileFiled+`":1}`) //log.Println(rdata) if len((*rdata)) == 0 { go config.Udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra) config.Sys.Unlock() return } newId := (*rdata)["_id"] if newId.(bson.ObjectId).Hex() >= eid { go config.Udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置 go config.Udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务 //if config.IsRun { // tmpdatas := mongodb.Find("ocr_task", bson.M{"isrun":"stop"}, `{"_id":1}`, nil, false, -1, -1) totmp := make(map[string]interface{}) totmp["stype"] = "fujian" totmp["lteid"] = qu.ObjToString(tmp[qu.ObjToString(config.EidField)]) //if tmpdatas == nil || len(*tmpdatas) == 0{ totmp["gtid"] = qu.ObjToString(tmp[qu.ObjToString("start")]) //}else { // totmp["gtid"] = qu.ObjToString((*tmpdatas)[0][qu.ObjToString("start")]) // for i,_:=range (*tmpdatas){ // mongodb.Del("ocr_task",bson.M{"_id":(*tmpdatas)[i]["_id"]}) // mongodb.Save("ocr_flie_over",totmp) // } //} tmpSumNum := mongodb.Count("bidding", bson.M{"_id": bson.M{ "$gte": bson.ObjectIdHex(qu.ObjToString(tmp[qu.ObjToString("start")])), "$lte": bson.ObjectIdHex(eid), }}) totmp["count"] = tmpSumNum totmp["isused"] = false totmp["import_time"] = time.Now().Unix() tobyte, _ := json.Marshal(totmp) mongodb.Save("ocr_flie_over", &totmp) //go config.Udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{ // IP: net.ParseIP(qu.ObjToString(config.Sysconfig["toudpip"])), // Port: qu.IntAll(config.Sysconfig["toudpport"]), //}) log.Println("ocr_task处理完成,ocr_flie_over存储数据", string(tobyte)) mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)}) tmp["end_time"] = time.Now().Unix() //mongodb.Update("ocr_task_bak", bson.M{"start": ObjectId.(bson.ObjectId)}, map[string]interface{}{"$set": tmp}, false, false) //}else { // tmp["isrun"] = "stop" // mongodb.Update("ocr_task", // bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false) //} config.Sys.Unlock() return } go config.Udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务 //log.Println(newId.(bson.ObjectId).Hex()) tmp[config.SidField] = newId.(bson.ObjectId).Hex() mongodb.Update("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false) config.Sys.Unlock() } } func reload(ip string) string { now := time.Now() hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours() tmp := mongodb.FindOne("ocr_ecs", bson.M{"ip_nw": ip}) if tmp == nil || len(*tmp) <= 0 { return "ip不存在" } cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例 cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours))) //创建新实例 time.Sleep(time.Second * 25) cluster.DescribeInstances() //查询多台实例的详细信息 escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1) if escObject != nil || len(*escObject) > 0 { var tmpip string if tmpip = qu.ObjToString((*escObject)[0]["ip_nw"]); tmpip == "" { log.Println("没用获取到ip,实例ip异常", tmpip) time.Sleep(time.Second * 15) cluster.DescribeInstances() //查询多台实例的详细信息 escObject = mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1) tmpip = qu.ObjToString((*escObject)[0]["ip_nw"]) } if corntask.DoCMD(tmpip) { var tmpstr string isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019") tmpstr += udpstr + ";  " isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text") tmpstr += fil2textstr if isok2 && isok3 { (*escObject)[0]["OcrTaskStatus"] = "successful" mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[0]["_id"]}, (*escObject)[0], true, false) log.Println((*escObject)[0]["_id"], tmpip, "部署成功") return tmpip + "部署成功," + tmpstr } else { return tmpip + "部署异常," + tmpstr } } else { tmpip += "部署失败" } } return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例" }