|
@@ -89,6 +89,7 @@ func RunExtractTestTask(ext *ExtractTask, startId, num string) bool {
|
|
|
|
|
|
//启动抽取
|
|
|
func StartExtractTaskId(taskId string) bool {
|
|
|
+ defer qu.Catch()
|
|
|
isgo := false
|
|
|
ext := TaskList[taskId]
|
|
|
if ext == nil {
|
|
@@ -133,6 +134,7 @@ func StartExtractTaskId(taskId string) bool {
|
|
|
|
|
|
//停止抽取
|
|
|
func StopExtractTaskId(taskId string) bool {
|
|
|
+ defer qu.Catch()
|
|
|
ext := TaskList[taskId]
|
|
|
if ext != nil {
|
|
|
ext.IsRun = false
|
|
@@ -145,6 +147,7 @@ func StopExtractTaskId(taskId string) bool {
|
|
|
|
|
|
//开始抽取
|
|
|
func RunExtractTask(taskId string) {
|
|
|
+ defer qu.Catch()
|
|
|
ext := TaskList[taskId]
|
|
|
query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
|
|
|
count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
|
|
@@ -182,6 +185,7 @@ func RunExtractTask(taskId string) {
|
|
|
|
|
|
//信息预处理
|
|
|
func PreInfo(doc map[string]interface{}) *ju.Job {
|
|
|
+ defer qu.Catch()
|
|
|
detail := ""
|
|
|
d1, _ := doc["detail"].(string)
|
|
|
d2, _ := doc["contenthtml"].(string)
|
|
@@ -312,15 +316,16 @@ func (e *ExtractTask) ExtractProcess(j *ju.Job) {
|
|
|
// log.Println("抽取结果", j.Title, j.SourceMid, string(bs))
|
|
|
//分析抽取结果并保存 todo
|
|
|
AnalysisSaveResult(j, e)
|
|
|
+ <-e.TaskInfo.ProcessPool
|
|
|
}, func(err interface{}) {
|
|
|
- log.Println((*j.Data)["_id"], err)
|
|
|
+ log.Println("ExtractProcess err", err, (*j.Data)["_id"])
|
|
|
<-e.TaskInfo.ProcessPool
|
|
|
})
|
|
|
- <-e.TaskInfo.ProcessPool
|
|
|
}
|
|
|
|
|
|
//前置过滤
|
|
|
func ExtRegPre(doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, t *TaskInfo) map[string]interface{} {
|
|
|
+ defer qu.Catch()
|
|
|
before := ju.DeepCopy(doc).(map[string]interface{})
|
|
|
extinfo := map[string]interface{}{}
|
|
|
if in.IsLua {
|
|
@@ -345,6 +350,7 @@ func ExtRegPre(doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, t *TaskInf
|
|
|
|
|
|
//抽取-规则
|
|
|
func ExtRegCore(extfrom string, doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, et *ExtractTask) {
|
|
|
+ defer qu.Catch()
|
|
|
//废标、流标、ppp等跳过
|
|
|
b := IsExtract(in.Field, j.Title, j.Content)
|
|
|
if !b {
|
|
@@ -385,6 +391,7 @@ func ExtRegCore(extfrom string, doc map[string]interface{}, j *ju.Job, in *RegLu
|
|
|
|
|
|
//lua脚本根据属性设置提取kv值
|
|
|
func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]*Tag) map[string][]map[string]interface{} {
|
|
|
+ defer qu.Catch()
|
|
|
kvmap := map[string][]map[string]interface{}{}
|
|
|
for fieldname, field := range in.LFields {
|
|
|
lock.Lock()
|
|
@@ -547,6 +554,7 @@ func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]
|
|
|
|
|
|
//正则提取结果
|
|
|
func extRegCoreToResult(extfrom, text string, j *ju.Job, v *RegLuaInfo) map[string][]map[string]interface{} {
|
|
|
+ defer qu.Catch()
|
|
|
extinfo := map[string][]map[string]interface{}{}
|
|
|
if v.RegCore.Bextract { //正则是两部分的,可以直接抽取的(含下划线)
|
|
|
apos := v.RegCore.Reg.FindAllStringSubmatchIndex(text, -1)
|
|
@@ -614,6 +622,7 @@ func extRegCoreToResult(extfrom, text string, j *ju.Job, v *RegLuaInfo) map[stri
|
|
|
|
|
|
//后置过滤
|
|
|
func ExtRegBack(j *ju.Job, in *RegLuaInfo, t *TaskInfo) {
|
|
|
+ defer qu.Catch()
|
|
|
if in.IsLua {
|
|
|
result := GetResultMapForLua(j)
|
|
|
lua := ju.LuaScript{Code: in.Code, Name: in.Name, Result: result, Script: in.RuleText}
|
|
@@ -695,6 +704,7 @@ func ExtRegBack(j *ju.Job, in *RegLuaInfo, t *TaskInfo) {
|
|
|
|
|
|
//获取抽取结果map[string][]interface{},lua脚本使用
|
|
|
func GetResultMapForLua(j *ju.Job) map[string][]map[string]interface{} {
|
|
|
+ defer qu.Catch()
|
|
|
result := map[string][]map[string]interface{}{}
|
|
|
for key, val := range j.Result {
|
|
|
if result[key] == nil {
|
|
@@ -718,6 +728,7 @@ func GetResultMapForLua(j *ju.Job) map[string][]map[string]interface{} {
|
|
|
|
|
|
//抽取日志
|
|
|
func AddExtLog(ftype, sid string, before interface{}, extinfo interface{}, v *RegLuaInfo, t *TaskInfo) {
|
|
|
+ defer qu.Catch()
|
|
|
if !t.IsEtxLog {
|
|
|
return
|
|
|
}
|
|
@@ -742,6 +753,7 @@ func AddExtLog(ftype, sid string, before interface{}, extinfo interface{}, v *Re
|
|
|
|
|
|
//保存抽取日志
|
|
|
func SaveExtLog() {
|
|
|
+ defer qu.Catch()
|
|
|
tmpLogs := map[*TaskInfo][]map[string]interface{}{}
|
|
|
lock.Lock()
|
|
|
tmpLogs = ExtLogs
|
|
@@ -773,77 +785,78 @@ type FieldValue struct {
|
|
|
|
|
|
//分析抽取结果并保存
|
|
|
func AnalysisSaveResult(j *ju.Job, e *ExtractTask) {
|
|
|
- doc := j.Data
|
|
|
- result := j.Result
|
|
|
- _id := qu.BsonIdToSId((*doc)["_id"])
|
|
|
- iscore, _ := ju.Config["fieldscore"].(bool)
|
|
|
- if iscore { //打分
|
|
|
- result = ScoreFields(j)
|
|
|
- }
|
|
|
- //结果排序
|
|
|
- values := map[string][]*ju.SortObject{}
|
|
|
- for key, val := range result {
|
|
|
- fieldValue := map[string][]interface{}{}
|
|
|
- if iscore { //走打分
|
|
|
- for _, v := range val {
|
|
|
- if len(fmt.Sprint(v.Value)) < 1 {
|
|
|
- continue //去除空串
|
|
|
+ qu.Try(func() {
|
|
|
+ doc := j.Data
|
|
|
+ result := j.Result
|
|
|
+ _id := qu.BsonIdToSId((*doc)["_id"])
|
|
|
+ iscore, _ := ju.Config["fieldscore"].(bool)
|
|
|
+ if iscore { //打分
|
|
|
+ result = ScoreFields(j)
|
|
|
+ }
|
|
|
+ //结果排序
|
|
|
+ values := map[string][]*ju.SortObject{}
|
|
|
+ for key, val := range result {
|
|
|
+ fieldValue := map[string][]interface{}{}
|
|
|
+ if iscore { //走打分
|
|
|
+ for _, v := range val {
|
|
|
+ if len(fmt.Sprint(v.Value)) < 1 {
|
|
|
+ continue //去除空串
|
|
|
+ }
|
|
|
+ fieldValue[fmt.Sprint(v.Value)+v.Type] = []interface{}{v.Score, v.Value}
|
|
|
+ }
|
|
|
+ } else { //不走打分,按出现频次
|
|
|
+ for _, v := range val {
|
|
|
+ if len(fmt.Sprint(v.Value)) < 1 {
|
|
|
+ continue //去除空串
|
|
|
+ }
|
|
|
+ if fieldValue[fmt.Sprint(v.Value)] == nil {
|
|
|
+ fieldValue[fmt.Sprint(v.Value)] = []interface{}{0, v.Value}
|
|
|
+ } else {
|
|
|
+ fieldValue[fmt.Sprint(v.Value)][0] = qu.IntAll(fieldValue[fmt.Sprint(v.Value)][0]) + 1
|
|
|
+ }
|
|
|
}
|
|
|
- fieldValue[fmt.Sprint(v.Value)+v.Type] = []interface{}{v.Score, v.Value}
|
|
|
}
|
|
|
- } else { //不走打分,按出现频次
|
|
|
- for _, v := range val {
|
|
|
- if len(fmt.Sprint(v.Value)) < 1 {
|
|
|
- continue //去除空串
|
|
|
+ objects := []*ju.SortObject{}
|
|
|
+ for k, v := range fieldValue {
|
|
|
+ ValueStr := "" //第二排序
|
|
|
+ if reflect.TypeOf(v[1]).String() == "string" {
|
|
|
+ ValueStr = qu.ObjToString(v[1])
|
|
|
}
|
|
|
- if fieldValue[fmt.Sprint(v.Value)] == nil {
|
|
|
- fieldValue[fmt.Sprint(v.Value)] = []interface{}{0, v.Value}
|
|
|
- } else {
|
|
|
- fieldValue[fmt.Sprint(v.Value)][0] = qu.IntAll(fieldValue[fmt.Sprint(v.Value)][0]) + 1
|
|
|
+ tmp := &ju.SortObject{
|
|
|
+ Key: k,
|
|
|
+ Value: qu.IntAll(v[0]),
|
|
|
+ Object: v[1],
|
|
|
+ ValueStr: ValueStr,
|
|
|
}
|
|
|
+ objects = append(objects, tmp)
|
|
|
}
|
|
|
+ values[key] = ju.ExtSort(objects)
|
|
|
}
|
|
|
- objects := []*ju.SortObject{}
|
|
|
- for k, v := range fieldValue {
|
|
|
- ValueStr := "" //第二排序
|
|
|
- if reflect.TypeOf(v[1]).String() == "string" {
|
|
|
- ValueStr = qu.ObjToString(v[1])
|
|
|
- }
|
|
|
- tmp := &ju.SortObject{
|
|
|
- Key: k,
|
|
|
- Value: qu.IntAll(v[0]),
|
|
|
- Object: v[1],
|
|
|
- ValueStr: ValueStr,
|
|
|
+ //从排序结果中取值
|
|
|
+ tmp := map[string]interface{}{} //抽取值
|
|
|
+ for key, val := range values {
|
|
|
+ for _, v := range val { //取第一个非负数
|
|
|
+ if v.Key != "" && v.Value > -1 {
|
|
|
+ tmp[key] = v.Object
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
- objects = append(objects, tmp)
|
|
|
}
|
|
|
- values[key] = ju.ExtSort(objects)
|
|
|
- }
|
|
|
- //从排序结果中取值
|
|
|
- tmp := map[string]interface{}{} //抽取值
|
|
|
- for key, val := range values {
|
|
|
- for _, v := range val { //取第一个非负数
|
|
|
- if v.Key != "" && v.Value > -1 {
|
|
|
- tmp[key] = v.Object
|
|
|
- break
|
|
|
- }
|
|
|
+ if len(j.PackageInfo) > 0 { //分包信息
|
|
|
+ tmp["package"] = j.PackageInfo
|
|
|
}
|
|
|
- }
|
|
|
- if len(j.PackageInfo) > 0 { //分包信息
|
|
|
- tmp["package"] = j.PackageInfo
|
|
|
- }
|
|
|
- if len(j.Winnerorder) > 0 { //候选人信息
|
|
|
- tmp["winnerorder"] = j.Winnerorder
|
|
|
- }
|
|
|
- for k, v := range *doc {
|
|
|
- //去重冗余字段
|
|
|
- if k == "detail" || k == "contenthtml" || k == "site" || k == "spidercode" {
|
|
|
- continue
|
|
|
+ if len(j.Winnerorder) > 0 { //候选人信息
|
|
|
+ tmp["winnerorder"] = j.Winnerorder
|
|
|
}
|
|
|
- if tmp[k] == nil {
|
|
|
- tmp[k] = v
|
|
|
+ for k, v := range *doc {
|
|
|
+ //去重冗余字段
|
|
|
+ if k == "detail" || k == "contenthtml" || k == "site" || k == "spidercode" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if tmp[k] == nil {
|
|
|
+ tmp[k] = v
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
//质量审核
|
|
|
if ju.Config["qualityaudit"].(bool) {
|
|
@@ -905,6 +918,9 @@ func AnalysisSaveResult(j *ju.Job, e *ExtractTask) {
|
|
|
log.Println(e.TaskInfo.TestColl, _id)
|
|
|
}
|
|
|
}
|
|
|
+ }, func(err interface{}) {
|
|
|
+ log.Println("AnalysisSaveResult err", err)
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
func (e *ExtractTask) QualityAudit(resulttmp map[string]interface{}) {
|