Przeglądaj źródła

Merge branch 'dev3.2' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.2

zhangjinkun 6 lat temu
rodzic
commit
55117d3de7

+ 75 - 25
src/jy/extract/extract.go

@@ -24,12 +24,12 @@ import (
 var (
 	lock, lockrule, lockclear sync.RWMutex
 
-	cut           = ju.NewCut()                          //获取正文并清理
-	ExtLogs       map[*TaskInfo][]map[string]interface{} //抽取日志
-	TaskList      map[string]*ExtractTask                //任务列表
-	ClearTaskList map[string]*ClearTask                  //清理任务列表
-	saveLimit     = 200                                  //抽取日志批量保存
-	PageSize      = 5000                                 //查询分页
+	cut     = ju.NewCut()                          //获取正文并清理
+	ExtLogs map[*TaskInfo][]map[string]interface{} //抽取日志
+	TaskList      map[string]*ExtractTask          //任务列表
+	ClearTaskList map[string]*ClearTask            //清理任务列表
+	saveLimit     = 200                            //抽取日志批量保存
+	PageSize      = 5000                           //查询分页
 	Fields        = `{"title":1,"detail":1,"contenthtml":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"area":1,"city":1,"comeintime":1,"publishtime":1,"sensitive":1,"projectinfo":1,"jsondata":1}`
 	Fields2       = `{"budget":1,"bidamount":1,"title":1,"projectname":1,"winner":1}`
 )
@@ -1250,23 +1250,29 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 				blocks = append(blocks, ju.BlockAndTag{v.Tag, tmpblock})
 			}
 			//把所有kv组装成一个字符串,存库
-			for ck, cv := range v.ColonKV.Kv {
-				kvtext.WriteString(ck)
-				kvtext.WriteString(":")
-				kvtext.WriteString(cv)
-				kvtext.WriteString(" ")
-			}
-			for sk, sv := range v.SpaceKV.Kv {
-				kvtext.WriteString(sk)
-				kvtext.WriteString(":")
-				kvtext.WriteString(sv)
-				kvtext.WriteString(" ")
-			}
-			for tk, tv := range v.TableKV.Kv {
-				kvtext.WriteString(tk)
-				kvtext.WriteString(":")
-				kvtext.WriteString(tv)
-				kvtext.WriteString(" ")
+			if v.ColonKV != nil {
+				for ck, cv := range v.ColonKV.Kv {
+					kvtext.WriteString(ck)
+					kvtext.WriteString(":")
+					kvtext.WriteString(cv)
+					kvtext.WriteString(" ")
+				}
+			}
+			if v.SpaceKV != nil {
+				for sk, sv := range v.SpaceKV.Kv {
+					kvtext.WriteString(sk)
+					kvtext.WriteString(":")
+					kvtext.WriteString(sv)
+					kvtext.WriteString(" ")
+				}
+			}
+			if v.TableKV != nil {
+				for tk, tv := range v.TableKV.Kv {
+					kvtext.WriteString(tk)
+					kvtext.WriteString(":")
+					kvtext.WriteString(tv)
+					kvtext.WriteString(" ")
+				}
 			}
 		}
 		if kvtext.Len() > 0 {
@@ -1328,13 +1334,57 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 //kv、表格、块上的标签凡是新的标签都入库
 //val  type   times   firstid  createtime 判定field
 func otherNeedSave(j *ju.Job, result map[string][]*ju.ExtField, e *ExtractTask) {
+	now := time.Now().Unix()
 	coll := e.TaskInfo.TestColl
 	if coll == "" {
 		coll = "extract_tag_result"
 	} else {
 		coll += "_tag"
 	}
-	//for _,v := range j.ColonKV
+	datas := []map[string]interface{}{}
+	kv := map[string]int{}
+	for _, v := range j.Block {
+		//
+		for _, vv := range []*ju.JobKv{v.ColonKV, v.TableKV, v.SpaceKV} {
+			if vv == nil || vv.KvTag == nil {
+				continue
+			}
+			for kkk, vvv := range vv.KvTag {
+				if vvv.Weight == ju.RetainKvWeight {
+					kv[kkk] = kv[kkk] + 1
+				}
+			}
+		}
+		for _, vv := range v.NotClassifyTitles {
+			datas = append(datas, map[string]interface{}{
+				"val":        vv,
+				"times":      0,
+				"type":       "block",
+				"firstid":    j.SourceMid,
+				"createtime": now,
+			})
+			if len(datas) == 200 {
+				db.Mgo.SaveBulk(coll, datas...)
+				datas = []map[string]interface{}{}
+			}
+		}
+	}
+	for k, v := range kv {
+		datas = append(datas, map[string]interface{}{
+			"val":        k,
+			"times":      v,
+			"type":       "kv",
+			"firstid":    j.SourceMid,
+			"createtime": now,
+		})
+		if len(datas) == 200 {
+			db.Mgo.SaveBulk(coll, datas...)
+			datas = []map[string]interface{}{}
+		}
+	}
+	if len(datas) > 0 {
+		db.Mgo.SaveBulk(coll, datas...)
+	}
 }
 
 func rangeBlockToJson(j *ju.Block, tmpblock ju.TmpBlock) (b *ju.TmpBlock) {
@@ -1449,7 +1499,7 @@ func (e *ExtractTask) QualityAudit(resulttmp map[string]interface{}) {
 func (e *ExtractTask) RedisMatch(field, fv string, val map[string]interface{}) {
 	defer qu.Catch()
 	i := redis.GetInt(field, field+"_"+fv) //查找redis
-	if i == 0 {                            //reids未找到,执行规则匹配
+	if i == 0 { //reids未找到,执行规则匹配
 		val[field+"_isredis"] = false
 		e.RuleMatch(field, fv, val) //规则匹配
 	} else { //redis找到,打标识存库

+ 6 - 1
src/jy/extract/extractudp.go

@@ -39,8 +39,8 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			if sid == "" || eid == "" {
 				log.Debug("err", "sid=", sid, ",eid=", eid)
 			} else {
-				go Udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
 				if stype == "distributed" { //分布式抽取分支
+					go Udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
 					log.Debug("分布式抽取id段", sid, " ", eid)
 					InstanceId := qu.ObjToString(rep["InstanceId"])
 					db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
@@ -58,6 +58,11 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 						}, true, false)
 					log.Debug("分布式抽取完成", sid, " ", eid, "释放esc实例", qu.ObjToString(rep["ip"]))
 				} else {
+					udpinfo, _ := rep["key"].(string)
+					if udpinfo == "" {
+						udpinfo = "udpok"
+					}
+					go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
 					log.Debug("udp通知抽取id段", sid, " ", eid)
 					ExtractByUdp(sid, eid)
 					log.Debug("udp通知抽取完成,eid=", eid)

+ 12 - 8
src/jy/pretreated/analystep.go

@@ -30,7 +30,7 @@ func AnalyStart(job *util.Job) {
 		}
 	}
 	blockArrays, _ := DivideBlock(job.CategorySecond, con, 1, job.RuleBlock) //分块
-	if len(blockArrays) > 0 { //有分块
+	if len(blockArrays) > 0 {                                                //有分块
 		//从块里面找分包
 		job.BlockPackage = FindPackageFromBlocks(&blockArrays, job.Title) //从块里面找分包
 		for _, bl := range blockArrays {
@@ -109,22 +109,22 @@ func FindProjectCode(newCon string, job *util.Job) {
 		ckv := GetKVAll(proCode, job.Title, nil, 1)
 		blCode.ColonKV = ckv
 		job.Block = append(job.Block, blCode)
-	}else if proCode = projectcodeReg2.FindString(newCon);proCode !=""{
+	} else if proCode = projectcodeReg2.FindString(newCon); proCode != "" {
 		ckv := GetKVAll(proCode, job.Title, nil, 1)
 		blCode.ColonKV = ckv
 		job.Block = append(job.Block, blCode)
-	}else if proCode = projectcodeReg3.FindString(newCon) ;proCode !=""{
+	} else if proCode = projectcodeReg3.FindString(newCon); proCode != "" {
 		ckv := GetKVAll(proCode, job.Title, nil, 1)
 		blCode.ColonKV = ckv
 		job.Block = append(job.Block, blCode)
 	}
-	if proCode = jsonReg.FindString(newCon);proCode != ""{
+	if proCode = jsonReg.FindString(newCon); proCode != "" {
 		jsonMap := make(map[string]string)
-		json.Unmarshal([]byte(proCode),&jsonMap)
+		json.Unmarshal([]byte(proCode), &jsonMap)
 		jobKv := util.NewJobKv()
-		for k,v := range jsonMap{
+		for k, v := range jsonMap {
 			tmpkv := new(util.Kv)
-			tmpkv.Line = k+v
+			tmpkv.Line = k + v
 			tmpkv.Key = k
 			tmpkv.Value = v
 			jobKv.Kvs = append(jobKv.Kvs, tmpkv)
@@ -146,7 +146,11 @@ func processTableResult(tabres *TableResult, block *util.Block, job *util.Job) {
 	for k, v := range tabres.SortKVWeight {
 		kvIndex[k] = v
 	}
-	block.TableKV = &util.JobKv{Kv: kv, KvIndex: kvIndex}
+	KvTag := map[string]*util.Tag{}
+	for k, _ := range tabres.SortKV.NotTagKey {
+		KvTag[k] = &util.Tag{Weight: util.RetainKvWeight}
+	}
+	block.TableKV = &util.JobKv{Kv: kv, KvIndex: kvIndex, KvTag: KvTag}
 
 	//分包
 	tablePackage := map[string]*util.BlockPackage{}

+ 21 - 9
src/jy/pretreated/analytable.go

@@ -125,8 +125,8 @@ func IsHide(g *goquery.Selection) (b bool) {
 
 //对表格的key进行标准化处理,多个k相同时,出现覆盖问题
 //待扩展,暂不支持正则标签库
-func CommonDataAnaly(k, tabletag, tabledesc string, v interface{}) (k1 []string, weight []int, v1, returntag string, b bool) {
-	k1 = []string{}
+func CommonDataAnaly(k, tabletag, tabledesc string, v interface{}) (k1, k2 []string, weight []int, v1, returntag string, b bool) {
+	k1, k2 = []string{}, []string{}
 	weight = []int{}
 	tk := k
 	if sv, sok := v.(string); sok { //取KV
@@ -177,6 +177,8 @@ func CommonDataAnaly(k, tabletag, tabledesc string, v interface{}) (k1 []string,
 				returntag = "中标情况"
 			}
 			b = true
+		} else {
+			k2 = append(k2, k)
 		}
 	}
 	//对上一步没有取到标准化key的进一步处理
@@ -227,7 +229,7 @@ func (table *Table) KVFilter() {
 		v := table.SortKV.Map[k]
 		if _, ok := v.(string); ok { //table.SortKV.Value为字符串,匹配抽取关键词table.SortKV.Key,匹配到添加k,v到table.StandKV,table.StandKVWeight
 			k = regSpliteSegment.ReplaceAllString(regReplAllSpace.ReplaceAllString(k, ""), "")
-			k1, w1, v1, tag, b := CommonDataAnaly(k, table.Tag, table.Desc, v) //对key标准化处理,没有找到会走中标
+			k1, n_k1, w1, v1, tag, b := CommonDataAnaly(k, table.Tag, table.Desc, v) //对key标准化处理,没有找到会走中标
 			//qutil.Debug(k, v, k1, w1, v1, tag, b)
 			if b {
 				//降低冒号值的权重
@@ -257,6 +259,9 @@ func (table *Table) KVFilter() {
 					table.StandKVWeight[k] = 0
 				}
 			}
+			for _, n_k2 := range n_k1 {
+				table.SortKV.NotTagKey[n_k2] = true
+			}
 		} else {
 			//u.Debug(k, v, "---------")
 			as.AddKey(k, v)
@@ -473,7 +478,7 @@ func (table *Table) sortKVArr(as *SortMap, winnertag bool) {
 					}
 				}
 			}
-			k1, w1, v1, tag, b := CommonDataAnaly(k, table.Tag, table.Desc, v)
+			k1, n_k1, w1, v1, tag, b := CommonDataAnaly(k, table.Tag, table.Desc, v)
 			if b {
 				if tag != "" && table.Tag == "" {
 					table.Tag = tag
@@ -489,6 +494,10 @@ func (table *Table) sortKVArr(as *SortMap, winnertag bool) {
 					//					}
 					//				}
 				}
+			} else {
+				for _, n_k2 := range n_k1 {
+					table.SortKV.NotTagKey[n_k2] = true
+				}
 			}
 		}
 	}
@@ -659,7 +668,7 @@ func (ts *TableResult) Analy() {
 		//核心模块
 		ts := tn.Analy(contactFormat)
 		for _, tab := range ts {
-			if len(tab.TRs) > 0{
+			if len(tab.TRs) > 0 {
 				tabs = append(tabs, tab)
 			}
 			//fmt.Println("tab.SortKV.Map", tab.SortKV.Keys)
@@ -840,11 +849,14 @@ func (tn *Table) AnalyTables(contactFormat *u.ContactFormat) []*Table {
 				table.KVFilter()
 			}
 			for k, v := range table.StandKV { //过滤后的标准化kv
-				if table.TableResult.SortKV.Map[k] == nil {
+				if table.TableResult.SortKV.Map[k] == nil || table.StandKVWeight[k] > table.TableResult.SortKVWeight[k] {
 					table.TableResult.SortKV.AddKey(k, v)
 					table.TableResult.SortKVWeight[k] = table.StandKVWeight[k]
 				}
 			}
+			for k, v := range table.SortKV.NotTagKey {
+				table.TableResult.SortKV.NotTagKey[k] = v
+			}
 			//u.Debug(str)
 		}
 	}
@@ -2079,7 +2091,7 @@ func (tn *Table) manyPackageProcessByIndex(index []string, standIndex_pos []int)
 			}
 		} else if val, bvs := v1.(string); bvs && len(index) == 1 {
 			//删除子包的kv
-			k1tags, _, _, _, _ := CommonDataAnaly(k1, "", "", val)
+			k1tags, _, _, _, _, _ := CommonDataAnaly(k1, "", "", val)
 			if len(k1tags) > 0 && regexp.MustCompile("^(项目|开标|采购单位|招标机构)").MatchString(k1tags[0]) { //(k1tags[0].Value == "采购单位" || k1tags[0].Value == "项目编号")) {
 				//log.Println("remove", k1, val)
 				tn.SortKV.RemoveKey(k1)
@@ -2301,7 +2313,7 @@ func (tn *Table) assemblePackage(k1, v1, key string) {
 		bp.TableKV = u.NewJobKv()
 	}
 	if v1 != "" {
-		k2, w1, v2, _, bf := CommonDataAnaly(k1, "中标情况", "", v1) //匹配抽取关键词
+		k2, _, w1, v2, _, bf := CommonDataAnaly(k1, "中标情况", "", v1) //匹配抽取关键词
 		if bf {
 			for pos, k3 := range k2 {
 				if bp.TableKV.Kv != nil && bp.TableKV.KvTag[k3] != nil && (bp.TableKV.Kv[k3] == "" || w1[pos] > bp.TableKV.KvTag[k3].Weight) {
@@ -3169,7 +3181,7 @@ func initLineMapLineMapArr(table *Table) (lineMapArr map[string]*SortMap, lineMa
 	for _, key := range table.SortKV.Keys { //遍历table.SortKV.Keys而不是直接遍历table.SortKV.Map是为了得到table头的顺序
 		val := table.SortKV.Map[key]
 		key = regReplAllSpace.ReplaceAllString(key, "")
-		key = strings.Replace(key, "", "", -1) //处理一个特殊的采购量 经上层处理空格后未处理掉
+		key = strings.Replace(key, "", "", -1)    //处理一个特殊的采购量 经上层处理空格后未处理掉
 		if realTypeVal, ok := val.([]string); ok { //val为数组 {"数量":["1","2","3"]}
 			/*
 				{

+ 32 - 27
src/jy/pretreated/tablev2.go

@@ -126,7 +126,7 @@ func NewTD(Goquery *goquery.Selection, tr *TR, table *Table) *TD {
 		td.tdHasTable(&bsontable, tr, table) //处理td中的table,块标签处理,子表解析集处理
 		//处理table外内容
 		var ub []*u.Block
-		ub, _ = DivideBlock("",txt, 2, table.TableResult.RuleBlock)
+		ub, _ = DivideBlock("", txt, 2, table.TableResult.RuleBlock)
 		//看是否划块
 		if len(ub) > 0 {
 			colonKvWeight := map[string]int{}
@@ -156,31 +156,31 @@ func NewTD(Goquery *goquery.Selection, tr *TR, table *Table) *TD {
 	td.Text = txt //原始串
 	//调用kv解析
 	cKV := GetKVAll(text, "", nil, 1)
-	for k,v :=range cKV.Kv{
-		td.SortKV.AddKey(k,v)
+	for k, v := range cKV.Kv {
+		td.SortKV.AddKey(k, v)
 	}
 	sKV := SspacekvEntity.Entrance(text, "", nil)
-	for k,v :=range sKV.Kv{
-		td.SortKV.AddKey(k,v)
+	for k, v := range sKV.Kv {
+		td.SortKV.AddKey(k, v)
 	}
 	//抽取不到走正则抽
 	proCode := projectcodeReg.FindString(text)
 	if proCode != "" {
 		ckv := GetKVAll(proCode, "", nil, 1)
-		for k,v :=range ckv.Kv{
-			td.SortKV.AddKey(k,v)
+		for k, v := range ckv.Kv {
+			td.SortKV.AddKey(k, v)
 		}
-	}else if proCode = projectcodeReg2.FindString(text);proCode !=""{
+	} else if proCode = projectcodeReg2.FindString(text); proCode != "" {
 		ckv := GetKVAll(proCode, "", nil, 1)
-		for k,v :=range ckv.Kv{
-			td.SortKV.AddKey(k,v)
+		for k, v := range ckv.Kv {
+			td.SortKV.AddKey(k, v)
 		}
 	}
-	if proCode = jsonReg.FindString(text);proCode != ""{
+	if proCode = jsonReg.FindString(text); proCode != "" {
 		jsonMap := make(map[string]string)
-		json.Unmarshal([]byte(proCode),&jsonMap)
-		for k,v := range jsonMap{
-			td.SortKV.AddKey(k,v)
+		json.Unmarshal([]byte(proCode), &jsonMap)
+		for k, v := range jsonMap {
+			td.SortKV.AddKey(k, v)
 		}
 	}
 	//对td单元格值判断是否是表头和根据td内容长度进行分块处理
@@ -229,11 +229,12 @@ func (td *TD) tdHasTable(bsontable *bool, tr *TR, table *Table) {
 			}
 			sonts := AnalyTableV2(tabs, ts.Toptype, stag, td.Html, 2, ts.Id, table.TableResult.RuleBlock) //又一次调用解析表格入口
 			td.BH = false
-			for k,v := range sonts.SortKV.Map{
-				if td.TR.Table.TableResult == nil{
-					td.TR.Table.TableResult = NewTableResult(sonts.Id,sonts.Toptype,sonts.BlockTag,sonts.Html,sonts.Itype,sonts.RuleBlock)
+			for k, v := range sonts.SortKV.Map {
+				if td.TR.Table.TableResult == nil {
+					td.TR.Table.TableResult = NewTableResult(sonts.Id, sonts.Toptype, sonts.BlockTag, sonts.Html, sonts.Itype, sonts.RuleBlock)
 				}
-				td.TR.Table.TableResult.SortKV.AddKey(k,v)
+				td.TR.Table.TableResult.SortKV.AddKey(k, v)
+				td.TR.Table.TableResult.SortKVWeight[k] = sonts.SortKVWeight[k]
 			}
 			//td.SonTableResult = sonts
 			//for _, k := range sonts.SortKV.Keys {
@@ -436,8 +437,8 @@ func (td *TD) tdIsHb(tr *TR, table *Table, bsontable bool) {
 		*/
 
 		fSortKV := FindKv(td.Val, "", 2)
-		for k,v := range fSortKV.Map{
-			td.SortKV.AddKey(k,v)
+		for k, v := range fSortKV.Map {
+			td.SortKV.AddKey(k, v)
 		}
 		//		td.LeftNode.Val
 		//		for _, vvv := range *td.TR {
@@ -639,18 +640,20 @@ func (t *Table) InsertTR(tr *TR) {
 
 //支持排序的map
 type SortMap struct {
-	Index map[string]int
-	Keys  []string
-	Map   map[string]interface{}
-	Lock  sync.Mutex
+	Index     map[string]int
+	Keys      []string
+	Map       map[string]interface{}
+	Lock      sync.Mutex
+	NotTagKey map[string]bool
 }
 
 //快速创建排序map
 func NewSortMap() *SortMap {
 	return &SortMap{
-		Index: map[string]int{},
-		Keys:  []string{},
-		Map:   map[string]interface{}{},
+		Index:     map[string]int{},
+		Keys:      []string{},
+		Map:       map[string]interface{}{},
+		NotTagKey: map[string]bool{},
 	}
 }
 
@@ -881,11 +884,13 @@ func ComputeConRatio(con string, strtype int) (tabs []*goquery.Selection, ratio
 	**/
 	return
 }
+
 //纯文本
 func HtmlToText(con string) string {
 	doc2, _ := goquery.NewDocumentFromReader(strings.NewReader(con))
 	return doc2.Text()
 }
+
 //取出排除表格之外的文本
 func TextAfterRemoveTable(con string) string {
 	doc2, _ := goquery.NewDocumentFromReader(strings.NewReader(con))