瀏覽代碼

抽取结果保存调整

zhangjinkun 6 年之前
父節點
當前提交
61b3507716

+ 8 - 0
src/jy/admin/task/task.go

@@ -8,6 +8,7 @@ import (
 	. "jy/mongodbutil"
 	"net/http"
 	qu "qfw/util"
+	"strings"
 	"time"
 
 	"github.com/gin-contrib/sessions"
@@ -37,6 +38,13 @@ func init() {
 
 	//新增任务、编辑任务
 	Admin.POST("/task/save", func(c *gin.Context) {
+		//判断存储配置是否完整
+		s_mgosavecoll, _ := c.GetPostForm("s_mgosavecoll")
+		strs := strings.Split(s_mgosavecoll, "/")
+		if len(strs) < 3 {
+			c.JSON(200, gin.H{"rep": false, "msg": "保存表输入地址不正确!"})
+			return
+		}
 		data := GetPostForm(c)
 		_id, _ := c.GetPostForm("_id")
 		b := false

+ 2 - 2
src/jy/extract/exportask.go

@@ -49,7 +49,7 @@ func extractAndExport(v string, t map[string]interface{}) {
 		IsEtxLog:    false,
 		ProcessPool: make(chan bool, 5),
 	}
-	e.TaskInfo.DB = db.MgoFactory(1, 3, 120, fmt.Sprint(t["dbaddr"]), fmt.Sprint(t["dbname"]))
+	e.TaskInfo.FDB = db.MgoFactory(1, 3, 120, fmt.Sprint(t["dbaddr"]), fmt.Sprint(t["dbname"]))
 	e.InitRulePres()
 	e.InitRuleBacks()
 	e.InitRuleCore()
@@ -57,7 +57,7 @@ func extractAndExport(v string, t map[string]interface{}) {
 	e.InitClearFn()
 	query := t["query"]
 	limit := qu.IntAll(t["limit"])
-	list, _ := e.TaskInfo.DB.Find(e.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
+	list, _ := e.TaskInfo.FDB.Find(e.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
 	for _, v := range *list {
 		j := PreInfo(v)
 		e.TaskInfo.ProcessPool <- true

+ 17 - 27
src/jy/extract/extract.go

@@ -13,7 +13,6 @@ import (
 	"reflect"
 	"regexp"
 	"strconv"
-	"strings"
 	"sync"
 	"time"
 
@@ -27,7 +26,7 @@ var (
 	TaskList  map[string]*ExtractTask                //任务列表
 	saveLimit = 200                                  //抽取日志批量保存
 	PageSize  = 5000                                 //查询分页
-	Fields    = `{"title":1,"detail":1,"contenthtml":1,"href":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"area":1,"city":1,"comeintime":1,"publishtime":1}`
+	Fields    = `{"title":1,"detail":1,"contenthtml":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"area":1,"city":1,"comeintime":1,"publishtime":1}`
 )
 
 //启动测试抽取
@@ -37,7 +36,7 @@ func StartExtractTestTask(taskId, startId, num, resultcoll, trackcoll string) bo
 	ext.Id = taskId
 	ext.IsRun = true
 	ext.InitTestTaskInfo(resultcoll, trackcoll)
-	ext.TaskInfo.DB = db.MgoFactory(1, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
+	ext.TaskInfo.FDB = db.MgoFactory(1, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
 	ext.InitRulePres()
 	ext.InitRuleBacks()
 	ext.InitRuleCore()
@@ -67,7 +66,7 @@ func RunExtractTestTask(ext *ExtractTask, startId, num string) bool {
 	id := IdTrans(startId)
 	if id.Valid() {
 		query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(startId)}}
-		list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, n)
+		list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, n)
 		for _, v := range *list {
 			//log.Println(v["_id"])
 			j := PreInfo(v)
@@ -93,7 +92,8 @@ func StartExtractTaskId(taskId string) bool {
 		ext.Id = taskId
 		ext.InitTaskInfo()
 	}
-	ext.TaskInfo.DB = db.MgoFactory(2, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
+	ext.TaskInfo.FDB = db.MgoFactory(2, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
+	ext.TaskInfo.TDB = db.MgoFactory(1, 3, 120, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
 	ext.InitRulePres()
 	ext.InitRuleBacks()
 	ext.InitRuleCore()
@@ -136,7 +136,7 @@ func StopExtractTaskId(taskId string) bool {
 func RunExtractTask(taskId string) {
 	ext := TaskList[taskId]
 	query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
-	count := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
+	count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
 	pageNum := (count + PageSize - 1) / PageSize
 	limit := PageSize
 	if count < PageSize {
@@ -146,7 +146,7 @@ func RunExtractTask(taskId string) {
 	for i := 0; i < pageNum; i++ {
 		query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
 		log.Printf("page=%d,query=%v", i+1, query)
-		list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
+		list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
 		for _, v := range *list {
 			//log.Println(v["_id"])
 			if !ext.IsRun {
@@ -179,17 +179,6 @@ func PreInfo(doc map[string]interface{}) *ju.Job {
 	detail = ju.CutLableStr(detail)
 	detail = cut.ClearHtml(detail)
 	doc["detail"] = detail
-	href := qu.ObjToString(doc["href"])
-	if strings.HasPrefix(href, "http://") {
-		href = href[7:]
-	} else if strings.HasPrefix(href, "https://") {
-		href = href[8:]
-	}
-	pos := strings.Index(href, "/")
-	if pos > 0 {
-		href = href[:pos]
-	}
-	doc["domain"] = href
 	toptype := qu.ObjToString(doc["toptype"])
 	if qu.ObjToString(doc["type"]) == "bid" {
 		toptype = "结果"
@@ -202,14 +191,14 @@ func PreInfo(doc map[string]interface{}) *ju.Job {
 		Category:   toptype,
 		Content:    qu.ObjToString(doc["detail"]),
 		SpiderCode: qu.ObjToString(doc["spidercode"]),
-		Domain:     qu.ObjToString(doc["domain"]),
-		Href:       qu.ObjToString(doc["href"]),
-		Title:      qu.ObjToString(doc["title"]),
-		Data:       &doc,
-		City:       qu.ObjToString(doc["city"]),
-		Province:   qu.ObjToString(doc["area"]),
-		Result:     map[string][]*ju.ExtField{},
-		BuyerAddr:  qu.ObjToString(doc["buyeraddr"]),
+		//Domain:     qu.ObjToString(doc["domain"]),
+		//Href:       qu.ObjToString(doc["href"]),
+		Title:     qu.ObjToString(doc["title"]),
+		Data:      &doc,
+		City:      qu.ObjToString(doc["city"]),
+		Province:  qu.ObjToString(doc["area"]),
+		Result:    map[string][]*ju.ExtField{},
+		BuyerAddr: qu.ObjToString(doc["buyeraddr"]),
 	}
 	qu.Try(func() {
 		pretreated.AnalyStart(j)
@@ -826,7 +815,8 @@ func AnalysisSaveResult(j *ju.Job, e *ExtractTask) {
 		tmp["winnerorder"] = j.Winnerorder
 	}
 	for k, v := range *doc {
-		if k == "detail" || k == "contenthtml" {
+		//去重冗余字段
+		if k == "detail" || k == "contenthtml" || k == "site" || k == "spidercode" {
 			continue
 		}
 		if tmp[k] == nil {

+ 29 - 19
src/jy/extract/extractInit.go

@@ -37,8 +37,10 @@ type RuleCore struct {
 type TaskInfo struct {
 	Name, Version, VersionId, TrackColl string    //名称、版本、版本id、追踪记录表
 	FromDbAddr, FromDB, FromColl        string    //抽取数据库地址、库名、表名
-	SaveColl, TestColl, LastExtId       string    //抽取结果表、测试结果表、上次抽取信息id
-	DB                                  *db.Pool  //数据库连接池
+	ToDbAddr, ToDB, ToColl              string    //结果数据库地址、库名、表名
+	TestColl, LastExtId                 string    //测试结果表、上次抽取信息id
+	FDB                                 *db.Pool  //数据库连接池
+	TDB                                 *db.Pool  //数据库连接池
 	IsEtxLog                            bool      //是否开启抽取日志
 	ProcessPool                         chan bool //任务进程池
 	TestLua                             bool      //检查测试用
@@ -121,21 +123,29 @@ func (e *ExtractTask) InitTaskInfo() {
 	log.Println("task", task)
 	if len(*task) > 1 {
 		v, _ := db.Mgo.FindOne("version", `{"version":"`+(*task)["s_version"].(string)+`","delete":false}`)
-		e.TaskInfo = &TaskInfo{
-			Name:      (*task)["s_taskname"].(string),
-			Version:   (*task)["s_version"].(string),
-			VersionId: qu.BsonIdToSId((*v)["_id"]),
-			//TrackColl:   (*task)["s_trackcoll"].(string),
-			FromDbAddr:  (*task)["s_mgoaddr"].(string),
-			FromDB:      (*task)["s_mgodb"].(string),
-			FromColl:    (*task)["s_mgocoll"].(string),
-			SaveColl:    (*task)["s_mgosavecoll"].(string),
-			IsEtxLog:    false, //qu.If(qu.IntAll((*task)["i_track"]) == 1, true, false).(bool),
-			LastExtId:   qu.ObjToString((*task)["s_extlastid"]),
-			ProcessPool: make(chan bool, qu.IntAllDef((*task)["i_process"], 1)),
-		}
-		if (*v)["isextractcity"] != nil {
-			e.IsExtractCity = (*v)["isextractcity"].(bool)
+		strs := strings.Split((*task)["s_mgosavecoll"].(string), "/")
+		log.Println("s_mgosavecoll", strs)
+		if len(strs) < 3 {
+			return
+		} else {
+			e.TaskInfo = &TaskInfo{
+				Name:      (*task)["s_taskname"].(string),
+				Version:   (*task)["s_version"].(string),
+				VersionId: qu.BsonIdToSId((*v)["_id"]),
+				//TrackColl:   (*task)["s_trackcoll"].(string),
+				FromDbAddr:  (*task)["s_mgoaddr"].(string),
+				FromDB:      (*task)["s_mgodb"].(string),
+				FromColl:    (*task)["s_mgocoll"].(string),
+				ToDbAddr:    strs[0],
+				ToDB:        strs[1],
+				ToColl:      strs[2],
+				IsEtxLog:    false, //qu.If(qu.IntAll((*task)["i_track"]) == 1, true, false).(bool),
+				LastExtId:   qu.ObjToString((*task)["s_extlastid"]),
+				ProcessPool: make(chan bool, qu.IntAllDef((*task)["i_process"], 1)),
+			}
+			if (*v)["isextractcity"] != nil {
+				e.IsExtractCity = (*v)["isextractcity"].(bool)
+			}
 		}
 		log.Println(e.TaskInfo.Name, "thread:", qu.IntAllDef((*task)["i_process"], 1))
 	} else {
@@ -711,7 +721,7 @@ func (e *ExtractTask) BidSave() {
 			arr := e.BidArr[:500]
 			go func(tmp *[][]map[string]interface{}) {
 				qu.Try(func() {
-					db.Mgo.UpSertBulk(e.TaskInfo.SaveColl, *tmp...)
+					db.Mgo.UpSertBulk(e.TaskInfo.ToColl, *tmp...)
 					<-e.BidChanel
 				}, func(err interface{}) {
 					log.Println(err)
@@ -724,7 +734,7 @@ func (e *ExtractTask) BidSave() {
 			arr := e.BidArr
 			go func(tmp *[][]map[string]interface{}) {
 				qu.Try(func() {
-					db.Mgo.UpSertBulk(e.TaskInfo.SaveColl, *tmp...)
+					db.Mgo.UpSertBulk(e.TaskInfo.ToColl, *tmp...)
 					<-e.BidChanel
 				}, func(err interface{}) {
 					log.Println(err)

+ 8 - 8
src/jy/extract/extractudp.go

@@ -87,7 +87,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 	ext := &ExtractTask{}
 	ext.Id = qu.ObjToString(ju.Config["udptaskid"])
 	ext.InitTaskInfo()
-	ext.TaskInfo.DB = db.MgoFactory(2, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
+	ext.TaskInfo.FDB = db.MgoFactory(2, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
 	ext.InitRulePres()
 	ext.InitRuleBacks()
 	ext.InitRuleCore()
@@ -108,8 +108,8 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 
 	if len(instanceId) > 0 { //分布式抽取进度
 		query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-		count1 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
-		count2 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", query)
+		count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
+		count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
 		count := count1 + count2
 		pageNum := (count + PageSize - 1) / PageSize
 		limit := PageSize
@@ -132,8 +132,8 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 		for i := startI; i < pageNum; i++ {
 			query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
 			log.Printf("page=%d,query=%v", i+1, query)
-			if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query) > 0 {
-				list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
+			if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) > 0 {
+				list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
 				for _, v := range *list {
 					//log.Println(v["_id"])
 					j := PreInfo(v)
@@ -148,8 +148,8 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 			}
 			queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
 			log.Printf("page=%d,queryback=%v", i+1, queryback)
-			if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
-				list2, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
+			if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
+				list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
 				for _, v := range *list2 {
 					//log.Println(v["_id"])
 					j := PreInfo(v)
@@ -170,7 +170,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 		}
 	} else { //普通抽取
 		query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-		list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
+		list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
 		for _, v := range *list {
 			//log.Println(v["_id"])
 			j := PreInfo(v)

+ 4 - 4
src/web/templates/admin/task_list.html

@@ -96,10 +96,10 @@ $(function () {
 				/*表单*/
 				addtask=[
 					{label:"任务名称",s_label:"s_taskname",placeholder:"剑鱼抽取",must:true},
-					{label:"数据库连接",s_label:"s_mgoaddr",must:true},
-					{label:"数据库",s_label:"s_mgodb",must:true},
-					{label:"数据库表",s_label:"s_mgocoll",must:true},
-					{label:"保存表",s_label:"s_mgosavecoll",must:true},
+					{label:"库连接",s_label:"s_mgoaddr",must:true},
+					{label:"数据库",s_label:"s_mgodb",must:true},
+					{label:"表",s_label:"s_mgocoll",must:true},
+					{label:"保存表",s_label:"s_mgosavecoll",placeholder:"127.0.0.1:27080/extract/bidding(数据库地址/数据库/表)",must:true},
 					{label:"描述",s_label:"s_descript",type:"tpl_text"},
 					/*
 					{label:"是否追踪",s_label:"i_track",type:"tpl_list_local",must:true,list:[{"s_name":"是","_id":1},{"s_name":"否","_id":0}],default:0,fun:function(){