package extract import ( "bytes" "encoding/json" "fmt" "jy/clear" db "jy/mongodbutil" "jy/pretreated" ju "jy/util" qu "qfw/util" "qfw/util/redis" "regexp" "strconv" "strings" "sync" "time" "unicode/utf8" log "github.com/donnie4w/go-logger/logger" "gopkg.in/mgo.v2/bson" ) var ( lock, lockrule, lockclear sync.RWMutex cut = ju.NewCut() //获取正文并清理 ExtLogs map[*TaskInfo][]map[string]interface{} //抽取日志 TaskList map[string]*ExtractTask //任务列表 ClearTaskList map[string]*ClearTask //清理任务列表 saveLimit = 200 //抽取日志批量保存 PageSize = 5000 //查询分页 Fields = `{"title":1,"detail":1,"contenthtml":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"area":1,"city":1,"comeintime":1,"publishtime":1,"sensitive":1,"projectinfo":1,"jsondata":1}` Fields2 = `{"budget":1,"bidamount":1,"title":1,"projectname":1,"winner":1}` ) //启动测试抽取 func StartExtractTestTask(taskId, startId, num, resultcoll, trackcoll string) bool { defer qu.Catch() ext := &ExtractTask{} ext.Id = taskId ext.IsRun = true ext.InitTestTaskInfo(resultcoll, trackcoll) ext.TaskInfo.FDB = db.MgoFactory(1, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB) ext.InitRulePres() ext.InitRuleBacks() ext.InitRuleCore() ext.InitPkgCore() ext.InitBlockRule() ext.InfoTypeList() ext.InitTag() ext.InitClearFn() if ext.IsExtractCity { //版本上控制是否开始城市抽取 //初始化城市DFA信息 ext.InitCityDFA() ext.InitAreaCode() ext.InitPostCode() } //质量审核 ext.InitAuditFields() ext.InitAuditRule() ext.InitAuditClass() ext.InitAuditRecogField() //品牌抽取是否开启 ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool) //附件抽取是否开启 ext.InitFile() return RunExtractTestTask(ext, startId, num) } func IdTrans(startId string) bson.ObjectId { defer qu.Catch() return bson.ObjectIdHex(startId) } //开始测试任务抽取 func RunExtractTestTask(ext *ExtractTask, startId, num string) bool { n, _ := strconv.Atoi(num) id := IdTrans(startId) if id.Valid() { query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(startId)}} list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, n) for _, v := range *list { if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据 continue } var j, jf *ju.Job if ext.IsFileField && v["projectinfo"] != nil { v["isextFile"] = true j, jf = ext.PreInfo(v) } else { j, _ = ext.PreInfo(v) } ext.TaskInfo.ProcessPool <- true go ext.ExtractProcess(j, jf) } return true } else { return false } } //启动抽取 func StartExtractTaskId(taskId string) bool { defer qu.Catch() isgo := false ext := TaskList[taskId] if ext == nil { ext = &ExtractTask{} ext.Id = taskId ext.InitTaskInfo() isgo = true } else { ext.Id = taskId ext.InitTaskInfo() } ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB) ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB) ext.InitRulePres() ext.InitRuleBacks() ext.InitRuleCore() ext.InitPkgCore() ext.InitBlockRule() ext.InfoTypeList() ext.InitTag() ext.InitClearFn() if ext.IsExtractCity { //版本上控制是否开始城市抽取 //初始化城市DFA信息 ext.InitCityDFA() ext.InitAreaCode() ext.InitPostCode() } //质量审核 ext.InitAuditFields() ext.InitAuditRule() ext.InitAuditClass() ext.InitAuditRecogField() //品牌抽取是否开启 ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool) //附件抽取是否开启 ext.InitFile() ext.IsRun = true go ext.ResultSave(true) go ext.BidSave(true) if isgo { go RunExtractTask(taskId) } TaskList[taskId] = ext return true } //停止抽取 func StopExtractTaskId(taskId string) bool { defer qu.Catch() ext := TaskList[taskId] if ext != nil { ext.IsRun = false TaskList[taskId] = ext } //更新task.s_extlastid db.Mgo.UpdateById("task", taskId, `{"$set":{"s_extlastid":"`+ext.TaskInfo.LastExtId+`"}}`) return true } //开始抽取 func RunExtractTask(taskId string) { defer qu.Catch() ext := TaskList[taskId] query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}} count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) pageNum := (count + PageSize - 1) / PageSize limit := PageSize if count < PageSize { limit = count } fmt.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query) for i := 0; i < pageNum; i++ { query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}} fmt.Printf("page=%d,query=%v", i+1, query) list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit) for _, v := range *list { if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据 continue } _id := qu.BsonIdToSId(v["_id"]) //log.Debug(_id) if !ext.IsRun { break } var j, jf *ju.Job if ext.IsFileField && v["projectinfo"] != nil { v["isextFile"] = true j, jf = ext.PreInfo(v) } else { j, _ = ext.PreInfo(v) } ext.TaskInfo.ProcessPool <- true go ext.ExtractProcess(j, jf) ext.TaskInfo.LastExtId = _id } db.Mgo.UpdateById("task", ext.Id, `{"$set":{"s_extlastid":"`+ext.TaskInfo.LastExtId+`"}}`) if !ext.IsRun { break } } //更新task.s_extlastid time.AfterFunc(1*time.Minute, func() { RunExtractTask(taskId) }) } //信息预处理-不和版本关联,取最新版本的配置项 func PreInfo(doc map[string]interface{}) (j, jf *ju.Job) { return (&ExtractTask{}).PreInfo(doc) } //信息预处理-和版本关联 func (e *ExtractTask) PreInfo(doc map[string]interface{}) (j, jf *ju.Job) { defer qu.Catch() //判断是否有附件这个字段 var isextFile bool if doc["isextFile"] != nil { isextFile = doc["isextFile"].(bool) } detail := "" d1, _ := doc["detail"].(string) d2, _ := doc["contenthtml"].(string) if len(d1) >= len(d2) || d2 == "" { detail = d1 } else { detail = d2 } detail = ju.CutLableStr(detail) detail = cut.ClearHtml(detail) doc["detail"] = detail if isextFile { file2text(&doc) //附件文本堆一起(后期可以考虑,分开处理),方法里修改了doc["detailfile"]结果 } toptype := qu.ObjToString(doc["toptype"]) subtype := qu.ObjToString(doc["subtype"]) if qu.ObjToString(doc["type"]) == "bid" { toptype = "结果" } if toptype == "" { toptype = "all" } if subtype == "" { subtype = "all" } j = &ju.Job{ SourceMid: qu.BsonIdToSId(doc["_id"]), Category: toptype, CategorySecond: subtype, Content: qu.ObjToString(doc["detail"]), SpiderCode: qu.ObjToString(doc["spidercode"]), //Domain: qu.ObjToString(doc["domain"]), //Href: qu.ObjToString(doc["href"]), Title: qu.ObjToString(doc["title"]), Data: &doc, City: qu.ObjToString(doc["city"]), Province: qu.ObjToString(doc["area"]), Jsondata: qu.ObjToMap(doc["jsondata"]), Result: map[string][]*ju.ExtField{}, BuyerAddr: qu.ObjToString(doc["buyeraddr"]), RuleBlock: e.RuleBlock, } if isextFile { jf = &ju.Job{ SourceMid: qu.BsonIdToSId(doc["_id"]), Category: toptype, Content: qu.ObjToString(doc["detailfile"]), SpiderCode: qu.ObjToString(doc["spidercode"]), Title: qu.ObjToString(doc["title"]), Data: &doc, City: qu.ObjToString(doc["city"]), Province: qu.ObjToString(doc["area"]), Jsondata: qu.ObjToMap(doc["jsondata"]), Result: map[string][]*ju.ExtField{}, BuyerAddr: qu.ObjToString(doc["buyeraddr"]), RuleBlock: e.RuleBlock, IsFile: isextFile, } } qu.Try(func() { pretreated.AnalyStart(j) //job.Block分块 if isextFile { pretreated.AnalyStart(jf) } }, func(err interface{}) { log.Debug("pretreated.AnalyStart", err) }) return j, jf } //遍历附件字段内容,拼接在一起;附件文本堆一起(后期可以考虑,分开处理),方法里修改了doc["detailfile"]结果 func file2text(doc *map[string]interface{}) { var strfileinfo bytes.Buffer if v, ok := (*doc)["projectinfo"].(map[string]interface{}); ok { if va, ok := v["attachments"].(map[string]interface{}); ok { for _, vaatt := range va { if fileinfo, ok := vaatt.(map[string]interface{}); ok { if qu.ObjToString(fileinfo["content"]) != "" { switch fileinfo["content"].(type) { case string: lock.Lock() strfileinfo.WriteString(fileinfo["content"].(string) + " \n") lock.Unlock() case []map[string]interface{}: for _, fv := range fileinfo["content"].([]map[string]interface{}) { if fv["context"] != nil { lock.Lock() strfileinfo.WriteString(fv["context"].(string) + " \n") lock.Unlock() } } } } } } } } if utf8.RuneCountInString(strfileinfo.String()) < qu.IntAllDef(ju.Config["filelength"], 100000) { (*doc)["detailfile"] = strfileinfo.String() //附件文本堆一起(后期可以考虑,分开处理) } } //抽取 func (e *ExtractTask) ExtractProcess(j, jf *ju.Job) { e.ExtractDetail(j) if jf != nil && jf.IsFile { e.ExtractFile(jf) } //分析抽取结果并保存 todo AnalysisSaveResult(j, jf, e) <-e.TaskInfo.ProcessPool } func (e *ExtractTask) ExtractDetail(j *ju.Job) { qu.Try(func() { doc := *j.Data //全局前置规则,结果覆盖doc属性 //for _, v := range e.RulePres { // doc = ExtRegPre(doc, j, v, e.TaskInfo) //} tmprules := map[string][]*RuleCore{} lockrule.Lock() if j.Category == "all" || j.CategorySecond == "all" { for k, vc1 := range e.RuleCores["all_all"] { tmprules[k] = vc1 } } else { for k, vc1 := range e.RuleCores[j.Category+"_"+j.CategorySecond] { tmprules[k] = vc1 } } if len(tmprules) < 1 { //分类未覆盖部分 for k, vc1 := range e.RuleCores["all_all"] { tmprules[k] = vc1 } } lockrule.Unlock() //抽取规则 for _, vc1 := range tmprules { for _, vc := range vc1 { tmp := ju.DeepCopy(doc).(map[string]interface{}) //是否进入逻辑 if !ju.Logic(vc.LuaLogic, tmp) { continue } ////抽取-前置规则 //for _, v := range vc.RulePres { // tmp = ExtRegPre(tmp, j, v, e.TaskInfo) //} // log.Debug("抽取-前置规则", tmp) //抽取-规则 for _, v := range vc.RuleCores { ExtRegCore(vc.ExtFrom, tmp, j, v, e) } // log.Debug("抽取-规则", tmp) //项目名称未能抽取到,标题来凑 if vc.Field == "projectname" { //if len(j.Result[vc.Field]) < 1 {//如果抽取有结果,不走标题。待验证,暂时标题加入选举逻辑 field := &ju.ExtField{Field: vc.Field, Code: "title", RuleText: "title", Type: "title", ExtFrom: vc.ExtFrom, SourceValue: j.Title, Value: j.Title} if tmp["blocktag"] != nil { btag:= make(map[string]string) for k := range tmp["blocktag"].(map[string]bool){ btag[k] = TagConfigDesc[k] } field.BlockTag = btag } j.Result[vc.Field] = append(j.Result[vc.Field], field) //} } //抽取-后置规则 for _, v := range vc.RuleBacks { ExtRegBack(j, v, e.TaskInfo) } // log.Debug("抽取-后置规则", tmp) } } //全局后置规则 for _, v := range e.RuleBacks { ExtRegBack(j, v, e.TaskInfo) } //候选人加入 if len(j.Winnerorder) > 0 { winner := &ju.ExtField{ Field: "winner", Code: "", RuleText: "", Type: "winnerorder", MatchType: "winnerorder", ExtFrom: "", Value: j.Winnerorder[0]["entname"], Score: 0, } if len([]rune(qu.ObjToString(j.Winnerorder[0]["entname"]))) < 4 { winner.Score = -5 } winners := j.Result["winner"] if winners != nil { winners = append(winners, winner) } else { winners = []*ju.ExtField{} winners = append(winners, winner) } j.Result["winner"] = winners } //函数清理 for key, val := range j.Result { for _, v := range val { lockclear.Lock() cfn := e.ClearFn[key] lockclear.Unlock() data := clear.DoClearFn(cfn, []interface{}{v.Value, j.Content}) v.Value = data[0] //清理特殊符号 lockclear.Lock() if clear.AsyField[key] != nil || clear.SymField[key] != nil || clear.MesField[key] != nil { text := qu.ObjToString(v.Value) text = clear.OtherClean(key, text) if text != "" { v.Value = text } } lockclear.Unlock() } } PackageDetail(j, e) //处理分包信息 // bs, _ := json.Marshal(j.Result) // log.Debug("抽取结果", j.Title, j.SourceMid, string(bs)) }, func(err interface{}) { log.Debug("ExtractProcess err", err) }) } func (e *ExtractTask) ExtractFile(j *ju.Job) { qu.Try(func() { doc := *j.Data //全局前置规则,结果覆盖doc属性 // for _, v := range e.RulePres { // if value, ok := e.FileFields.Load(v.Field); ok && qu.IntAllDef(value, 1) > 0 { // doc = ExtRegPre(doc, j, v, e.TaskInfo) // } // } //抽取规则 tmprules := map[string][]*RuleCore{} lockrule.Lock() if j.Category == "all" || j.CategorySecond == "all" { for k, vc1 := range e.RuleCores["all_all"] { tmprules[k] = vc1 } } else { for k, vc1 := range e.RuleCores[j.Category+"_"+j.CategorySecond] { tmprules[k] = vc1 } } lockrule.Unlock() for _, vc1 := range tmprules { for _, vc := range vc1 { tmp := ju.DeepCopy(doc).(map[string]interface{}) //是否进入逻辑 if !ju.Logic(vc.LuaLogic, tmp) { continue } //抽取-前置规则 // for _, v := range vc.RulePres { // if value, ok := e.FileFields.Load(v.Field); ok && qu.IntAllDef(value, 1) > 0 { // tmp = ExtRegPre(tmp, j, v, e.TaskInfo) // } // } // log.Debug("抽取-前置规则", tmp) //抽取-规则 for _, v := range vc.RuleCores { if value, ok := e.FileFields.Load(v.Field); ok && qu.IntAllDef(value, 1) > 0 { ExtRegCore(vc.ExtFrom, tmp, j, v, e) } } // log.Debug("抽取-规则", tmp) //抽取-后置规则 for _, v := range vc.RuleBacks { if value, ok := e.FileFields.Load(v.Field); ok && qu.IntAllDef(value, 1) > 0 { ExtRegBack(j, v, e.TaskInfo) } } // log.Debug("抽取-后置规则", tmp) } } //全局后置规则 for _, v := range e.RuleBacks { if value, ok := e.FileFields.Load(v.Field); ok && qu.IntAllDef(value, 1) > 0 { ExtRegBack(j, v, e.TaskInfo) } } //候选人加入 if len(j.Winnerorder) > 0 { winner := &ju.ExtField{ Field: "winner", Code: "", RuleText: "", Type: "winnerorder", MatchType: "winnerorder", ExtFrom: "", Value: j.Winnerorder[0]["entname"], Score: 0, } if len([]rune(qu.ObjToString(j.Winnerorder[0]["entname"]))) < 4 { winner.Score = -5 } winners := j.Result["winner"] if winners != nil { winners = append(winners, winner) } else { winners = []*ju.ExtField{} winners = append(winners, winner) } j.Result["winner"] = winners } //函数清理 for key, val := range j.Result { for _, v := range val { lockclear.Lock() cfn := e.ClearFn[key] lockclear.Unlock() data := clear.DoClearFn(cfn, []interface{}{v.Value, j.Content}) v.Value = data[0] //清理特殊符号 lockclear.Lock() if clear.AsyField[key] != nil || clear.SymField[key] != nil || clear.MesField[key] != nil { text := qu.ObjToString(v.Value) text = clear.OtherClean(key, text) v.Value = text } lockclear.Unlock() } } PackageDetail(j, e) //处理分包信息 // bs, _ := json.Marshal(j.Result) // log.Debug("抽取结果", j.Title, j.SourceMid, string(bs)) }, func(err interface{}) { log.Debug("ExtractProcess err", err) }) } //前置过滤 func ExtRegPre(doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, t *TaskInfo) map[string]interface{} { defer qu.Catch() before := ju.DeepCopy(doc).(map[string]interface{}) extinfo := map[string]interface{}{} if in.IsLua { lua := ju.LuaScript{Code: in.Code, Name: in.Name, Doc: doc, Script: in.RuleText} if j != nil { lua.Block = j.Block } extinfo = lua.RunScript("pre") for k, v := range extinfo { //结果覆盖原doc doc[k] = v } AddExtLog("prereplace", j.SourceMid, before, extinfo, in, t) //抽取日志 } else { var key string if !j.IsFile { key = qu.If(in.Field == "", "detail", in.Field).(string) } else { key = qu.If(in.Field == "", "detailfile", in.Field).(string) } text := qu.ObjToString(doc[key]) extinfo[key] = in.RegPreBac.Reg.ReplaceAllString(text, "") doc[key] = extinfo[key] //结果覆盖原doc AddExtLog("prereplace", j.SourceMid, before, extinfo, in, t) //抽取日志 } return doc } //抽取-规则 func ExtRegCore(extfrom string, doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, et *ExtractTask) { defer qu.Catch() //废标、流标、ppp等跳过 b := IsExtract(in.Field, j.Title, j.Content) if !b { return } if in.IsLua { lua := ju.LuaScript{Code: in.Code, Name: in.Name, Doc: doc, Script: in.RuleText} lua.KvMap = getKvByLuaFields(extfrom, j, in, et.Tag) lua.Block = j.Block extinfo := lua.RunScript("core") for k, v := range extinfo { if k == in.Field { if j.Result[k] == nil { j.Result[k] = [](*ju.ExtField){} } if tmps, ok := v.([]map[string]interface{}); ok { for _, tmp := range tmps { field := &ju.ExtField{Field: k, Code: qu.ObjToString(tmp["code"]), RuleText: qu.ObjToString(tmp["ruletext"]), SourceValue: tmp["sourcevalue"], Value: tmp["value"]} if tmp["blocktag"] != nil { btag := make(map[string]string) for k := range tmp["blocktag"].(map[string]bool){ btag[k] = TagConfigDesc[k] } field.BlockTag = btag } j.Result[k] = append(j.Result[k], field) } } } } if len(extinfo) > 0 { AddExtLog("extract", j.SourceMid, nil, extinfo, in, et.TaskInfo) //抽取日志 } } else { //全文正则 //text := qu.ObjToString(doc[extfrom]) //if in.Field != "" { // extinfo := extRegCoreToResult(extfrom, text, j, in) // if len(extinfo) > 0 { // AddExtLog("extract", j.SourceMid, nil, extinfo, in, et.TaskInfo) //抽取日志 // } //} //块抽取 if in.Field != "" { if extfrom == "title" { extinfo := extRegCoreToResult(extfrom, qu.ObjToString(doc[extfrom]), &map[string]string{}, j, in) if len(extinfo) > 0 { AddExtLog("extract", j.SourceMid, nil, extinfo, in, et.TaskInfo) //抽取日志 } } else { for _, v := range j.Block { btag := make(map[string]string) for k:=range v.Classify{ btag[k] = TagConfigDesc[k] } extinfo := extRegCoreToResult(extfrom, v.Text, &btag, j, in) if len(extinfo) > 0 { AddExtLog("extract", j.SourceMid, nil, extinfo, in, et.TaskInfo) //抽取日志 } } } } } } //lua脚本根据属性设置提取kv值 func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]*Tag) map[string][]map[string]interface{} { defer qu.Catch() kvmap := map[string][]map[string]interface{}{} for fieldname, field := range in.LFields { lock.Lock() tags := t[field] //获取对应标签库 lock.Unlock() if tags == nil { continue } for _, bl := range j.Block { //冒号kv if bl.ColonKV != nil { kvs := bl.ColonKV.Kvs kvs2 := bl.ColonKV.Kvs_2 // log.Debug("ColonKV1", kvs) // log.Debug("ColonKV2", kvs2) for _, tag := range tags { for _, kv := range kvs { if tag.Type == "string" { if kv.Key == tag.Key { text := ju.TrimLRSpace(kv.Value, "") if text != "" { kvmap[field] = append(kvmap[field], map[string]interface{}{ "field": field, "code": in.Code, "ruletext": tag.Key, "extfrom": extfrom, "sourcevalue": text, "value": text, "type": "colon1", "matchtype": "tag_string", "blocktag": bl.Classify, }) } break } } else if tag.Type == "regexp" { if tag.Reg.MatchString(kv.Key) { text := ju.TrimLRSpace(kv.Value, "") if text != "" { kvmap[field] = append(kvmap[field], map[string]interface{}{ "field": field, "code": in.Code, "ruletext": tag.Key, "extfrom": extfrom, "sourcevalue": text, "value": text, "type": "colon1", "matchtype": "tag_regexp", "blocktag": bl.Classify, }) } break } } } for _, kv := range kvs2 { if tag.Type == "string" { if kv.Key == tag.Key { text := ju.TrimLRSpace(kv.Value, "") if text != "" { kvmap[field] = append(kvmap[field], map[string]interface{}{ "field": field, "code": in.Code, "ruletext": tag.Key, "extfrom": extfrom, "sourcevalue": text, "value": text, "type": "colon2", "matchtype": "tag_string", "blocktag": bl.Classify, }) } break } } else if tag.Type == "regexp" { if tag.Reg.MatchString(kv.Key) { text := ju.TrimLRSpace(kv.Value, "") if text != "" { kvmap[field] = append(kvmap[field], map[string]interface{}{ "field": field, "code": in.Code, "ruletext": tag.Key, "extfrom": extfrom, "sourcevalue": text, "value": text, "type": "colon2", "matchtype": "tag_regexp", "blocktag": bl.Classify, }) } break } } } } } //空格kv if bl.SpaceKV != nil { kvs := bl.SpaceKV.Kvs // log.Debug("SpaceKV", kvs) for _, tag := range tags { for _, kv := range kvs { if tag.Type == "string" { if kv.Key == tag.Key { text := ju.TrimLRSpace(kv.Value, "") if text != "" { kvmap[field] = append(kvmap[field], map[string]interface{}{ "field": field, "code": in.Code, "ruletext": tag.Key, "extfrom": extfrom, "sourcevalue": text, "value": text, "type": "space", "matchtype": "tag_string", "blocktag": bl.Classify, }) } break } } else if tag.Type == "regexp" { if tag.Reg.MatchString(kv.Key) { text := ju.TrimLRSpace(kv.Value, "") if text != "" { kvmap[field] = append(kvmap[field], map[string]interface{}{ "field": field, "code": in.Code, "ruletext": tag.Key, "extfrom": extfrom, "sourcevalue": text, "value": text, "type": "space", "matchtype": "tag_regexp", "blocktag": bl.Classify, }) } break } } } } } //表格kv if bl.TableKV != nil { tkv := bl.TableKV // log.Debug("tkv", tkv) for k, v := range tkv.Kv { if k == fieldname { if len(tags) > -tkv.KvIndex[fieldname] { ruletext := "" if fieldname == "项目名称" && -tkv.KvIndex[fieldname] == -100 { ruletext = "项目名称" } else { ruletext = tags[-tkv.KvIndex[fieldname]].Key } kvmap[field] = append(kvmap[field], map[string]interface{}{ "field": field, "code": in.Code, "ruletext": ruletext, "extfrom": "table", "sourcevalue": v, "value": v, "type": "table", "matchtype": "tag_string", "blocktag": bl.Classify, }) } else { //涉及其他待处理 // log.Debug(tags) } } } } } } return kvmap } //正则提取结果 func extRegCoreToResult(extfrom, text string, tag *map[string]string, j *ju.Job, v *RegLuaInfo) map[string][]map[string]interface{} { defer qu.Catch() extinfo := map[string][]map[string]interface{}{} if v.RegCore.Bextract { //正则是两部分的,可以直接抽取的(含下划线) apos := v.RegCore.Reg.FindAllStringSubmatchIndex(text, -1) if len(apos) > 0 { pos := apos[0] for k, p := range v.RegCore.ExtractPos { if len(pos) > p { if pos[p] == -1 || pos[p+1] == -1 { continue } val := text[pos[p]:pos[p+1]] sourcevalue := val if val == "招标公告" { return extinfo } if utf8.RuneCountInString(val) < 2 && extfrom == "title" { val = text } tmps := []map[string]interface{}{} tmp := map[string]interface{}{ "field": v.Field, "code": v.Code, "ruletext": v.RuleText, "extfrom": extfrom, "value": val, "type": "regexp", "matchtype": "regcontent", "blocktag": *tag, } tmps = append(tmps, tmp) extinfo[k] = tmps if strings.TrimSpace(val) != "" { if v.RegCore.NumSign == -1 { //正负值修正 val = "-" + val } exfield := ju.ExtField{BlockTag: *tag, Field: k, Code: v.Code, RuleText: v.RuleText, Type: "regexp", MatchType: "regcontent", ExtFrom: extfrom, SourceValue: sourcevalue, Value: val} if tmp["blocktag"] != nil { exfield.BlockTag = tmp["blocktag"].(map[string]string) } j.Result[k] = append(j.Result[k], &exfield) //j.Result[k] = append(j.Result[k], &ju.ExtField{tmp["blocktag"].(map[string]bool), k, v.Code, v.RuleText, "regexp", "regcontent", extfrom, val, 0}) } } } if len(extinfo) == 0 { regArr := strings.Split(v.RuleText, "__") //fmt.Println(regArr[0]) if len(regArr) > 0 { reg, err := regexp.Compile(regArr[0]) if err == nil { datavals := reg.FindStringSubmatch(text) tmps := []map[string]interface{}{} for _, value := range datavals { if value == "" { continue } tmp := map[string]interface{}{ "field": v.Field, "code": v.Code, "ruletext": regArr[0], "extfrom": extfrom, "value": value, "type": "regexp", "matchtype": "regcontent", "blocktag": *tag, } tmps = append(tmps, tmp) extinfo[v.Field] = tmps exfield := ju.ExtField{BlockTag: *tag, Field: v.Field, Code: v.Code + "去除__*后", RuleText: v.RuleText, Type: "regexp", MatchType: "regcontent", ExtFrom: extfrom, SourceValue: text, Value: value} if tmp["blocktag"] != nil { exfield.BlockTag = tmp["blocktag"].(map[string]string) } j.Result[v.Field] = append(j.Result[v.Field], &exfield) //j.Result[k] = append(j.Result[k], &ju.ExtField{tmp["blocktag"].(map[string]bool), k, v.Code, v.RuleText, "regexp", "regcontent", extfrom, val, 0}) } } } } } } else { pos := v.RegCore.Reg.FindStringIndex(text) val := "" if len(pos) == 2 { text = text[pos[1]:] rs := regexp.MustCompile("[^\r\n\t]+") tmp := rs.FindAllString(text, -1) if len(tmp) > 0 { val = tmp[0] } } if val != "" { tmps := []map[string]interface{}{} tmp := map[string]interface{}{ "field": v.Field, "code": v.Code, "ruletext": v.RuleText, "extfrom": extfrom, "value": val, "type": "regexp", "matchtype": "regcontent", "blocktag": *tag, } tmps = append(tmps, tmp) extinfo[v.Field] = tmps if j.Result[v.Field] == nil { j.Result[v.Field] = [](*ju.ExtField){} } field := &ju.ExtField{BlockTag: *tag, Field: v.Field, Code: v.Code, RuleText: v.RuleText, Type: "regexp", MatchType: "regcontent", ExtFrom: extfrom, SourceValue: text, Value: val} if tmp["blocktag"] != nil { field.BlockTag = tmp["blocktag"].(map[string]string) } j.Result[v.Field] = append(j.Result[v.Field], field) } } return extinfo } //后置过滤 func ExtRegBack(j *ju.Job, in *RegLuaInfo, t *TaskInfo) { defer qu.Catch() if in.IsLua { result := GetResultMapForLua(j) lua := ju.LuaScript{Code: in.Code, Name: in.Name, Result: result, Script: in.RuleText} if j != nil { lua.Block = j.Block } extinfo := lua.RunScript("back") for k, v := range extinfo { if tmps, ok := v.([]map[string]interface{}); ok { j.Result[k] = [](*ju.ExtField){} for _, tmp := range tmps { field := &ju.ExtField{Field: k, Code: qu.ObjToString(tmp["code"]), RuleText: qu.ObjToString(tmp["ruletext"]), Type: qu.ObjToString(tmp["type"]), MatchType: qu.ObjToString(tmp["matchtype"]), ExtFrom: qu.ObjToString(tmp["extfrom"]), Value: tmp["value"], Score: 0} if tmp["blocktag"] != nil { field.BlockTag = tmp["blocktag"].(map[string]string) } j.Result[k] = append(j.Result[k], field) //j.Result[k] = append(j.Result[k], &ju.ExtField{tmp["blocktag"].(map[string]bool), k, qu.ObjToString(tmp["code"]), qu.ObjToString(tmp["ruletext"]), qu.ObjToString(tmp["type"]), qu.ObjToString(tmp["matchtype"]), qu.ObjToString(tmp["extfrom"]), tmp["value"], 0}) } } } if len(extinfo) > 0 { AddExtLog("clear", j.SourceMid, result, extinfo, in, t) //抽取日志 } } else { extinfo := map[string]interface{}{} if in.Field != "" { if j.Result[in.Field] != nil { tmp := j.Result[in.Field] exts := []interface{}{} for k, v := range tmp { //table抽取到的数据不清理 // if v.Type == "table" && v.Field != "projectname" { // continue // } text := qu.ObjToString(v.Value) if text != "" { text = in.RegPreBac.Reg.ReplaceAllString(text, in.RegPreBac.Replace) } j.Result[in.Field][k].Value = text exts = append(exts, map[string]interface{}{ "field": v.Field, "code": v.Code, "ruletext": v.RuleText, "type": v.Type, "matchtype": v.MatchType, "extfrom": v.ExtFrom, "value": text, }) } extinfo[in.Field] = exts if len(extinfo) > 0 { AddExtLog("clear", j.SourceMid, tmp, extinfo, in, t) //抽取日志 } } } else { for key, tmp := range j.Result { exts := []interface{}{} for k, v := range tmp { if v.Type == "table" { //table抽取到的数据不清理 continue } text := qu.ObjToString(v.Value) if text != "" { text = in.RegPreBac.Reg.ReplaceAllString(text, in.RegPreBac.Replace) } j.Result[key][k].Value = text exts = append(exts, map[string]interface{}{ "field": v.Field, "code": v.Code, "ruletext": v.RuleText, "type": v.Type, "matchtype": v.MatchType, "extfrom": v.ExtFrom, "value": text, }) } extinfo[key] = exts } if len(extinfo) > 0 { AddExtLog("clear", j.SourceMid, j.Result, extinfo, in, t) //抽取日志 } } } } //获取抽取结果map[string][]interface{},lua脚本使用 func GetResultMapForLua(j *ju.Job) map[string][]map[string]interface{} { defer qu.Catch() result := map[string][]map[string]interface{}{} for key, val := range j.Result { if result[key] == nil { result[key] = []map[string]interface{}{} } for _, v := range val { tmp := map[string]interface{}{ "field": v.Field, "code": v.Code, "ruletext": v.RuleText, "value": v.Value, "type": v.Type, "matchtype": v.MatchType, "extfrom": v.ExtFrom, } result[key] = append(result[key], tmp) } } return result } //抽取日志 func AddExtLog(ftype, sid string, before interface{}, extinfo interface{}, v *RegLuaInfo, t *TaskInfo) { defer qu.Catch() if !t.IsEtxLog { return } logdata := map[string]interface{}{ "code": v.Code, "name": v.Name, "type": ftype, "ruletext": v.RuleText, "islua": v.IsLua, "field": v.Field, "version": t.Version, "taskname": t.Name, "before": before, "extinfo": extinfo, "sid": sid, "comeintime": time.Now().Unix(), } lock.Lock() ExtLogs[t] = append(ExtLogs[t], logdata) lock.Unlock() } //保存抽取日志 func SaveExtLog() { defer qu.Catch() tmpLogs := map[*TaskInfo][]map[string]interface{}{} lock.Lock() tmpLogs = ExtLogs ExtLogs = map[*TaskInfo][]map[string]interface{}{} lock.Unlock() for k, v := range tmpLogs { if len(v) < saveLimit { db.Mgo.SaveBulk(k.TrackColl, v...) } else { for { if len(v) > saveLimit { tmp := v[:saveLimit] db.Mgo.SaveBulk(k.TrackColl, tmp...) v = v[saveLimit:] } else { db.Mgo.SaveBulk(k.TrackColl, v...) break } } } } time.AfterFunc(10*time.Second, SaveExtLog) } type FieldValue struct { Value interface{} Count int } //分析抽取结果并保存 func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) { qu.Try(func() { doc, result, _id := funcAnalysis(j) if isSaveTag, _ := ju.Config["isSaveTag"].(bool); isSaveTag { go otherNeedSave(j, result, e) } auxinfo := auxInfo(j) //从排序结果中取值 tmp := map[string]interface{}{} //抽取值 tmp["fieldall"] = auxinfo for _, val := range result { for _, v := range val { //取第一个非负数 if v.Score > -1 { tmp[v.Field] = v.Value break } } } if len(j.PackageInfo) > 0 { //分包信息 tmp["package"] = j.PackageInfo } if len(j.Winnerorder) > 0 { //候选人信息 tmp["winnerorder"] = j.Winnerorder } //处理附件 var resultf map[string][]*ju.ExtField if jf != nil { _, resultf, _ = funcAnalysis(jf) auxinfof := auxInfo(jf) tmp["fieldallf"] = auxinfof ffield := map[string]interface{}{} for _, val := range resultf { for _, v := range val { //取第一个非负数 if v.Score > -1 { ffield[v.Field] = v.Value break } } } if len(jf.PackageInfo) > 0 { //分包信息 ffield["package"] = jf.PackageInfo } if len(jf.Winnerorder) > 0 { //候选人信息 ffield["winnerorder"] = jf.Winnerorder } tmp["ffield"] = ffield } for k, v := range *doc { //去重冗余字段 if delFiled(k) { continue } if tmp[k] == nil { tmp[k] = v } } //质量审核 if ok, _ := ju.Config["qualityaudit"].(bool); ok { e.QualityAudit(tmp) } if e.IsExtractCity { //城市抽取 e.ExtractCity(j, tmp, _id) // b, p, c, d := e.TransmitData(tmp, _id) //抽取省份城市 // // log.Debug("省份---", p, "城市---", c, "区---", d) // tmp["district"] = d // if b { // tmp["city"] = c // tmp["area"] = p // } } //品牌抽取 if ju.IsBrandGoods { tmp["checkhas"] = map[string]int{ "hastable": j.HasTable, "hasgoods": j.HasGoods, "hasbrand": j.HasBrand, "haskey": j.HasKey, } if len(j.BrandData) > 0 { tmp["tablebrand"] = j.BrandData } // log.Debug("============", j.HasBrand, j.HasGoods, j.HasKey, j.HasTable, j.BrandData) } //所有kv组成的字符串 var kvtext bytes.Buffer blocks := make([]ju.BlockAndTag, 0) for _, v := range j.Block { //分包和标签 if ju.Config["saveblock"].(bool) { xx, _ := json.Marshal(v) tmpblock := new(ju.TmpBlock) err := json.Unmarshal(xx, &tmpblock) if err != nil { if v.BPackage != nil { bpb, _ := json.Marshal(v.BPackage) tmpblock.BPackage = string(bpb) } tmpblock = rangeBlockToJson(v, *tmpblock) } blocks = append(blocks, ju.BlockAndTag{v.Tag, tmpblock}) } //把所有kv组装成一个字符串,存库 if v.ColonKV != nil { for ck, cv := range v.ColonKV.Kv { kvtext.WriteString(ck) kvtext.WriteString(":") kvtext.WriteString(cv) kvtext.WriteString(" ") } } if v.SpaceKV != nil { for sk, sv := range v.SpaceKV.Kv { kvtext.WriteString(sk) kvtext.WriteString(":") kvtext.WriteString(sv) kvtext.WriteString(" ") } } if v.TableKV != nil { for tk, tv := range v.TableKV.Kv { kvtext.WriteString(tk) kvtext.WriteString(":") kvtext.WriteString(tv) kvtext.WriteString(" ") } } } if kvtext.Len() > 0 { tmp["kvtext"] = kvtext.String() } if len(blocks) > 0 { tmp["blocks"] = blocks } //tmp["extract_content"] = j.Content if e.TaskInfo.TestColl == "" { if len(tmp) > 0 { //保存抽取结果 for field, _ := range e.Fields { if tmp[field] == nil { tmp[field] = "" //覆盖之前版本数据 } } tmp["repeat"] = 0 tmparr := []map[string]interface{}{ map[string]interface{}{ "_id": qu.StringTOBsonId(_id), }, map[string]interface{}{"$set": tmp}, } e.BidArr = append(e.BidArr, tmparr) e.BidTotal++ } if b, ok := ju.Config["saveresult"].(bool); ok && b { id := tmp["_id"] tmp["result"] = result tmp["resultf"] = resultf delete(tmp, "_id") tmparr := []map[string]interface{}{ map[string]interface{}{ "_id": id, }, map[string]interface{}{"$set": tmp}, } e.ResultArr = append(e.ResultArr, tmparr) } } else { //测试结果 delete(tmp, "_id") if len(j.BlockPackage) > 0 { //分包详情 bs, _ := json.Marshal(j.BlockPackage) tmp["epackage"] = string(bs) } tmp["result"] = result tmp["resultf"] = resultf b := db.Mgo.Update(e.TaskInfo.TestColl, `{"_id":"`+_id+`"}`, map[string]interface{}{"$set": tmp}, true, false) if !b { log.Debug(e.TaskInfo.TestColl, _id) } } }, func(err interface{}) { log.Debug("AnalysisSaveResult err", err) }) } //保存其他 //kv、表格、块上的标签凡是新的标签都入库 //val type times firstid createtime 判定field func otherNeedSave(j *ju.Job, result map[string][]*ju.ExtField, e *ExtractTask) { now := time.Now().Unix() coll := e.TaskInfo.TestColl if coll == "" { coll = "extract_tag_result" } else { coll += "_tag" } datas := []map[string]interface{}{} kv := map[string]int{} for _, v := range j.Block { // for _, vv := range []*ju.JobKv{v.ColonKV, v.TableKV, v.SpaceKV} { if vv == nil || vv.KvTag == nil { continue } for kkk, vvv := range vv.KvTag { if vvv.Weight == ju.RetainKvWeight { kv[kkk] = kv[kkk] + 1 } } } for _, vv := range v.NotClassifyTitles { datas = append(datas, map[string]interface{}{ "val": vv, "times": 0, "type": "block", "firstid": j.SourceMid, "createtime": now, }) if len(datas) == 200 { db.Mgo.SaveBulk(coll, datas...) datas = []map[string]interface{}{} } } } for k, v := range kv { datas = append(datas, map[string]interface{}{ "val": k, "times": v, "type": "kv", "firstid": j.SourceMid, "createtime": now, }) if len(datas) == 200 { db.Mgo.SaveBulk(coll, datas...) datas = []map[string]interface{}{} } } if len(datas) > 0 { db.Mgo.SaveBulk(coll, datas...) } } func rangeBlockToJson(j *ju.Block, tmpblock ju.TmpBlock) (b *ju.TmpBlock) { if j == nil { return nil } if len(j.Block) > 0 { for i, v := range j.Block { rangetmp := new(ju.TmpBlock) vb, _ := json.Marshal(v) json.Unmarshal(vb, &rangetmp) tmpblock.Block[i] = rangeBlockToJson(v, *rangetmp) } } if j.ColonKV != nil { cb, _ := json.Marshal(j.ColonKV) tmpblock.ColonKV = string(cb) } if j.SpaceKV != nil { sb, _ := json.Marshal(j.SpaceKV) tmpblock.SpaceKV = string(sb) } if j.TableKV != nil { tb, _ := json.Marshal(j.TableKV) tmpblock.TableKV = string(tb) } return &tmpblock } //去重冗余字段 func delFiled(k string) bool { return k == "detail" || k == "contenthtml" || k == "site" || k == "spidercode" || k == "projectinfo" || k == "jsondata" } func funcAnalysis(j *ju.Job) (*map[string]interface{}, map[string][]*ju.ExtField, string) { defer qu.Catch() doc := j.Data result := j.Result _id := qu.BsonIdToSId((*doc)["_id"]) result = ScoreFields(j) //结果排序 for _, val := range result { ju.Sort(val) } return doc, result, _id } //辅助信息,如果没有排序先排序 func auxInfo(j *ju.Job) map[string][]map[string]interface{} { fieldalls := map[string][]map[string]interface{}{} for field, val := range j.Result { //ju.Sort(val) sfields := []map[string]interface{}{} for _, v := range val { standardized := false if field == "buyer" || field == "winner" || field == "agency" { i := redis.GetInt(field, field+"_"+qu.ObjToString(v.Value)) if i > 0 { standardized = true } } sfield := map[string]interface{}{ "val": v.Value, "type": v.Type, "score": v.Score, "blocktag": v.BlockTag, "sourceval": v.SourceValue, "standardized": standardized, } sfields = append(sfields, sfield) } fieldalls[field] = sfields } return fieldalls } func (e *ExtractTask) QualityAudit(resulttmp map[string]interface{}) { defer qu.Catch() //获取审核字段 for _, field := range e.AuditFields { //1.分包 if resulttmp["package"] != nil { packagedata := resulttmp["package"].(map[string]map[string]interface{}) for _, val := range packagedata { if val[field] != nil { fv := qu.ObjToString(val[field]) if fv != "" { if field == "buyer" || field == "winner" { //field为buyer和winner时特殊处理,先从Redis中查,有直接通过,没有走匹配规则 e.RedisMatch(field, fv, val) //redis匹配 } else { //除了buyer和winner,其他字段走规则匹配 e.RuleMatch(field, fv, val) } } } } } //2.外围 if resulttmp[field] != nil { fv := qu.ObjToString(resulttmp[field]) if fv != "" { if field == "buyer" || field == "winner" { //field为buyer和winner时特殊处理,先从Redis中查,有直接通过,没有走匹配规则 e.RedisMatch(field, fv, resulttmp) //redis匹配 } else { //除了buyer和winner,其他字段走规则匹配 e.RuleMatch(field, fv, resulttmp) } } } } } //Redis匹配 func (e *ExtractTask) RedisMatch(field, fv string, val map[string]interface{}) { defer qu.Catch() i := redis.GetInt(field, field+"_"+fv) //查找redis if i == 0 { //reids未找到,执行规则匹配 val[field+"_isredis"] = false e.RuleMatch(field, fv, val) //规则匹配 } else { //redis找到,打标识存库 val[field+"_isredis"] = true } } //规则匹配 func (e *ExtractTask) RuleMatch(field, fieldval string, tmpMap map[string]interface{}) { defer qu.Catch() if fieldval != "" { SMap := e.StartMatch(field, fieldval) //SMap.AddKey(field+"_isaudit", false) for _, k := range SMap.Keys { tmpMap[k] = SMap.Map[k] } tmpMap[field+"_isaudit"] = false //添加字段未审核信息 } } //开始规则匹配 func (e *ExtractTask) StartMatch(field, text string) *pretreated.SortMap { defer qu.Catch() SMap := pretreated.NewSortMap() lock.Lock() f := e.RecogFieldMap[field] lock.Unlock() if len(f) > 0 { fid := qu.BsonIdToSId(f["_id"]) recogFieldPreRule := qu.ObjToString(f["s_recogfield_prerule"]) textAfterRecogFieldPrerule := ju.PreFilter(text, recogFieldPreRule) //识别字段的前置过滤 if textAfterRecogFieldPrerule != "" { lock.Lock() classMap := e.FidClassMap[fid] lock.Unlock() L: for _, c := range classMap { //class classid := qu.BsonIdToSId(c["_id"]) classPrerule := qu.ObjToString(c["s_class_prerule"]) savefield := qu.ObjToString(c["s_savefield"]) //保存字段 textAfterClassPrerule := ju.PreFilter(textAfterRecogFieldPrerule, classPrerule) //class的前置过滤 if textAfterClassPrerule != "" { lock.Lock() ruleMap := e.CidRuleMap[classid] lock.Unlock() for _, r := range ruleMap { //rule rulePrerule := qu.ObjToString(r["s_rule_prerule"]) s_name := qu.ObjToString(r["s_name"]) rule := r["rule"].([]interface{}) textAfterRulePrerule := ju.PreFilter(textAfterClassPrerule, rulePrerule) //class的前置过滤 if textAfterRulePrerule != "" { b, _ := ju.RecogAnalyRules(textAfterRulePrerule, rule) if b { //匹配到一个分类下某个规则时,不再继续匹配 if savefield != "" { //保存字段不为空,存储代码信息 SMap.AddKey(field+"_"+savefield, s_name) } break L } } } } } } } return SMap }