fengweiqiang 6 жил өмнө
parent
commit
2d1a4fc424

+ 44 - 3
src/jy/admin/distribution/distribution.go

@@ -2,13 +2,17 @@
 package distribution
 
 import (
+	"encoding/json"
+	"github.com/gin-gonic/gin"
 	. "jy/admin"
 	"jy/cluster"
+	"jy/extract"
 	db "jy/mongodbutil"
+	ju "jy/util"
 	"log"
+	mu "mfw/util"
+	"net"
 	qu "qfw/util"
-
-	"github.com/gin-gonic/gin"
 )
 
 func init() {
@@ -54,7 +58,44 @@ func init() {
 		cluster.DescribeInstances()
 		c.JSON(200, gin.H{"rep": true})
 	})
-
+	//继续执行实例id区间段
+	Admin.POST("/distribution/continueExecution", func(c *gin.Context) {
+		qu.Catch()
+		InstanceId, _ := c.GetPostForm("InstanceId")
+		tmp, b := db.Mgo.FindOneByField("ecs", `{"InstanceId":"`+InstanceId+`"}`, `{"ip_nw":1,"lastId":1,"extask":1}`)
+		if b && tmp != nil {
+			ip, sid, eid := "", "", ""
+			ip = qu.ObjToString((*tmp)["ip_nw"])
+			if extasktmp, ok := (*tmp)["extask"].([]interface{}); ok && len(extasktmp) > 1 {
+				log.Println(extasktmp)
+				if qu.ObjToString((*tmp)["lastId"]) == "" {
+					sid = qu.ObjToString(extasktmp[0])
+				} else {
+					sid = qu.ObjToString((*tmp)["lastId"])
+				}
+				eid = qu.ObjToString(extasktmp[1])
+				by, _ := json.Marshal(map[string]interface{}{
+					"ip":         ip,
+					"gtid":       sid,
+					"lteid":      eid,
+					"InstanceId": InstanceId,
+					"stype":      "distributed",
+				})
+				err := extract.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+					IP:   net.ParseIP(ip),
+					Port: qu.IntAll(ju.Config["udpport"]),
+				})
+				if err != nil {
+					log.Println(err)
+					c.JSON(200, gin.H{"rep": true})
+					return
+				}
+				c.JSON(200, gin.H{"rep": true})
+				return
+			}
+		}
+		c.JSON(200, gin.H{"rep": false})
+	})
 	//释放ecs实例
 	Admin.POST("/distribution/deleteInstance", func(c *gin.Context) {
 		InstanceId, _ := c.GetPostForm("InstanceId")

+ 2 - 2
src/jy/cluster/ssh.go

@@ -49,11 +49,11 @@ var sshstr = `
 cd /opt
 kill -9 $(pidof extract_v3)
 rm -rf extract_v3*
-wget http://10.170.187.34:8300/upload/extract_v3.tgz
+wget http://10.171.112.160:9090/res/extract_v3.tgz
 tar -xzvf extract_v3.tgz
 cd /opt/extract_v3
 chmod 777 extract_v3
-nohup ./extract_v3 >/dev/null 2>&1 &
+nohup ./extract_v3 &
 exit
 `
 

+ 6 - 6
src/jy/extract/clearesult.go

@@ -111,9 +111,9 @@ func (c *ClearTask) UpdateResultVal(init bool) {
 		go func() {
 			for {
 				c.RWMutex.Lock()
-				if len(c.UpdateResult) > 100 {
-					arr := c.UpdateResult[:100]
-					c.UpdateResult = c.UpdateResult[100:]
+				if len(c.UpdateResult) > 50 {
+					arr := c.UpdateResult[:50]
+					c.UpdateResult = c.UpdateResult[50:]
 					c.RWMutex.Unlock()
 					qu.Try(func() {
 						c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
@@ -141,9 +141,9 @@ func (c *ClearTask) UpdateResultVal(init bool) {
 		qu.Try(func() {
 			lenarr := len(arr)
 			for {
-				if lenarr > 100 {
-					arr2 := arr[:100]
-					arr = arr[100:]
+				if lenarr > 50 {
+					arr2 := arr[:50]
+					arr = arr[50:]
 					lenarr = len(arr)
 					c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr2...)
 					time.Sleep(1*time.Second)

+ 1 - 1
src/jy/extract/extract.go

@@ -1304,8 +1304,8 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 				}
 				e.RWMutex.Lock()
 				e.BidArr = append(e.BidArr, tmparr)
-				e.RWMutex.Unlock()
 				e.BidTotal++
+				e.RWMutex.Unlock()
 			}
 			if b, ok := ju.Config["saveresult"].(bool); ok && b {
 				id := tmp["_id"]

+ 12 - 12
src/jy/extract/extractInit.go

@@ -818,9 +818,9 @@ func (e *ExtractTask) ResultSave(init bool) {
 		go func() {
 			for {
 				e.RWMutex.Lock()
-				if len(e.ResultArr) > 100 {
-					arr := e.ResultArr[:100]
-					e.ResultArr = e.ResultArr[100:]
+				if len(e.ResultArr) > 50 {
+					arr := e.ResultArr[:50]
+					e.ResultArr = e.ResultArr[50:]
 					e.RWMutex.Unlock()
 					qu.Try(func() {
 						db.Mgo.UpSertBulk("extract_result", arr...)
@@ -849,9 +849,9 @@ func (e *ExtractTask) ResultSave(init bool) {
 		qu.Try(func() {
 			lenarr := len(arr)
 			for {
-				if lenarr > 100 {
-					arr2 := arr[:100]
-					arr = arr[100:]
+				if lenarr > 50 {
+					arr2 := arr[:50]
+					arr = arr[50:]
 					lenarr = len(arr)
 					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr2...)
 				} else {
@@ -878,9 +878,9 @@ func (e *ExtractTask) BidSave(init bool) {
 		go func() {
 			for {
 				e.RWMutex.Lock()
-				if len(e.BidArr) > 100 {
-					arr := e.BidArr[:100]
-					e.BidArr = e.BidArr[100:]
+				if len(e.BidArr) > 50 {
+					arr := e.BidArr[:50]
+					e.BidArr = e.BidArr[50:]
 					e.RWMutex.Unlock()
 					qu.Try(func() {
 						e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
@@ -908,9 +908,9 @@ func (e *ExtractTask) BidSave(init bool) {
 		qu.Try(func() {
 			lenarr := len(arr)
 			for {
-				if lenarr > 100 {
-					arr2 := arr[:100]
-					arr = arr[100:]
+				if lenarr > 50 {
+					arr2 := arr[:50]
+					arr = arr[50:]
 					lenarr = len(arr)
 					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr2...)
 				} else {

+ 27 - 11
src/jy/extract/extractudp.go

@@ -10,10 +10,12 @@ import (
 	"jy/cluster"
 	db "jy/mongodbutil"
 	ju "jy/util"
+	log2 "log"
 	mu "mfw/util"
 	"net"
 	"net/http"
 	qu "qfw/util"
+	"strings"
 	"sync"
 )
 
@@ -42,7 +44,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				log.Debug("err", "sid=", sid, ",eid=", eid)
 			} else {
 				if stype == "distributed" { //分布式抽取分支
-					go Udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
+					go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
 					log.Debug("分布式抽取id段", sid, " ", eid)
 					InstanceId := qu.ObjToString(rep["InstanceId"])
 					db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
@@ -89,25 +91,44 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 	case mu.OP_NOOP: //下个节点回应
 		log.Debug(string(data))
+		log2.Println(string(data))
 	case mu.OP_SEND_EMAIL:
-		log.Debug(data, ra.IP)
+		log.Debug("实例抽取完成,发送邮件:",string(data),ra.IP)
+		log2.Println("实例抽取完成,发送邮件:",string(data), ra.IP)
 		rep := make(map[string]interface{})
 		err := json.Unmarshal(data, &rep)
 		if err != nil {
 			log.Debug(err)
+			log2.Println(string(data), ra.IP)
 		} else {
-			sendMail(string(data))
+			tmpstr := ""
+			for k,v :=range rep{
+				switch k {
+				case "desc":
+					tmpstr += fmt.Sprint(v)+","
+				case "count":
+					tmpstr += "区间数据量为"+fmt.Sprint(v)+","
+				case "index":
+					tmpstr += "实际抽取数据量"+fmt.Sprint(v)+","
+				case "instanceId":
+					tmpstr += "实例"+fmt.Sprint(v)+","
+				}
+			}
+			tmpstr = strings.TrimRight(tmpstr,",")
+			sendMail(tmpstr)
 			cluster.ModifyInstanceAutoReleaseTime(qu.ObjToString(rep["instanceId"]), qu.IntAll(ju.Config["deleteInstanceTimeHour"]))
 		}
 	}
 }
 func sendMail(content string) {
+	log2.Println(ju.Config["api"], ju.Config["tomail"],"jy-data-extract_3.2","抽取完成:"+content)
 	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", ju.Config["api"], ju.Config["tomail"], "jy-data-extract_3.2", "抽取完成:"+content))
 	defer res.Body.Close()
 	if err == nil {
 		read, err := ioutil.ReadAll(res.Body)
-		log.Debug("邮件发送:", string(read), err)
+		log2.Println("邮件发送:", string(read), err)
 	}
+	log2.Println("api email:",err)
 }
 
 var ext *ExtractTask
@@ -171,7 +192,7 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 		if qu.ObjToString((*esc)["lastIdback"]) != "" {
 			sidback = qu.ObjToString((*esc)["lastIdback"])
 		}
-
+		go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功,count=%d,pageNum=%d,query=%v\n",instanceId[1], count, pageNum, query)), mu.OP_NOOP, ra)
 		for i := startI; i < pageNum; i++ {
 			query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
 			fmt.Printf("page=%d,query=%v\n", i+1, query)
@@ -182,7 +203,6 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 						continue
 					}
 					_id := qu.BsonIdToSId(v["_id"])
-					log.Debug(_id)
 					var j, jf *ju.Job
 					if ext.IsFileField && v["projectinfo"] != nil {
 						v["isextFile"] = true
@@ -209,7 +229,6 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 						continue
 					}
 					_id := qu.BsonIdToSId(v["_id"])
-					log.Debug(_id)
 					var j, jf *ju.Job
 					if ext.IsFileField && v["projectinfo"] != nil {
 						v["isextFile"] = true
@@ -234,12 +253,9 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 				}}, true, false)
 		}
 		des := make(map[string]interface{})
-		des["desc"]=`分布式抽取完成,一小时后释放, sid:`+sid+`, eid:`+eid+`, count:`+fmt.Sprint(count)+`,index:`+fmt.Sprint(index)+`,bidtotal:`+fmt.Sprint(ext.BidTotal)+`,释放esc实例: `+instanceId[0]+`,`+instanceId[1]
-		des["sid"] = sid
-		des["eid"] = eid
+		des["desc"]="分布式抽取完成,一小时后释放"
 		des["count"] = count
 		des["index"] = index
-		des["bidtotal"] = ext.BidTotal
 		des["instanceId"] = instanceId[0]
 		des["instanceIP"] = instanceId[1]
 		udpbytes, _ := json.Marshal(des)

+ 18 - 1
src/web/templates/admin/distribution.html

@@ -110,6 +110,7 @@ $(function () {
             }},
 			{"data":"InstanceId",render:function(val,a,row,pos){
 				return  '<div class="btn-group">'+
+                    '<a class="btn btn-sm btn-primary opr" onclick="ContinueExecution(\''+row.InstanceId+'\')">执行</a>'+
                     '<a class="btn btn-sm btn-warning" onclick="del(\''+row.InstanceId+'\')">释放</a>'+
                     '<a class="btn btn-sm btn-danger" href="#" onclick="delInstanceId(\''+row.InstanceId+'\')">删&nbsp;&nbsp;除</a>'
                     '</div>';
@@ -187,6 +188,22 @@ $(function () {
 		});
 	})
 })
+function ContinueExecution(_id){
+    showConfirm("确定继续抽取?", function() {
+        $.ajax({
+            url:"/admin/distribution/continueExecution",
+            type:"post",
+            data:{"InstanceId":_id},
+            success:function(r){
+                if(r.rep){
+                    ttable.ajax.reload();
+                }else{
+                    showTip("执行失败", 1000, function() {});
+                }
+            }
+        })
+    });
+}
 function del(_id){
 	showConfirm("确定释放实例?", function() {
         $.ajax({
@@ -213,7 +230,7 @@ function delInstanceId(_id){
                 if(r.rep){
                     ttable.ajax.reload();
                 }else{
-                    showTip("确定删除实例", 1000, function() {});
+                    showTip("删除实例失败", 1000, function() {});
                 }
             }
         })