Explorar el Código

Merge branch 'dev3.2' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.2

maxiaoshan hace 5 años
padre
commit
f527f05214

+ 4 - 17
src/jy/cluster/ssh.go

@@ -46,36 +46,23 @@ func ssHConnect(user, password, host string, port int) (*ssh.Session, error) {
 //wget https://www.jianyu360.com/upload/extract_v3.tgz
 var sshstr = `
 #!/bin/bash
-curl -fsSL get.docker.com -o get-docker.sh
-sh get-docker.sh --mirror Aliyun
-systemctl start docker
-cd /opt/
-wget http://10.171.112.160:9090/res/auto_service.sh
-chmod 777 auto_service.sh
-wget http://10.171.112.160:9090/res/gocv.tar
-docker load --input gocv.tar
-docker run --net=host --name=gocv -itd -v /opt:/opt gocv:v2 /opt/auto_service.sh
-exit
-`
-
-/*
-#!/bin/bash
+cd /opt
 kill -9 $(pidof extract_v3)
 rm -rf extract_v3*
 mkdir extract_v3
-cd /opt/extract_v3
 wget http://10.171.112.160:9090/res/extract_v3.tgz
 tar -xzvf extract_v3.tgz
+cd /opt/extract_v3
 chmod 777 extract_v3
 nohup ./extract_v3 >/opt/extract_v3/nohup 2>&1 &
 exit
-*/
+`
 
 func RunSsh(ip, password string, port int) {
 	var stdOut, stdErr bytes.Buffer
 	session, err := ssHConnect("root", password, ip, port)
 	if err != nil {
-		log.Fatal(err)
+		log.Println(err)
 	}
 	defer session.Close()
 	session.Stdout = &stdOut

+ 16 - 1
udp_ocr_conter/config.json

@@ -13,5 +13,20 @@
   "broadcast": false,
   "broadcast_ips": "127.0.0.1;192.168.1.2;192.168.1.3;192.168.1.4",
   "broadcast_port": 1490,
-  "http_port": "12345"
+  "http_port": "12345",
+  "cornstr": "0 0/10 7,8,9,10,11,12,13,14,15,16,17,18,19 * * ?",
+  "pernum": 4,
+  "esconfig": {
+    "available": true,
+    "AccessID": "LTAIkuomMLAjIlGH",
+    "AccessSecret": "dva1YTVQcLRWCFUfKq8TrugUTBsNnZ",
+    "ZoneIds": [
+      {
+        "zoneid": "cn-beijing-h",
+        "LaunchTemplateId4": "lt-2ze5ir54gy4ui8okr71f",
+        "LaunchTemplateId8": "lt-2ze5fzxwgt8jcqczvmjy",
+        "vswitchid": "vsw-2ze1n1k3mo3fv2irsfdps"
+      }
+    ]
+  }
 }

+ 239 - 62
udp_ocr_conter/main.go

@@ -1,54 +1,43 @@
 package main
 
 import (
+	"cluster"
+	"config"
 	"encoding/json"
 	"fmt"
+	"github.com/cron"
 	"gopkg.in/mgo.v2/bson"
 	"html/template"
-	"jy/mongodbutil"
 	"log"
+	"math"
 	mu "mfw/util"
 	"net"
 	"net/http"
-	"qfw/common/src/qfw/util"
 	qu "qfw/util"
+	"qfw/util/mongodb"
+	"regexp"
 	"strings"
 	"sync"
+	"time"
 )
 
 var udpclient mu.UdpClient //udp对象
-var Sysconfig map[string]interface{}
-var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
-var sys sync.RWMutex
-
-func init() {
+var auto bool
 
-	qu.ReadConfig(&Sysconfig)
-	MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
-	MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
-	MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
-	SidField = qu.ObjToString(Sysconfig["json_sidfiled"])
-	EidField = qu.ObjToString(Sysconfig["json_eidfiled"])
-	MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo")
-	if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
-		log.Println("获取配置文件参数失败", Sysconfig)
-		return
-	}
-	mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
-	log.Println(mongodbutil.Mgo.Get().Ping())
+var sys sync.RWMutex
 
-}
 func main() {
-	udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
+	auto = false
+	udpclient = mu.UdpClient{Local: config.Sysconfig["udpip"].(string) + ":" + config.Sysconfig["udpport"].(string), BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
-	log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
-	if Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点
-		ips := qu.ObjToString(Sysconfig["broadcast_ips"])
+	log.Printf("Udp listening port: %s:%s\n", config.Sysconfig["udpip"], config.Sysconfig["udpport"])
+	if config.Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点
+		ips := qu.ObjToString(config.Sysconfig["broadcast_ips"])
 		ipsArr := strings.Split(ips, ";")
 		for _, v := range ipsArr {
-			udpclient.WriteUdp([]byte{}, mu.OP_NOOP, &net.UDPAddr{
+			udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
 				IP:   net.ParseIP(v),
-				Port: qu.IntAll(Sysconfig["broadcast_port"]),
+				Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
 			})
 		}
 	}
@@ -60,14 +49,14 @@ func main() {
 		writer.WriteHeader(http.StatusOK)
 	})
 	mux.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) {
-		tp, err := template.ParseFiles("web/login.html")
+		tp, err := template.ParseFiles("src/web/login.html")
 		if err != nil {
 			log.Println(err)
 			writer.Write([]byte( "页面找不到了~"))
 			return
 		}
-		if request.Method == "GET"{
-			tp.Execute(writer,"")
+		if request.Method == "GET" {
+			tp.Execute(writer, "")
 			return
 		}
 
@@ -80,7 +69,7 @@ func main() {
 			email := request.Form.Get("username")
 			pwd := request.Form.Get("pwd")
 			if email == "ocr_task" && pwd == "ocr_task_pwd" {
-				http.SetCookie(writer,&http.Cookie{Name: "username", Value: util.GetMd5String("email")})
+				http.SetCookie(writer, &http.Cookie{Name: "username", Value: qu.GetMd5String("email")})
 				http.Redirect(writer, request, "/query", http.StatusFound)
 			} else {
 				writer.WriteHeader(http.StatusUnauthorized)
@@ -90,37 +79,140 @@ func main() {
 		}
 
 	})
-	res := http.FileServer(http.Dir("res"))
+	res := http.FileServer(http.Dir("src/res"))
 	mux.Handle("/res/", http.StripPrefix("/res/", res))
 	mux.HandleFunc("/query", func(writer http.ResponseWriter, request *http.Request) {
-		tplogin, err := template.ParseFiles("web/login.html")
+		tplogin, err := template.ParseFiles("src/web/login.html")
 		if err != nil {
 			log.Println(err)
 			writer.Write([]byte( "页面找不到了~"))
 			return
 		}
-		cookie, e := request.Cookie("username")
-		if cookie == nil{
+		cookie, _ := request.Cookie("username")
+		if cookie == nil {
 			http.RedirectHandler("/login", http.StatusUnauthorized)
-			tplogin.Execute(writer,"请先登录")
+			tplogin.Execute(writer, "请先登录")
 			return
 		}
-		if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc"{
+		if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc" {
 			http.RedirectHandler("/login", http.StatusUnauthorized)
-			tplogin.Execute(writer,"密钥错误")
+			tplogin.Execute(writer, "密钥错误")
 			return
 		}
-		tp, err := template.ParseFiles("web/index.html", "public/header.html")
+		tp, err := template.ParseFiles("src/web/index.html", "src/public/header.html")
 		if err != nil {
 			log.Println(err)
 			writer.Write([]byte( "页面找不到了~"))
 			return
 		}
-		task := queryOcr_task()
+		task := queryOcrTask()
 		tp.Execute(writer, task)
 	})
-	log.Println("Http  listening port: ", qu.ObjToString(Sysconfig["http_port"]))
-	if err := http.ListenAndServe(":"+qu.ObjToString(Sysconfig["http_port"]), mux); err != nil {
+	mux.HandleFunc("/reload", func(writer http.ResponseWriter, request *http.Request) {
+		tplogin, err := template.ParseFiles("src/web/login.html")
+		if err != nil {
+			log.Println(err)
+			writer.Write([]byte( "页面找不到了~"))
+			return
+		}
+		cookie, _ := request.Cookie("username")
+		if cookie == nil {
+			http.RedirectHandler("/login", http.StatusUnauthorized)
+			tplogin.Execute(writer, "请先登录")
+			return
+		}
+		tpReload, err := template.ParseFiles("src/web/reload.html")
+		if err != nil {
+			log.Println(err)
+			writer.Write([]byte( "页面找不到了~"))
+			return
+
+		}
+
+		if request.Method == "POST" {
+			err = request.ParseForm()
+			if err != nil {
+				tpReload.Execute(writer, err)
+				return
+			}
+			data := request.PostForm["ips"]
+			if len(data) <= 0 {
+				tpReload.Execute(writer, "参数无效")
+				return
+			}
+			var tmpstr string
+			for _, v := range data {
+				if !regexp.MustCompile("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}").MatchString(v){
+					tmpstr+= v +"ip格式不正确</br>"
+					continue
+				}
+				result := reload(v)
+				tmpstr += v + "---->" + result + "</br>"
+				log.Println( v, "重新部署完成", )
+			}
+			tpReload.Execute(writer, template.HTML(
+				"</br><span style=\"color: red\">"+tmpstr+" 重新部署结束</span></br>"+
+					"</br>"+
+					"<form action=\"/reload\" method=\"post\">"+
+					"<input id=\"ips1\" name=\"ips\" placeholder=\"请输入实例ip\" required/></br>"+
+					"<input type=\"submit\" value=\"提交\" />"+
+					"</form>"))
+			return
+		}
+
+		tpReload.Execute(writer, template.HTML("<form action=\"/reload\" method=\"post\">"+
+			"<input id=\"ips1\" name=\"ips\" placeholder=\"请输入实例ip\" required/></br>"+
+			"<input type=\"submit\" value=\"提交\" />"+
+			"</form>"))
+	})
+	c := cron.New()
+	spec := qu.ObjToString(config.Sysconfig["cornstr"])
+	c.AddFunc(spec, func() {
+		log.Println(168, auto)
+		if auto {
+			return
+		}
+		d := time.Now()
+		nowday := time.Date(d.Year(), d.Month(), d.Day(), 0, 0, 0, 0, d.Location())
+		taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
+		taskNum := len(*taskArr)
+		log.Println("当前任务数量:", taskNum)
+		if taskNum <= 1 {
+			return
+		}
+		if taskNum > 1 {
+			cluster.DescribeInstances() //查询多台实例的详细信息
+			ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1)
+			if ocrescs != nil || len(*ocrescs) > 0 {
+				for _, v := range (*ocrescs) {
+					if qu.ObjToString(v["AutoReleaseTime"]) != "" {
+						utc, err := time.ParseInLocation("2006-01-02T15:04Z", qu.ObjToString(v["AutoReleaseTime"]),time.Local)
+						if err != nil {
+							log.Println("解析时间异常", err)
+							continue
+						}
+						if utc.Before(nowday) {
+							log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
+						}
+					}
+				}
+			}
+			log.Println("申请实例")
+			now := time.Now()
+			hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
+			cluster.RunInstances("ocr_task_arr", "8", "false", qu.IntAll(config.Sysconfig["pernum"]), int(math.Round(hours)))
+			log.Println("实例申请成功")
+			DynamicTask(&auto) //动态任务
+			log.Println("申请实例结束")
+		}
+	})
+	c.AddFunc("0 0 0 1/1 * ? ", func() {
+		auto = false
+	})
+	c.Start()
+
+	log.Println("Http  listening port: ", qu.ObjToString(config.Sysconfig["http_port"]))
+	if err := http.ListenAndServe(":"+qu.ObjToString(config.Sysconfig["http_port"]), mux); err != nil {
 		fmt.Println("start http server faild:", err)
 	}
 }
@@ -136,9 +228,9 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
 			return
 		}
-		tmp["start"] = tmp[qu.ObjToString(SidField)]
+		tmp["start"] = tmp[qu.ObjToString(config.SidField)]
 		bytes, _ := json.Marshal(tmp)
-		b := mongodbutil.Mgo.Save("ocr_task", string(bytes))
+		b := mongodb.Save("ocr_task", string(bytes))
 		log.Println("保存id:", b)
 	case mu.OP_NOOP: //其他节点回应消息打印
 		log.Println("节点接收成功", string(data), ra.String())
@@ -149,7 +241,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			return
 		}
 		sys.Lock()
-		datas, _ := mongodbutil.Mgo.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
+		datas := mongodb.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
 		if len(*datas) == 0 {
 			go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
 			sys.Unlock()
@@ -157,11 +249,11 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 		tmp := (*datas)[0]
 		ObjectId := tmp["_id"]
-		sid := qu.ObjToString(tmp[qu.ObjToString(SidField)])
-		eid := qu.ObjToString(tmp[qu.ObjToString(EidField)])
-		rdata, _ := mongodbutil.Mgo.FindOneByField(MgoC, bson.M{"_id": bson.M{
+		sid := qu.ObjToString(tmp[qu.ObjToString(config.SidField)])
+		eid := qu.ObjToString(tmp[qu.ObjToString(config.EidField)])
+		rdata := mongodb.FindOneByField(config.MgoC, bson.M{"_id": bson.M{
 			"$gt": bson.ObjectIdHex(sid),
-		}}, `{"_id":1,"`+MgoFileFiled+`":1}`)
+		}}, `{"_id":1,"`+config.MgoFileFiled+`":1}`)
 		//log.Println(rdata)
 		if len((*rdata)) == 0 {
 			go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
@@ -174,28 +266,28 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)                    //分发任务
 			totmp := make(map[string]string)
 			totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
-			totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)])
+			totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(config.EidField)])
 			tobyte, _ := json.Marshal(totmp)
 			go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
-				IP:   net.ParseIP(qu.ObjToString(Sysconfig["toudpip"])),
-				Port: qu.IntAll(Sysconfig["toudpport"]),
+				IP:   net.ParseIP(qu.ObjToString(config.Sysconfig["toudpip"])),
+				Port: qu.IntAll(config.Sysconfig["toudpport"]),
 			})
 			log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
-			mongodbutil.Mgo.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
+			mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
 			sys.Unlock()
 			return
 		}
 		go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
 		//log.Println(newId.(bson.ObjectId).Hex())
-		tmp[SidField] = newId.(bson.ObjectId).Hex()
-		mongodbutil.Mgo.Update("ocr_task",
-			bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
+		tmp[config.SidField] = newId.(bson.ObjectId).Hex()
+		mongodb.Update("ocr_task",
+		bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
 		sys.Unlock()
 	}
 }
-func queryOcr_task() map[string]int {
+func queryOcrTask() map[string]int {
 	data := make(map[string]int)
-	taskArr, _ := mongodbutil.Mgo.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
+	taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
 	taskNum := len(*taskArr)
 	if taskNum == 0 {
 		return data
@@ -205,8 +297,8 @@ func queryOcr_task() map[string]int {
 	nowSumNum := 0
 	for i, v := range *taskArr {
 		sid := bson.ObjectIdHex(qu.ObjToString(v["start"]))
-		eid := bson.ObjectIdHex(qu.ObjToString(v[qu.ObjToString(EidField)]))
-		sumNum += mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{
+		eid := bson.ObjectIdHex(qu.ObjToString(v[qu.ObjToString(config.EidField)]))
+		sumNum += mongodb.Count("bidding", bson.M{"_id": bson.M{
 			"$gte": sid,
 			"$lte": eid,
 		}})
@@ -217,8 +309,8 @@ func queryOcr_task() map[string]int {
 	data["sumNum"] = sumNum
 	data["nowSumNum"] = nowSumNum
 	tmpsid := bson.ObjectIdHex(qu.ObjToString((*taskArr)[0]["start"]))
-	over := (*taskArr)[0][qu.ObjToString(SidField)]
-	overNum := mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{
+	over := (*taskArr)[0][qu.ObjToString(config.SidField)]
+	overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{
 		"$gte": tmpsid,
 		"$lt":  bson.ObjectIdHex(qu.ObjToString(over)),
 	}})
@@ -229,3 +321,88 @@ func queryOcr_task() map[string]int {
 	data["nowUnDoneNum"] = nowUnDoneNum
 	return data
 }
+
+func DynamicTask(auto *bool) {
+	time.Sleep(time.Second*30)
+	cluster.DescribeInstances() //查询多台实例的详细信息
+	escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
+	log.Println("实例未部署数量", len(*escObject))
+	if escObject != nil || len(*escObject) > 0 {
+		for i, v := range (*escObject) {
+			if tmpip := qu.ObjToString(v["ip_nw"]); tmpip == "" {
+				log.Println("没用获取到ip,实例ip异常", tmpip)
+				DynamicTask(auto)
+			} else {
+				if DoCMD(tmpip) {
+					var tmpstr string
+					isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
+					tmpstr += udpstr + ";&nbsp;&nbsp;"
+					isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
+					tmpstr += fil2textstr
+					if isok2 && isok3 {
+						(*escObject)[i]["OcrTaskStatus"] = "successful"
+						mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
+						(*auto) = true
+						log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
+					} else {
+						log.Println(tmpip, "部署异常,"+tmpstr)
+					}
+				} else {
+					log.Println(tmpip, "部署失败")
+				}
+			}
+		}
+	} else {
+		log.Println("动态任务创建失败")
+	}
+}
+
+func DoCMD(ip string) bool {
+	if ip == "" {
+		return false
+	}
+
+	return cluster.RunSsh(ip)
+}
+
+func reload(ip string) string {
+	tmp := mongodb.FindOne("ocr_ecs", bson.M{"ip_nw": ip})
+	if tmp == nil || len(*tmp) <= 0 {
+		return "ip不存在"
+	}
+	cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例
+	now := time.Now()
+	hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
+	cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours))) //创建新实例
+	time.Sleep(time.Second*30)
+	cluster.DescribeInstances() //查询多台实例的详细信息
+	escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1)
+	if escObject != nil || len(*escObject) > 0 {
+		var tmpip string
+		if tmpip = qu.ObjToString((*escObject)[0]["ip_nw"]); tmpip == "" {
+			log.Println("没用获取到ip,实例ip异常", tmpip)
+			time.Sleep(time.Minute)
+			cluster.DescribeInstances() //查询多台实例的详细信息
+			escObject = mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1)
+			tmpip = qu.ObjToString((*escObject)[0]["ip_nw"])
+		}
+		if DoCMD(tmpip) {
+			var tmpstr string
+			isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
+			tmpstr += udpstr + ";&nbsp;&nbsp;"
+			isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
+			tmpstr += fil2textstr
+			if isok2 && isok3 {
+				(*escObject)[0]["OcrTaskStatus"] = "successful"
+				mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[0]["_id"]}, (*escObject)[0], true, false)
+				log.Println((*escObject)[0]["_id"], tmpip, "部署成功")
+				return tmpip + "部署成功," + tmpstr
+			} else {
+				return tmpip + "部署异常," + tmpstr
+			}
+		} else {
+			return tmpip + "部署失败"
+		}
+	}
+	return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例"
+}

+ 259 - 0
udp_ocr_conter/src/cluster/aliecs.go

@@ -0,0 +1,259 @@
+/**
+阿里云ecs实例自动申请、部署、释放
+**/
+package cluster
+
+import (
+	"bytes"
+	"config"
+	"crypto/hmac"
+	"crypto/sha1"
+	"encoding/base64"
+	"encoding/json"
+	"fmt"
+	"hash"
+	"io"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"net/url"
+	qu "qfw/util"
+	"qfw/util/mongodb"
+	"sort"
+	"strings"
+	"time"
+)
+
+const (
+	Password = "Jy_ExtractBid_2019"
+	URL      = "https://ecs.aliyuncs.com"
+	UseFor   = "ocr_task_arr"
+)
+
+//批量创建实例
+func RunInstances(taskName, computer, flow string, num, hours int) {
+	if esconfig, ok := config.Sysconfig["esconfig"].(map[string]interface{}); ok {
+		widthOut := "0"
+		if flow == "true" {
+			widthOut = "10"
+		}
+		//log.Println(esconfig["LaunchTemplateId"+computer], widthOut)
+		if b, ok := esconfig["available"].(bool); ok && b {
+			if zoneIds, _ := esconfig["ZoneIds"].([]interface{}); ok {
+				pernum := num / len(zoneIds)
+				if pernum < 1 {
+					kv, _ := zoneIds[len(zoneIds)-1].(map[string]interface{})
+					runInstances(kv, taskName, widthOut, computer, num, hours)
+				} else {
+					for k, v := range zoneIds {
+						if (k == len(zoneIds)-1) && (num%len(zoneIds) != 0) {
+							pernum = num - pernum*(len(zoneIds)-1)
+						}
+						kv, _ := v.(map[string]interface{})
+						runInstances(kv, taskName, widthOut, computer, pernum, hours)
+					}
+				}
+			}
+		}
+	}
+}
+
+func runInstances(kv map[string]interface{}, taskName, widthOut, computer string, pernum, hours int) {
+	log.Println(kv, taskName, widthOut, computer, pernum, hours)
+	res := GET("RunInstances", [][]string{
+		[]string{"RegionId", "cn-beijing"},
+		[]string{"ZoneId", qu.ObjToString(kv["zoneid"])},
+		[]string{"VSwitchId", qu.ObjToString(kv["vswitchid"])},
+		[]string{"LaunchTemplateId", qu.ObjToString(kv["LaunchTemplateId"+computer])},
+		//[]string{"ImageId", "centos_7_06_64_20G_alibase_20181212.vhd"},
+		//[]string{"InstanceType", "ecs.ic5.large"},
+		//[]string{"SecurityGroupId", "sg-bp16x3td2evrejhkshp7"},[]string{"InternetMaxBandwidthIn", "50"},
+		[]string{"InternetMaxBandwidthOut", widthOut},
+		[]string{"InstanceChargeType", "PostPaid"},
+		[]string{"SpotStrategy", "SpotWithPriceLimit"},
+		[]string{"SpotPriceLimit", "4.99"},
+		[]string{"InstanceName", UseFor},
+		[]string{"UniqueSuffix", "true"},
+		[]string{"Password", Password},
+		[]string{"Amount", fmt.Sprint(pernum)},
+		[]string{"AutoReleaseTime", time.Now().Add(time.Duration(hours) * time.Hour).Local().Format("2006-01-02T15:04:05Z")},
+	})
+	if tmp, ok := res["InstanceIdSets"].(map[string]interface{}); ok {
+		if t, ok := tmp["InstanceIdSet"].([]interface{}); ok {
+			//实例id持久化
+			for _, v := range t {
+				mongodb.Save("ocr_ecs", map[string]interface{}{
+					"InstanceId":    v,
+					"TaskName":      taskName,
+					"UseFor":        UseFor,
+					"OcrTaskStatus": "none",
+				})
+			}
+		}
+	}
+	//log.Println(res)
+}
+
+//查询多台实例的详细信息
+func DescribeInstances() {
+	res := GET("DescribeInstances", [][]string{
+		[]string{"RegionId", "cn-beijing"},
+		[]string{"InstanceChargeType", "PostPaid"},
+		[]string{"PageSize", "100"},
+	})
+	for _, ins := range res["Instances"].(map[string]interface{}) {
+		for _, val := range ins.([]interface{}) {
+			if tmp, ok := val.(map[string]interface{}); ok {
+				if qu.ObjToString(tmp["InstanceName"]) == UseFor {
+					if t, ok := tmp["VpcAttributes"].(map[string]interface{}); ok {
+						if tt, ok := t["PrivateIpAddress"].(map[string]interface{}); ok {
+							ttt := tt["IpAddress"].([]interface{})
+							tmp["ip_nw"] = ttt[0]
+						}
+					}
+					if t, ok := tmp["PublicIpAddress"].(map[string]interface{}); ok {
+						if tt, ok := t["IpAddress"].([]interface{}); ok && len(tt) > 0 {
+							tmp["ip_ww"] = tt[0]
+						}
+					}
+					log.Println("更新申请实例",tmp["InstanceId"],"内网ip",tmp["ip_nw"])
+					//更新实例信息
+					mongodb.Update("ocr_ecs", `{"InstanceId":"`+qu.ObjToString(tmp["InstanceId"])+`"}`, map[string]interface{}{"$set": tmp}, true, false)
+				}
+			}
+		}
+	}
+	//log.Println(res)
+}
+
+//停止实例
+func StopInstance(InstanceId string) {
+	res := GET("StopInstance", [][]string{
+		[]string{"InstanceId", InstanceId},
+	})
+	mongodb.Update("ocr_ecs", `{"InstanceId":"`+InstanceId+`"}`, map[string]interface{}{"$set": map[string]interface{}{"Status": "Released"}}, true, false)
+	log.Println("StopInstance", res)
+}
+
+//释放实例
+func DeleteInstance(InstanceId string) {
+	res := GET("DeleteInstance", [][]string{
+		[]string{"InstanceId", InstanceId},
+		[]string{"Force", "true"},
+	})
+	mongodb.Del("ocr_ecs", `{"InstanceId":"`+InstanceId+`"}`)
+	log.Println("DeleteInstance", res)
+}
+
+//实例自动释放时间
+func ModifyInstanceAutoReleaseTime(InstanceId string, hours int) {
+	res := GET("ModifyInstanceAutoReleaseTime", [][]string{
+		[]string{"InstanceId", InstanceId},
+		[]string{"AutoReleaseTime", time.Now().Add(time.Duration(hours) * time.Hour).UTC().Format("2006-01-02T15:04:05Z")},
+	})
+	log.Println("ModifyInstanceAutoReleaseTime", res)
+}
+
+//GET请求
+func GET(action string, param [][]string) (mres map[string]interface{}) {
+	esconfig, _ := config.Sysconfig["esconfig"].(map[string]interface{})
+	ps := &paramSorter{[]string{
+		"Format",
+		"Version",
+		"SignatureMethod",
+		"SignatureNonce",
+		"SignatureVersion",
+		"AccessKeyId",
+		"Timestamp",
+	}, []string{
+		"JSON",
+		"2014-05-26",
+		"HMAC-SHA1",
+		fmt.Sprintf("%d", time.Now().UnixNano()/1000),
+		"1.0",
+		qu.ObjToString(esconfig["AccessID"]),
+		time.Now().UTC().Format("2006-01-02T15:04:05Z"),
+	}}
+	ps.Keys = append(ps.Keys, "Action")
+	ps.Vals = append(ps.Vals, action)
+	if len(param) > 0 {
+		for _, v := range param {
+			ps.Keys = append(ps.Keys, v[0])
+			ps.Vals = append(ps.Vals, v[1])
+		}
+	}
+	ps.Sort()
+	reqStr := ps.String()
+	str := "GET&" +
+		percentEncode("/") + "&" +
+		percentEncode(reqStr)
+	str = SP(str, "%3A", "%253A", -1)
+	h := hmac.New(func() hash.Hash { return sha1.New() }, []byte(qu.ObjToString(esconfig["AccessSecret"])+"&"))
+	io.WriteString(h, str)
+	signedStr := base64.StdEncoding.EncodeToString(h.Sum(nil))
+	ps.Keys = append(ps.Keys, "Signature")
+	ps.Vals = append(ps.Vals, signedStr)
+	reqStr = ps.Query()
+	res, err := http.Get(fmt.Sprintf("%s?%s", URL, reqStr))
+	if err != nil {
+		log.Println(err.Error())
+	} else {
+		defer res.Body.Close()
+		bt, err := ioutil.ReadAll(res.Body)
+		if err == nil {
+			//log.Println(string(bt))
+			err := json.Unmarshal(bt, &mres)
+			if err != nil {
+				log.Println(err.Error())
+			}
+		}
+	}
+	return
+}
+
+var SP = strings.Replace
+
+func percentEncode(str string) string {
+	str = url.QueryEscape(str)
+	str = SP(SP(SP(str, "+", "%20", -1), "*", "%2A", -1), "%7E", "~", -1)
+	return str
+}
+
+type paramSorter struct {
+	Keys []string
+	Vals []string
+}
+
+func (ps *paramSorter) String() string {
+	str := ""
+	for n, k := range ps.Keys {
+		str += k + "=" + ps.Vals[n]
+		if n < len(ps.Keys)-1 {
+			str += "&"
+		}
+	}
+	return str
+}
+func (ps *paramSorter) Query() string {
+	str := ""
+	for n, k := range ps.Keys {
+		str += k + "=" + url.QueryEscape(ps.Vals[n])
+		if n < len(ps.Keys)-1 {
+			str += "&"
+		}
+	}
+	return str
+}
+func (ps *paramSorter) Sort() {
+	sort.Sort(ps)
+}
+func (ps *paramSorter) Len() int {
+	return len(ps.Vals)
+}
+func (ps *paramSorter) Less(i, j int) bool {
+	return bytes.Compare([]byte(ps.Keys[i]), []byte(ps.Keys[j])) < 0
+}
+func (ps *paramSorter) Swap(i, j int) {
+	ps.Vals[i], ps.Vals[j] = ps.Vals[j], ps.Vals[i]
+	ps.Keys[i], ps.Keys[j] = ps.Keys[j], ps.Keys[i]
+}

+ 141 - 0
udp_ocr_conter/src/cluster/ssh.go

@@ -0,0 +1,141 @@
+package cluster
+
+import (
+	"bytes"
+	"fmt"
+	"log"
+	"mfw/util"
+	"net"
+
+	"golang.org/x/crypto/ssh"
+)
+
+func ssHConnect(user, password, host string, port int) (*ssh.Session, error) {
+	util.Catch()
+	var (
+		auth         []ssh.AuthMethod
+		addr         string
+		clientConfig *ssh.ClientConfig
+		client       *ssh.Client
+		session      *ssh.Session
+		err          error
+	)
+	// get auth method
+	auth = make([]ssh.AuthMethod, 0)
+	auth = append(auth, ssh.Password(password))
+	hostKeyCallbk := func(hostname string, remote net.Addr, key ssh.PublicKey) error {
+		return nil
+	}
+	clientConfig = &ssh.ClientConfig{
+		User: user,
+		Auth: auth,
+		// Timeout:             30 * time.Second,
+		HostKeyCallback: hostKeyCallbk,
+	}
+	// connet to ssh
+	addr = fmt.Sprintf("%s:%d", host, port)
+
+	if client, err = ssh.Dial("tcp", addr, clientConfig); err != nil {
+		return nil, err
+	}
+	// create session
+	if session, err = client.NewSession(); err != nil {
+		return nil, err
+	}
+	return session, nil
+}
+
+/*
+#!/bin/bash
+kill -9 $(pidof extract_v3)
+rm -rf extract_v3*
+mkdir extract_v3
+cd /opt/extract_v3
+wget http://10.171.112.160:9090/res/extract_v3.tgz
+tar -xzvf extract_v3.tgz
+chmod 777 extract_v3
+nohup ./extract_v3 >/opt/extract_v3/nohup 2>&1 &
+exit
+*/
+
+func RunSsh(ip string) bool {
+	util.Catch()
+	var b bool
+	var stdOut, stdErr bytes.Buffer
+	session, err := ssHConnect("root", Password, ip, 22)
+	if err != nil {
+		log.Println(err)
+		return b
+	}
+	defer session.Close()
+	session.Stdout = &stdOut
+	session.Stderr = &stdErr
+
+	var sshstr = `
+#!/bin/bash
+kill -9 $(pidof udp2019)
+yum install -y docker docker-compose unzip
+systemctl start docker
+docker pull registry-vpc.cn-beijing.aliyuncs.com/file2text/github_ocr:tleyden5iwx_open-ocr-preprocessor
+docker tag 00a689ddd4f8 tleyden5iwx/open-ocr-preprocessor:latest
+docker rmi registry-vpc.cn-beijing.aliyuncs.com/file2text/github_ocr:tleyden5iwx_open-ocr-preprocessor
+docker pull registry-vpc.cn-beijing.aliyuncs.com/file2text/github_ocr:rabbitmq
+docker tag 5335b737c380 rabbitmq:3.6.5-management
+docker rmi registry-vpc.cn-beijing.aliyuncs.com/file2text/github_ocr:rabbitmq
+docker pull registry-vpc.cn-beijing.aliyuncs.com/file2text/github_ocr:tleyden5iwx_open-ocr-2
+docker tag c99e1ea480 tleyden5iwx/open-ocr-2:latest
+docker rmi registry-vpc.cn-beijing.aliyuncs.com/file2text/github_ocr:tleyden5iwx_open-ocr-2
+cd /home/
+wget http://10.171.112.160:10100/res/auto.zip
+unzip auto.zip
+wget http://10.171.112.160:10100/res/open-ocr.tar
+tar -xvf open-ocr.tar 
+chmod 777 /home/f2text/file2text
+chmod 777 /home/udp/udp2019
+
+cd /home/open-ocr/docker-compose/
+export OPEN_OCR_INSTANCE=open-ocr-2
+export RABBITMQ_HOST=` + ip + `
+docker-compose up -d
+docker pull registry-vpc.cn-beijing.aliyuncs.com/file2text/gocv:v2-binbash
+docker run --name f2text -itd -v /home/:/home/ --net host registry-vpc.cn-beijing.aliyuncs.com/file2text/gocv:v2-binbash /bin/bash
+docker exec  f2text /etc/profile.d/auto.sh
+
+cd /home/udp/
+sed -i "s/127.0.0.1/` + ip + `/g" /home/udp/config.json
+nohup ./udp2019 >/home/udp/log.out 2>&1 &
+
+`
+	log.Println("连接到ip:", ip,"成功")
+	if err := session.Run(sshstr); err == nil {
+		b = true
+		log.Println(ip, "ssh over ok")
+	} else {
+		log.Println(err)
+	}
+	return b
+}
+func SshPgrep(ip, cmd string) (bool, string ){
+	var bStdout bytes.Buffer
+	session, err := ssHConnect("root", Password, ip, 22)
+	defer session.Close()
+	var b bool
+	if err != nil {
+		log.Println(err)
+		return b,cmd+" ssh err"+err.Error()
+	}
+	session.Stdout = &bStdout
+	var tmpstr string
+	if err := session.Run(cmd); err == nil {
+		b = true
+		if len(bStdout.String()) == 0 {
+			b = false
+		}
+		log.Println(ip, cmd, "pid:", bStdout.String())
+		tmpstr = fmt.Sprintf(" %s  %s  %s  %s",ip, cmd, "pid:", bStdout.String())
+	} else {
+		log.Println(ip, cmd, "err", err)
+		tmpstr = fmt.Sprintf(" %s  %s  %s  %s",ip, cmd, "err:", err.Error())
+	}
+	return b,tmpstr
+}

+ 25 - 0
udp_ocr_conter/src/config/config.go

@@ -0,0 +1,25 @@
+package config
+
+import (
+	"log"
+	"qfw/util"
+	"qfw/util/mongodb"
+	"strings"
+)
+
+var Sysconfig map[string]interface{}
+var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
+func init() {
+	util.ReadConfig(&Sysconfig)
+	MgoIP = util.ObjToString(Sysconfig["mongodb_ip"])
+	MgoDB = util.ObjToString(Sysconfig["mongodb_db"])
+	MgoC = util.ObjToString(Sysconfig["mongodb_c"])
+	SidField = util.ObjToString(Sysconfig["json_sidfiled"])
+	EidField = util.ObjToString(Sysconfig["json_eidfiled"])
+	MgoFileFiled = util.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo")
+	if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
+		log.Println("获取配置文件参数失败", Sysconfig)
+		return
+	}
+	mongodb.InitMongodbPool(util.IntAllDef(Sysconfig["dbsize"], 5), MgoIP, MgoDB)
+}

+ 0 - 0
udp_ocr_conter/public/header.html → udp_ocr_conter/src/public/header.html


+ 0 - 0
udp_ocr_conter/res/css/table.css → udp_ocr_conter/src/res/css/table.css


+ 0 - 0
udp_ocr_conter/res/js/jquery.js → udp_ocr_conter/src/res/js/jquery.js


+ 0 - 0
udp_ocr_conter/web/index.html → udp_ocr_conter/src/web/index.html


+ 0 - 0
udp_ocr_conter/web/login.html → udp_ocr_conter/src/web/login.html


+ 32 - 0
udp_ocr_conter/src/web/reload.html

@@ -0,0 +1,32 @@
+<!DOCTYPE html>
+<html lang="en">
+<script type="text/javascript" src="../res/js/jquery.js"></script>
+<body>
+重新部署实例    <button onclick="AddIps(1)">添加实例ip</button>
+<br>
+{{.}}
+<br>
+
+
+
+</body>
+
+<script>
+    var i =1
+    function AddIps(num) {
+        i++
+        console.log(i)
+        // var tmp = "ips["+i+"]"
+        var tmp = "ips"
+        console.log(tmp)
+        var appendHtml = '</br><input id="ips'+i+'" name="'+tmp+'" placeholder="请输入实例ip" required/>';
+        console.log(appendHtml)
+        console.log("#ips"+(i-1))
+        tmpselect = "#ips"+(i-1)
+        $(tmpselect).after(appendHtml)
+
+    }
+</script>
+
+
+</html>