|
@@ -0,0 +1,446 @@
|
|
|
+package extract
|
|
|
+
|
|
|
+import (
|
|
|
+ db "jy/mongodbutil"
|
|
|
+ ju "jy/util"
|
|
|
+ "log"
|
|
|
+ qu "qfw/util"
|
|
|
+ "regexp"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "gopkg.in/mgo.v2/bson"
|
|
|
+)
|
|
|
+
|
|
|
+type RegLuaInfo struct { //正则或脚本信息
|
|
|
+ Code, Name, Field string
|
|
|
+ RuleText string
|
|
|
+ IsLua bool
|
|
|
+ RegPreBac *ExtReg
|
|
|
+ RegCore *ExtReg
|
|
|
+}
|
|
|
+type ExtReg struct {
|
|
|
+ Reg *regexp.Regexp
|
|
|
+ Replace string
|
|
|
+ Bextract bool
|
|
|
+ ExtractPos int
|
|
|
+}
|
|
|
+type RuleCore struct {
|
|
|
+ RulePres []*RegLuaInfo //前置规则
|
|
|
+ RuleBacks []*RegLuaInfo //后置规则
|
|
|
+ RuleCores []*RegLuaInfo //抽取规则
|
|
|
+}
|
|
|
+type TaskInfo struct {
|
|
|
+ Name, Version, TrackColl string //名称、版本、追踪记录表
|
|
|
+ FromDbAddr, FromDB, FromColl string //抽取数据库地址、库名、表名
|
|
|
+ SaveColl, LastExtId string //抽取结果表、上次抽取信息id
|
|
|
+ DB *db.Pool //数据库连接池
|
|
|
+ IsEtxLog bool //是否开启抽取日志
|
|
|
+ ProcessPool chan bool //任务进程池
|
|
|
+}
|
|
|
+type ExtField struct {
|
|
|
+ Field string //属性
|
|
|
+ Value map[string]int //属性值:出现次数
|
|
|
+ ExtNum int //抽取次数
|
|
|
+}
|
|
|
+type ExtractTask struct {
|
|
|
+ Id string //任务id
|
|
|
+ IsRun bool //是否启动
|
|
|
+ TaskInfo *TaskInfo //任务信息
|
|
|
+ RulePres []*RegLuaInfo //前置规则
|
|
|
+ RuleBacks []*RegLuaInfo //后置规则
|
|
|
+ RuleCores []*RuleCore //抽取规则
|
|
|
+}
|
|
|
+
|
|
|
+var lock sync.RWMutex
|
|
|
+var ExtLogs map[*TaskInfo][]map[string]interface{} //抽取日志
|
|
|
+var saveLimit = 200 //抽取日志批量保存
|
|
|
+var TaskList map[string]*ExtractTask //任务列表
|
|
|
+
|
|
|
+func init() {
|
|
|
+ TaskList = make(map[string]*ExtractTask)
|
|
|
+ go SaveExtLog()
|
|
|
+}
|
|
|
+
|
|
|
+//启动抽取
|
|
|
+func StartExtractTaskId(taskId string) bool {
|
|
|
+ ext := TaskList[taskId]
|
|
|
+ if ext == nil {
|
|
|
+ ext = &ExtractTask{}
|
|
|
+ ext.Id = taskId
|
|
|
+ ext.IsRun = true
|
|
|
+ ext.InitTaskInfo()
|
|
|
+ ext.TaskInfo.DB = db.MgoFactory(1, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
|
|
|
+ ext.InitRulePres()
|
|
|
+ ext.InitRuleBacks()
|
|
|
+ ext.InitRuleCore()
|
|
|
+ //只启动一次taskId
|
|
|
+ go RunExtractTask(ext)
|
|
|
+ }
|
|
|
+ ext.IsRun = true
|
|
|
+ TaskList[taskId] = ext
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+//停止抽取
|
|
|
+func StopExtractTaskId(taskId string) bool {
|
|
|
+ ext := TaskList[taskId]
|
|
|
+ if ext != nil {
|
|
|
+ ext.IsRun = false
|
|
|
+ TaskList[taskId] = ext
|
|
|
+ }
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+//开始抽取
|
|
|
+func RunExtractTask(ext *ExtractTask) {
|
|
|
+ if !ext.IsRun {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
|
|
|
+ list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, `{"title":1,"detail":1,"contenthtml":1}`, false, -1, -1)
|
|
|
+ for _, v := range *list {
|
|
|
+ if !ext.IsRun {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ ext.TaskInfo.ProcessPool <- true
|
|
|
+ go ext.ExtractProcess(v)
|
|
|
+ }
|
|
|
+ time.AfterFunc(30*time.Minute, func() { RunExtractTask(ext) })
|
|
|
+}
|
|
|
+
|
|
|
+//加载任务信息
|
|
|
+func (e *ExtractTask) InitTaskInfo() {
|
|
|
+ task, _ := db.Mgo.FindById("task", e.Id, nil)
|
|
|
+ if len(*task) > 1 {
|
|
|
+ e.TaskInfo = &TaskInfo{
|
|
|
+ Name: (*task)["s_taskname"].(string),
|
|
|
+ Version: (*task)["s_version"].(string),
|
|
|
+ TrackColl: (*task)["s_trackcoll"].(string),
|
|
|
+ FromDbAddr: (*task)["s_mgoaddr"].(string),
|
|
|
+ FromDB: (*task)["s_mgodb"].(string),
|
|
|
+ FromColl: (*task)["s_mgocoll"].(string),
|
|
|
+ SaveColl: (*task)["s_mgosavecoll"].(string),
|
|
|
+ IsEtxLog: qu.If(qu.IntAll((*task)["i_track"]) == 1, true, false).(bool),
|
|
|
+ LastExtId: qu.ObjToString((*task)["s_extlastid"]),
|
|
|
+ ProcessPool: make(chan bool, qu.IntAllDef((*task)["i_process"], 1)),
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ return
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//加载前置规则
|
|
|
+func (e *ExtractTask) InitRulePres() {
|
|
|
+ defer qu.Catch()
|
|
|
+ list, _ := db.Mgo.Find("rule_pre", `{"s_version":"`+e.TaskInfo.Version+`"}`, `{"_id":-1}`, nil, false, -1, -1)
|
|
|
+ for _, v := range *list {
|
|
|
+ rinfo := &RegLuaInfo{
|
|
|
+ Code: v["s_code"].(string),
|
|
|
+ Name: v["s_name"].(string),
|
|
|
+ IsLua: qu.If(v["s_type"].(string) == "1", true, false).(bool),
|
|
|
+ }
|
|
|
+ if rinfo.IsLua {
|
|
|
+ rinfo.RuleText = v["s_luascript"].(string)
|
|
|
+ } else {
|
|
|
+ rinfo.RuleText = v["s_rule"].(string)
|
|
|
+ tmp := strings.Split(rinfo.RuleText, "__")
|
|
|
+ if len(tmp) == 2 {
|
|
|
+ rinfo.RegPreBac = &ExtReg{Reg: regexp.MustCompile(tmp[0]), Replace: tmp[1]}
|
|
|
+ } else {
|
|
|
+ rinfo.RegPreBac = &ExtReg{Reg: regexp.MustCompile(tmp[0]), Replace: ""}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ e.RulePres = append(e.RulePres, rinfo)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//加载后置规则
|
|
|
+func (e *ExtractTask) InitRuleBacks() {
|
|
|
+ defer qu.Catch()
|
|
|
+ list, _ := db.Mgo.Find("rule_back", `{"s_version":"`+e.TaskInfo.Version+`"}`, `{"_id":-1}`, nil, false, -1, -1)
|
|
|
+ for _, v := range *list {
|
|
|
+ rinfo := &RegLuaInfo{
|
|
|
+ Code: v["s_code"].(string),
|
|
|
+ Name: v["s_name"].(string),
|
|
|
+ IsLua: qu.If(v["s_type"].(string) == "1", true, false).(bool),
|
|
|
+ }
|
|
|
+ if rinfo.IsLua {
|
|
|
+ rinfo.RuleText = v["s_luascript"].(string)
|
|
|
+ } else {
|
|
|
+ rinfo.RuleText = v["s_rule"].(string)
|
|
|
+ tmp := strings.Split(rinfo.RuleText, "__")
|
|
|
+ if len(tmp) == 2 {
|
|
|
+ rinfo.RegPreBac = &ExtReg{Reg: regexp.MustCompile(tmp[0]), Replace: tmp[1]}
|
|
|
+ } else {
|
|
|
+ rinfo.RegPreBac = &ExtReg{Reg: regexp.MustCompile(tmp[0]), Replace: ""}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ e.RuleBacks = append(e.RuleBacks, rinfo)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//加载抽取规则
|
|
|
+func (e *ExtractTask) InitRuleCore() {
|
|
|
+ defer qu.Catch()
|
|
|
+ list, _ := db.Mgo.Find("rule_logic", `{"s_version":"`+e.TaskInfo.Version+`"}`, `{"_id":-1}`, nil, false, -1, -1)
|
|
|
+ for _, vv := range *list {
|
|
|
+ if b, _ := vv["isuse"].(bool); !b {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ rcore := &RuleCore{}
|
|
|
+ //前置规则
|
|
|
+ rulePres := []*RegLuaInfo{}
|
|
|
+ plist, _ := db.Mgo.Find("rule_logicpre", `{"sid":"`+qu.BsonIdToSId(vv["_id"])+`","s_version":"`+e.TaskInfo.Version+`"}`, `{"_id":-1}`, nil, false, -1, -1)
|
|
|
+ for _, v := range *plist {
|
|
|
+ rinfo := &RegLuaInfo{
|
|
|
+ Code: v["s_code"].(string),
|
|
|
+ Name: v["s_name"].(string),
|
|
|
+ IsLua: qu.If(v["s_type"].(string) == "1", true, false).(bool),
|
|
|
+ }
|
|
|
+ if rinfo.IsLua {
|
|
|
+ rinfo.RuleText = v["s_luascript"].(string)
|
|
|
+ } else {
|
|
|
+ rinfo.RuleText = v["s_rule"].(string)
|
|
|
+ rinfo.Field = v["s_field"].(string)
|
|
|
+ tmp := strings.Split(rinfo.RuleText, "__")
|
|
|
+ if len(tmp) == 2 {
|
|
|
+ rinfo.RegPreBac = &ExtReg{Reg: regexp.MustCompile(tmp[0]), Replace: tmp[1]}
|
|
|
+ } else {
|
|
|
+ rinfo.RegPreBac = &ExtReg{Reg: regexp.MustCompile(tmp[0]), Replace: ""}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ rulePres = append(rulePres, rinfo)
|
|
|
+ }
|
|
|
+ rcore.RulePres = rulePres
|
|
|
+
|
|
|
+ //后置规则
|
|
|
+ ruleBacks := []*RegLuaInfo{}
|
|
|
+ blist, _ := db.Mgo.Find("rule_logicback", `{"sid":"`+qu.BsonIdToSId(vv["_id"])+`","s_version":"`+e.TaskInfo.Version+`"}`, `{"_id":-1}`, nil, false, -1, -1)
|
|
|
+ for _, v := range *blist {
|
|
|
+ rinfo := &RegLuaInfo{
|
|
|
+ Code: v["s_code"].(string),
|
|
|
+ Name: v["s_name"].(string),
|
|
|
+ IsLua: qu.If(v["s_type"].(string) == "1", true, false).(bool),
|
|
|
+ }
|
|
|
+ if rinfo.IsLua {
|
|
|
+ rinfo.RuleText = v["s_luascript"].(string)
|
|
|
+ } else {
|
|
|
+ rinfo.RuleText = v["s_rule"].(string)
|
|
|
+ rinfo.Field = v["s_field"].(string)
|
|
|
+ tmp := strings.Split(rinfo.RuleText, "__")
|
|
|
+ if len(tmp) == 2 {
|
|
|
+ rinfo.RegPreBac = &ExtReg{Reg: regexp.MustCompile(tmp[0]), Replace: tmp[1]}
|
|
|
+ } else {
|
|
|
+ rinfo.RegPreBac = &ExtReg{Reg: regexp.MustCompile(tmp[0]), Replace: ""}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ruleBacks = append(ruleBacks, rinfo)
|
|
|
+ }
|
|
|
+ rcore.RuleBacks = ruleBacks
|
|
|
+
|
|
|
+ //抽取规则
|
|
|
+ ruleCores := []*RegLuaInfo{}
|
|
|
+ clist, _ := db.Mgo.Find("rule_logicore", `{"sid":"`+qu.BsonIdToSId(vv["_id"])+`","s_version":"`+e.TaskInfo.Version+`"}`, `{"_id":-1}`, nil, false, -1, -1)
|
|
|
+ for _, v := range *clist {
|
|
|
+ if b, _ := v["isuse"].(bool); !b {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ rinfo := &RegLuaInfo{
|
|
|
+ Code: v["s_code"].(string),
|
|
|
+ Name: v["s_name"].(string),
|
|
|
+ IsLua: qu.If(v["s_type"].(string) == "1", true, false).(bool),
|
|
|
+ }
|
|
|
+ if rinfo.IsLua {
|
|
|
+ rinfo.RuleText = v["s_luascript"].(string)
|
|
|
+ } else {
|
|
|
+ rinfo.RuleText = v["s_rule"].(string)
|
|
|
+ rinfo.Field = v["s_field"].(string)
|
|
|
+ tmp := strings.Split(rinfo.RuleText, "__")
|
|
|
+ if len(tmp) == 2 {
|
|
|
+ rinfo.RegCore = &ExtReg{Reg: regexp.MustCompile(tmp[0]), Bextract: true, ExtractPos: qu.IntAll(tmp[1])}
|
|
|
+ } else {
|
|
|
+ rinfo.RegCore = &ExtReg{Reg: regexp.MustCompile(tmp[0]), Bextract: false, ExtractPos: 0}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ruleCores = append(ruleCores, rinfo)
|
|
|
+ }
|
|
|
+ rcore.RuleCores = ruleCores
|
|
|
+
|
|
|
+ //
|
|
|
+ e.RuleCores = append(e.RuleCores, rcore)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//抽取
|
|
|
+func (e *ExtractTask) ExtractProcess(doc map[string]interface{}) {
|
|
|
+ qu.Catch()
|
|
|
+ qu.Try(func() {
|
|
|
+ result := map[string]*ExtField{} //抽取结果
|
|
|
+ //前置规则,结果覆盖doc属性
|
|
|
+ for _, v := range e.RulePres {
|
|
|
+ doc = ExtRegPre(doc, v, e.TaskInfo)
|
|
|
+ }
|
|
|
+ log.Println("前置规则,detail", doc["detail"])
|
|
|
+ //抽取规则
|
|
|
+ for _, vc := range e.RuleCores {
|
|
|
+ data := map[string]interface{}{}
|
|
|
+ //抽取-前置规则
|
|
|
+ tmpdoc := map[string]interface{}{}
|
|
|
+ for _, v := range vc.RulePres {
|
|
|
+ tmpdoc = ExtRegPre(doc, v, e.TaskInfo)
|
|
|
+ }
|
|
|
+ log.Println("抽取-前置规则", tmpdoc)
|
|
|
+ //抽取-规则
|
|
|
+ for _, v := range vc.RuleCores {
|
|
|
+ data = ExtRegCore(tmpdoc, v, e.TaskInfo)
|
|
|
+ }
|
|
|
+ log.Println("抽取-规则", data)
|
|
|
+
|
|
|
+ //抽取-后置规则
|
|
|
+ for _, v := range vc.RuleBacks {
|
|
|
+ data = ExtRegBack(data, v, e.TaskInfo)
|
|
|
+ }
|
|
|
+ log.Println("抽取-后置规则", data)
|
|
|
+
|
|
|
+ //抽取结果赋值
|
|
|
+ for k, v := range data {
|
|
|
+ if k == "_id" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if result[k] == nil {
|
|
|
+ result[k] = &ExtField{Field: k, Value: map[string]int{qu.ObjToString(v): 1}, ExtNum: 1}
|
|
|
+ } else {
|
|
|
+ ef := result[k]
|
|
|
+ ef.Value[qu.ObjToString(v)] += 1
|
|
|
+ ef.ExtNum += 1
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //抽取结果保存 todo
|
|
|
+ }
|
|
|
+ }, func(err interface{}) {
|
|
|
+ log.Println(err)
|
|
|
+ <-e.TaskInfo.ProcessPool
|
|
|
+ })
|
|
|
+ <-e.TaskInfo.ProcessPool
|
|
|
+}
|
|
|
+
|
|
|
+//前置过滤
|
|
|
+func ExtRegPre(doc map[string]interface{}, v *RegLuaInfo, t *TaskInfo) map[string]interface{} {
|
|
|
+ if v.IsLua {
|
|
|
+ lua := ju.LuaScript{Code: v.Code, Name: v.Name, Doc: doc, Script: v.RuleText}
|
|
|
+ data := lua.RunScript()
|
|
|
+ AddExtLog(doc, data, v, t) //抽取日志
|
|
|
+ for k, v := range data {
|
|
|
+ doc[k] = v
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ tmp := doc
|
|
|
+ key := qu.If(v.Field == "", "detail", v.Field).(string)
|
|
|
+ text := qu.ObjToString(doc[key])
|
|
|
+ doc[key] = v.RegPreBac.Reg.ReplaceAllString(text, "")
|
|
|
+ AddExtLog(tmp, doc, v, t) //抽取日志
|
|
|
+ }
|
|
|
+ return doc
|
|
|
+}
|
|
|
+
|
|
|
+//抽取-规则
|
|
|
+func ExtRegCore(doc map[string]interface{}, v *RegLuaInfo, t *TaskInfo) map[string]interface{} {
|
|
|
+ if v.IsLua {
|
|
|
+ lua := ju.LuaScript{Code: v.Code, Name: v.Name, Doc: doc, Script: v.RuleText}
|
|
|
+ data := lua.RunScript()
|
|
|
+ AddExtLog(doc, data, v, t) //抽取日志
|
|
|
+ for k, v := range data {
|
|
|
+ doc[k] = v
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if v.Field == "" {
|
|
|
+ return doc
|
|
|
+ }
|
|
|
+ tmp := doc
|
|
|
+ text := qu.ObjToString(doc["detail"])
|
|
|
+ if v.RegCore.Bextract { //正则是两部分的,可以直接抽取的(含下划线)
|
|
|
+ apos := v.RegCore.Reg.FindAllStringSubmatchIndex(text, -1)
|
|
|
+ if len(apos) > 0 {
|
|
|
+ pos := apos[0]
|
|
|
+ if len(pos)-1 > v.RegCore.ExtractPos {
|
|
|
+ doc[v.Field] = text[pos[v.RegCore.ExtractPos]:pos[v.RegCore.ExtractPos+1]]
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ doc[v.Field] = v.RegCore.Reg.ReplaceAllString(text, "")
|
|
|
+ }
|
|
|
+ AddExtLog(tmp, doc, v, t) //抽取日志
|
|
|
+ }
|
|
|
+ return doc
|
|
|
+}
|
|
|
+
|
|
|
+//后置过滤
|
|
|
+func ExtRegBack(doc map[string]interface{}, v *RegLuaInfo, t *TaskInfo) map[string]interface{} {
|
|
|
+ if v.IsLua {
|
|
|
+ lua := ju.LuaScript{Code: v.Code, Name: v.Name, Doc: doc, Script: v.RuleText}
|
|
|
+ data := lua.RunScript()
|
|
|
+ AddExtLog(doc, data, v, t) //抽取日志
|
|
|
+ for k, v := range data {
|
|
|
+ doc[k] = v
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ tmp := doc
|
|
|
+ if v.Field != "" && qu.ObjToString(doc[v.Field]) != "" {
|
|
|
+ doc[v.Field] = v.RegPreBac.Reg.ReplaceAllString(qu.ObjToString(doc[v.Field]), "")
|
|
|
+ } else {
|
|
|
+ for k, val := range doc {
|
|
|
+ doc[k] = v.RegPreBac.Reg.ReplaceAllString(qu.ObjToString(val), "")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ AddExtLog(tmp, doc, v, t) //抽取日志
|
|
|
+ }
|
|
|
+ return doc
|
|
|
+}
|
|
|
+
|
|
|
+//抽取日志
|
|
|
+func AddExtLog(before, extifno map[string]interface{}, v *RegLuaInfo, t *TaskInfo) {
|
|
|
+ if !t.IsEtxLog {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ logdata := map[string]interface{}{
|
|
|
+ "code": v.Code,
|
|
|
+ "name": v.Name,
|
|
|
+ "ruletext": v.RuleText,
|
|
|
+ "islua": v.IsLua,
|
|
|
+ "version": t.Version,
|
|
|
+ "taskname": t.Name,
|
|
|
+ "before": before,
|
|
|
+ "extinfo": extifno,
|
|
|
+ "comeintime": time.Now().Unix(),
|
|
|
+ }
|
|
|
+ lock.Lock()
|
|
|
+ ExtLogs[t] = append(ExtLogs[t], logdata)
|
|
|
+ lock.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+//保存抽取日志
|
|
|
+func SaveExtLog() {
|
|
|
+ tmpLogs := map[*TaskInfo][]map[string]interface{}{}
|
|
|
+ lock.Lock()
|
|
|
+ tmpLogs = ExtLogs
|
|
|
+ ExtLogs = map[*TaskInfo][]map[string]interface{}{}
|
|
|
+ lock.Unlock()
|
|
|
+ for k, v := range tmpLogs {
|
|
|
+ if len(v) < saveLimit {
|
|
|
+ k.DB.SaveBulk(k.TrackColl, v...)
|
|
|
+ } else {
|
|
|
+ for {
|
|
|
+ if len(v) > saveLimit {
|
|
|
+ tmp := v[:saveLimit]
|
|
|
+ k.DB.SaveBulk(k.TrackColl, tmp...)
|
|
|
+ v = v[saveLimit:]
|
|
|
+ } else {
|
|
|
+ k.DB.SaveBulk(k.TrackColl, v...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ time.AfterFunc(2*time.Minute, SaveExtLog)
|
|
|
+}
|