Explorar o código

Merge branch 'dev3.2' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.2

maxiaoshan %!s(int64=6) %!d(string=hai) anos
pai
achega
24ef5f7f92
Modificáronse 4 ficheiros con 50 adicións e 25 borrados
  1. 2 2
      src/jy/extract/exportask.go
  2. 26 9
      src/jy/extract/extract.go
  3. 6 6
      src/jy/extract/extractudp.go
  4. 16 8
      udps/main.go

+ 2 - 2
src/jy/extract/exportask.go

@@ -66,9 +66,9 @@ func extractAndExport(v string, t map[string]interface{}) {
 		if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
 			continue
 		}
-		j := PreInfo(v)
+		j, jf := PreInfo(v, false)
 		e.TaskInfo.ProcessPool <- true
-		go e.ExtractProcess(j)
+		go e.ExtractProcess(j, jf)
 	}
 }
 

+ 26 - 9
src/jy/extract/extract.go

@@ -78,9 +78,9 @@ func RunExtractTestTask(ext *ExtractTask, startId, num string) bool {
 				continue
 			}
 			//log.Println(v["_id"])
-			j := PreInfo(v)
+			j, jf := PreInfo(v, false)
 			ext.TaskInfo.ProcessPool <- true
-			go ext.ExtractProcess(j)
+			go ext.ExtractProcess(j, jf)
 		}
 		return true
 	} else {
@@ -171,9 +171,9 @@ func RunExtractTask(taskId string) {
 			if !ext.IsRun {
 				break
 			}
-			j := PreInfo(v)
+			j, jf := PreInfo(v, false)
 			ext.TaskInfo.ProcessPool <- true
-			go ext.ExtractProcess(j)
+			go ext.ExtractProcess(j, jf)
 			ext.TaskInfo.LastExtId = _id
 		}
 		db.Mgo.UpdateById("task", ext.Id, `{"$set":{"s_extlastid":"`+ext.TaskInfo.LastExtId+`"}}`)
@@ -186,7 +186,7 @@ func RunExtractTask(taskId string) {
 }
 
 //信息预处理
-func PreInfo(doc map[string]interface{}) *ju.Job {
+func PreInfo(doc map[string]interface{}, isextFile bool) (j, jf *ju.Job) {
 	defer qu.Catch()
 	detail := ""
 	d1, _ := doc["detail"].(string)
@@ -199,6 +199,7 @@ func PreInfo(doc map[string]interface{}) *ju.Job {
 	detail = ju.CutLableStr(detail)
 	detail = cut.ClearHtml(detail)
 	doc["detail"] = detail
+	doc["detailfile"] = "" //附件文本堆一起(后期可以考虑,分开处理)
 	toptype := qu.ObjToString(doc["toptype"])
 	if qu.ObjToString(doc["type"]) == "bid" {
 		toptype = "结果"
@@ -206,7 +207,7 @@ func PreInfo(doc map[string]interface{}) *ju.Job {
 	if toptype == "" {
 		toptype = "*"
 	}
-	j := &ju.Job{
+	j = &ju.Job{
 		SourceMid:  qu.BsonIdToSId(doc["_id"]),
 		Category:   toptype,
 		Content:    qu.ObjToString(doc["detail"]),
@@ -220,17 +221,33 @@ func PreInfo(doc map[string]interface{}) *ju.Job {
 		Result:    map[string][]*ju.ExtField{},
 		BuyerAddr: qu.ObjToString(doc["buyeraddr"]),
 	}
+	if isextFile {
+		jf = &ju.Job{
+			SourceMid:  qu.BsonIdToSId(doc["_id"]),
+			Category:   toptype,
+			Content:    qu.ObjToString(doc["detailfile"]),
+			SpiderCode: qu.ObjToString(doc["spidercode"]),
+			Title:      qu.ObjToString(doc["title"]),
+			Data:       &doc,
+			City:       qu.ObjToString(doc["city"]),
+			Province:   qu.ObjToString(doc["area"]),
+			Result:     map[string][]*ju.ExtField{},
+			BuyerAddr:  qu.ObjToString(doc["buyeraddr"]),
+		}
+	}
 	qu.Try(func() {
-
 		pretreated.AnalyStart(j)
+		if isextFile {
+			pretreated.AnalyStart(jf)
+		}
 	}, func(err interface{}) {
 		log.Println("pretreated.AnalyStart", err)
 	})
-	return j
+	return j, jf
 }
 
 //抽取
-func (e *ExtractTask) ExtractProcess(j *ju.Job) {
+func (e *ExtractTask) ExtractProcess(j, jf *ju.Job) {
 	qu.Try(func() {
 		doc := *j.Data
 		//全局前置规则,结果覆盖doc属性

+ 6 - 6
src/jy/extract/extractudp.go

@@ -145,9 +145,9 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 					}
 					_id := qu.BsonIdToSId(v["_id"])
 					log.Println(_id)
-					j := PreInfo(v)
+					j, jf := PreInfo(v, false)
 					ext.TaskInfo.ProcessPool <- true
-					go ext.ExtractProcess(j)
+					go ext.ExtractProcess(j, jf)
 					sid = _id
 				}
 				db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
@@ -165,9 +165,9 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 					}
 					_id := qu.BsonIdToSId(v["_id"])
 					log.Println(_id)
-					j := PreInfo(v)
+					j, jf := PreInfo(v, false)
 					ext.TaskInfo.ProcessPool <- true
-					go ext.ExtractProcess(j)
+					go ext.ExtractProcess(j, jf)
 					sidback = _id
 				}
 				db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
@@ -199,9 +199,9 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 				}
 				_id := qu.BsonIdToSId(v["_id"])
 				log.Println(_id)
-				j := PreInfo(v)
+				j, jf := PreInfo(v, false)
 				ext.TaskInfo.ProcessPool <- true
-				go ext.ExtractProcess(j)
+				go ext.ExtractProcess(j, jf)
 				sid = _id
 			}
 

+ 16 - 8
udps/main.go

@@ -15,29 +15,37 @@ import (
 var udpclient mu.UdpClient //udp对象
 var nextNodes []map[string]interface{}
 
-var startDate, endDate, ip, port, stype string
+var startDate, endDate, ip, port, stype, sid, eid string
 
 func main() {
 	//2015-11-03,2017-04-01
 	//2017-04-01,2017-06-01
 	//2017-06-01,2018-06-01
 	//2018-06-01,2019-02-20
+	flag.StringVar(&sid, "sid", "", "开始id")
+	flag.StringVar(&eid, "eid", "", "结束id")
 	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
 	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
 	flag.StringVar(&port, "port", "", "dup端口")
 	flag.StringVar(&stype, "stype", "", "stype")
 	flag.Parse()
-	log.Println(startDate, endDate, ip, port, stype)
-	start, _ := time.ParseInLocation(qu.Date_Short_Layout, startDate, time.Local)
-	end, _ := time.ParseInLocation(qu.Date_Short_Layout, endDate, time.Local)
-	sid := bson.NewObjectIdWithTime(start)
-	eid := bson.NewObjectIdWithTime(end)
+	var startid, endid bson.ObjectId
+	if sid != "" && eid != "" {
+		startid = qu.StringTOBsonId(sid)
+		endid = qu.StringTOBsonId(eid)
+	} else {
+		start, _ := time.ParseInLocation(qu.Date_Short_Layout, startDate, time.Local)
+		end, _ := time.ParseInLocation(qu.Date_Short_Layout, endDate, time.Local)
+		startid = bson.NewObjectIdWithTime(start)
+		endid = bson.NewObjectIdWithTime(end)
+	}
+	log.Println(startid, startid, ip, port, stype)
 	udpclient = mu.UdpClient{Local: ":1470", BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	by, _ := json.Marshal(map[string]interface{}{
-		"gtid":  sid,
-		"lteid": eid,
+		"gtid":  startid,
+		"lteid": endid,
 		"stype": stype,
 	})
 	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{