Jelajahi Sumber

分布式抽取调整

zhangjinkun 6 tahun lalu
induk
melakukan
22dddca5f1

+ 32 - 94
src/jy/admin/distribution/distributed.go

@@ -1,46 +1,41 @@
 /**
-分布式抽取
+数据按小时划段入库
 **/
 package distribution
 
 import (
 	"encoding/json"
-	"fmt"
 	"jy/extract"
 	db "jy/mongodbutil"
 	ju "jy/util"
-	"log"
 	mu "mfw/util"
 	"net"
 	qu "qfw/util"
 	"time"
 
+	log "github.com/donnie4w/go-logger/logger"
 	"gopkg.in/mgo.v2/bson"
 )
 
-var EscIds map[string][]string //id区间
-
-//根据esc数量实例数量id划段
+//按小时id划段
 func IdsRange(table, sdate, edate string) int {
 	start, _ := time.ParseInLocation(qu.Date_Short_Layout, sdate, time.Local)
 	end, _ := time.ParseInLocation(qu.Date_Short_Layout, edate, time.Local)
-	EscIds = map[string][]string{}
-	list, _ := db.Mgo.Find("ecs", `{"Status":"Running"}`, nil, nil, false, -1, -1)
-	ids := RangeIdsByDate(len(*list), start, end)
-	for k, v := range *list {
-		db.Mgo.UpdateById("ecs", qu.BsonIdToSId(v["_id"]), map[string]interface{}{
-			"$set": map[string]interface{}{
-				"extask": []string{
-					ids[fmt.Sprint(k)][0],
-					ids[fmt.Sprint(k)][1],
-					ids[fmt.Sprint(k)][2],
-					qu.ObjToString(v["InstanceId"]),
-					ids[fmt.Sprint(k)][3],
-				},
-			},
+	nums := 0
+	for start.Unix() <= end.Unix() {
+		nums++
+		timestr := start.Format("2006-01-02 15:04:05")
+		sid := bson.NewObjectIdWithTime(start)
+		start = start.Add(1 * time.Hour)
+		eid := bson.NewObjectIdWithTime(start)
+		db.Mgo.Save("esctask", map[string]interface{}{
+			"timestr": timestr,
+			"sid":     qu.BsonIdToSId(sid),
+			"eid":     qu.BsonIdToSId(eid),
+			"state":   0, //0待执行,1执行中,2完成
 		})
 	}
-	return len(*list)
+	return nums
 }
 
 //启动任务
@@ -48,80 +43,23 @@ func RunEcsTask() int {
 	list, _ := db.Mgo.Find("ecs", `{"extstatus":"deploy"}`, nil, nil, false, -1, -1)
 	num := 0
 	for _, v := range *list {
-		if extask, ok := v["extask"].([]interface{}); ok {
-			ip := qu.ObjToString(v["ip_nw"])
-			by, _ := json.Marshal(map[string]interface{}{
-				"ip":         ip,
-				"gtid":       extask[0],
-				"lteid":      extask[1],
-				"InstanceId": extask[3],
-				"stype":      "distributed",
-			})
-			err := extract.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-				IP:   net.ParseIP(ip),
-				Port: qu.IntAll(ju.Config["udpport"]),
-			})
-			if err != nil {
-				log.Println(err)
-			} else {
-				num++
-				time.Sleep(2 * time.Second)
-				log.Println("分发任务", string(by))
-			}
+		ip := qu.ObjToString(v["ip_nw"])
+		by, _ := json.Marshal(map[string]interface{}{
+			"ip":         ip,
+			"InstanceId": v["InstanceId"],
+			"stype":      "distributed",
+		})
+		err := extract.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+			IP:   net.ParseIP(ip),
+			Port: qu.IntAll(ju.Config["udpport"]),
+		})
+		if err != nil {
+			log.Debug(err)
+		} else {
+			num++
+			time.Sleep(2 * time.Second)
+			log.Debug("分发任务", string(by))
 		}
 	}
 	return num
 }
-
-//id分段
-func RangeIdsByDate(escnum int, start, end time.Time) map[string][]string {
-	ids := map[string][]string{}
-	task, _ := db.Mgo.FindById("task", qu.ObjToString(ju.Config["udptaskid"]), nil)
-	log.Println(qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
-	DB := db.MgoFactory(2, 3, 120, qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
-	total := DB.Count("bidding", bson.M{"_id": bson.M{"$gte": bson.NewObjectIdWithTime(start), "$lt": bson.NewObjectIdWithTime(end)}})
-	total_back := DB.Count("bidding_back", bson.M{"_id": bson.M{"$gte": bson.NewObjectIdWithTime(start), "$lt": bson.NewObjectIdWithTime(end)}})
-	total += total_back
-	pagesize := (total + escnum - 1) / escnum
-	log.Printf("total:%d pagesize:%d escnum:%d", total, pagesize, escnum)
-	nums := 0
-	table := "bidding_back"
-	for i := 0; i < escnum; i++ {
-		log.Println("escnum", i)
-		sid := bson.NewObjectIdWithTime(start)
-		var eid bson.ObjectId
-		var idsnum = 0
-		for {
-			tmpsid := bson.NewObjectIdWithTime(start)
-			endi := start.Add(4 * time.Hour)
-			if endi.Unix() > end.Unix() {
-				eid = bson.NewObjectIdWithTime(end)
-			} else {
-				eid = bson.NewObjectIdWithTime(endi)
-			}
-			start = endi
-			query := bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": eid}}
-			count := DB.Count(table, query)
-			//log.Println(count, table, query)
-			if count < 1 { //校验是否切换table
-				tmpnum := DB.Count(table, bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": bson.NewObjectIdWithTime(endi.Add(30 * 24 * time.Hour))}})
-				if tmpnum < 1 && table == "bidding_back" {
-					table = "bidding"
-					start = start.Add(-4 * time.Hour)
-					log.Println("切换table,bidding", start)
-					continue
-				}
-			} else {
-				idsnum += count
-			}
-			//log.Printf("i:%d count:%d,date:%s", i, idsnum, end.Format(qu.Date_Full_Layout))
-			if idsnum >= pagesize || start.Unix() > time.Now().Unix() || count > 5000000 { //测试数据count > 5000000
-				break
-			}
-		}
-		nums += idsnum
-		ids[fmt.Sprint(i)] = []string{qu.BsonIdToSId(sid), qu.BsonIdToSId(eid), fmt.Sprint(idsnum), table}
-		log.Println("nums", nums, table)
-	}
-	return ids
-}

+ 127 - 0
src/jy/admin/distribution/distributed_bak

@@ -0,0 +1,127 @@
+/**
+分布式抽取
+**/
+package distribution
+
+import (
+	"encoding/json"
+	"fmt"
+	"jy/extract"
+	db "jy/mongodbutil"
+	ju "jy/util"
+	"log"
+	mu "mfw/util"
+	"net"
+	qu "qfw/util"
+	"time"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+var EscIds map[string][]string //id区间
+
+//根据esc数量实例数量id划段
+func IdsRange(table, sdate, edate string) int {
+	start, _ := time.ParseInLocation(qu.Date_Short_Layout, sdate, time.Local)
+	end, _ := time.ParseInLocation(qu.Date_Short_Layout, edate, time.Local)
+	EscIds = map[string][]string{}
+	list, _ := db.Mgo.Find("ecs", `{"Status":"Running"}`, nil, nil, false, -1, -1)
+	ids := RangeIdsByDate(len(*list), start, end)
+	for k, v := range *list {
+		db.Mgo.UpdateById("ecs", qu.BsonIdToSId(v["_id"]), map[string]interface{}{
+			"$set": map[string]interface{}{
+				"extask": []string{
+					ids[fmt.Sprint(k)][0],
+					ids[fmt.Sprint(k)][1],
+					ids[fmt.Sprint(k)][2],
+					qu.ObjToString(v["InstanceId"]),
+					ids[fmt.Sprint(k)][3],
+				},
+			},
+		})
+	}
+	return len(*list)
+}
+
+//启动任务
+func RunEcsTask() int {
+	list, _ := db.Mgo.Find("ecs", `{"extstatus":"deploy"}`, nil, nil, false, -1, -1)
+	num := 0
+	for _, v := range *list {
+		if extask, ok := v["extask"].([]interface{}); ok {
+			ip := qu.ObjToString(v["ip_nw"])
+			by, _ := json.Marshal(map[string]interface{}{
+				"ip":         ip,
+				"gtid":       extask[0],
+				"lteid":      extask[1],
+				"InstanceId": extask[3],
+				"stype":      "distributed",
+			})
+			err := extract.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP(ip),
+				Port: qu.IntAll(ju.Config["udpport"]),
+			})
+			if err != nil {
+				log.Println(err)
+			} else {
+				num++
+				time.Sleep(2 * time.Second)
+				log.Println("分发任务", string(by))
+			}
+		}
+	}
+	return num
+}
+
+//id分段
+func RangeIdsByDate(escnum int, start, end time.Time) map[string][]string {
+	ids := map[string][]string{}
+	task, _ := db.Mgo.FindById("task", qu.ObjToString(ju.Config["udptaskid"]), nil)
+	log.Println(qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
+	DB := db.MgoFactory(2, 3, 120, qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
+	total := DB.Count("bidding", bson.M{"_id": bson.M{"$gte": bson.NewObjectIdWithTime(start), "$lt": bson.NewObjectIdWithTime(end)}})
+	total_back := DB.Count("bidding_back", bson.M{"_id": bson.M{"$gte": bson.NewObjectIdWithTime(start), "$lt": bson.NewObjectIdWithTime(end)}})
+	total += total_back
+	pagesize := (total + escnum - 1) / escnum
+	log.Printf("total:%d pagesize:%d escnum:%d", total, pagesize, escnum)
+	nums := 0
+	table := "bidding_back"
+	for i := 0; i < escnum; i++ {
+		log.Println("escnum", i)
+		sid := bson.NewObjectIdWithTime(start)
+		var eid bson.ObjectId
+		var idsnum = 0
+		for {
+			tmpsid := bson.NewObjectIdWithTime(start)
+			endi := start.Add(4 * time.Hour)
+			if endi.Unix() > end.Unix() {
+				eid = bson.NewObjectIdWithTime(end)
+			} else {
+				eid = bson.NewObjectIdWithTime(endi)
+			}
+			start = endi
+			query := bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": eid}}
+			count := DB.Count(table, query)
+			//log.Println(count, table, query)
+			if count < 1 { //校验是否切换table
+				tmpnum := DB.Count(table, bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": bson.NewObjectIdWithTime(endi.Add(30 * 24 * time.Hour))}})
+				if tmpnum < 1 && table == "bidding_back" {
+					table = "bidding"
+					start = start.Add(-4 * time.Hour)
+					log.Println("切换table,bidding", start)
+					continue
+				}
+			} else {
+				idsnum += count
+			}
+			//log.Printf("i:%d count:%d,date:%s", i, idsnum, end.Format(qu.Date_Full_Layout))
+			if idsnum >= pagesize || start.Unix() > time.Now().Unix() || count > 5000000 { //测试数据count > 5000000
+				break
+			}
+		}
+		nums += idsnum
+		ids[fmt.Sprint(i)] = []string{qu.BsonIdToSId(sid), qu.BsonIdToSId(eid), fmt.Sprint(idsnum), table}
+		log.Println("nums", nums, table)
+	}
+	return ids
+}

+ 52 - 29
src/jy/admin/distribution/distribution.go

@@ -3,7 +3,6 @@ package distribution
 
 import (
 	"encoding/json"
-	"github.com/gin-gonic/gin"
 	. "jy/admin"
 	"jy/cluster"
 	"jy/extract"
@@ -13,6 +12,9 @@ import (
 	mu "mfw/util"
 	"net"
 	qu "qfw/util"
+	"time"
+
+	"github.com/gin-gonic/gin"
 )
 
 func init() {
@@ -58,41 +60,30 @@ func init() {
 		cluster.DescribeInstances()
 		c.JSON(200, gin.H{"rep": true})
 	})
-	//继续执行实例id区间段
+	//单实例任务
 	Admin.POST("/distribution/continueExecution", func(c *gin.Context) {
 		qu.Catch()
 		InstanceId, _ := c.GetPostForm("InstanceId")
 		tmp, b := db.Mgo.FindOneByField("ecs", `{"InstanceId":"`+InstanceId+`"}`, `{"ip_nw":1,"lastId":1,"extask":1}`)
 		if b && tmp != nil {
-			ip, sid, eid := "", "", ""
+			ip := ""
 			ip = qu.ObjToString((*tmp)["ip_nw"])
-			if extasktmp, ok := (*tmp)["extask"].([]interface{}); ok && len(extasktmp) > 1 {
-				log.Println(extasktmp)
-				if qu.ObjToString((*tmp)["lastId"]) == "" {
-					sid = qu.ObjToString(extasktmp[0])
-				} else {
-					sid = qu.ObjToString((*tmp)["lastId"])
-				}
-				eid = qu.ObjToString(extasktmp[1])
-				by, _ := json.Marshal(map[string]interface{}{
-					"ip":         ip,
-					"gtid":       sid,
-					"lteid":      eid,
-					"InstanceId": InstanceId,
-					"stype":      "distributed",
-				})
-				err := extract.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-					IP:   net.ParseIP(ip),
-					Port: qu.IntAll(ju.Config["udpport"]),
-				})
-				if err != nil {
-					log.Println(err)
-					c.JSON(200, gin.H{"rep": true})
-					return
-				}
+			by, _ := json.Marshal(map[string]interface{}{
+				"ip":         ip,
+				"InstanceId": InstanceId,
+				"stype":      "distributed",
+			})
+			err := extract.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP(ip),
+				Port: qu.IntAll(ju.Config["udpport"]),
+			})
+			if err != nil {
+				log.Println(err)
 				c.JSON(200, gin.H{"rep": true})
 				return
 			}
+			c.JSON(200, gin.H{"rep": true})
+			return
 		}
 		c.JSON(200, gin.H{"rep": false})
 	})
@@ -112,9 +103,40 @@ func init() {
 	//部署
 	Admin.POST("/distribution/deploy", func(c *gin.Context) {
 		list, _ := db.Mgo.Find("ecs", `{"Status":"Running"}`, nil, nil, false, -1, -1)
+		ch := make(chan bool, 5)
 		for _, v := range *list {
-			ip := qu.ObjToString(v["ip_nw"])
-			id := qu.ObjToString(v["InstanceId"])
+			time.Sleep(1 * time.Second)
+			ch <- true
+			go func(v map[string]interface{}) {
+				ip := qu.ObjToString(v["ip_nw"])
+				id := qu.ObjToString(v["InstanceId"])
+				log.Println("部署", ip, id)
+				cluster.RunSsh(ip, cluster.Password, 22)
+				db.Mgo.Update("ecs", `{"ip_nw":"`+ip+`"}`,
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"extstatus": "deploy",
+						},
+					}, true, false)
+				<-ch
+			}(v)
+		}
+		c.JSON(200, gin.H{"rep": true})
+	})
+
+	Admin.POST("/distribution/deployone", func(c *gin.Context) {
+		InstanceId, _ := c.GetPostForm("InstanceId")
+		v, b := db.Mgo.FindOneByField("ecs", `{"InstanceId":"`+InstanceId+`"}`, `{"ip_nw":1,"InstanceId":1}`)
+		log.Println(b, InstanceId)
+		if b {
+			ip := qu.ObjToString((*v)["ip_nw"])
+			id := qu.ObjToString((*v)["InstanceId"])
+			db.Mgo.Update("ecs", `{"ip_nw":"`+ip+`"}`,
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"extstatus": "",
+					},
+				}, true, false)
 			log.Println("部署", ip, id)
 			cluster.RunSsh(ip, cluster.Password, 22)
 			db.Mgo.Update("ecs", `{"ip_nw":"`+ip+`"}`,
@@ -123,6 +145,7 @@ func init() {
 						"extstatus": "deploy",
 					},
 				}, true, false)
+
 		}
 		c.JSON(200, gin.H{"rep": true})
 	})

+ 14 - 7
src/jy/cluster/ssh.go

@@ -47,12 +47,19 @@ func ssHConnect(user, password, host string, port int) (*ssh.Session, error) {
 var sshstr = `
 #!/bin/bash
 curl -fsSL get.docker.com -o get-docker.sh
-sudo sh get-docker.sh --mirror Aliyun
-sudo systemctl start docker
-sudo docker pull registry-vpc.cn-beijing.aliyuncs.com/file2text/gocv:v1 
-sudo docker run -itd --name gocv registry-vpc.cn-beijing.aliyuncs.com/file2text/gocv:v1  /bin/bash 
-sudo docker exec -it gocv /bin/bash
-cd /opt
+sh get-docker.sh --mirror Aliyun
+systemctl start docker
+cd /opt/
+wget http://10.171.112.160:9090/res/auto_service.sh
+chmod 777 auto_service.sh
+wget http://10.171.112.160:9090/res/gocv.tar
+docker load --input gocv.tar
+docker run --net=host --name=gocv -itd -v /opt:/opt gocv:v2 /opt/auto_service.sh
+exit
+`
+
+/*
+#!/bin/bash
 kill -9 $(pidof extract_v3)
 rm -rf extract_v3*
 mkdir extract_v3
@@ -62,7 +69,7 @@ tar -xzvf extract_v3.tgz
 chmod 777 extract_v3
 nohup ./extract_v3 >/opt/extract_v3/nohup 2>&1 &
 exit
-`
+*/
 
 func RunSsh(ip, password string, port int) {
 	var stdOut, stdErr bytes.Buffer

+ 81 - 150
src/jy/extract/extractudp.go

@@ -4,17 +4,13 @@ package extract
 import (
 	"encoding/json"
 	"fmt"
-	"io/ioutil"
-	"jy/cluster"
 	db "jy/mongodbutil"
 	ju "jy/util"
-	log2 "log"
 	mu "mfw/util"
 	"net"
-	"net/http"
 	qu "qfw/util"
-	"strings"
 	"sync"
+	"time"
 
 	log "github.com/donnie4w/go-logger/logger"
 	"gopkg.in/mgo.v2/bson"
@@ -38,32 +34,29 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		if err != nil {
 			log.Debug(err)
 		} else {
-			sid, _ := rep["gtid"].(string)
-			eid, _ := rep["lteid"].(string)
 			stype, _ := rep["stype"].(string)
-			if sid == "" || eid == "" {
-				log.Debug("err", "sid=", sid, ",eid=", eid)
+			if stype == "distributed" { //分布式抽取分支
+				go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
+				InstanceId := qu.ObjToString(rep["InstanceId"])
+				db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"extstatus": "running",
+						},
+					}, true, false)
+				ExtractByUdp("", "", ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
+				db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"extstatus": "ok",
+						},
+					}, true, false)
+				log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"]))
 			} else {
-				if stype == "distributed" { //分布式抽取分支
-					go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
-					log.Debug("分布式抽取id段", sid, " ", eid)
-					InstanceId := qu.ObjToString(rep["InstanceId"])
-					db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
-						map[string]interface{}{
-							"$set": map[string]interface{}{
-								"extstatus": "running",
-							},
-						}, true, false)
-					ExtractByUdp(sid, eid, ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
-					db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
-						map[string]interface{}{
-							"$set": map[string]interface{}{
-								"extstatus": "ok",
-							},
-						}, true, false)
-					//<-time.NewTimer(time.Minute * time.Duration(qu.IntAll(ju.Config["DeleteInstanceTimeMinute"]))).C
-					//cluster.DeleteInstance("instanceId[0]")
-					log.Debug("分布式抽取完成", sid, " ", eid, "释放esc实例", qu.ObjToString(rep["ip"]))
+				sid, _ := rep["gtid"].(string)
+				eid, _ := rep["lteid"].(string)
+				if sid == "" || eid == "" {
+					log.Debug("err", "sid=", sid, ",eid=", eid)
 				} else {
 					udpinfo, _ := rep["key"].(string)
 					if udpinfo == "" {
@@ -92,44 +85,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 	case mu.OP_NOOP: //下个节点回应
 		log.Debug(string(data))
-		log2.Println(string(data))
-	case mu.OP_SEND_EMAIL:
-		log.Debug("实例抽取完成,发送邮件:", string(data), ra.IP)
-		log2.Println("实例抽取完成,发送邮件:", string(data), ra.IP)
-		rep := make(map[string]interface{})
-		err := json.Unmarshal(data, &rep)
-		if err != nil {
-			log.Debug(err)
-			log2.Println(string(data), ra.IP)
-		} else {
-			tmpstr := ""
-			for k, v := range rep {
-				switch k {
-				case "desc":
-					tmpstr += fmt.Sprint(v) + ","
-				case "count":
-					tmpstr += "实际抽取数据量" + fmt.Sprint(v) + ","
-				case "index":
-					tmpstr += "区间数据量为" + fmt.Sprint(v) + ","
-				case "instanceId":
-					tmpstr += "实例" + fmt.Sprint(v) + ","
-				}
-			}
-			tmpstr = strings.TrimRight(tmpstr, ",")
-			sendMail(tmpstr)
-			cluster.ModifyInstanceAutoReleaseTime(qu.ObjToString(rep["instanceId"]), qu.IntAll(ju.Config["deleteInstanceTimeHour"]))
-		}
-	}
-}
-func sendMail(content string) {
-	log2.Println(ju.Config["api"], ju.Config["tomail"], "jy-data-extract_3.2", "抽取完成:"+content)
-	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", ju.Config["api"], ju.Config["tomail"], "jy-data-extract_3.2", "抽取完成:"+content))
-	defer res.Body.Close()
-	if err == nil {
-		read, err := ioutil.ReadAll(res.Body)
-		log2.Println("邮件发送:", string(read), err)
 	}
-	log2.Println("api email:", err)
 }
 
 var ext *ExtractTask
@@ -173,96 +129,71 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 	}
 	index := 0
 	if len(instanceId) > 0 { //分布式抽取进度
-		query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-		count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
-		count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
-		count := count1 + count2
-		pageNum := (count + PageSize - 1) / PageSize
-		limit := PageSize
-		if count < PageSize {
-			limit = count
-		}
-		fmt.Printf("count=%d,pageNum=%d,query=%v\n", count, pageNum, query)
-
-		startI := 0 //接着上次任务执行
-		sidback := sid
-		esc, _ := db.Mgo.FindOne("ecs", `{"InstanceId":"`+instanceId[0]+`"}`)
-		startI = qu.IntAll((*esc)["pagecurrent"])
-		if qu.ObjToString((*esc)["lastId"]) != "" {
-			sid = qu.ObjToString((*esc)["lastId"])
-		}
-		if qu.ObjToString((*esc)["lastIdback"]) != "" {
-			sidback = qu.ObjToString((*esc)["lastIdback"])
-		}
-		go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功,count=%d,pageNum=%d,query=%v\n", instanceId[1], count, pageNum, query)), mu.OP_NOOP, ra)
-		for i := startI; i < pageNum; i++ {
-			query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-			fmt.Printf("page=%d,query=%v\n", i+1, query)
-			if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) > 0 {
-				list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
-				for _, v := range *list {
-					if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
-						continue
-					}
-					_id := qu.BsonIdToSId(v["_id"])
-					var j, jf *ju.Job
-					if ext.IsFileField && v["projectinfo"] != nil {
-						v["isextFile"] = true
-						j, jf = ext.PreInfo(v)
-					} else {
-						j, _ = ext.PreInfo(v)
-					}
-					ext.TaskInfo.ProcessPool <- true
-					go ext.ExtractProcess(j, jf)
-					sid = _id
-					index++
+		go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
+		for {
+			tsk, b := db.Mgo.FindOne("esctask", `{"state":{"$lt":1}}`)
+			if tsk != nil && !b {
+				break
+			}
+			db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
+				"$set": map[string]interface{}{
+					"InstanceId": instanceId[0],
+					"state":      1,
+					"runtime":    time.Now().Format(qu.Date_Full_Layout),
+				},
+			})
+			query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(qu.ObjToString((*tsk)["sid"])), "$lte": bson.ObjectIdHex(qu.ObjToString((*tsk)["eid"]))}}
+			count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
+			count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
+			log.Debug("timestr", (*tsk)["timestr"], "count", count1+count2)
+			list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
+			for _, v := range *list {
+				if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
+					continue
+				}
+				var j, jf *ju.Job
+				if ext.IsFileField && v["projectinfo"] != nil {
+					v["isextFile"] = true
+					j, jf = ext.PreInfo(v)
+				} else {
+					j, _ = ext.PreInfo(v)
 				}
-				db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
-					map[string]interface{}{"$set": map[string]interface{}{
-						"lastId": sid,
-					}}, true, false)
+				ext.TaskInfo.ProcessPool <- true
+				go ext.ExtractProcess(j, jf)
+				index++
 			}
-			queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
-			fmt.Printf("page=%d,queryback=%v\n", i+1, queryback)
-			if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
-				list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
-				for _, v := range *list2 {
-					if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
-						continue
-					}
-					_id := qu.BsonIdToSId(v["_id"])
-					var j, jf *ju.Job
-					if ext.IsFileField && v["projectinfo"] != nil {
-						v["isextFile"] = true
-						j, jf = ext.PreInfo(v)
-					} else {
-						j, _ = ext.PreInfo(v)
-					}
-					ext.TaskInfo.ProcessPool <- true
-					go ext.ExtractProcess(j, jf)
-					sidback = _id
-					index++
+			list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, -1, -1)
+			for _, v := range *list2 {
+				if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
+					continue
 				}
-				db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
-					map[string]interface{}{"$set": map[string]interface{}{
-						"lastIdback": sidback,
-					}}, true, false)
+				var j, jf *ju.Job
+				if ext.IsFileField && v["projectinfo"] != nil {
+					v["isextFile"] = true
+					j, jf = ext.PreInfo(v)
+				} else {
+					j, _ = ext.PreInfo(v)
+				}
+				ext.TaskInfo.ProcessPool <- true
+				go ext.ExtractProcess(j, jf)
+				index++
 			}
+			db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
+				"$set": map[string]interface{}{
+					"InstanceId": instanceId[0],
+					"oktime":     time.Now().Format(qu.Date_Full_Layout),
+					"state":      1,
+				},
+			})
 			db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
-				map[string]interface{}{"$set": map[string]interface{}{
-					"pagetotal":   pageNum,
-					"pagecurrent": i + 1,
-				}}, true, false)
+				map[string]interface{}{
+					"$inc": map[string]interface{}{
+						"totalnum": count1 + count2,
+						"step":     1,
+					},
+				}, true, false)
 		}
-		des := make(map[string]interface{})
-		des["desc"] = "分布式抽取完成,一小时后释放"
-		des["count"] = count
-		des["index"] = index
-		des["instanceId"] = instanceId[0]
-		des["instanceIP"] = instanceId[1]
-		udpbytes, _ := json.Marshal(des)
-		go Udpclient.WriteUdp(udpbytes, mu.OP_SEND_EMAIL, ra)
-		log.Debug("抽取完成", "count:", count, "index:", index, "bidtotal:", ext.BidTotal)
+		log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal)
 	} else { //普通抽取
 		query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
 		count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)

+ 29 - 15
src/web/templates/admin/distribution.html

@@ -10,7 +10,7 @@
 		<h1>
 			<small><a class="btn btn-primary opr" opr="new">申请阿里云实例</a></small>
             <small><a class="btn btn-primary" onclick="upInstanceIds()">更新实例状态</a></small>
-            <small><a class="btn btn-primary opr" opr="idrange">id划段</a></small>
+            <small><a class="btn btn-primary opr" opr="idrange">生成任务</a></small>
             <small><a class="btn btn-primary" onclick="deploy()">部署</a></small>
 		    <small><a class="btn btn-primary" onclick="rangetask()">执行</a></small>
             <small><a class="btn btn-primary opr" opr="releasetime">释放设置</a></small>
@@ -34,9 +34,9 @@
 						<th>ip</th>
                         <th>释放时间(UTC)</th>
 						<th>实例状态</th>
-                        <th>数据量</th>
                         <th>抽取状态</th>
-                        <th>抽取进度</th>
+                        <th>抽取数量</th>
+                        <th>抽取步数</th>
 						<th>操作</th>
 		              </tr>
 		              </thead>
@@ -75,42 +75,38 @@ $(function () {
         },
 		"columns": [
             { "data": "TaskName"},
-            { "data": "HostName"},
+            { "data": "InstanceName"},
 			{ "data": "InstanceId"},
 			{ "data": "ip_nw",render:function(val,a,row,pos){
                 return val+"("+row.ip_ww+")"
             }},
 			{ "data": "AutoReleaseTime"},
             { "data": "Status"},
-            { "data": "extask",render:function(val,a,row,pos){
+            { "data": "extstatus",render:function(val,a,row,pos){
                if(val){
-                    return val[2]
+                    return val
                 }else{
                     return "-"
                 }
             }},
-            { "data": "extstatus",render:function(val,a,row,pos){
+            { "data": "totalnum",render:function(val,a,row,pos){
                if(val){
                     return val
                 }else{
                     return "-"
                 }
             }},
-            { "data": "pagetotal",render:function(val,a,row,pos){
+            { "data": "step",render:function(val,a,row,pos){
                if(val){
-                    if(row.pagecurrent){
-                        return row.pagecurrent+"/"+val
-                    }else{
-                         return "0/"+val
-                    }
-      
+                    return  val
                 }else{
                     return "-"
                 }
             }},
 			{"data":"InstanceId",render:function(val,a,row,pos){
 				return  '<div class="btn-group">'+
-                    '<a class="btn btn-sm btn-primary opr" onclick="ContinueExecution(\''+row.InstanceId+'\')">执行</a>'+
+                    '<a class="btn btn-sm btn-primary" onclick="deployone(\''+row.InstanceId+'\')">部署</a>'+
+                    '<a class="btn btn-sm btn-success opr" onclick="ContinueExecution(\''+row.InstanceId+'\')">执行</a>'+
                     '<a class="btn btn-sm btn-warning" onclick="del(\''+row.InstanceId+'\')">释放</a>'+
                     '<a class="btn btn-sm btn-danger" href="#" onclick="delInstanceId(\''+row.InstanceId+'\')">删&nbsp;&nbsp;除</a>'
                     '</div>';
@@ -204,6 +200,24 @@ function ContinueExecution(_id){
         })
     });
 }
+
+function deployone(_id){
+    showConfirm("确定部署?", function() {
+        $.ajax({
+            url:"/admin/distribution/deployone",
+            type:"post",
+            data:{"InstanceId":_id},
+            success:function(r){
+                if(r.rep){
+                    ttable.ajax.reload();
+                }else{
+                    showTip("执行失败", 1000, function() {});
+                }
+            }
+        })
+    });
+}
+
 function del(_id){
 	showConfirm("确定释放实例?", function() {
         $.ajax({