Prechádzať zdrojové kódy

Merge branch 'master' of http://192.168.3.207:10080/qmx/jy-data-extract

unknown 6 rokov pred
rodič
commit
70cc8157c5

+ 8 - 0
src/jy/admin/distribution/distribution.go

@@ -25,8 +25,16 @@ func init() {
 	//申请ecs实例、id划段
 	Admin.POST("/distribution/runInstances", func(c *gin.Context) {
 		s_table, _ := c.GetPostForm("s_table")
+		instanceid, _ := c.GetPostForm("instanceid")
 		if s_table != "" { //id划段
 			ecs.IdsRange(s_table)
+		} else if instanceid != "" { //实例自动释放时间
+			hours, _ := c.GetPostForm("hours")
+			hour := qu.IntAll(hours)
+			if hour > 0 {
+				log.Println(instanceid, hour)
+				ecs.ModifyInstanceAutoReleaseTime(instanceid, hour)
+			}
 		} else { //申请ecs实例
 			TaskName, _ := c.GetPostForm("s_name")
 			num, _ := c.GetPostForm("i_num")

+ 1 - 1
src/jy/admin/task/task.go

@@ -55,7 +55,7 @@ func init() {
 
 	//获取版本列表
 	Admin.POST("/task/getversion", func(c *gin.Context) {
-		list, b := Mgo.Find("version", `{}`, `{"_id":-1}`, `{"version":1}`, false, -1, -1)
+		list, b := Mgo.Find("version", `{"delete":false}`, `{"_id":-1}`, `{"version":1}`, false, -1, -1)
 		if b && list != nil {
 			for _, v := range *list {
 				v["_id"] = v["version"]

+ 10 - 1
src/jy/cluster/aliecs.go

@@ -86,7 +86,7 @@ func DescribeInstances() {
 					}
 				}
 				if t, ok := tmp["PublicIpAddress"].(map[string]interface{}); ok {
-					if tt, ok := t["IpAddress"].([]interface{}); ok {
+					if tt, ok := t["IpAddress"].([]interface{}); ok && len(tt) > 0 {
 						tmp["ip_ww"] = tt[0]
 					}
 				}
@@ -117,6 +117,15 @@ func DeleteInstance(InstanceId string) {
 	log.Println("DeleteInstance", res)
 }
 
+//实例自动释放时间
+func ModifyInstanceAutoReleaseTime(InstanceId string, hours int) {
+	res := GET("ModifyInstanceAutoReleaseTime", [][]string{
+		[]string{"InstanceId", InstanceId},
+		[]string{"AutoReleaseTime", time.Now().Add(time.Duration(hours) * time.Hour).UTC().Format("2006-01-02T15:04:05Z")},
+	})
+	log.Println("ModifyInstanceAutoReleaseTime", res)
+}
+
 //GET请求
 func GET(action string, param [][]string) (mres map[string]interface{}) {
 	esconfig, _ := ju.Config["esconfig"].(map[string]interface{})

+ 29 - 12
src/jy/cluster/distributed.go

@@ -22,10 +22,10 @@ var EscIds map[string][]string //id区间
 
 //根据esc数量实例数量id划段
 func IdsRange(table string) int {
-	start := time.Date(2015, 11, 0, 0, 0, 0, 0, time.Local)
+	start := time.Date(2015, 11, 3, 0, 0, 0, 0, time.Local)
 	EscIds = map[string][]string{}
 	list, _ := db.Mgo.Find("ecs", `{"Status":"Running"}`, nil, nil, false, -1, -1)
-	ids := RangeIdsByDate(table, len(*list), start)
+	ids := RangeIdsByDate(len(*list), start)
 	for k, v := range *list {
 		db.Mgo.UpdateById("ecs", qu.BsonIdToSId(v["_id"]), map[string]interface{}{
 			"$set": map[string]interface{}{
@@ -71,32 +71,49 @@ func RunEcsTask() int {
 }
 
 //id分段
-func RangeIdsByDate(table string, escnum int, start time.Time) map[string][]string {
+func RangeIdsByDate(escnum int, start time.Time) map[string][]string {
 	ids := map[string][]string{}
 	task, _ := db.Mgo.FindById("task", qu.ObjToString(ju.Config["udptaskid"]), nil)
 	log.Println(qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
-	DB := db.MgoFactory(1, 2, 120, qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
-	total := DB.Count(table, `{}`)
+	DB := db.MgoFactory(2, 3, 120, qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
+	total := DB.Count("bidding", `{}`)
+	total_back := DB.Count("bidding_back", `{}`)
+	total += total_back
 	pagesize := (total + escnum - 1) / escnum
-	log.Printf("total:%d pagesize:%d escnum:%d", total, pagesize, escnum)
+	log.Printf("total:%d total_back:%d pagesize:%d escnum:%d", total, total_back, pagesize, escnum)
+	nums := 0
 	for i := 0; i < escnum; i++ {
 		log.Println("escnum", i)
 		sid := bson.NewObjectIdWithTime(start)
 		var eid bson.ObjectId
 		var idsnum = 0
+		table := "bidding_back"
 		for {
-			end := start.Add(24 * time.Hour)
+			tmpsid := bson.NewObjectIdWithTime(start)
+			end := start.Add(12 * time.Hour)
 			eid = bson.NewObjectIdWithTime(end)
-			query := bson.M{"_id": bson.M{"$gte": sid, "$lt": eid}}
-			count := DB.Count(table, query)
 			start = end
-			log.Printf("i:%d count:%d", i, count)
-			if count >= 200000 || start.Unix() > time.Now().Unix() {
-				idsnum = count
+			query := bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": eid}}
+			count := DB.Count(table, query)
+			log.Println(count, table, query)
+			if count < 1 { //校验是否切换table
+				tmpnum := DB.Count(table, bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": bson.NewObjectIdWithTime(end.Add(24 * 10 * time.Hour))}})
+				if tmpnum < 1 && table != "bidding" {
+					table = "bidding"
+					start = start.Add(-12 * time.Hour)
+					continue
+				}
+			} else {
+				idsnum += count
+			}
+			log.Printf("i:%d count:%d,date:%s", i, idsnum, end.Format(qu.Date_Full_Layout))
+			if idsnum >= pagesize || start.Unix() > time.Now().Unix() || count > 5000000 { //测试数据count > 5000000
 				break
 			}
 		}
+		nums += idsnum
 		ids[fmt.Sprint(i)] = []string{qu.BsonIdToSId(sid), qu.BsonIdToSId(eid), fmt.Sprint(idsnum)}
+		log.Println("nums", nums)
 	}
 	return ids
 }

+ 1 - 0
src/jy/extract/exportask.go

@@ -17,6 +17,7 @@ func Export() {
 	for _, t := range *tk {
 		extractAndExport("v1", t)
 		extractAndExport("v2", t)
+		time.Sleep(20 * time.Second)
 		//生成excel
 		filename := createFile(t)
 		if filename != "" {

+ 12 - 5
src/jy/extract/extract.go

@@ -10,6 +10,7 @@ import (
 	"log"
 	qu "qfw/util"
 	redis "qfw/util/redis"
+	"reflect"
 	"regexp"
 	"strconv"
 	"strings"
@@ -753,10 +754,15 @@ func AnalysisSaveResult(j *ju.Job, e *ExtractTask) {
 		}
 		objects := []*ju.SortObject{}
 		for k, v := range fieldValue {
+			ValueStr := "" //第二排序
+			if reflect.TypeOf(v[1]).String() == "string" {
+				ValueStr = qu.ObjToString(v[1])
+			}
 			tmp := &ju.SortObject{
-				Key:    k,
-				Value:  qu.IntAll(v[0]),
-				Object: v[1],
+				Key:      k,
+				Value:    qu.IntAll(v[0]),
+				Object:   v[1],
+				ValueStr: ValueStr,
 			}
 			objects = append(objects, tmp)
 		}
@@ -765,8 +771,8 @@ func AnalysisSaveResult(j *ju.Job, e *ExtractTask) {
 	//从排序结果中取值
 	tmp := map[string]interface{}{} //抽取值
 	for key, val := range values {
-		for _, v := range val { //取第一个
-			if v.Key != "" {
+		for _, v := range val { //取第一个非负数
+			if v.Key != "" && v.Value > -1 {
 				tmp[key] = v.Object
 				break
 			}
@@ -829,6 +835,7 @@ func AnalysisSaveResult(j *ju.Job, e *ExtractTask) {
 			bs, _ := json.Marshal(j.BlockPackage)
 			tmp["epackage"] = string(bs)
 		}
+		tmp["result"] = result
 		b := db.Mgo.Update(e.TaskInfo.TestColl, `{"_id":"`+_id+`"}`, map[string]interface{}{"$set": tmp}, true, false)
 		if !b {
 			log.Println(e.TaskInfo.TestColl, _id)

+ 32 - 9
src/jy/extract/extractudp.go

@@ -107,23 +107,46 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 	ext.IsRun = true
 
 	query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-	count := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
+	count1 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
+	count2 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", query)
+	count := count1 + count2
 	pageNum := (count + PageSize - 1) / PageSize
 	limit := PageSize
 	if count < PageSize {
 		limit = count
 	}
 	log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
-	for i := 0; i < pageNum; i++ {
+	//接着上次任务执行
+	startI := 0
+	if len(instanceId) > 0 {
+		esc, _ := db.Mgo.FindOne("ecs", `{"InstanceId":"`+instanceId[0]+`"}`)
+		startI = qu.IntAll((*esc)["pagecurrent"])
+	}
+	sidback := sid
+	for i := startI; i < pageNum; i++ {
 		query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}}
 		log.Printf("page=%d,query=%v", i+1, query)
-		list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
-		for _, v := range *list {
-			//log.Println(v["_id"])
-			j := PreInfo(v)
-			ext.TaskInfo.ProcessPool <- true
-			go ext.ExtractProcess(j)
-			sid = qu.BsonIdToSId(v["_id"])
+		if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query) > 0 {
+			list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
+			for _, v := range *list {
+				//log.Println(v["_id"])
+				j := PreInfo(v)
+				ext.TaskInfo.ProcessPool <- true
+				go ext.ExtractProcess(j)
+				sid = qu.BsonIdToSId(v["_id"])
+			}
+		}
+		queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
+		log.Printf("page=%d,queryback=%v", i+1, queryback)
+		if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
+			list2, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, 0, limit)
+			for _, v := range *list2 {
+				//log.Println(v["_id"])
+				j := PreInfo(v)
+				ext.TaskInfo.ProcessPool <- true
+				go ext.ExtractProcess(j)
+				sidback = qu.BsonIdToSId(v["_id"])
+			}
 		}
 		//分布式抽取进度
 		if len(instanceId) > 0 {

+ 82 - 77
src/jy/extract/score.go

@@ -67,93 +67,98 @@ func ScoreFields(j *ju.Job) map[string][]*ju.ExtField {
 			if len(fmt.Sprint(v.Value)) < 1 {
 				continue //空串跳过
 			}
-			//类型打分
-			if v.ExtFrom == "title" {
-				v.Score += qu.IntAll(extractype["title"])
+			//长度超过100个字,直接负分
+			vlen := len([]rune(qu.ObjToString(v.Value)))
+			if vlen > 100 && field != "projectscope" {
+				v.Score = -1
 			} else {
-				if strings.Contains(v.Type, "table") {
-					v.Score += qu.IntAll(extractype["table"])
-				} else if strings.Contains(v.Type, "colon") {
-					v.Score += qu.IntAll(extractype["colon"])
-				} else if strings.Contains(v.Type, "space") {
-					v.Score += qu.IntAll(extractype["space"])
-				} else if strings.Contains(v.Type, "regexp") {
-					v.Score += qu.IntAll(extractype["regexp"])
-				} else if strings.Contains(v.Type, "winnerorder") {
-					v.Score += qu.IntAll(extractype["winnerorder"])
+				//类型打分
+				if v.ExtFrom == "title" {
+					v.Score += qu.IntAll(extractype["title"])
+				} else {
+					if strings.Contains(v.Type, "table") {
+						v.Score += qu.IntAll(extractype["table"])
+					} else if strings.Contains(v.Type, "colon") {
+						v.Score += qu.IntAll(extractype["colon"])
+					} else if strings.Contains(v.Type, "space") {
+						v.Score += qu.IntAll(extractype["space"])
+					} else if strings.Contains(v.Type, "regexp") {
+						v.Score += qu.IntAll(extractype["regexp"])
+					} else if strings.Contains(v.Type, "winnerorder") {
+						v.Score += qu.IntAll(extractype["winnerorder"])
+					}
 				}
-			}
-			//字符型打分
-			if fieldtype == "string" {
-				//位置打分
-				if positions, ok := scoreRule["position"].([]interface{}); ok {
-					for _, position := range positions {
-						if p, ok := position.(map[string]interface{}); ok {
-							qu.Try(func() {
-								if p["regexp"] != nil {
-									reg := p["regexp"].(*regexp.Regexp)
-									if reg.MatchString(qu.ObjToString(v.Value)) {
-										v.Score += qu.IntAll(p["score"])
+				//字符型打分
+				if fieldtype == "string" {
+					//位置打分
+					if positions, ok := scoreRule["position"].([]interface{}); ok {
+						for _, position := range positions {
+							if p, ok := position.(map[string]interface{}); ok {
+								qu.Try(func() {
+									if p["regexp"] != nil {
+										reg := p["regexp"].(*regexp.Regexp)
+										if reg.MatchString(qu.ObjToString(v.Value)) {
+											v.Score += qu.IntAll(p["score"])
+										}
 									}
-								}
-							}, func(err interface{}) {
-								log.Println(err)
-							})
+								}, func(err interface{}) {
+									log.Println(err)
+								})
+							}
 						}
 					}
-				}
-				//长度打分
-				if lengths, ok := scoreRule["length"].([]interface{}); ok {
-					for _, tmp := range lengths {
-						if length, ok := tmp.(map[string]interface{}); ok {
-							min := qu.IntAll(length["min"])
-							max := qu.IntAll(length["max"])
-							vlen := len([]rune(qu.ObjToString(v.Value)))
-							scores, _ := length["score"].([]interface{})
-							if len(scores) < 3 {
-								continue
-							}
-							if vlen < min {
-								v.Score += qu.IntAll(scores[0])
-							} else if vlen > max {
-								v.Score += qu.IntAll(scores[2])
-							} else {
-								v.Score += qu.IntAll(scores[1])
+					//长度打分
+					if lengths, ok := scoreRule["length"].([]interface{}); ok {
+						for _, tmp := range lengths {
+							if length, ok := tmp.(map[string]interface{}); ok {
+								min := qu.IntAll(length["min"])
+								max := qu.IntAll(length["max"])
+								scores, _ := length["score"].([]interface{})
+								if len(scores) < 3 {
+									continue
+								}
+								if vlen < min {
+									v.Score += qu.IntAll(scores[0])
+								} else if vlen > max {
+									v.Score += qu.IntAll(scores[2])
+								} else {
+									v.Score += qu.IntAll(scores[1])
+								}
 							}
 						}
 					}
 				}
-			}
-			//float类型打分
-			if fieldtype == "float" {
-				min := qu.IntAll(scoreRule["min"])
-				max := qu.IntAll(scoreRule["max"])
-				val := qu.IntAll(v.Value)
-				scores, _ := scoreRule["score"].([]interface{})
-				if len(scores) < 3 {
-					continue
-				}
-				if val < min && 0 < val {
-					v.Score += qu.IntAll(scores[0])
-				} else if val > max {
-					v.Score += qu.IntAll(scores[2])
-				} else if val <= max && val >= min {
-					v.Score += qu.IntAll(scores[1])
-				}
-			}
-			//decimal
-			if fieldtype == "decimal" {
-				min := qu.IntAll(scoreRule["min"])
-				max := qu.IntAll(scoreRule["max"])
-				val := qu.IntAll(v.Value)
-				scores, _ := scoreRule["score"].([]interface{})
-				if len(scores) < 3 {
-					continue
+				//float类型打分
+				if fieldtype == "float" {
+					min := qu.IntAll(scoreRule["min"])
+					max := qu.IntAll(scoreRule["max"])
+					val := qu.IntAll(v.Value)
+					scores, _ := scoreRule["score"].([]interface{})
+					if len(scores) < 3 {
+						continue
+					}
+					if val < min && 0 < val {
+						v.Score += qu.IntAll(scores[0])
+					} else if val > max {
+						v.Score += qu.IntAll(scores[2])
+					} else if val <= max && val >= min {
+						v.Score += qu.IntAll(scores[1])
+					}
 				}
-				if val > max {
-					v.Score += qu.IntAll(scores[2])
-				} else if val <= max && val > min {
-					v.Score += qu.IntAll(scores[1])
+				//decimal
+				if fieldtype == "decimal" {
+					min := qu.IntAll(scoreRule["min"])
+					max := qu.IntAll(scoreRule["max"])
+					val := qu.IntAll(v.Value)
+					scores, _ := scoreRule["score"].([]interface{})
+					if len(scores) < 3 {
+						continue
+					}
+					if val > max {
+						v.Score += qu.IntAll(scores[2])
+					} else if val <= max && val > min {
+						v.Score += qu.IntAll(scores[1])
+					}
 				}
 			}
 		}

+ 1 - 1
src/jy/mongodbutil/pool.go

@@ -38,7 +38,7 @@ func (p *Pool) GetLive() int {
 
 func (p *Pool) init() {
 	for i := 0; i < p.initCap; i++ {
-		sess, err := mgo.DialWithTimeout(p.addr, 10*time.Second)
+		sess, err := mgo.DialWithTimeout(p.addr, time.Duration(p.timeout)*time.Second)
 		if sess != nil && sess.Ping() == nil {
 			p.live++
 			p.ch <- &mgosess{sess, time.Now().Unix()}

+ 6 - 3
src/jy/util/sort.go

@@ -6,9 +6,10 @@ import (
 )
 
 type SortObject struct {
-	Key    string
-	Value  int
-	Object interface{}
+	Key      string
+	Value    int
+	ValueStr string
+	Object   interface{}
 }
 
 type SortStruct []*SortObject
@@ -20,6 +21,8 @@ func (list SortStruct) Len() int {
 func (list SortStruct) Less(i, j int) bool {
 	if list[i].Value > list[j].Value {
 		return true
+	} else if list[i].ValueStr > list[j].ValueStr {
+		return true
 	} else {
 		return false
 	}

+ 8 - 0
src/web/templates/admin/distribution.html

@@ -13,6 +13,7 @@
             <small><a class="btn btn-primary opr" opr="idrange">id划段</a></small>
             <small><a class="btn btn-primary" onclick="deploy()">部署</a></small>
 		    <small><a class="btn btn-primary" onclick="rangetask()">执行</a></small>
+            <small><a class="btn btn-primary opr" opr="releasetime">释放设置</a></small>
         </h1>
 		<ol class="breadcrumb">
 		  <li><a href="/admin/distribution"><i class="fa fa-dashboard"></i>分布式抽取</a></li>
@@ -119,6 +120,7 @@ $(function () {
 			switch(n){
 			case "new":
             case "idrange":
+            case "releasetime":
                 if(n=="new"){
                     _tit="批量申请实例"
     				tag=[
@@ -131,6 +133,12 @@ $(function () {
     				tag=[
     					{label:"表名",s_label:"s_table",placeholder:"信息表名",must:true},
     				]   
+                }else if (n=="releasetime"){
+                    _tit="释放设置"
+                    tag=[
+    					{label:"实例ID",s_label:"instanceid",placeholder:"实例ID",must:true},
+                        {label:"顺延时间",s_label:"hours",placeholder:"当前时间起(单位/h)",must:true},
+    				] 
                 } 
 				htmlObj={
 					title:_tit,