Browse Source

Merge branch 'dev3.1.2' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.1.2

unknown 6 years ago
parent
commit
accacd04ff

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>

+ 14 - 12
src/jy/cluster/distributed.go

@@ -35,6 +35,7 @@ func IdsRange(table, sdate, edate string) int {
 					ids[fmt.Sprint(k)][1],
 					ids[fmt.Sprint(k)][2],
 					qu.ObjToString(v["InstanceId"]),
+					ids[fmt.Sprint(k)][3],
 				},
 			},
 		})
@@ -82,44 +83,45 @@ func RangeIdsByDate(escnum int, start, end time.Time) map[string][]string {
 	total_back := DB.Count("bidding_back", bson.M{"_id": bson.M{"$gte": bson.NewObjectIdWithTime(start), "$lt": bson.NewObjectIdWithTime(end)}})
 	total += total_back
 	pagesize := (total + escnum - 1) / escnum
-	log.Printf("total:%d total_back:%d pagesize:%d escnum:%d", total, total_back, pagesize, escnum)
+	log.Printf("total:%d pagesize:%d escnum:%d", total, pagesize, escnum)
 	nums := 0
+	table := "bidding_back"
 	for i := 0; i < escnum; i++ {
 		log.Println("escnum", i)
 		sid := bson.NewObjectIdWithTime(start)
 		var eid bson.ObjectId
 		var idsnum = 0
-		table := "bidding_back"
 		for {
 			tmpsid := bson.NewObjectIdWithTime(start)
-			end := start.Add(4 * time.Hour)
-			if end.Unix() > end.Unix() {
+			endi := start.Add(4 * time.Hour)
+			if endi.Unix() > end.Unix() {
 				eid = bson.NewObjectIdWithTime(end)
 			} else {
-				eid = bson.NewObjectIdWithTime(end)
+				eid = bson.NewObjectIdWithTime(endi)
 			}
-			start = end
+			start = endi
 			query := bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": eid}}
 			count := DB.Count(table, query)
-			log.Println(count, table, query)
+			//log.Println(count, table, query)
 			if count < 1 { //校验是否切换table
-				tmpnum := DB.Count(table, bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": bson.NewObjectIdWithTime(end.Add(24 * 10 * time.Hour) /*连续10天无数据*/)}})
-				if tmpnum < 1 && table != "bidding" {
+				tmpnum := DB.Count(table, bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": bson.NewObjectIdWithTime(endi.Add(30 * 24 * time.Hour))}})
+				if tmpnum < 1 && table == "bidding_back" {
 					table = "bidding"
 					start = start.Add(-4 * time.Hour)
+					log.Println("切换table,bidding", start)
 					continue
 				}
 			} else {
 				idsnum += count
 			}
-			log.Printf("i:%d count:%d,date:%s", i, idsnum, end.Format(qu.Date_Full_Layout))
+			//log.Printf("i:%d count:%d,date:%s", i, idsnum, end.Format(qu.Date_Full_Layout))
 			if idsnum >= pagesize || start.Unix() > time.Now().Unix() || count > 5000000 { //测试数据count > 5000000
 				break
 			}
 		}
 		nums += idsnum
-		ids[fmt.Sprint(i)] = []string{qu.BsonIdToSId(sid), qu.BsonIdToSId(eid), fmt.Sprint(idsnum)}
-		log.Println("nums", nums)
+		ids[fmt.Sprint(i)] = []string{qu.BsonIdToSId(sid), qu.BsonIdToSId(eid), fmt.Sprint(idsnum), table}
+		log.Println("nums", nums, table)
 	}
 	return ids
 }

+ 111 - 105
src/jy/extract/extpackage.go

@@ -4,138 +4,144 @@ package extract
 import (
 	"jy/clear"
 	ju "jy/util"
+	"log"
 	qu "qfw/util"
 	"reflect"
 )
 
 //处理分包信息
 func PackageDetail(j *ju.Job, e *ExtractTask) {
-	if len(j.BlockPackage) > 0 {
-		packageResult := map[string]map[string]interface{}{}
-		packagenum := len(j.BlockPackage)
-		for pkName, pkg := range j.BlockPackage {
-			//是否清理标记
-			clearmap := map[string]bool{}
-			sonJobResult := map[string]interface{}{}
-			sonJobResult["text"] = pkg.Text
-			sonJobResult["origin"] = pkg.Origin
-			sonJobResult["type"] = pkg.Type
-			sonJobResult["winnerorder"] = pkg.WinnerOrder
-			for k, tags := range e.Tag {
-			L:
-				for _, tag := range tags {
-					if pkg.TableKV != nil {
-						for key, val := range pkg.TableKV.Kv {
-							if tag.Key == key {
-								clearmap[k] = false
-								var tmpval interface{}
-								if len(e.ClearFn[k]) > 0 {
-									data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
-									tmpval = data[0]
-								} else {
-									tmpval = val
-								}
-								sonJobResult[k] = tmpval
-								if packagenum == 1 {
-									field := &ju.ExtField{
-										Field:     k,
-										Code:      "package",
-										RuleText:  "package",
-										Type:      "table",
-										MatchType: "tag_string",
-										ExtFrom:   "package",
-										Value:     tmpval,
-										Score:     0,
+	qu.Try(func() {
+		if len(j.BlockPackage) > 0 {
+			packageResult := map[string]map[string]interface{}{}
+			packagenum := len(j.BlockPackage)
+			for pkName, pkg := range j.BlockPackage {
+				//是否清理标记
+				clearmap := map[string]bool{}
+				sonJobResult := map[string]interface{}{}
+				sonJobResult["text"] = pkg.Text
+				sonJobResult["origin"] = pkg.Origin
+				sonJobResult["type"] = pkg.Type
+				sonJobResult["winnerorder"] = pkg.WinnerOrder
+				for k, tags := range e.Tag {
+				L:
+					for _, tag := range tags {
+						if pkg.TableKV != nil {
+							for key, val := range pkg.TableKV.Kv {
+								if tag.Key == key {
+									clearmap[k] = false
+									var tmpval interface{}
+									if len(e.ClearFn[k]) > 0 {
+										data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
+										tmpval = data[0]
+									} else {
+										tmpval = val
+									}
+									sonJobResult[k] = tmpval
+									if packagenum == 1 {
+										field := &ju.ExtField{
+											Field:     k,
+											Code:      "package",
+											RuleText:  "package",
+											Type:      "table",
+											MatchType: "tag_string",
+											ExtFrom:   "package",
+											Value:     tmpval,
+											Score:     0,
+										}
+										j.Result[k] = append(j.Result[k], field)
 									}
-									j.Result[k] = append(j.Result[k], field)
+									break L
 								}
-								break L
 							}
 						}
-					}
-					if pkg.ColonKV != nil {
-						for key, val := range pkg.ColonKV.Kv {
-							if tag.Key == key {
-								clearmap[k] = true
-								var tmpval interface{}
-								if len(e.ClearFn[k]) > 0 {
-									data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
-									tmpval = data[0]
-								} else {
-									tmpval = val
-								}
-								sonJobResult[k] = tmpval
-								if packagenum == 1 {
-									field := &ju.ExtField{
-										Field:     k,
-										Code:      "package",
-										RuleText:  "package",
-										Type:      "colon",
-										MatchType: "tag_string",
-										ExtFrom:   "package",
-										Value:     tmpval,
-										Score:     0,
+						if pkg.ColonKV != nil {
+							for key, val := range pkg.ColonKV.Kv {
+								if tag.Key == key {
+									clearmap[k] = true
+									var tmpval interface{}
+									if len(e.ClearFn[k]) > 0 {
+										data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
+										tmpval = data[0]
+									} else {
+										tmpval = val
 									}
-									j.Result[k] = append(j.Result[k], field)
+									sonJobResult[k] = tmpval
+									if packagenum == 1 {
+										field := &ju.ExtField{
+											Field:     k,
+											Code:      "package",
+											RuleText:  "package",
+											Type:      "colon",
+											MatchType: "tag_string",
+											ExtFrom:   "package",
+											Value:     tmpval,
+											Score:     0,
+										}
+										j.Result[k] = append(j.Result[k], field)
+									}
+									break L
 								}
-								break L
 							}
 						}
-					}
-					if pkg.SpaceKV != nil {
-						for key, val := range pkg.SpaceKV.Kv {
-							if tag.Key == key {
-								clearmap[k] = true
-								var tmpval interface{}
-								if len(e.ClearFn[k]) > 0 {
-									data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
-									tmpval = data[0]
-								} else {
-									tmpval = val
-								}
-								sonJobResult[k] = tmpval
-								if packagenum == 1 {
-									field := &ju.ExtField{
-										Field:     k,
-										Code:      "package",
-										RuleText:  "package",
-										Type:      "space",
-										MatchType: "tag_string",
-										ExtFrom:   "package",
-										Value:     tmpval,
-										Score:     0,
+						if pkg.SpaceKV != nil {
+							for key, val := range pkg.SpaceKV.Kv {
+								if tag.Key == key {
+									clearmap[k] = true
+									var tmpval interface{}
+									if len(e.ClearFn[k]) > 0 {
+										data := clear.DoClearFn(e.ClearFn[k], []interface{}{val, j.Content})
+										tmpval = data[0]
+									} else {
+										tmpval = val
+									}
+									sonJobResult[k] = tmpval
+									if packagenum == 1 {
+										field := &ju.ExtField{
+											Field:     k,
+											Code:      "package",
+											RuleText:  "package",
+											Type:      "space",
+											MatchType: "tag_string",
+											ExtFrom:   "package",
+											Value:     tmpval,
+											Score:     0,
+										}
+										j.Result[k] = append(j.Result[k], field)
 									}
-									j.Result[k] = append(j.Result[k], field)
+									break L
 								}
-								break L
 							}
 						}
 					}
 				}
-			}
-			//如果有中标候选人排序,优先用第一中标候选人的中标单位和中标金额覆盖该包里面相应的字段的值
-			if pkg.WinnerOrder != nil && len(pkg.WinnerOrder) > 0 {
-				firstWinnerOrder := pkg.WinnerOrder[0]
-				if qu.ObjToString(sonJobResult["winner"]) == "" || (!pkg.Accuracy && qu.ObjToString(firstWinnerOrder["entname"]) != "" && qu.Int64All(firstWinnerOrder["sort"]) == 1) {
-					sonJobResult["winner"] = firstWinnerOrder["entname"]
-				}
-				if qu.Float64All(sonJobResult["bidamount"]) == 0 || (!pkg.Accuracy && qu.Float64All(firstWinnerOrder["price"]) > 0 && qu.Int64All(firstWinnerOrder["sort"]) == 1) {
-					sonJobResult["bidamount"] = firstWinnerOrder["price"]
+				//如果有中标候选人排序,优先用第一中标候选人的中标单位和中标金额覆盖该包里面相应的字段的值
+				if pkg.WinnerOrder != nil && len(pkg.WinnerOrder) > 0 {
+					firstWinnerOrder := pkg.WinnerOrder[0]
+					if qu.ObjToString(sonJobResult["winner"]) == "" || (!pkg.Accuracy && qu.ObjToString(firstWinnerOrder["entname"]) != "" && qu.Int64All(firstWinnerOrder["sort"]) == 1) {
+						sonJobResult["winner"] = firstWinnerOrder["entname"]
+					}
+					if qu.Float64All(sonJobResult["bidamount"]) == 0 || (!pkg.Accuracy && qu.Float64All(firstWinnerOrder["price"]) > 0 && qu.Int64All(firstWinnerOrder["sort"]) == 1) {
+						sonJobResult["bidamount"] = firstWinnerOrder["price"]
+					}
 				}
+				//log.Println(pkName, sonJobResult)
+				sonJobResult["clear"] = clearmap
+				packageResult[pkName] = sonJobResult
+			}
+			if len(packageResult) > 0 {
+				j.PackageInfo = packageResult
 			}
-			//log.Println(pkName, sonJobResult)
-			sonJobResult["clear"] = clearmap
-			packageResult[pkName] = sonJobResult
-		}
-		if len(packageResult) > 0 {
-			j.PackageInfo = packageResult
 		}
-	}
-	extRegBackPack(j, e)
+		extRegBackPack(j, e)
+	}, func(err interface{}) {
+		log.Println("PackageDetail err", err)
+	})
 }
 
 //清理分包信息
 func extRegBackPack(j *ju.Job, e *ExtractTask) {
+	defer qu.Catch()
 	//正则清理
 	for _, rc := range e.RuleCores {
 		for pk, pack := range j.PackageInfo {

+ 78 - 62
src/jy/extract/extract.go

@@ -89,6 +89,7 @@ func RunExtractTestTask(ext *ExtractTask, startId, num string) bool {
 
 //启动抽取
 func StartExtractTaskId(taskId string) bool {
+	defer qu.Catch()
 	isgo := false
 	ext := TaskList[taskId]
 	if ext == nil {
@@ -133,6 +134,7 @@ func StartExtractTaskId(taskId string) bool {
 
 //停止抽取
 func StopExtractTaskId(taskId string) bool {
+	defer qu.Catch()
 	ext := TaskList[taskId]
 	if ext != nil {
 		ext.IsRun = false
@@ -145,6 +147,7 @@ func StopExtractTaskId(taskId string) bool {
 
 //开始抽取
 func RunExtractTask(taskId string) {
+	defer qu.Catch()
 	ext := TaskList[taskId]
 	query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
 	count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
@@ -182,6 +185,7 @@ func RunExtractTask(taskId string) {
 
 //信息预处理
 func PreInfo(doc map[string]interface{}) *ju.Job {
+	defer qu.Catch()
 	detail := ""
 	d1, _ := doc["detail"].(string)
 	d2, _ := doc["contenthtml"].(string)
@@ -312,15 +316,16 @@ func (e *ExtractTask) ExtractProcess(j *ju.Job) {
 		//		log.Println("抽取结果", j.Title, j.SourceMid, string(bs))
 		//分析抽取结果并保存 todo
 		AnalysisSaveResult(j, e)
+		<-e.TaskInfo.ProcessPool
 	}, func(err interface{}) {
-		log.Println((*j.Data)["_id"], err)
+		log.Println("ExtractProcess err", err, (*j.Data)["_id"])
 		<-e.TaskInfo.ProcessPool
 	})
-	<-e.TaskInfo.ProcessPool
 }
 
 //前置过滤
 func ExtRegPre(doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, t *TaskInfo) map[string]interface{} {
+	defer qu.Catch()
 	before := ju.DeepCopy(doc).(map[string]interface{})
 	extinfo := map[string]interface{}{}
 	if in.IsLua {
@@ -345,6 +350,7 @@ func ExtRegPre(doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, t *TaskInf
 
 //抽取-规则
 func ExtRegCore(extfrom string, doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, et *ExtractTask) {
+	defer qu.Catch()
 	//废标、流标、ppp等跳过
 	b := IsExtract(in.Field, j.Title, j.Content)
 	if !b {
@@ -385,6 +391,7 @@ func ExtRegCore(extfrom string, doc map[string]interface{}, j *ju.Job, in *RegLu
 
 //lua脚本根据属性设置提取kv值
 func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]*Tag) map[string][]map[string]interface{} {
+	defer qu.Catch()
 	kvmap := map[string][]map[string]interface{}{}
 	for fieldname, field := range in.LFields {
 		lock.Lock()
@@ -547,6 +554,7 @@ func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]
 
 //正则提取结果
 func extRegCoreToResult(extfrom, text string, j *ju.Job, v *RegLuaInfo) map[string][]map[string]interface{} {
+	defer qu.Catch()
 	extinfo := map[string][]map[string]interface{}{}
 	if v.RegCore.Bextract { //正则是两部分的,可以直接抽取的(含下划线)
 		apos := v.RegCore.Reg.FindAllStringSubmatchIndex(text, -1)
@@ -614,6 +622,7 @@ func extRegCoreToResult(extfrom, text string, j *ju.Job, v *RegLuaInfo) map[stri
 
 //后置过滤
 func ExtRegBack(j *ju.Job, in *RegLuaInfo, t *TaskInfo) {
+	defer qu.Catch()
 	if in.IsLua {
 		result := GetResultMapForLua(j)
 		lua := ju.LuaScript{Code: in.Code, Name: in.Name, Result: result, Script: in.RuleText}
@@ -695,6 +704,7 @@ func ExtRegBack(j *ju.Job, in *RegLuaInfo, t *TaskInfo) {
 
 //获取抽取结果map[string][]interface{},lua脚本使用
 func GetResultMapForLua(j *ju.Job) map[string][]map[string]interface{} {
+	defer qu.Catch()
 	result := map[string][]map[string]interface{}{}
 	for key, val := range j.Result {
 		if result[key] == nil {
@@ -718,6 +728,7 @@ func GetResultMapForLua(j *ju.Job) map[string][]map[string]interface{} {
 
 //抽取日志
 func AddExtLog(ftype, sid string, before interface{}, extinfo interface{}, v *RegLuaInfo, t *TaskInfo) {
+	defer qu.Catch()
 	if !t.IsEtxLog {
 		return
 	}
@@ -742,6 +753,7 @@ func AddExtLog(ftype, sid string, before interface{}, extinfo interface{}, v *Re
 
 //保存抽取日志
 func SaveExtLog() {
+	defer qu.Catch()
 	tmpLogs := map[*TaskInfo][]map[string]interface{}{}
 	lock.Lock()
 	tmpLogs = ExtLogs
@@ -773,77 +785,78 @@ type FieldValue struct {
 
 //分析抽取结果并保存
 func AnalysisSaveResult(j *ju.Job, e *ExtractTask) {
-	doc := j.Data
-	result := j.Result
-	_id := qu.BsonIdToSId((*doc)["_id"])
-	iscore, _ := ju.Config["fieldscore"].(bool)
-	if iscore { //打分
-		result = ScoreFields(j)
-	}
-	//结果排序
-	values := map[string][]*ju.SortObject{}
-	for key, val := range result {
-		fieldValue := map[string][]interface{}{}
-		if iscore { //走打分
-			for _, v := range val {
-				if len(fmt.Sprint(v.Value)) < 1 {
-					continue //去除空串
+	qu.Try(func() {
+		doc := j.Data
+		result := j.Result
+		_id := qu.BsonIdToSId((*doc)["_id"])
+		iscore, _ := ju.Config["fieldscore"].(bool)
+		if iscore { //打分
+			result = ScoreFields(j)
+		}
+		//结果排序
+		values := map[string][]*ju.SortObject{}
+		for key, val := range result {
+			fieldValue := map[string][]interface{}{}
+			if iscore { //走打分
+				for _, v := range val {
+					if len(fmt.Sprint(v.Value)) < 1 {
+						continue //去除空串
+					}
+					fieldValue[fmt.Sprint(v.Value)+v.Type] = []interface{}{v.Score, v.Value}
+				}
+			} else { //不走打分,按出现频次
+				for _, v := range val {
+					if len(fmt.Sprint(v.Value)) < 1 {
+						continue //去除空串
+					}
+					if fieldValue[fmt.Sprint(v.Value)] == nil {
+						fieldValue[fmt.Sprint(v.Value)] = []interface{}{0, v.Value}
+					} else {
+						fieldValue[fmt.Sprint(v.Value)][0] = qu.IntAll(fieldValue[fmt.Sprint(v.Value)][0]) + 1
+					}
 				}
-				fieldValue[fmt.Sprint(v.Value)+v.Type] = []interface{}{v.Score, v.Value}
 			}
-		} else { //不走打分,按出现频次
-			for _, v := range val {
-				if len(fmt.Sprint(v.Value)) < 1 {
-					continue //去除空串
+			objects := []*ju.SortObject{}
+			for k, v := range fieldValue {
+				ValueStr := "" //第二排序
+				if reflect.TypeOf(v[1]).String() == "string" {
+					ValueStr = qu.ObjToString(v[1])
 				}
-				if fieldValue[fmt.Sprint(v.Value)] == nil {
-					fieldValue[fmt.Sprint(v.Value)] = []interface{}{0, v.Value}
-				} else {
-					fieldValue[fmt.Sprint(v.Value)][0] = qu.IntAll(fieldValue[fmt.Sprint(v.Value)][0]) + 1
+				tmp := &ju.SortObject{
+					Key:      k,
+					Value:    qu.IntAll(v[0]),
+					Object:   v[1],
+					ValueStr: ValueStr,
 				}
+				objects = append(objects, tmp)
 			}
+			values[key] = ju.ExtSort(objects)
 		}
-		objects := []*ju.SortObject{}
-		for k, v := range fieldValue {
-			ValueStr := "" //第二排序
-			if reflect.TypeOf(v[1]).String() == "string" {
-				ValueStr = qu.ObjToString(v[1])
-			}
-			tmp := &ju.SortObject{
-				Key:      k,
-				Value:    qu.IntAll(v[0]),
-				Object:   v[1],
-				ValueStr: ValueStr,
+		//从排序结果中取值
+		tmp := map[string]interface{}{} //抽取值
+		for key, val := range values {
+			for _, v := range val { //取第一个非负数
+				if v.Key != "" && v.Value > -1 {
+					tmp[key] = v.Object
+					break
+				}
 			}
-			objects = append(objects, tmp)
 		}
-		values[key] = ju.ExtSort(objects)
-	}
-	//从排序结果中取值
-	tmp := map[string]interface{}{} //抽取值
-	for key, val := range values {
-		for _, v := range val { //取第一个非负数
-			if v.Key != "" && v.Value > -1 {
-				tmp[key] = v.Object
-				break
-			}
+		if len(j.PackageInfo) > 0 { //分包信息
+			tmp["package"] = j.PackageInfo
 		}
-	}
-	if len(j.PackageInfo) > 0 { //分包信息
-		tmp["package"] = j.PackageInfo
-	}
-	if len(j.Winnerorder) > 0 { //候选人信息
-		tmp["winnerorder"] = j.Winnerorder
-	}
-	for k, v := range *doc {
-		//去重冗余字段
-		if k == "detail" || k == "contenthtml" || k == "site" || k == "spidercode" {
-			continue
+		if len(j.Winnerorder) > 0 { //候选人信息
+			tmp["winnerorder"] = j.Winnerorder
 		}
-		if tmp[k] == nil {
-			tmp[k] = v
+		for k, v := range *doc {
+			//去重冗余字段
+			if k == "detail" || k == "contenthtml" || k == "site" || k == "spidercode" {
+				continue
+			}
+			if tmp[k] == nil {
+				tmp[k] = v
+			}
 		}
-	}
 
 	//质量审核
 	if ju.Config["qualityaudit"].(bool) {
@@ -905,6 +918,9 @@ func AnalysisSaveResult(j *ju.Job, e *ExtractTask) {
 			log.Println(e.TaskInfo.TestColl, _id)
 		}
 	}
+	}, func(err interface{}) {
+		log.Println("AnalysisSaveResult err", err)
+	})
 }
 
 func (e *ExtractTask) QualityAudit(resulttmp map[string]interface{}) {

+ 1 - 0
src/jy/extract/extractudp.go

@@ -84,6 +84,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 
 //根据id区间抽取
 func ExtractByUdp(sid, eid string, instanceId ...string) {
+	defer qu.Catch()
 	ext := &ExtractTask{}
 	ext.Id = qu.ObjToString(ju.Config["udptaskid"])
 	ext.InitTaskInfo()

+ 1 - 0
src/jy/extract/isextract.go

@@ -22,6 +22,7 @@ func init() {
 }
 
 func IsExtract(filed, title, content string) bool {
+	defer qu.Catch()
 	b := true
 	if N_extract[filed] != nil {
 		nregs := N_extract[filed]

+ 97 - 93
src/jy/extract/score.go

@@ -35,113 +35,117 @@ func init() {
 //结果打分
 func ScoreFields(j *ju.Job) map[string][]*ju.ExtField {
 	result := j.Result
-	//打分
-	for field, tmps := range result {
-		scoreRule := SoreConfig[field]
-		if scoreRule == nil {
-			continue
-		}
-		extractype := SoreConfig["extractype"]
-		fieldtype := scoreRule["type"]
-		for _, v := range tmps {
-			if len(fmt.Sprint(v.Value)) < 1 {
-				continue //空串跳过
+	qu.Try(func() {
+		//打分
+		for field, tmps := range result {
+			scoreRule := SoreConfig[field]
+			if scoreRule == nil {
+				continue
 			}
-			//长度超过100个字,直接负分
-			vlen := len([]rune(qu.ObjToString(v.Value)))
-			if vlen > 100 && field != "projectscope" {
-				v.Score = -1
-			} else {
-				//类型打分
-				if v.ExtFrom == "title" {
-					v.Score += qu.IntAll(extractype["title"])
+			extractype := SoreConfig["extractype"]
+			fieldtype := scoreRule["type"]
+			for _, v := range tmps {
+				if len(fmt.Sprint(v.Value)) < 1 {
+					continue //空串跳过
+				}
+				//长度超过100个字,直接负分
+				vlen := len([]rune(qu.ObjToString(v.Value)))
+				if vlen > 100 && field != "projectscope" {
+					v.Score = -1
 				} else {
-					if strings.Contains(v.Type, "table") {
-						v.Score += qu.IntAll(extractype["table"])
-					} else if strings.Contains(v.Type, "colon") {
-						v.Score += qu.IntAll(extractype["colon"])
-					} else if strings.Contains(v.Type, "space") {
-						v.Score += qu.IntAll(extractype["space"])
-					} else if strings.Contains(v.Type, "regexp") {
-						v.Score += qu.IntAll(extractype["regexp"])
-					} else if strings.Contains(v.Type, "winnerorder") {
-						v.Score += qu.IntAll(extractype["winnerorder"])
+					//类型打分
+					if v.ExtFrom == "title" {
+						v.Score += qu.IntAll(extractype["title"])
+					} else {
+						if strings.Contains(v.Type, "table") {
+							v.Score += qu.IntAll(extractype["table"])
+						} else if strings.Contains(v.Type, "colon") {
+							v.Score += qu.IntAll(extractype["colon"])
+						} else if strings.Contains(v.Type, "space") {
+							v.Score += qu.IntAll(extractype["space"])
+						} else if strings.Contains(v.Type, "regexp") {
+							v.Score += qu.IntAll(extractype["regexp"])
+						} else if strings.Contains(v.Type, "winnerorder") {
+							v.Score += qu.IntAll(extractype["winnerorder"])
+						}
 					}
-				}
-				//字符型打分
-				if fieldtype == "string" {
-					//位置打分
-					if positions, ok := scoreRule["position"].([]interface{}); ok {
-						for _, position := range positions {
-							if p, ok := position.(map[string]interface{}); ok {
-								qu.Try(func() {
-									if p["regexp"] != nil {
-										reg := p["regexp"].(*regexp.Regexp)
-										if reg.MatchString(qu.ObjToString(v.Value)) {
-											v.Score += qu.IntAll(p["score"])
+					//字符型打分
+					if fieldtype == "string" {
+						//位置打分
+						if positions, ok := scoreRule["position"].([]interface{}); ok {
+							for _, position := range positions {
+								if p, ok := position.(map[string]interface{}); ok {
+									qu.Try(func() {
+										if p["regexp"] != nil {
+											reg := p["regexp"].(*regexp.Regexp)
+											if reg.MatchString(qu.ObjToString(v.Value)) {
+												v.Score += qu.IntAll(p["score"])
+											}
 										}
-									}
-								}, func(err interface{}) {
-									log.Println(err)
-								})
+									}, func(err interface{}) {
+										log.Println(err)
+									})
+								}
 							}
 						}
-					}
-					//长度打分
-					if lengths, ok := scoreRule["length"].([]interface{}); ok {
-						for _, tmp := range lengths {
-							if length, ok := tmp.(map[string]interface{}); ok {
-								min := qu.IntAll(length["min"])
-								max := qu.IntAll(length["max"])
-								scores, _ := length["score"].([]interface{})
-								if len(scores) < 3 {
-									continue
-								}
-								if vlen < min {
-									v.Score += qu.IntAll(scores[0])
-								} else if vlen > max {
-									v.Score += qu.IntAll(scores[2])
-								} else {
-									v.Score += qu.IntAll(scores[1])
+						//长度打分
+						if lengths, ok := scoreRule["length"].([]interface{}); ok {
+							for _, tmp := range lengths {
+								if length, ok := tmp.(map[string]interface{}); ok {
+									min := qu.IntAll(length["min"])
+									max := qu.IntAll(length["max"])
+									scores, _ := length["score"].([]interface{})
+									if len(scores) < 3 {
+										continue
+									}
+									if vlen < min {
+										v.Score += qu.IntAll(scores[0])
+									} else if vlen > max {
+										v.Score += qu.IntAll(scores[2])
+									} else {
+										v.Score += qu.IntAll(scores[1])
+									}
 								}
 							}
 						}
 					}
-				}
-				//float类型打分
-				if fieldtype == "float" {
-					min := qu.IntAll(scoreRule["min"])
-					max := qu.IntAll(scoreRule["max"])
-					val := qu.IntAll(v.Value)
-					scores, _ := scoreRule["score"].([]interface{})
-					if len(scores) < 3 {
-						continue
-					}
-					if val < min && 0 < val {
-						v.Score += qu.IntAll(scores[0])
-					} else if val > max {
-						v.Score += qu.IntAll(scores[2])
-					} else if val <= max && val >= min {
-						v.Score += qu.IntAll(scores[1])
-					}
-				}
-				//decimal
-				if fieldtype == "decimal" {
-					min := qu.IntAll(scoreRule["min"])
-					max := qu.IntAll(scoreRule["max"])
-					val := qu.IntAll(v.Value)
-					scores, _ := scoreRule["score"].([]interface{})
-					if len(scores) < 3 {
-						continue
+					//float类型打分
+					if fieldtype == "float" {
+						min := qu.IntAll(scoreRule["min"])
+						max := qu.IntAll(scoreRule["max"])
+						val := qu.IntAll(v.Value)
+						scores, _ := scoreRule["score"].([]interface{})
+						if len(scores) < 3 {
+							continue
+						}
+						if val < min && 0 < val {
+							v.Score += qu.IntAll(scores[0])
+						} else if val > max {
+							v.Score += qu.IntAll(scores[2])
+						} else if val <= max && val >= min {
+							v.Score += qu.IntAll(scores[1])
+						}
 					}
-					if val > max {
-						v.Score += qu.IntAll(scores[2])
-					} else if val <= max && val > min {
-						v.Score += qu.IntAll(scores[1])
+					//decimal
+					if fieldtype == "decimal" {
+						min := qu.IntAll(scoreRule["min"])
+						max := qu.IntAll(scoreRule["max"])
+						val := qu.IntAll(v.Value)
+						scores, _ := scoreRule["score"].([]interface{})
+						if len(scores) < 3 {
+							continue
+						}
+						if val > max {
+							v.Score += qu.IntAll(scores[2])
+						} else if val <= max && val > min {
+							v.Score += qu.IntAll(scores[1])
+						}
 					}
 				}
 			}
 		}
-	}
+	}, func(err interface{}) {
+		log.Println("ScoreFields err", err)
+	})
 	return result
 }

+ 2 - 2
src/udpfileserver/config.json

@@ -7,9 +7,9 @@
   "mongodb_one_db": "spider",
   "mongodb_one_c": "data_bak",
   "mongodb_one_filefiled": "projectinfo",
-  "mongodb_two_ip": "127.0.0.1:27017",
+  "mongodb_two_ip": "192.168.3.207:27082",
   "mongodb_two_db": "spider",
   "mongodb_two_c": "data_bak",
   "mongodb_two_filefiled": "projectinfo",
-  "file2text": "127.0.0.1:1234"
+  "file2text": "192.168.3.207:1234"
 }

+ 145 - 58
src/udpfileserver/main.go

@@ -2,12 +2,13 @@ package main
 
 import (
 	"encoding/json"
-	"fmt"
+	"gopkg.in/mgo.v2"
 	"jy/mongodbutil"
 	"log"
 	mu "mfw/util"
 	"net"
 	"net/rpc"
+	"path"
 	qu "qfw/util"
 	"strings"
 
@@ -52,10 +53,18 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			log.Println("json err :", err, string(data))
 			return
 		}
+		log.Println(mapInfo)
 		gid := strings.TrimSpace(mapInfo["gtid"].(string))
 		lid := strings.TrimSpace(mapInfo["lteid"].(string))
 		if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
-			if findAll, b := mongodbutil.Mgo.Find(MgoC,
+			MgoSession, err := mgo.Dial(MgoIP)
+			defer MgoSession.Close()
+			if err != nil {
+				log.Println("mongo err:",err)
+				return
+			}
+
+			iter := MgoSession.DB(MgoDB).C(MgoC).Find(
 				bson.M{
 					"_id": bson.M{
 						"$gte": bson.ObjectIdHex(gid),
@@ -64,34 +73,51 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					MgoFileFiled: bson.M{
 						"$ne": nil,
 					},
-				},
-				//if findAll, b := mongodbutil.Mgo.Find(MgoC, bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}},
-				nil, `{"_id":"1",`+MgoFileFiled+`:"1"}`, false, -1, -1); !b {
-				log.Println("查询数据失败 :", string(data))
-			} else {
-				fmt.Println(len(*findAll))
-				if len(*findAll) <= 0 {
-					log.Println("查询数据为空 :", string(data))
-					return
-				}
-				for _, v := range *findAll {
-					qmap := *qu.ObjToMap(v)
-					mid := qmap["_id"]
-					if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok {
-						log.Println(mid, "mgo 转换异常", MgoFileFiled)
+				},).Select(bson.M{"_id": 1,MgoFileFiled:1}).Iter()
+
+			//if findAll, b := mongodbutil.Mgo.Find(MgoC,
+			//	bson.M{
+			//		"_id": bson.M{
+			//			"$gte": bson.ObjectIdHex(gid),
+			//			"$lte": bson.ObjectIdHex(lid),
+			//		},
+			//		MgoFileFiled: bson.M{
+			//			"$ne": nil,
+			//		},
+			//	},
+			//	//if findAll, b := mongodbutil.Mgo.Find(MgoC, bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}},
+			//	nil, `{"_id":"1",`+MgoFileFiled+`:"1"}`, false, -1, -1); !b {
+			//	log.Println("查询数据失败 :", string(data))
+			//} else {
+			var result *map[string]interface{}
+			log.Println("处理查询数据...")
+			for iter.Next(&result){
+				//for _, v := range *result {
+					qmap := qu.ObjToMap(result)
+					mid := (*qmap)["_id"]
+					if v, ok := (*qmap)[MgoFileFiled].(map[string]interface{}); !ok {
+						mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+							"$set": bson.M{
+								"updatefileErr": 1,
+							},})
+						//log.Println(mid, "mgo 转换异常", MgoFileFiled)
 						continue
 					} else {
 						switch v["attachments"].(type) {
 						case map[string]interface{}:
 							att := v["attachments"].(map[string]interface{})
-							for _, vaatt := range att {
+							for attk, vaatt := range att {
 								if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
-									log.Println(mid, "mgo 结构体转换失败", vaatt)
+									mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+										"$set": bson.M{
+											"updatefileErr": 1,
+										},})
+									//log.Println(mid, "mgo 结构体转换失败", vaatt)
 									continue
 								} else {
 									ChanB <- true
-									go save(mid, qmap, fileinfo)
-
+									save(mid,attk, qmap, &fileinfo)
+									<-ChanB
 								}
 							}
 						}
@@ -99,7 +125,40 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					//fileMap := *qu.ObjToMap(qmap["projectinfo"])
 					//fmt.Println(fileMap["attachments"])
 				}
-			}
+			//}
+			defer iter.Close()
+			log.Println("处理查询数据结束...")
+			//fmt.Println(len(*findAll))
+				//if len(*findAll) <= 0 {
+				//	log.Println("查询数据为空 :", string(data))
+				//	return
+				//}
+				//for _, v := range *findAll {
+				//	qmap := *qu.ObjToMap(v)
+				//	mid := qmap["_id"]
+				//	if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok {
+				//		log.Println(mid, "mgo 转换异常", MgoFileFiled)
+				//		continue
+				//	} else {
+				//		switch v["attachments"].(type) {
+				//		case map[string]interface{}:
+				//			att := v["attachments"].(map[string]interface{})
+				//			for _, vaatt := range att {
+				//				if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
+				//					log.Println(mid, "mgo 结构体转换失败", vaatt)
+				//					continue
+				//				} else {
+				//					ChanB <- true
+				//					go save(mid, qmap, fileinfo)
+				//
+				//				}
+				//			}
+				//		}
+				//	}
+				//	//fileMap := *qu.ObjToMap(qmap["projectinfo"])
+				//	//fmt.Println(fileMap["attachments"])
+				//}
+			//}
 		} else {
 			log.Println("开始id或结束id参数错误:", string(data))
 		}
@@ -110,12 +169,10 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 
 }
-func save(mid interface{}, qmap, fileinfo map[string]interface{}) {
+func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{}) {
 	defer qu.Catch()
-	defer func() {
-		<-ChanB
-	}()
 	type FileData struct {
+		OrgUrl  string //源下载地址
 		Fid     string
 		Name    string
 		Type    string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
@@ -123,20 +180,36 @@ func save(mid interface{}, qmap, fileinfo map[string]interface{}) {
 	}
 	client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
 	if err != nil {
+		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				"updatefileErr": 1,
+			},})
 		log.Println(mid, "rpc err :", err)
 		return
 	}
 	defer client.Close()
 	var reply []byte
 	//bs, _ := ioutil.ReadFile("1.docx")
+	var fffpath string
+	fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"]))
+	if strings.TrimSpace(fffpath) == ""{
+		fffpath  = qu.ObjToString((*fileinfo)["ftype"])
+	}else {
+		fffpath = fffpath[1:]
+	}
 	fileData := &FileData{
-		Name: qu.ObjToString(fileinfo["filename"]),
-		Fid:  qu.ObjToString(fileinfo["fid"]), //附件id
-		Type: qu.ObjToString(fileinfo["ftype"]),
+		OrgUrl: qu.ObjToString((*fileinfo)["org_url"]),
+		Name: qu.ObjToString((*fileinfo)["filename"]),
+		Fid:  qu.ObjToString((*fileinfo)["fid"]), //附件id
+		Type: fffpath,
 	}
-	log.Println(mid, fileData)
+	//log.Println(mid, fileData)
 	err = client.Call("FileToText.FileToContext", fileData, &reply)
 	if err != nil {
+		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				"updatefileErr": 1,
+			},})
 		log.Println(mid, "call ocr error:", err)
 		return
 	}
@@ -151,51 +224,65 @@ func save(mid interface{}, qmap, fileinfo map[string]interface{}) {
 	//}
 	//reply, _ = json.Marshal(testfiles)
 	if len(reply) == 0{
+		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				"updatefileErr": 1,
+			},})
 		log.Println(mid, "rpc返回数据为空:", string(reply))
 		return
 	}
-	log.Println(mid, string(reply))
+	log.Println(mid, string(reply)[:23])
 	rdata := make(map[string]interface{})
 	if err := json.Unmarshal(reply, &rdata); err != nil {
+		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				"updatefileErr": 1,
+			},})
 		log.Println(mid, "rpc返回数据解析失败:", err)
 		return
 	}
 	if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
-		if qu.ObjToString(fileinfo["ftype"]) == "rar" || qu.ObjToString(fileinfo["ftype"]) == "zip" {
-			fileinfo["content"] = rdata["contextc"]
+		if qu.ObjToString((*fileinfo)["ftype"]) == "rar" || qu.ObjToString((*fileinfo)["ftype"]) == "zip" {
+			(*fileinfo)["content"] = rdata["contextc"]
 		} else {
-			fileinfo["content"] = rdata["context"]
+			(*fileinfo)["content"] = rdata["context"]
 		}
-		if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+		//log.Println((*fileinfo))
+
+		asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
+		qwer := asdf["attachments"].(map[string]interface{})
+		qwer[attk] =*fileinfo
+		//log.Println((*qmap)[MgoFileFiled])
+
+		updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
 			"$set": bson.M{
-				MgoFileFiled: qmap[MgoFileFiled],
+				MgoFileFiled: (*qmap)[MgoFileFiled],
+				//MgoFileFiled: bson.M{
+				//	"attachments":bson.M{
+				//		attk:(*fileinfo),
+				//	},
+				//},
 			},
-		}) {
-			log.Println(mid, "mongo更新数据失败")
-		} else {
+		})
+		if updateBool{
+			mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+				"$set": bson.M{
+					"updatefileErr": 0,
+				},})
 			log.Println(mid, "mongo更新数据成功")
+		}else {
+			mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+				"$set": bson.M{
+					"updatefileErr": 1,
+				},})
+			log.Println(mid, "mongo更新数据失败")
 		}
 	} else {
+		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				"updatefileErr": 1,
+			},})
 		log.Println(mid, "调用rpc服务解析异常:", rdata["err"])
 	}
-	//if qu.ObjToString(fileinfo["ftype"]) == "zip" || qu.ObjToString(fileinfo["ftype"]) == "rar" {
-	//	fileDatas := make([]map[string]interface{}, 0)
-	//	if err := json.Unmarshal(reply, &fileDatas); err != nil {
-	//		log.Println("json转换错误", mid, err)
-	//		return
-	//	}
-	//	fileinfo["content"] = fileDatas
-	//} else {
-	//	fileinfo["content"] = string(reply)
-	//}
-	//if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
-	//	"$set": bson.M{
-	//		MgoFileFiled: qmap[MgoFileFiled],
-	//	},
-	//}) {
-	//	log.Println(mid, "更新数据失败")
-	//} else {
-	//	log.Println(mid, "更新数据成功")
-	//}
 
 }

+ 10 - 7
src/udpfileserver/main_test.go → src/udpfileserver/maintest.go

@@ -5,17 +5,17 @@ import (
 	"log"
 	"mfw/util"
 	"net"
-	"testing"
 )
 
-func TestName(t *testing.T) {
-	udpclient = util.UdpClient{Local: "127.0.0.1:8889", BufSize: 1024}
-	udpclient.Listen(processUdpMsg)
+func main() {
+	udpclient := util.UdpClient{Local: "127.0.0.1:8889", BufSize: 1024}
+	udpclient.Listen(processUdpMsg2)
 	m := map[string]string{
-		"gtid":"5cac41a6dff41f3b20b3b99c",
-		"lteid":"5cac41a6dff41f3b20b3b99c",
+		"gtid":"5cb682c9ed1d910046aca2eb",
+		"lteid":"5cb683f2ed1d9100570abbc3",
 	}
 	b, _ := json.Marshal(m)
+	//for  range time.Tick(time.Second){
 	err := udpclient.WriteUdp(b, util.OP_TYPE_DATA, &net.UDPAddr{
 		IP:   net.ParseIP("127.0.0.1"),
 		Port: 8888,
@@ -25,5 +25,8 @@ func TestName(t *testing.T) {
 		return
 	}
 	log.Println("发送成功")
-
+	//}
 }
+func processUdpMsg2(act byte, data []byte, ra *net.UDPAddr) {
+
+}