浏览代码

分布式抽取

zhangjinkun 6 年之前
父节点
当前提交
db5f45f4ac

+ 8 - 0
src/jy/admin/distribution/distribution.go

@@ -25,8 +25,16 @@ func init() {
 	//申请ecs实例、id划段
 	Admin.POST("/distribution/runInstances", func(c *gin.Context) {
 		s_table, _ := c.GetPostForm("s_table")
+		instanceid, _ := c.GetPostForm("instanceid")
 		if s_table != "" { //id划段
 			ecs.IdsRange(s_table)
+		} else if instanceid != "" { //实例自动释放时间
+			hours, _ := c.GetPostForm("hours")
+			hour := qu.IntAll(hours)
+			if hour > 0 {
+				log.Println(instanceid, hour)
+				ecs.ModifyInstanceAutoReleaseTime(instanceid, hour)
+			}
 		} else { //申请ecs实例
 			TaskName, _ := c.GetPostForm("s_name")
 			num, _ := c.GetPostForm("i_num")

+ 10 - 1
src/jy/cluster/aliecs.go

@@ -86,7 +86,7 @@ func DescribeInstances() {
 					}
 				}
 				if t, ok := tmp["PublicIpAddress"].(map[string]interface{}); ok {
-					if tt, ok := t["IpAddress"].([]interface{}); ok {
+					if tt, ok := t["IpAddress"].([]interface{}); ok && len(tt) > 0 {
 						tmp["ip_ww"] = tt[0]
 					}
 				}
@@ -117,6 +117,15 @@ func DeleteInstance(InstanceId string) {
 	log.Println("DeleteInstance", res)
 }
 
+//实例自动释放时间
+func ModifyInstanceAutoReleaseTime(InstanceId string, hours int) {
+	res := GET("ModifyInstanceAutoReleaseTime", [][]string{
+		[]string{"InstanceId", InstanceId},
+		[]string{"AutoReleaseTime", time.Now().Add(time.Duration(hours) * time.Hour).UTC().Format("2006-01-02T15:04:05Z")},
+	})
+	log.Println("ModifyInstanceAutoReleaseTime", res)
+}
+
 //GET请求
 func GET(action string, param [][]string) (mres map[string]interface{}) {
 	esconfig, _ := ju.Config["esconfig"].(map[string]interface{})

+ 29 - 12
src/jy/cluster/distributed.go

@@ -22,10 +22,10 @@ var EscIds map[string][]string //id区间
 
 //根据esc数量实例数量id划段
 func IdsRange(table string) int {
-	start := time.Date(2015, 11, 0, 0, 0, 0, 0, time.Local)
+	start := time.Date(2015, 11, 3, 0, 0, 0, 0, time.Local)
 	EscIds = map[string][]string{}
 	list, _ := db.Mgo.Find("ecs", `{"Status":"Running"}`, nil, nil, false, -1, -1)
-	ids := RangeIdsByDate(table, len(*list), start)
+	ids := RangeIdsByDate(len(*list), start)
 	for k, v := range *list {
 		db.Mgo.UpdateById("ecs", qu.BsonIdToSId(v["_id"]), map[string]interface{}{
 			"$set": map[string]interface{}{
@@ -71,32 +71,49 @@ func RunEcsTask() int {
 }
 
 //id分段
-func RangeIdsByDate(table string, escnum int, start time.Time) map[string][]string {
+func RangeIdsByDate(escnum int, start time.Time) map[string][]string {
 	ids := map[string][]string{}
 	task, _ := db.Mgo.FindById("task", qu.ObjToString(ju.Config["udptaskid"]), nil)
 	log.Println(qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
-	DB := db.MgoFactory(1, 2, 120, qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
-	total := DB.Count(table, `{}`)
+	DB := db.MgoFactory(2, 3, 120, qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
+	total := DB.Count("bidding", `{}`)
+	total_back := DB.Count("bidding_back", `{}`)
+	total += total_back
 	pagesize := (total + escnum - 1) / escnum
-	log.Printf("total:%d pagesize:%d escnum:%d", total, pagesize, escnum)
+	log.Printf("total:%d total_back:%d pagesize:%d escnum:%d", total, total_back, pagesize, escnum)
+	nums := 0
 	for i := 0; i < escnum; i++ {
 		log.Println("escnum", i)
 		sid := bson.NewObjectIdWithTime(start)
 		var eid bson.ObjectId
 		var idsnum = 0
+		table := "bidding_back"
 		for {
-			end := start.Add(24 * time.Hour)
+			tmpsid := bson.NewObjectIdWithTime(start)
+			end := start.Add(12 * time.Hour)
 			eid = bson.NewObjectIdWithTime(end)
-			query := bson.M{"_id": bson.M{"$gte": sid, "$lt": eid}}
-			count := DB.Count(table, query)
 			start = end
-			log.Printf("i:%d count:%d", i, count)
-			if count >= 200000 || start.Unix() > time.Now().Unix() {
-				idsnum = count
+			query := bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": eid}}
+			count := DB.Count(table, query)
+			log.Println(count, table, query)
+			if count < 1 { //校验是否切换table
+				tmpnum := DB.Count(table, bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": bson.NewObjectIdWithTime(end.Add(24 * 10 * time.Hour))}})
+				if tmpnum < 1 && table != "bidding" {
+					table = "bidding"
+					start = start.Add(-12 * time.Hour)
+					continue
+				}
+			} else {
+				idsnum += count
+			}
+			log.Printf("i:%d count:%d,date:%s", i, idsnum, end.Format(qu.Date_Full_Layout))
+			if idsnum >= pagesize || start.Unix() > time.Now().Unix() || count > 5000000 { //测试数据count > 5000000
 				break
 			}
 		}
+		nums += idsnum
 		ids[fmt.Sprint(i)] = []string{qu.BsonIdToSId(sid), qu.BsonIdToSId(eid), fmt.Sprint(idsnum)}
+		log.Println("nums", nums)
 	}
 	return ids
 }

+ 32 - 9
src/jy/extract/extractudp.go

@@ -107,23 +107,46 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 	ext.IsRun = true
 
 	query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-	count := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
+	count1 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
+	count2 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", query)
+	count := count1 + count2
 	pageNum := (count + PageSize - 1) / PageSize
 	limit := PageSize
 	if count < PageSize {
 		limit = count
 	}
 	log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
-	for i := 0; i < pageNum; i++ {
+	//接着上次任务执行
+	startI := 0
+	if len(instanceId) > 0 {
+		esc, _ := db.Mgo.FindOne("ecs", `{"InstanceId":"`+instanceId[0]+`"}`)
+		startI = qu.IntAll((*esc)["pagecurrent"])
+	}
+	sidback := sid
+	for i := startI; i < pageNum; i++ {
 		query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}}
 		log.Printf("page=%d,query=%v", i+1, query)
-		list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
-		for _, v := range *list {
-			//log.Println(v["_id"])
-			j := PreInfo(v)
-			ext.TaskInfo.ProcessPool <- true
-			go ext.ExtractProcess(j)
-			sid = qu.BsonIdToSId(v["_id"])
+		if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query) > 0 {
+			list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
+			for _, v := range *list {
+				//log.Println(v["_id"])
+				j := PreInfo(v)
+				ext.TaskInfo.ProcessPool <- true
+				go ext.ExtractProcess(j)
+				sid = qu.BsonIdToSId(v["_id"])
+			}
+		}
+		queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
+		log.Printf("page=%d,queryback=%v", i+1, queryback)
+		if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
+			list2, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, 0, limit)
+			for _, v := range *list2 {
+				//log.Println(v["_id"])
+				j := PreInfo(v)
+				ext.TaskInfo.ProcessPool <- true
+				go ext.ExtractProcess(j)
+				sidback = qu.BsonIdToSId(v["_id"])
+			}
 		}
 		//分布式抽取进度
 		if len(instanceId) > 0 {

+ 1 - 1
src/jy/mongodbutil/pool.go

@@ -38,7 +38,7 @@ func (p *Pool) GetLive() int {
 
 func (p *Pool) init() {
 	for i := 0; i < p.initCap; i++ {
-		sess, err := mgo.DialWithTimeout(p.addr, 10*time.Second)
+		sess, err := mgo.DialWithTimeout(p.addr, time.Duration(p.timeout)*time.Second)
 		if sess != nil && sess.Ping() == nil {
 			p.live++
 			p.ch <- &mgosess{sess, time.Now().Unix()}

+ 8 - 0
src/web/templates/admin/distribution.html

@@ -13,6 +13,7 @@
             <small><a class="btn btn-primary opr" opr="idrange">id划段</a></small>
             <small><a class="btn btn-primary" onclick="deploy()">部署</a></small>
 		    <small><a class="btn btn-primary" onclick="rangetask()">执行</a></small>
+            <small><a class="btn btn-primary opr" opr="releasetime">释放设置</a></small>
         </h1>
 		<ol class="breadcrumb">
 		  <li><a href="/admin/distribution"><i class="fa fa-dashboard"></i>分布式抽取</a></li>
@@ -119,6 +120,7 @@ $(function () {
 			switch(n){
 			case "new":
             case "idrange":
+            case "releasetime":
                 if(n=="new"){
                     _tit="批量申请实例"
     				tag=[
@@ -131,6 +133,12 @@ $(function () {
     				tag=[
     					{label:"表名",s_label:"s_table",placeholder:"信息表名",must:true},
     				]   
+                }else if (n=="releasetime"){
+                    _tit="释放设置"
+                    tag=[
+    					{label:"实例ID",s_label:"instanceid",placeholder:"实例ID",must:true},
+                        {label:"顺延时间",s_label:"hours",placeholder:"当前时间起(单位/h)",must:true},
+    				] 
                 } 
 				htmlObj={
 					title:_tit,