瀏覽代碼

Merge branch 'dev3.2' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.2

maxiaoshan 6 年之前
父節點
當前提交
699cca364d

+ 7 - 4
src/config.json

@@ -1,7 +1,7 @@
 {
     "port": "9090",
     "mgodb": "192.168.3.207:27082",
-    "dbsize": 2,
+    "dbsize": 10,
     "dbname": "extract_kf",
     "redis": "buyer=192.168.3.207:1377,winner=192.168.3.207:1378,agency=192.168.3.207:1379",
     "elasticsearch": "http://192.168.3.11:9800",
@@ -10,11 +10,11 @@
     "mergetablealias": "projectset_v1",
     "saveresult": true,
     "qualityaudit": false,
-    "saveblock": false,
+    "saveblock": true,
     "filelength": 100000,
     "iscltlog": false,
     "brandgoods": false,
-    "udptaskid": "5cdd3025698414032c8322b1",
+    "udptaskid": "5d5d13cfd0fcef0e580d4fb9",
     "udpport": "1484",
     "nextNode": [
         {
@@ -56,5 +56,8 @@
             }
         ]
     },
-	"isSaveTag":false
+	"isSaveTag":false,
+    "tomail": "zhangjinkun@topnet.net.cn,chenmingzhu@topnet.net.cn,zhaolongyue@topnet.net.cn",
+    "api": "http://10.171.112.160:19281/_send/_mail",
+    "deleteInstanceTimeHour":1
 } 

+ 1 - 1
src/jy/cluster/distributed.go → src/jy/admin/distribution/distributed.go

@@ -1,7 +1,7 @@
 /**
 分布式抽取
 **/
-package cluster
+package distribution
 
 import (
 	"encoding/json"

+ 58 - 11
src/jy/admin/distribution/distribution.go

@@ -2,13 +2,17 @@
 package distribution
 
 import (
+	"encoding/json"
+	"github.com/gin-gonic/gin"
 	. "jy/admin"
-	ecs "jy/cluster"
+	"jy/cluster"
+	"jy/extract"
 	db "jy/mongodbutil"
+	ju "jy/util"
 	"log"
+	mu "mfw/util"
+	"net"
 	qu "qfw/util"
-
-	"github.com/gin-gonic/gin"
 )
 
 func init() {
@@ -30,13 +34,13 @@ func init() {
 			sdate, _ := c.GetPostForm("s_date")
 			edate, _ := c.GetPostForm("e_date")
 			log.Println(s_table, sdate, edate)
-			ecs.IdsRange(s_table, sdate, edate)
+			IdsRange(s_table, sdate, edate)
 		} else if instanceid != "" { //实例自动释放时间
 			hours, _ := c.GetPostForm("hours")
 			hour := qu.IntAll(hours)
 			if hour > 0 {
 				log.Println(instanceid, hour)
-				ecs.ModifyInstanceAutoReleaseTime(instanceid, hour)
+				cluster.ModifyInstanceAutoReleaseTime(instanceid, hour)
 			}
 		} else { //申请ecs实例
 			TaskName, _ := c.GetPostForm("s_name")
@@ -44,23 +48,66 @@ func init() {
 			s_flow, _ := c.GetPostForm("s_flow")
 			num, _ := c.GetPostForm("i_num")
 			hours, _ := c.GetPostForm("i_hours")
-			ecs.RunInstances(TaskName, s_computer, s_flow, qu.IntAll(num), qu.IntAll(hours))
+			cluster.RunInstances(TaskName, s_computer, s_flow, qu.IntAll(num), qu.IntAll(hours))
 		}
 		c.JSON(200, gin.H{"rep": true})
 	})
 
 	//更新ecs实例状态
 	Admin.POST("/distribution/upstatus", func(c *gin.Context) {
-		ecs.DescribeInstances()
+		cluster.DescribeInstances()
 		c.JSON(200, gin.H{"rep": true})
 	})
-
+	//继续执行实例id区间段
+	Admin.POST("/distribution/continueExecution", func(c *gin.Context) {
+		qu.Catch()
+		InstanceId, _ := c.GetPostForm("InstanceId")
+		tmp, b := db.Mgo.FindOneByField("ecs", `{"InstanceId":"`+InstanceId+`"}`, `{"ip_nw":1,"lastId":1,"extask":1}`)
+		if b && tmp != nil {
+			ip, sid, eid := "", "", ""
+			ip = qu.ObjToString((*tmp)["ip_nw"])
+			if extasktmp, ok := (*tmp)["extask"].([]interface{}); ok && len(extasktmp) > 1 {
+				log.Println(extasktmp)
+				if qu.ObjToString((*tmp)["lastId"]) == "" {
+					sid = qu.ObjToString(extasktmp[0])
+				} else {
+					sid = qu.ObjToString((*tmp)["lastId"])
+				}
+				eid = qu.ObjToString(extasktmp[1])
+				by, _ := json.Marshal(map[string]interface{}{
+					"ip":         ip,
+					"gtid":       sid,
+					"lteid":      eid,
+					"InstanceId": InstanceId,
+					"stype":      "distributed",
+				})
+				err := extract.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+					IP:   net.ParseIP(ip),
+					Port: qu.IntAll(ju.Config["udpport"]),
+				})
+				if err != nil {
+					log.Println(err)
+					c.JSON(200, gin.H{"rep": true})
+					return
+				}
+				c.JSON(200, gin.H{"rep": true})
+				return
+			}
+		}
+		c.JSON(200, gin.H{"rep": false})
+	})
 	//释放ecs实例
 	Admin.POST("/distribution/deleteInstance", func(c *gin.Context) {
 		InstanceId, _ := c.GetPostForm("InstanceId")
-		ecs.DeleteInstance(InstanceId)
+		cluster.DeleteInstance(InstanceId)
 		c.JSON(200, gin.H{"rep": true})
 	})
+	//数据库删除ecs实例
+	Admin.POST("/distribution/deleteInstancedb", func(c *gin.Context) {
+		InstanceId, _ := c.GetPostForm("InstanceId")
+		b := db.Mgo.Del("ecs", `{"InstanceId":"`+InstanceId+`"}`)
+		c.JSON(200, gin.H{"rep": b})
+	})
 
 	//部署
 	Admin.POST("/distribution/deploy", func(c *gin.Context) {
@@ -69,7 +116,7 @@ func init() {
 			ip := qu.ObjToString(v["ip_nw"])
 			id := qu.ObjToString(v["InstanceId"])
 			log.Println("部署", ip, id)
-			ecs.RunSsh(ip, ecs.Password, 22)
+			cluster.RunSsh(ip, cluster.Password, 22)
 			db.Mgo.Update("ecs", `{"ip_nw":"`+ip+`"}`,
 				map[string]interface{}{
 					"$set": map[string]interface{}{
@@ -82,7 +129,7 @@ func init() {
 
 	//分发任务
 	Admin.POST("/distribution/rangetask", func(c *gin.Context) {
-		num := ecs.RunEcsTask()
+		num := RunEcsTask()
 		log.Println("任务分发完成", num)
 		c.JSON(200, gin.H{"rep": true})
 	})

+ 1 - 1
src/jy/admin/track/track.go

@@ -58,7 +58,7 @@ func GetTrackPath(_id, field, table, resultcoll string) map[string]interface{} {
 	//针对lua抽取kv路径拆分
 	luatracks := map[string][]map[string]interface{}{}
 	for k, v := range tracks {
-		if strings.Contains(k, "CL") {
+		if strings.Contains(k, "CL") ||strings.Contains(k, "JsonData"){
 			track, _ := v.([]map[string]interface{})
 			for _, val := range track { //按code,kvtype,matchtype分组
 				code := k + "_" + qu.ObjToString(val["kvtype"]) + "_" + qu.ObjToString(val["matchtype"])

+ 4 - 3
src/jy/cluster/ssh.go

@@ -49,11 +49,12 @@ var sshstr = `
 cd /opt
 kill -9 $(pidof extract_v3)
 rm -rf extract_v3*
-wget http://10.170.187.34:8300/upload/extract_v3.tgz
-tar -xzvf extract_v3.tgz
+mkdir extract_v3
 cd /opt/extract_v3
+wget http://10.171.112.160:9090/res/extract_v3.tgz
+tar -xzvf extract_v3.tgz
 chmod 777 extract_v3
-nohup ./extract_v3 >/dev/null 2>&1 &
+nohup ./extract_v3 >/opt/extract_v3/nohup 2>&1 &
 exit
 `
 

+ 28 - 8
src/jy/extract/clearesult.go

@@ -63,9 +63,9 @@ func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
 				"$set": set,
 			},
 		}
-		lock.Lock()
+		c.RWMutex.Lock()
 		c.UpdateResult = append(c.UpdateResult, updatearr)
-		lock.Unlock()
+		c.RWMutex.Unlock()
 	}, func(err interface{}) {
 		log.Debug((*doc)["_id"], err)
 		<-c.ClearTaskInfo.ProcessPool
@@ -96,21 +96,25 @@ func SaveCltLog() {
 			}
 		}
 	}
-	time.AfterFunc(10*time.Second, SaveCltLog)
+	time.AfterFunc(3*time.Second, SaveCltLog)
 }
 
 //批量更新抽取结果的值
 func (c *ClearTask) UpdateResultVal(init bool) {
 	defer qu.Catch()
+	c.RWMutex.Lock()
 	if c.UpdateResult == nil {
 		c.UpdateResult = [][]map[string]interface{}{}
 	}
+	c.RWMutex.Unlock()
 	if init {
 		go func() {
 			for {
-				if len(c.UpdateResult) > 500 {
-					arr := c.UpdateResult[:500]
-					c.UpdateResult = c.UpdateResult[500:]
+				c.RWMutex.Lock()
+				if len(c.UpdateResult) > 50 {
+					arr := c.UpdateResult[:50]
+					c.UpdateResult = c.UpdateResult[50:]
+					c.RWMutex.Unlock()
 					qu.Try(func() {
 						c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
 					}, func(err interface{}) {
@@ -119,20 +123,36 @@ func (c *ClearTask) UpdateResultVal(init bool) {
 				} else {
 					arr := c.UpdateResult
 					c.UpdateResult = [][]map[string]interface{}{}
+					c.RWMutex.Unlock()
 					qu.Try(func() {
 						c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
 					}, func(err interface{}) {
 						log.Debug(err)
 					})
-					time.Sleep(10 * time.Second)
+					time.Sleep(3 * time.Second)
 				}
 			}
 		}()
 	} else {
+		c.RWMutex.Lock()
 		arr := c.UpdateResult
 		c.UpdateResult = [][]map[string]interface{}{}
+		c.RWMutex.Unlock()
 		qu.Try(func() {
-			c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
+			lenarr := len(arr)
+			for {
+				if lenarr > 50 {
+					arr2 := arr[:50]
+					arr = arr[50:]
+					lenarr = len(arr)
+					c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr2...)
+					time.Sleep(1*time.Second)
+				} else {
+					c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
+					break
+				}
+			}
+			//c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
 		}, func(err interface{}) {
 			log.Debug(err)
 		})

+ 36 - 9
src/jy/extract/extract.go

@@ -17,6 +17,8 @@ import (
 	"time"
 	"unicode/utf8"
 
+	"github.com/PuerkitoBio/goquery"
+
 	log "github.com/donnie4w/go-logger/logger"
 	"gopkg.in/mgo.v2/bson"
 )
@@ -28,7 +30,7 @@ var (
 	ExtLogs       map[*TaskInfo][]map[string]interface{} //抽取日志
 	TaskList      map[string]*ExtractTask                //任务列表
 	ClearTaskList map[string]*ClearTask                  //清理任务列表
-	saveLimit     = 200                                  //抽取日志批量保存
+	saveLimit     = 100                                  //抽取日志批量保存
 	PageSize      = 5000                                 //查询分页
 	Fields        = `{"title":1,"summary":1,"detail":1,"contenthtml":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"area":1,"city":1,"comeintime":1,"publishtime":1,"sensitive":1,"projectinfo":1,"jsondata":1}`
 	Fields2       = `{"budget":1,"bidamount":1,"title":1,"projectname":1,"winner":1}`
@@ -246,6 +248,22 @@ func (e *ExtractTask) PreInfo(doc map[string]interface{}) (j, jf *ju.Job) {
 	if isextFile {
 		file2text(&doc) //附件文本堆一起(后期可以考虑,分开处理),方法里修改了doc["detailfile"]结果
 	}
+	//正文小于50个字,有附件把附件内容加到正文
+	tmpDeatil := detail
+	tmpdocument, err := goquery.NewDocumentFromReader(strings.NewReader(tmpDeatil))
+	if err == nil {
+		conlen := utf8.RuneCountInString(strings.Trim(tmpdocument.Text(), " "))
+		if conlen < 50 {
+			if isextFile {
+				detail += qu.ObjToString(doc["detailfile"])
+				doc["detail"] = detail
+			}
+		} else if conlen > qu.IntAllDef(ju.Config["filelength"], 100000) {
+			//防止文本过长,造成抽取阻塞
+			log.Debug("文本太长", doc["_id"], conlen)
+			doc["detail"] = d3
+		}
+	}
 	toptype := qu.ObjToString(doc["toptype"])
 	subtype := qu.ObjToString(doc["subtype"])
 	if qu.ObjToString(doc["type"]) == "bid" {
@@ -707,7 +725,7 @@ func ExtRuleCoreByReg(extfrom string, doc map[string]interface{}, j *ju.Job, in
 //lua脚本根据属性设置提取kv值
 func getKvByLuaFields(vc *RuleCore, j *ju.Job, et *ExtractTask) (map[string][]map[string]interface{}, bool) {
 	kvmap := map[string][]map[string]interface{}{}
-	if len(j.Winnerorder) > 0 {
+	if len(j.Winnerorder) > 1 {
 		if vc.Field == "bidamount" {
 			for _, v := range j.Winnerorder {
 				kvmap[vc.Field] = append(kvmap[vc.Field], map[string]interface{}{
@@ -1150,7 +1168,7 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 	qu.Try(func() {
 		//重新取出清理过后的中标候选人
 		resetWinnerorder(j)
-		doc, result, _id := funcAnalysis(j, e.Tag)
+		doc, result, _id := funcAnalysis(j, e)
 		if isSaveTag, _ := ju.Config["isSaveTag"].(bool); isSaveTag {
 			go otherNeedSave(j, result, e)
 		}
@@ -1178,7 +1196,7 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 		//处理附件
 		var resultf map[string][]*ju.ExtField
 		if jf != nil {
-			_, resultf, _ = funcAnalysis(jf, e.Tag)
+			_, resultf, _ = funcAnalysis(jf, e)
 			auxinfof := auxInfo(jf)
 			tmp["fieldallf"] = auxinfof
 			ffield := map[string]interface{}{}
@@ -1272,7 +1290,11 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 			tmp["kvtext"] = kvtext.String()
 		}
 		if len(blocks) > 0 {
-			tmp["blocks"] = blocks
+			if blocksBytes, err := json.Marshal(blocks); err == nil {
+				if utf8.RuneCount(blocksBytes) < 100000 {
+					tmp["blocks"] = string(blocksBytes)
+				}
+			}
 		}
 		//tmp["extract_content"] = j.Content
 		if e.TaskInfo.TestColl == "" {
@@ -1289,8 +1311,10 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 					},
 					map[string]interface{}{"$set": tmp},
 				}
+				e.RWMutex.Lock()
 				e.BidArr = append(e.BidArr, tmparr)
 				e.BidTotal++
+				e.RWMutex.Unlock()
 			}
 			if b, ok := ju.Config["saveresult"].(bool); ok && b {
 				id := tmp["_id"]
@@ -1303,7 +1327,9 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 					},
 					map[string]interface{}{"$set": tmp},
 				}
+				e.RWMutex.Lock()
 				e.ResultArr = append(e.ResultArr, tmparr)
+				e.RWMutex.Unlock()
 			}
 		} else { //测试结果
 			delete(tmp, "_id")
@@ -1359,7 +1385,7 @@ func otherNeedSave(j *ju.Job, result map[string][]*ju.ExtField, e *ExtractTask)
 				"firstid":    j.SourceMid,
 				"createtime": now,
 			})
-			if len(datas) == 200 {
+			if len(datas) == saveLimit {
 				db.Mgo.SaveBulk(coll, datas...)
 				datas = []map[string]interface{}{}
 			}
@@ -1373,7 +1399,7 @@ func otherNeedSave(j *ju.Job, result map[string][]*ju.ExtField, e *ExtractTask)
 			"firstid":    j.SourceMid,
 			"createtime": now,
 		})
-		if len(datas) == 200 {
+		if len(datas) == saveLimit {
 			db.Mgo.SaveBulk(coll, datas...)
 			datas = []map[string]interface{}{}
 		}
@@ -1415,17 +1441,18 @@ func delFiled(k string) bool {
 	return k == "summary" || k == "detail" || k == "contenthtml" || k == "site" || k == "spidercode" || k == "projectinfo" || k == "jsondata"
 }
 
-func funcAnalysis(j *ju.Job, ftag map[string][]*Tag) (*map[string]interface{}, map[string][]*ju.ExtField, string) {
+func funcAnalysis(j *ju.Job, e *ExtractTask) (*map[string]interface{}, map[string][]*ju.ExtField, string) {
 	defer qu.Catch()
 	doc := j.Data
 	result := j.Result
 	_id := qu.BsonIdToSId((*doc)["_id"])
-	result = ScoreFields(j, ftag)
+	result = ScoreFields(j, e.Tag)
 
 	//结果排序
 	for _, val := range result {
 		ju.Sort(val)
 	}
+	j.Result = JsonDataMergeProcessing(j, e)
 	return doc, result, _id
 }
 

+ 61 - 19
src/jy/extract/extractInit.go

@@ -16,7 +16,8 @@ import (
 	jb "github.com/yanyiwu/gojieba"
 )
 
-type RegLuaInfo struct { //正则或脚本信息
+type RegLuaInfo struct {
+	//正则或脚本信息
 	Code, Name, Field string  //
 	RuleText          string  //
 	IsLua             bool    //
@@ -76,11 +77,12 @@ type ExtractTask struct {
 	IsFileField bool      //是否开启附件抽取
 	FileFields  *sync.Map //抽取附件属性组
 
-	ResultChanel chan bool                  //抽取结果详情
-	ResultArr    [][]map[string]interface{} //抽取结果详情
-	BidChanel    chan bool                  //抽取结果
-	BidArr       [][]map[string]interface{} //抽取结果
-	BidTotal     int                        //结果数量
+	ResultChanel chan bool //抽取结果详情
+	sync.RWMutex
+	ResultArr [][]map[string]interface{} //抽取结果详情
+	BidChanel chan bool                  //抽取结果
+	BidArr    [][]map[string]interface{} //抽取结果
+	BidTotal  int                        //结果数量
 
 	RecogFieldMap map[string]map[string]interface{}   //识别字段
 	FidClassMap   map[string][]map[string]interface{} //分类
@@ -155,6 +157,7 @@ type ClearLua struct {
 }
 
 type ClearTask struct {
+	sync.RWMutex
 	Id            string                 //任务id
 	Content       string                 //信息内容
 	ClearTaskInfo *ClearTaskInfo         //任务信息
@@ -579,7 +582,8 @@ func (e *ExtractTask) InitTag() {
 				tab.Items[k] = &ju.Tag{"", key.(string), 0 - k, nil, false}
 			}
 			sort.Sort(tab.Items)
-			ju.TagdbTable[fname] = &tab
+			//ju.TagdbTable[fname] = &tab
+			ju.TagdbTable.Store(fname, &tab)
 		}
 	}
 	//正则标签库
@@ -596,7 +600,8 @@ func (e *ExtractTask) InitTag() {
 				tab.Items[k] = &ju.Tag{"", key.(string), 0 - k, regexp.MustCompile(key.(string)), false}
 			}
 			sort.Sort(tab.Items)
-			ju.TagdbTable[fname+"_reg"] = &tab
+			//ju.TagdbTable[fname+"_reg"] = &tab
+			ju.TagdbTable.Store(fname+"_reg", &tab)
 		}
 	}
 }
@@ -1040,15 +1045,19 @@ func (e *ExtractTask) InitAreaCode() {
 //保存抽取详情数据
 func (e *ExtractTask) ResultSave(init bool) {
 	defer qu.Catch()
+	e.RWMutex.Lock()
 	if e.ResultArr == nil {
 		e.ResultArr = [][]map[string]interface{}{}
 	}
+	e.RWMutex.Unlock()
 	if init {
 		go func() {
 			for {
-				if len(e.ResultArr) > 500 {
-					arr := e.ResultArr[:500]
-					e.ResultArr = e.ResultArr[500:]
+				e.RWMutex.Lock()
+				if len(e.ResultArr) > saveLimit {
+					arr := e.ResultArr[:saveLimit]
+					e.ResultArr = e.ResultArr[saveLimit:]
+					e.RWMutex.Unlock()
 					qu.Try(func() {
 						db.Mgo.UpSertBulk("extract_result", arr...)
 					}, func(err interface{}) {
@@ -1057,21 +1066,37 @@ func (e *ExtractTask) ResultSave(init bool) {
 				} else {
 					arr := e.ResultArr
 					e.ResultArr = [][]map[string]interface{}{}
+					e.RWMutex.Unlock()
 					qu.Try(func() {
 						db.Mgo.UpSertBulk("extract_result", arr...)
 					}, func(err interface{}) {
 						log.Debug(err)
 					})
 				}
-				time.Sleep(10 * time.Second)
+
+				time.Sleep(3 * time.Second)
 			}
 		}()
 	} else {
+		e.RWMutex.Lock()
 		arr := e.ResultArr
 		e.ResultArr = [][]map[string]interface{}{}
+		e.RWMutex.Unlock()
 		qu.Try(func() {
-			e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+			lenarr := len(arr)
+			for {
+				if lenarr > saveLimit {
+					arr2 := arr[:saveLimit]
+					arr = arr[saveLimit:]
+					lenarr = len(arr)
+					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr2...)
+				} else {
+					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+					break
+				}
+			}
 		}, func(err interface{}) {
+			defer e.RWMutex.Unlock()
 			log.Debug(err)
 		})
 	}
@@ -1080,15 +1105,19 @@ func (e *ExtractTask) ResultSave(init bool) {
 //保存抽取数据
 func (e *ExtractTask) BidSave(init bool) {
 	defer qu.Catch()
+	e.RWMutex.Lock()
 	if e.BidArr == nil {
 		e.BidArr = [][]map[string]interface{}{}
 	}
+	e.RWMutex.Unlock()
 	if init {
 		go func() {
 			for {
-				if len(e.BidArr) > 500 {
-					arr := e.BidArr[:500]
-					e.BidArr = e.BidArr[500:]
+				e.RWMutex.Lock()
+				if len(e.BidArr) > saveLimit {
+					arr := e.BidArr[:saveLimit]
+					e.BidArr = e.BidArr[saveLimit:]
+					e.RWMutex.Unlock()
 					qu.Try(func() {
 						e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
 					}, func(err interface{}) {
@@ -1097,21 +1126,34 @@ func (e *ExtractTask) BidSave(init bool) {
 				} else {
 					arr := e.BidArr
 					e.BidArr = [][]map[string]interface{}{}
+					e.RWMutex.Unlock()
 					qu.Try(func() {
 						e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
 					}, func(err interface{}) {
 						log.Debug(err)
 					})
-
 				}
-				time.Sleep(10 * time.Second)
+				time.Sleep(2 * time.Second)
 			}
 		}()
 	} else {
+		e.RWMutex.Lock()
 		arr := e.BidArr
 		e.BidArr = [][]map[string]interface{}{}
+		e.RWMutex.Unlock()
 		qu.Try(func() {
-			e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+			lenarr := len(arr)
+			for {
+				if lenarr > saveLimit {
+					arr2 := arr[:saveLimit]
+					arr = arr[saveLimit:]
+					lenarr = len(arr)
+					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr2...)
+				} else {
+					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+					break
+				}
+			}
 		}, func(err interface{}) {
 			log.Debug(err)
 		})

+ 66 - 13
src/jy/extract/extractudp.go

@@ -4,11 +4,16 @@ package extract
 import (
 	"encoding/json"
 	"fmt"
+	"io/ioutil"
+	"jy/cluster"
 	db "jy/mongodbutil"
 	ju "jy/util"
+	log2 "log"
 	mu "mfw/util"
 	"net"
+	"net/http"
 	qu "qfw/util"
+	"strings"
 	"sync"
 
 	log "github.com/donnie4w/go-logger/logger"
@@ -40,7 +45,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				log.Debug("err", "sid=", sid, ",eid=", eid)
 			} else {
 				if stype == "distributed" { //分布式抽取分支
-					go Udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
+					go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
 					log.Debug("分布式抽取id段", sid, " ", eid)
 					InstanceId := qu.ObjToString(rep["InstanceId"])
 					db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
@@ -49,13 +54,15 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 								"extstatus": "running",
 							},
 						}, true, false)
-					ExtractByUdp(sid, eid, qu.ObjToString(rep["InstanceId"]))
+					ExtractByUdp(sid, eid, ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
 					db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
 						map[string]interface{}{
 							"$set": map[string]interface{}{
 								"extstatus": "ok",
 							},
 						}, true, false)
+					//<-time.NewTimer(time.Minute * time.Duration(qu.IntAll(ju.Config["DeleteInstanceTimeMinute"]))).C
+					//cluster.DeleteInstance("instanceId[0]")
 					log.Debug("分布式抽取完成", sid, " ", eid, "释放esc实例", qu.ObjToString(rep["ip"]))
 				} else {
 					udpinfo, _ := rep["key"].(string)
@@ -64,7 +71,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}
 					go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
 					log.Debug("udp通知抽取id段", sid, " ", eid)
-					ExtractByUdp(sid, eid)
+					ExtractByUdp(sid, eid, ra)
 					log.Debug("udp通知抽取完成,eid=", eid)
 					for _, m := range nextNodes {
 						by, _ := json.Marshal(map[string]interface{}{
@@ -85,13 +92,50 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 	case mu.OP_NOOP: //下个节点回应
 		log.Debug(string(data))
+		log2.Println(string(data))
+	case mu.OP_SEND_EMAIL:
+		log.Debug("实例抽取完成,发送邮件:", string(data), ra.IP)
+		log2.Println("实例抽取完成,发送邮件:", string(data), ra.IP)
+		rep := make(map[string]interface{})
+		err := json.Unmarshal(data, &rep)
+		if err != nil {
+			log.Debug(err)
+			log2.Println(string(data), ra.IP)
+		} else {
+			tmpstr := ""
+			for k, v := range rep {
+				switch k {
+				case "desc":
+					tmpstr += fmt.Sprint(v) + ","
+				case "count":
+					tmpstr += "实际抽取数据量" + fmt.Sprint(v) + ","
+				case "index":
+					tmpstr += "区间数据量为" + fmt.Sprint(v) + ","
+				case "instanceId":
+					tmpstr += "实例" + fmt.Sprint(v) + ","
+				}
+			}
+			tmpstr = strings.TrimRight(tmpstr, ",")
+			sendMail(tmpstr)
+			cluster.ModifyInstanceAutoReleaseTime(qu.ObjToString(rep["instanceId"]), qu.IntAll(ju.Config["deleteInstanceTimeHour"]))
+		}
 	}
 }
+func sendMail(content string) {
+	log2.Println(ju.Config["api"], ju.Config["tomail"], "jy-data-extract_3.2", "抽取完成:"+content)
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", ju.Config["api"], ju.Config["tomail"], "jy-data-extract_3.2", "抽取完成:"+content))
+	defer res.Body.Close()
+	if err == nil {
+		read, err := ioutil.ReadAll(res.Body)
+		log2.Println("邮件发送:", string(read), err)
+	}
+	log2.Println("api email:", err)
+}
 
 var ext *ExtractTask
 
 //根据id区间抽取
-func ExtractByUdp(sid, eid string, instanceId ...string) {
+func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 	defer qu.Catch()
 	if ext == nil {
 		ext = &ExtractTask{}
@@ -138,7 +182,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 		if count < PageSize {
 			limit = count
 		}
-		fmt.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
+		fmt.Printf("count=%d,pageNum=%d,query=%v\n", count, pageNum, query)
 
 		startI := 0 //接着上次任务执行
 		sidback := sid
@@ -150,10 +194,10 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 		if qu.ObjToString((*esc)["lastIdback"]) != "" {
 			sidback = qu.ObjToString((*esc)["lastIdback"])
 		}
-
+		go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功,count=%d,pageNum=%d,query=%v\n", instanceId[1], count, pageNum, query)), mu.OP_NOOP, ra)
 		for i := startI; i < pageNum; i++ {
 			query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-			fmt.Printf("page=%d,query=%v", i+1, query)
+			fmt.Printf("page=%d,query=%v\n", i+1, query)
 			if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) > 0 {
 				list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
 				for _, v := range *list {
@@ -161,7 +205,6 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 						continue
 					}
 					_id := qu.BsonIdToSId(v["_id"])
-					log.Debug(_id)
 					var j, jf *ju.Job
 					if ext.IsFileField && v["projectinfo"] != nil {
 						v["isextFile"] = true
@@ -180,7 +223,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 					}}, true, false)
 			}
 			queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
-			fmt.Printf("page=%d,queryback=%v", i+1, queryback)
+			fmt.Printf("page=%d,queryback=%v\n", i+1, queryback)
 			if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
 				list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
 				for _, v := range *list2 {
@@ -188,7 +231,6 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 						continue
 					}
 					_id := qu.BsonIdToSId(v["_id"])
-					log.Debug(_id)
 					var j, jf *ju.Job
 					if ext.IsFileField && v["projectinfo"] != nil {
 						v["isextFile"] = true
@@ -212,10 +254,19 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 					"pagecurrent": i + 1,
 				}}, true, false)
 		}
+		des := make(map[string]interface{})
+		des["desc"] = "分布式抽取完成,一小时后释放"
+		des["count"] = count
+		des["index"] = index
+		des["instanceId"] = instanceId[0]
+		des["instanceIP"] = instanceId[1]
+		udpbytes, _ := json.Marshal(des)
+		go Udpclient.WriteUdp(udpbytes, mu.OP_SEND_EMAIL, ra)
 		log.Debug("抽取完成", "count:", count, "index:", index, "bidtotal:", ext.BidTotal)
 	} else { //普通抽取
 		query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
 		count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
+		log.Debug("查询条件为:", query, "查询条数:", count)
 		pageNum := (count + PageSize - 1) / PageSize
 		limit := PageSize
 		if count < PageSize {
@@ -224,10 +275,11 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 		wg := sync.WaitGroup{}
 		for i := 0; i < pageNum; i++ {
 			query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}}
-			fmt.Printf("page=%d,query=%v", i+1, query)
+			fmt.Printf("page=%d,query=%v\n", i+1, query)
 			list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
 			for _, v := range *list {
 				if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
+					log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
 					continue
 				}
 				_id := qu.BsonIdToSId(v["_id"])
@@ -240,10 +292,11 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 				}
 				ext.TaskInfo.ProcessPool <- true
 				wg.Add(1)
-				go func() {
+				go func(wg *sync.WaitGroup, j, jf *ju.Job) {
 					defer wg.Done()
+					//log.Debug(index,j.SourceMid,)
 					ext.ExtractProcess(j, jf)
-				}()
+				}(&wg, j, jf)
 				index++
 				if index%1000 == 0 {
 					log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)

+ 12 - 1
src/jy/extract/score.go

@@ -114,6 +114,8 @@ func ScoreFields(j *ju.Job, ftag map[string][]*Tag) map[string][]*ju.ExtField {
 		for tmpsindex, tmpsvalue := range tmps {
 			//没有抽取到值,不打分
 			if string_value := fmt.Sprint(tmpsvalue.Value); string_value == "" || string_value == "0" || string_value == "<nil>" {
+				tmps[tmpsindex].Score = -10
+				tmps[tmpsindex].ScoreItem = append(tmps[tmpsindex].ScoreItem, &ju.ScoreItem{Des: `value结果为空直接-10分`, Code: field, Value: tmpsvalue.Value, Score: -10})
 				continue
 			}
 			lockscore.Lock()
@@ -166,6 +168,9 @@ func ScoreFields(j *ju.Job, ftag map[string][]*Tag) map[string][]*ju.ExtField {
 			//kv权重打分
 			if fieldscore != nil { //指定抽取属性打分配置
 				if tmpsvalue.Type == "colon" || tmpsvalue.Type == "space" || tmpsvalue.Type == "table" {
+					if taglength == 0{
+						continue
+					}
 					weightscore := ju.FloatFormat(float64(qu.Float64All(fieldscore["kvweight"]))+float64(tmps[tmpsindex].Weight)/float64(taglength), 4)
 					tmps[tmpsindex].Score += weightscore
 					tmps[tmpsindex].ScoreItem = append(tmps[tmpsindex].ScoreItem, &ju.ScoreItem{Des: "kv权重打分", Code: "kv-weight", RuleText: describe, ScoreFrom: "fieldscore.json", Value: tmpsvalue.Value, Score: weightscore})
@@ -174,6 +179,9 @@ func ScoreFields(j *ju.Job, ftag map[string][]*Tag) map[string][]*ju.ExtField {
 				}
 			} else {
 				if tmpsvalue.Type == "colon" || tmpsvalue.Type == "space" || tmpsvalue.Type == "table" {
+					if taglength == 0{
+						continue
+					}
 					weightscore := ju.FloatFormat(float64(qu.Float64All(CommonScore["kvweight"]))+float64(tmps[tmpsindex].Weight)/float64(taglength), 4)
 					tmps[tmpsindex].Score += weightscore
 					tmps[tmpsindex].ScoreItem = append(tmps[tmpsindex].ScoreItem, &ju.ScoreItem{Des: "kv权重打分", Code: "kv-weight", RuleText: describe, ScoreFrom: "fieldscore.json", Value: tmpsvalue.Value, Score: weightscore})
@@ -192,6 +200,8 @@ func ScoreFields(j *ju.Job, ftag map[string][]*Tag) map[string][]*ju.ExtField {
 				//1.长度打分
 				valueLen := utf8.RuneCountInString(fmt.Sprint(tmpsvalue.Value))
 				if valueLen < 1 {
+					tmps[tmpsindex].Score = -10
+					tmps[tmpsindex].ScoreItem = append(tmps[tmpsindex].ScoreItem, &ju.ScoreItem{Des: `valueLen < 1 && field != "projectscope"直接-10分`, Code: field, Value: tmpsvalue.Value, Score: -10})
 					continue
 				}
 				if valueLen > 100 && field != "projectscope" {
@@ -260,7 +270,7 @@ func ScoreFields(j *ju.Job, ftag map[string][]*Tag) map[string][]*ju.ExtField {
 				max := qu.IntAll(scoreRule["max"])
 				val := qu.IntAll(tmpsvalue.Value)
 				scores, _ := scoreRule["score"].([]interface{})
-				if len(scores) < 3 {
+				if len(scores) < 3 || val == 0{
 					continue
 				}
 				if val < min && 0 < val {
@@ -317,6 +327,7 @@ func projectWeightClear(tmps []*ju.ExtField) []*ju.ExtField {
 	if len(tmps)<1{
 		return newList
 	}
+	ju.Sort(tmps)
 	tmpWeight := -999 //记录最大权重
 	tmpIndex := -999  //记录最大权重下标
 	for i, v := range tmps {

+ 162 - 0
src/jy/extract/score_jsondata.go

@@ -0,0 +1,162 @@
+package extract
+
+import (
+	"fmt"
+	"jy/clear"
+	"jy/util"
+	"log"
+	util2 "qfw/util"
+	"regexp"
+	"strings"
+	"unicode/utf8"
+)
+
+func JsonDataMergeProcessing(j *util.Job, e *ExtractTask) map[string][]*util.ExtField {
+	if len((j.Result)) <= 0 {
+		return j.Result
+	}
+	tmps := make(map[string][]*util.ExtField)
+	for _, v := range util.JsonData {
+		tmp := make([]*util.ExtField, 0)
+		//jsondata没有值跳过
+		if j.Jsondata == nil || (*j.Jsondata)[v] == nil || (*j.Jsondata)[v] == "" {
+			continue
+		}
+		//jsondata有值,res没有值,取jsondata值
+		if j.Result[v] == nil {
+			if v == "budget" || v == "bidamount" {
+				lockclear.Lock()
+				cfn := e.ClearFn[v]
+				lockclear.Unlock()
+				newNum := clear.DoClearFn(cfn, []interface{}{fmt.Sprint((*j.Jsondata)[v]), ""})
+				if util2.IntAll(newNum[0]) != 0 {
+					extFields := make([]*util.ExtField, 0)
+					extFields = append(extFields, &util.ExtField{Code: "JsonData_" + v, Field: v, ExtFrom: "JsonData_" + v, SourceValue: (*j.Jsondata)[v], Value: newNum[0], Score: 0.1})
+					j.Result[v] = extFields
+					//AddExtLog("extract", j.SourceMid, nil, newNum[0], &RegLuaInfo{ "JsonData_"+v, "", v, "", false, nil, nil}, e.TaskInfo) //抽取日志
+					//AddExtLog("clear", j.SourceMid, (*j.Jsondata)[v], newNum[0], &RegLuaInfo{ "JsonData_"+v, "", v, "", false, nil, nil}, e.TaskInfo) //抽取日志
+				}
+				continue
+			}
+
+			extFields := make([]*util.ExtField, 0)
+			extFields = append(extFields, &util.ExtField{Code: "JsonData_" + v, Field: v, ExtFrom: "JsonData_" + v, SourceValue: (*j.Jsondata)[v], Value: strings.Trim(util2.ObjToString((*j.Jsondata)[v]), " "), Score: 0.1})
+			j.Result[v] = extFields
+			//AddExtLog("extract", j.SourceMid, nil, (*j.Jsondata)[v], &RegLuaInfo{  "JsonData_"+v, "", v, "", false, nil, nil}, e.TaskInfo) //抽取日志
+			//AddExtLog("clear", j.SourceMid, (*j.Jsondata)[v], (*j.Jsondata)[v], &RegLuaInfo{  "JsonData_"+v, "", v, "", false, nil, nil}, e.TaskInfo) //抽取日志
+			continue
+		} else {
+			if v == "budget" || v == "bidamount" {
+				lockclear.Lock()
+				cfn := e.ClearFn[v]
+				lockclear.Unlock()
+				newNum := clear.DoClearFn(cfn, []interface{}{fmt.Sprint((*j.Jsondata)[v]), ""})
+				if util2.IntAll(newNum[0]) != 0 {
+					extFields := make([]*util.ExtField, 0)
+					extFields = append(extFields, &util.ExtField{Code: "JsonData_" + v, Field: v, ExtFrom: "JsonData_" + v, SourceValue: (*j.Jsondata)[v], Value: newNum[0], Score: 0.1})
+					j.Result[v] = extFields
+					//AddExtLog("extract", j.SourceMid, nil, newNum[0], &RegLuaInfo{ "JsonData_"+v, "", v, "", false, nil, nil}, e.TaskInfo) //抽取日志
+					//AddExtLog("clear", j.SourceMid, (*j.Jsondata)[v], newNum[0], &RegLuaInfo{ "JsonData_"+v, "", v, "", false, nil, nil}, e.TaskInfo) //抽取日志
+				}
+				continue
+			}
+
+			if strings.Trim(util2.ObjToString(j.Result[v][0].Value), " ") != strings.Trim(util2.ObjToString((*j.Jsondata)[v]), " ") {
+				tmp = append(tmp, j.Result[v][0])
+				tmp = append(tmp, &util.ExtField{Code: "JsonData_" + v, Field: v, ExtFrom: "JsonData_" + v, SourceValue: (*j.Jsondata)[v], Value: strings.Trim(util2.ObjToString((*j.Jsondata)[v]), " "), Score: j.Result[v][0].Score})
+				//AddExtLog("extract", j.SourceMid, j.Result[v], (*j.Jsondata)[v], &RegLuaInfo{ "JsonData_"+v, "", v, "", false, nil, nil}, e.TaskInfo) //抽取日志
+				//AddExtLog("clear", j.SourceMid, j.Result[v], (*j.Jsondata)[v], &RegLuaInfo{ "JsonData_"+v, "", v, "", false, nil, nil}, e.TaskInfo) //抽取日志
+				tmps[v] = tmp
+			}
+		}
+	}
+	for k, v := range tmps {
+		lockscore.Lock()
+		scoreRule := SoreConfig[k]
+		lockscore.Unlock()
+		if k == "projectname" || k == "projectcode" || k == "buyer" || k == "winner" || k == "agency" || k == "buyerperson" || k == "buyertel" {
+			for i, tmpsvalue := range v {
+				//1.长度打分
+				valueLen := utf8.RuneCountInString(fmt.Sprint(tmpsvalue.Value))
+				if valueLen < 1 {
+					v[i].Score = -5
+					continue
+				}
+				if valueLen > 100 {
+					v[i].Score = -99
+				}
+				if lengths, ok := scoreRule["length"].([]interface{}); ok {
+					for _, tmp := range lengths {
+						if length, ok := tmp.(map[string]interface{}); ok {
+							if ranges, ok := length["range"].([]interface{}); ok {
+								gt := util2.IntAll(ranges[0])
+								lte := util2.IntAll(ranges[1])
+								if lte < 0 { //∞
+									lte = 999999
+								}
+								score := util2.Float64All(ranges[2])
+								if valueLen > gt && valueLen <= lte {
+									v[i].Score += score
+									break
+								}
+							}
+						}
+					}
+				}
+				//2.负面词打分
+				if positions, ok := scoreRule["negativewords"].([]interface{}); ok {
+					for _, position := range positions {
+						if p, ok := position.(map[string]interface{}); ok {
+							util2.Try(func() {
+								if p["regexp"] != nil {
+									reg := p["regexp"].(*regexp.Regexp)
+									if reg.MatchString(util2.ObjToString(tmpsvalue.Value)) {
+										v[i].Score += util2.Float64All(p["score"])
+									}
+								}
+							}, func(err interface{}) {
+								log.Println(err)
+							})
+						}
+					}
+				}
+				//3.正面词打分
+				if positions, ok := scoreRule["positivewords"].([]interface{}); ok {
+					for _, position := range positions {
+						if p, ok := position.(map[string]interface{}); ok {
+							util2.Try(func() {
+								if p["regexp"] != nil {
+									reg := p["regexp"].(*regexp.Regexp)
+									if reg.MatchString(util2.ObjToString(tmpsvalue.Value)) {
+										v[i].Score += util2.Float64All(p["score"])
+									}
+								}
+							}, func(err interface{}) {
+								log.Println(err)
+							})
+						}
+					}
+				}
+			}
+		}
+	}
+
+	for k, v := range tmps { //新打分的结果集放入到result中,v为数组只有2个值
+		if v[0].Score == v[1].Score { //分数相等优先取打分的值
+			if v[0].ExtFrom == "JsonData_"+k {
+				j.Result[k] = append(j.Result[k], v[1])
+			} else {
+				j.Result[k] = append(j.Result[k], v[0])
+			}
+			continue
+		}
+		j.Result[k] = append(j.Result[k], v...) //分数不相等就放入result排序
+	}
+
+	//结果排序
+	for _, val := range j.Result {
+		util.Sort(val)
+	}
+
+	return j.Result
+}

+ 1 - 1
src/jy/mongodbutil/mongodbutil.go

@@ -225,7 +225,7 @@ func (m *Pool) FindById(c string, query string, fields interface{}) (*map[string
 		var err error
 		err = coll.FindId(ObjectIdHex(query)).Select(ObjToOth(fields)).One(&res)
 		if nil != err {
-			log.Println("FindByIdError", err)
+			log.Println("FindByIdError", err,query)
 		}
 		b = true
 	}

+ 10 - 1
src/jy/pretreated/analykv.go

@@ -148,7 +148,7 @@ func FindKv(con, tag string, from int) (m *SortMap) {
 	s1 := []string{}
 	//断行
 	strings.IndexFunc(con, func(r rune) bool {
-		if r == 10 {
+		if r == 10 || r == 59 {
 			if len(s1) > 0 {
 				str := strings.Join(s1, "")
 				str = u.TrimLRSpace(str, "")
@@ -409,6 +409,15 @@ func keydetail(k, v string, m *SortMap, tag string, pos int, strs [][]string, ma
 		//u.Debug(k, v)
 		if m.Map[k] == nil {
 			m.AddKey(k, v)
+		}else {
+			vals := []string{}
+			if vvv ,ok := m.Map[k].([]string);ok{
+				vals = append(vals, vvv...)
+			}else {
+				vals = append(vals,v)
+			}
+			vals = append(vals, v)
+			m.AddKey(k,vals)
 		}
 	}
 }

+ 5 - 6
src/jy/pretreated/analystep.go

@@ -6,6 +6,8 @@ package pretreated
 import (
 	"encoding/json"
 	"jy/util"
+
+	//"log"
 	"strings"
 
 	"github.com/PuerkitoBio/goquery"
@@ -63,9 +65,6 @@ func AnalyStart(job *util.Job) {
 				tabres := AnalyTableV2(tabs[i], job.Category, "", con, 1, job.SourceMid, job.RuleBlock) //解析表格入口 返回:汇总表格对象
 				processTableResult(tabres, bl, job)
 			}
-			//			for k, v := range bl.TableKV.Kv {
-			//				//log.Println("bl.TableKV.Kv", k, v)
-			//			}
 		} else {
 			//从正文里面找分包
 			job.BlockPackage = FindPackageFromText(job.Title, newCon)
@@ -179,8 +178,8 @@ func processTableResult(tabres *TableResult, block *util.Block, job *util.Job) {
 	tablePackage := map[string]*util.BlockPackage{}
 	if tabres.IsMultiPackage {
 		//分包中的map
-		for k, v := range tabres.PackageMap.Map {
-			blockPackage, ok := v.(*util.BlockPackage)
+		for _, v := range tabres.PackageMap.Keys {
+			blockPackage, ok := tabres.PackageMap.Map[v].(*util.BlockPackage)
 			if !ok {
 				continue
 			}
@@ -202,7 +201,7 @@ func processTableResult(tabres *TableResult, block *util.Block, job *util.Job) {
 				blockPackage.TableKV = util.NewJobKv()
 			}
 			MergeKvTags(blockPackage.TableKV.KvTags, GetKvTags(labelKVs, "", nil))
-			tablePackage[k] = blockPackage
+			tablePackage[v] = blockPackage
 		}
 	}
 	//处理中标人排序

+ 61 - 36
src/jy/pretreated/analytable.go

@@ -68,7 +68,7 @@ var (
 	//包含以下字眼做标准化处理
 	filter_zbje_k = regexp.MustCompile("(中标|成交|总|拦标|合同|供[应货]商|报)[\\p{Han}、]{0,6}(价|额|[大小]写|[万亿]?元).{0,4}$")
 	//简单判断金额
-	filter_zbje_jd = regexp.MustCompile("^[^售]{0,4}(价|额).{0,4}$")
+	filter_zbje_jd = regexp.MustCompile("^[^(|保证)]{0,4}(价|额).{0,4}$")
 	//且排队以下字眼的key
 	filter_zbje_kn = regexp.MustCompile("得分|打分|时间|业绩|须知|分|要求$")
 	//且值包含以下字眼
@@ -93,7 +93,7 @@ var (
 	projectnameReg = regexp.MustCompile("((公开)?招标)*[((第]*[一二三四五六七八九十a-zA-Z0-9]+(标段|包|标|段)[))]*$")
 	MhSpilt        = regexp.MustCompile("[::]")
 	//识别采购单位联系人、联系电话、代理机构联系人、联系电话
-	ContactInfoVagueReg = regexp.MustCompile("邮政编码|邮编|名称|(征求意见|报名审核购买)?((联系人?(及|和)?|办公|单位)?((电话([//]传真|及手机)?|手机)(号码)?|邮箱(地址)?|(地(址|点)))|(联系|收料)(人(姓名)?|方式)|传真|电子邮件|(主要负责|项目(负责|联系)|(项目)?经办)人)|采购方代表")
+	ContactInfoVagueReg = regexp.MustCompile("邮政编码|邮编|名称|(征求意见|报名审核购买)?((联系人?(及|和)?|办公|单位)?((电话([//]传真|及手机)?|手机)(号码)?|邮箱(地址)?|(地(址|点)))|(联系|收料)(人(姓名)?|方式)|传真|电子邮件|(主要负责|项目(负责|联系)|经办)人)|采购方代表")
 	ContactInfoMustReg  = regexp.MustCompile("^(" + ContactInfoVagueReg.String() + ")$")
 	ContactType         = map[string]*regexp.Regexp{
 		"采购单位": regexp.MustCompile("(采购(项目.{2}|服务)?|比选|询价|招标(服务)?|甲|建设|委托|发包|业主|使用|谈判|本招标项目经办|征求意见联系|项目实施)(人|单位|部门|机构|机关|(执行)?方$)|(项目|建(库|设))单位|招标人信息|采购中心地址|业主|收料人|采购部"),
@@ -159,7 +159,9 @@ func CommonDataAnaly(k, tabletag, tabledesc string, v interface{}) (kvTags map[s
 			}
 			if winnerOrderAndBidResult.MatchString(tabletag) && t1.Value == "采购单位联系人" { //处理table中项目负责人
 				kvTags[k] = append(kvTags[k], &u.Tag{Key: k, Value: v1, IsInvalid: true})
-			} else {
+			} else if regexp.MustCompile("(中标候选人|名单及其排序|排序)").MatchString(tabletag) && t1.Value == "采购单位"{
+				kvTags[t1.Value] = append(kvTags[t1.Value], &u.Tag{Key: k1, Value: v1, Weight: t1.Weight-100})
+			}else{
 				kvTags[t1.Value] = append(kvTags[t1.Value], &u.Tag{Key: k1, Value: v1, Weight: t1.Weight})
 			}
 		}
@@ -275,12 +277,6 @@ func (table *Table) KVFilter() {
 		if len(table.WinnerOrder) > 0 {
 			//中标候选人合并
 			winnerOrderEntity.Merge(table.WinnerOrder, winnerOrder)
-			if len(table.StandKV["中标单位"]) == 0 {
-				ent := table.WinnerOrder[0]["entname"]
-				if ent != nil {
-					table.StandKV["中标单位"] = append(table.StandKV["中标单位"], &u.Tag{Key: "中标单位", Value: qutil.ObjToString(ent), Weight: -25})
-				}
-			}
 		} else if !table.BPackage { //没有table.WinnerOrder也没有分包 将td中的WinnerOrder赋值给table.WinnerOrder
 			if len(winnerOrder) > 1 {
 				table.WinnerOrder = winnerOrder
@@ -296,7 +292,7 @@ func (table *Table) KVFilter() {
 			onePkg, _ := table.BlockPackage.Map[onePkgKey].(*u.BlockPackage)
 			if onePkg != nil && onePkg.WinnerOrder != nil && len(onePkg.WinnerOrder) == 0 {
 				onePkg.WinnerOrder = table.WinnerOrder
-				table.BlockPackage.Map[onePkgKey] = onePkg
+				table.BlockPackage.AddKey(onePkgKey, onePkg)
 			}
 		}
 	}
@@ -511,23 +507,31 @@ func (table *Table) MergerToTableresult() {
 	//对多包表格的多包值的合并处理
 	if table.BPackage {
 		table.TableResult.IsMultiPackage = true
-		for k, v := range table.BlockPackage.Map {
-			package1 := table.TableResult.PackageMap.Map[k]
+		for _, v2 := range table.BlockPackage.Keys {
+			package1 := table.TableResult.PackageMap.Map[v2]
 			if package1 == nil {
-				table.TableResult.PackageMap.AddKey(k, v)
+				table.TableResult.PackageMap.AddKey(v2, table.BlockPackage.Map[v2])
+				if vvv, ok := table.BlockPackage.Map[v2].(*u.BlockPackage); ok {
+					if vvv.TableKV != nil && len(vvv.TableKV.KvTags) > 0 {
+						MergeKvTags(table.TableResult.KvTags, vvv.TableKV.KvTags)
+					}
+				}
 			} else {
 				bp := package1.(*u.BlockPackage)
 				if bp.TableKV == nil {
 					bp.TableKV = u.NewJobKv()
 				}
-				v1 := v.(*u.BlockPackage)
+				v1 := table.BlockPackage.Map[v2].(*u.BlockPackage)
 				if v1.TableKV != nil && len(v1.TableKV.KvTags) > 0 {
 					for k2, v2 := range v1.TableKV.KvTags {
-						if bp.TableKV == nil {
-							bp.TableKV = u.NewJobKv()
+						if k2 == "" {
+							continue
 						}
 						isExists := false
 						for _, v2v := range v2 {
+							if v2v.Value == "" {
+								continue
+							}
 							for _, v2vv := range bp.TableKV.KvTags[k2] {
 								if v2v.Value == v2vv.Value {
 									isExists = true
@@ -541,7 +545,9 @@ func (table *Table) MergerToTableresult() {
 						}
 					}
 				}
-				bp.WinnerOrder = v1.WinnerOrder
+				if len(v1.WinnerOrder) > 0 && len(bp.WinnerOrder) == 0 {
+					bp.WinnerOrder = v1.WinnerOrder
+				}
 				//table.TableResult.PackageMap.AddKey(k, v)
 			}
 		}
@@ -575,6 +581,25 @@ func (table *Table) MergerToTableresult() {
 			}
 		}
 	}
+	if table.BlockPackage != nil && len(table.BlockPackage.Keys) == 0 {
+		for _, v := range table.BlockPackage.Keys {
+			if table.BlockPackage.Map[v] != nil {
+				if vvv, ok := table.BlockPackage.Map[v].((*u.BlockPackage)); ok {
+					if vvv.TableKV != nil && len(vvv.TableKV.KvTags) > 0 {
+						for kk, vv := range vvv.TableKV.KvTags {
+							if kk == "" {
+								continue
+							}
+							if len(table.TableResult.KvTags[kk]) == 0 {
+								table.TableResult.KvTags[kk] = vv
+							}
+						}
+					}
+				}
+			}
+
+		}
+	}
 }
 
 /**
@@ -1327,9 +1352,9 @@ func (table *Table) ComputeRowColIsKeyRation() {
 					sv.AddKey(k, v)
 				}
 				if len(sv.Keys) > 0 {
-					for k1, v1 := range sv.Map {
-						if tr.TDs[0].SortKV.Map[k1] == nil {
-							table.SortKV.AddKey(k1, v1)
+					for _, v1 := range sv.Keys {
+						if tr.TDs[0].SortKV.Map[v1] == nil {
+							table.SortKV.AddKey(v1, sv.Map[v1])
 						}
 					}
 				} else if table.Tag == "" && k == 0 && len(tr.TDs[0].Val) > 11 {
@@ -1375,7 +1400,7 @@ func (table *Table) FindKV() {
 			if bcon {
 				continue
 			}
-			if tr.TDs[0].StartRow > 0 {
+			if tr.TDs[0].StartRow >= 0 {
 				numbh := 0
 				for _, td := range tr.TDs {
 					if td.BH {
@@ -1409,11 +1434,11 @@ func (table *Table) FindKV() {
 								bodirect = bo
 							}
 							if len(td.SortKV.Map) > 0 {
-								for tdk, tdv := range td.SortKV.Map {
-									if tdv == nil || tdv == "" { //value为空或者null不再添加到table.SortKV
+								for _, tdv := range td.SortKV.Keys {
+									if tdv == "" || td.SortKV.Map[tdv] == "" { //value为空或者null不再添加到table.SortKV
 										continue
 									}
-									table.SortKV.AddKey(tdk, tdv)
+									table.SortKV.AddKey(tdv, td.SortKV.Map[tdv])
 								}
 							}
 						}
@@ -1572,7 +1597,7 @@ func GetBidOrder(td *TD, direct, n int) (d int, res bool) {
 						"sort":    GetBidSort(td.Val, n),
 					})
 					res = true
-					td.TR.Table.SortKV.Map[NullTxtBid] = a1
+					td.TR.Table.SortKV.AddKey(NullTxtBid, a1)
 				}
 			}
 		}
@@ -1599,7 +1624,7 @@ func GetBidOrder(td *TD, direct, n int) (d int, res bool) {
 						"sort":    GetBidSort(td.Val, n),
 					})
 					res = true
-					td.TR.Table.SortKV.Map[NullTxtBid] = a1
+					td.TR.Table.SortKV.AddKey(NullTxtBid, a1)
 				}
 			}
 		}
@@ -1621,7 +1646,7 @@ func GetBidOrder(td *TD, direct, n int) (d int, res bool) {
 				"sort":    GetBidSort(td.Val, n),
 			})
 			res = true
-			td.TR.Table.SortKV.Map[NullTxtBid] = a1
+			td.TR.Table.SortKV.AddKey(NullTxtBid, a1)
 			//} else if ((btd != nil && !btd.BH && btd.Valtype == "BO") || direct == 2) && rtd != nil && filter_zbdw_v.MatchString(rtd.Val) {
 		} else if ((btd != nil && !btd.BH) || direct == 2) && rtd != nil && filter_zbdw_v2.MatchString(rtd.Val) {
 			d = 2
@@ -1637,7 +1662,7 @@ func GetBidOrder(td *TD, direct, n int) (d int, res bool) {
 				"sort":    GetBidSort(td.Val, n),
 			})
 			res = true
-			td.TR.Table.SortKV.Map[NullTxtBid] = a1
+			td.TR.Table.SortKV.AddKey(NullTxtBid, a1)
 		}
 	}
 	return
@@ -1944,9 +1969,9 @@ func (tn *Table) CheckMultiPackageByTable() (b bool, index []string) {
 		tn.isGoonNext()
 	}
 	//查找分包中的中标人排序
-	if tn.BlockPackage != nil && tn.BlockPackage.Map != nil && len(tn.BlockPackage.Map) > 0 {
-		for _, v := range tn.BlockPackage.Map {
-			vv := v.(*u.BlockPackage)
+	if tn.BlockPackage != nil && tn.BlockPackage.Keys != nil && len(tn.BlockPackage.Keys) > 0 {
+		for _, v := range tn.BlockPackage.Keys {
+			vv := tn.BlockPackage.Map[v].(*u.BlockPackage)
 			if vv.WinnerOrder == nil || len(vv.WinnerOrder) == 0 {
 				vv.WinnerOrder = winnerOrderEntity.Find(vv.Text, true, 2)
 			}
@@ -1960,8 +1985,8 @@ func (tn *Table) manyPackageProcessByIndex(index []string, standIndex_pos []int)
 	if len(index) == 1 { //是一个的情况
 		if len(tn.SortKV.Keys) < 10 && tn.ColNum < 10 && tn.RowNum < 4 { //table带排序的KV值小于10并且小于10列和小于4行
 			beq := true
-			for _, v2 := range tn.SortKV.Map {
-				if _, ok := v2.(string); !ok {
+			for _, v2 := range tn.SortKV.Keys {
+				if _, ok := tn.SortKV.Map[v2].(string); !ok {
 					beq = false
 					break
 				}
@@ -1969,8 +1994,8 @@ func (tn *Table) manyPackageProcessByIndex(index []string, standIndex_pos []int)
 			if beq { //统一处理为数组
 				td := tn.GetTdByRCNo(tn.RowNum-1, 0)
 				if !td.BH && FindVal2_1.MatchString(td.Val) {
-					for k2, v2 := range tn.SortKV.Map {
-						tn.SortKV.Map[k2] = []string{v2.(string)}
+					for _, v2 := range tn.SortKV.Keys {
+						tn.SortKV.AddKey(v2, []string{tn.SortKV.Map[v2].(string)})
 					}
 				} else {
 					//没有处理成数组的情况下,继续调用正文查找分包的方法
@@ -2293,7 +2318,7 @@ func (tn *Table) assemblePackage(k1, v1, key string) {
 	if !excludeKey.MatchString(k1) {
 		bp.Text += fmt.Sprintf("%v:%v\n", k1, v1)
 	}
-	tn.BlockPackage.Map[key] = bp
+	tn.BlockPackage.AddKey(key, bp)
 }
 
 /**

+ 9 - 9
src/jy/pretreated/tablev2.go

@@ -265,12 +265,12 @@ func (td *TD) tdHasTable(bsontable *bool, tr *TR) {
 			if sonts.IsMultiPackage {
 				td.TR.Table.BPackage = true
 				tb1 := td.TR.Table.BlockPackage
-				for k, v := range sonts.PackageMap.Map {
-					v1 := v.(*u.BlockPackage)
-					if tb1.Map[k] == nil {
-						tb1.AddKey(k, v)
+				for _, v := range sonts.PackageMap.Keys {
+					v1 := sonts.PackageMap.Map[v].(*u.BlockPackage)
+					if tb1.Map[v] == nil {
+						tb1.AddKey(v, sonts.PackageMap.Map[v])
 					} else {
-						bp := tb1.Map[k].(*u.BlockPackage)
+						bp := tb1.Map[v].(*u.BlockPackage)
 						if bp != nil && v1.TableKV != nil {
 							for k2, v2 := range v1.TableKV.KvTags {
 								if bp.TableKV == nil {
@@ -361,7 +361,7 @@ func (td *TD) tdIsHb(tr *TR, table *Table, bsontable bool) {
 				}
 				MergeKvTags(bp.TableKV.KvTags, bp_v.ColonKV.KvTags)
 				MergeKvTags(bp.TableKV.KvTags, bp_v.SpaceKV.KvTags)
-				table.TableResult.PackageMap.Map[bp_k] = bp
+				table.TableResult.PackageMap.AddKey(bp_k, bp)
 			}
 		}
 	}
@@ -430,8 +430,8 @@ func (td *TD) tdIsHb(tr *TR, table *Table, bsontable bool) {
 		*/
 
 		fSortKV := FindKv(td.Val, "", 2)
-		for k, v := range fSortKV.Map {
-			td.SortKV.AddKey(k, v)
+		for _, v := range fSortKV.Keys {
+			td.SortKV.AddKey(v, fSortKV.Map[v])
 		}
 		//		td.LeftNode.Val
 		//		for _, vvv := range *td.TR {
@@ -696,7 +696,7 @@ func (s *SortMap) RemoveKey(key string) {
 					}
 				} else if pos == len(s.Keys) {
 					newkeys = append(newkeys, s.Keys[:pos]...)
-				} else {
+				} else if len(s.Keys) > 1 {
 					tmp := s.Keys[pos+1:]
 					newkeys = append(append(newkeys, s.Keys[:pos]...), tmp...)
 					for _, v := range tmp {

+ 1 - 1
src/jy/pretreated/winnerorder.go

@@ -41,7 +41,7 @@ var (
 	findCompanyReg = regexp.MustCompile("[^::]+公司")
 	colonSpaceReg  = regexp.MustCompile("[::]\\s+")
 	findCandidate  = regexp.MustCompile("(^.{5,}(公司|集团|单位|机构|企业|厂|场|院|所|店|中心|市|局|站|城|处|行|部|队|联合(会|体))|工作室)")
-	findCandidate2 = regexp.MustCompile("(^.{5,}(公司|集团|单位|机构|企业|厂|场|院|所|店|中心|市|局|站|城|处|行|部|队|联合(会|体)|工作室)$)")
+	findCandidate2 = regexp.MustCompile("(^.{5,}(公司|集团|单位|机构|企业|厂|场|院|所|店|中心|局|站|城|处|行|部|队|联合(会|体)|工作室)$)")
 	clearSpace1    = regexp.MustCompile("([((][\\d一二三四五六七八九十][))][\\s\u3000\u2003\u00a0\\t]*|<[^>].+?>)")
 	offerReg       = regexp.MustCompile("(中标|磋商|投标|报|单|成交)总?(价|金额)")
 )

+ 2 - 1
src/jy/util/config.go

@@ -10,11 +10,12 @@ import (
 
 //全局配置
 var FormatTextMap map[string][]map[string]interface{}
-
+var JsonData []string
 func init() {
 	loadFormatText()
 	//LoadTagDb("./res/tagdb")
 	LoadTagDb("./res/blocktagdb")
+	util.ReadConfig("./res/jsondata.json",&JsonData)
 }
 
 //加载格式化正文配置

+ 22 - 9
src/jy/util/tagmatch.go

@@ -1,6 +1,7 @@
 package util
 
 import (
+	"fmt"
 	"io/ioutil"
 	"os"
 	"path/filepath"
@@ -40,7 +41,8 @@ func (s Tags) Less(i, j int) bool {
 }
 
 //
-var TagdbTable = make(map[string]*TagFile)
+//var TagdbTable = make(map[string]*TagFile)
+var TagdbTable = sync.Map{}
 var blocktagdb = make(map[string]*TagFile)
 
 //加载
@@ -138,16 +140,27 @@ func GetAppointTags(src string, array []string) Tags {
 			m[v] = true
 		}
 	}
-	lock.Lock()
-	for k, v := range TagdbTable {
-		if len(m) > 0 && !m[k] {
-			continue
+	//lock.Lock()
+	//for k, v := range TagdbTable {
+	//	if len(m) > 0 && !m[k] {
+	//		continue
+	//	}
+	//	if ok, tag := v.Match(src); ok {
+	//		ret = append(ret, &Tag{src, v.Name, tag.Weight, tag.TagReg, false})
+	//	}
+	//}
+	//lock.Unlock()
+	TagdbTable.Range(func(key, value interface{}) bool {
+		if len(m) > 0 && !m[fmt.Sprint(key)] {
+			return true
 		}
-		if ok, tag := v.Match(src); ok {
-			ret = append(ret, &Tag{src, v.Name, tag.Weight, tag.TagReg, false})
+		if v,ok := value.(*TagFile);ok {
+			if ok, tag := v.Match(src); ok {
+				ret = append(ret, &Tag{src, v.Name, tag.Weight, tag.TagReg, false})
+			}
 		}
-	}
-	lock.Unlock()
+		return true
+	})
 	//sort.Sort(ret)
 	return ret
 }

+ 1 - 1
src/main.go

@@ -11,7 +11,7 @@ import (
 	"jy/util"
 	qu "qfw/util"
 	//"qfw/util/elastic"
-	redis "qfw/util/redis"
+	"qfw/util/redis"
 
 	log "github.com/donnie4w/go-logger/logger"
 )

+ 1 - 1
src/res/fieldscore.json

@@ -142,7 +142,7 @@
         "negativewords": [
             {
                 "describe": "包含负分",
-                "regstr": "(附件|委托|代理|咨询|管理有限公司|管理顾问|招标失败|交易中心|不足|公告|变更|招标|废标|废止|流标|中标|评标|开标|供应商|金额|万元|元整|预算|报价|单价|第(\\d|一|二|三|四|五)(名|包)|排名|候选|确定|标段|(标|一|二|三|四|五)包|中选|成交|包号|(A|B|C|D|E|F|G)包|地址|详情|要求|推荐|名称|评审|得分|合同|平方米|公示期|结果|备注|说明|单位|代表|委托|工作日|营业(执|期)|通过|代码|电话|联系|条件|合理|费率|以上|以下|拟定|为|注:|\\d[\\s]{0,10}(\\.|元|包|米|平米|平方米|吨|辆|千克|克|毫克|毫升|公升|套|件|瓶|箱|只|台|年|月|日|天|号)|(:|:|;|;|?|¥|\\*|%)|^[a-zA-Z0-9-]{5,100}|^[a-zA-Z0-9-]{1,100}$|[a-zA-Z0-9-]{10,100})",
+                "regstr": "(附件|委托|代理|咨询|顾问|管理有限公司|管理顾问|招标失败|交易中心|不足|公告|变更|招标|废标|废止|流标|中标|评标|开标|供应商|金额|万元|元整|预算|报价|单价|第(\\d|一|二|三|四|五)(名|包)|排名|候选|确定|标段|(标|一|二|三|四|五)包|中选|成交|包号|(A|B|C|D|E|F|G)包|地址|详情|要求|推荐|名称|评审|得分|合同|平方米|公示期|结果|备注|说明|单位|代表|委托|工作日|营业(执|期)|通过|代码|电话|联系|条件|合理|费率|以上|以下|拟定|为|注:|\\d[\\s]{0,10}(\\.|元|包|米|平米|平方米|吨|辆|千克|克|毫克|毫升|公升|套|件|瓶|箱|只|台|年|月|日|天|号)|(:|:|;|;|?|¥|\\*|%)|^[a-zA-Z0-9-]{5,100}|^[a-zA-Z0-9-]{1,100}$|[a-zA-Z0-9-]{10,100})",
                 "score": -5
             },
             {

+ 28 - 0
src/res/jsondata.json

@@ -0,0 +1,28 @@
+[
+  "area_city_district",
+  "projectname",
+  "projectcode",
+  "approvalno",
+  "projectscope",
+  "item",
+  "buyer",
+  "agency",
+  "budget",
+  "buyerperson",
+  "buyertel",
+  "buyeraddr",
+  "projectaddr",
+  "publishdept",
+  "funds",
+  "paymenttype",
+  "projectscale",
+  "bidmethod",
+  "bidopendate",
+  "bidopentime",
+  "agencyperson",
+  "agencytel",
+  "agencyaddr",
+  "supervisorrate",
+  "bidamount",
+  "winner"
+]

+ 53 - 16
src/web/templates/admin/distribution.html

@@ -49,7 +49,7 @@
 	</div>
   </section>
 </div>
-	
+
 <!-- footer -->
 {{template "mask"}}
 {{template "dialog"}}
@@ -109,8 +109,13 @@ $(function () {
                 }
             }},
 			{"data":"InstanceId",render:function(val,a,row,pos){
-				return '<a class="btn btn-sm btn-warning" onclick="del(\''+row.InstanceId+'\')">释放</a>';
-			}}
+				return  '<div class="btn-group">'+
+                    '<a class="btn btn-sm btn-primary opr" onclick="ContinueExecution(\''+row.InstanceId+'\')">执行</a>'+
+                    '<a class="btn btn-sm btn-warning" onclick="del(\''+row.InstanceId+'\')">释放</a>'+
+                    '<a class="btn btn-sm btn-danger" href="#" onclick="delInstanceId(\''+row.InstanceId+'\')">删&nbsp;&nbsp;除</a>'
+                    '</div>';
+			},
+			}
        	]
 	});
 	ttable.on('init.dt', function () {
@@ -136,14 +141,14 @@ $(function () {
     					{label:"表名",s_label:"s_table",placeholder:"信息表名",must:true},
                         {label:"开始日期",s_label:"s_date",placeholder:"2018-01-01",must:true},
                         {label:"截止日期",s_label:"e_date",placeholder:"2019-01-01",must:true},
-    				]   
+    				]
                 }else if (n=="releasetime"){
                     _tit="释放设置"
                     tag=[
     					{label:"实例ID",s_label:"instanceid",placeholder:"实例ID",must:true},
                         {label:"顺延时间",s_label:"hours",placeholder:"当前时间起(单位/h)",must:true},
-    				] 
-                } 
+    				]
+                }
 				htmlObj={
 					title:_tit,
 					tag:tag,
@@ -159,12 +164,12 @@ $(function () {
 										bcon=false
 										return false
 									}
-								}) 
-								if (bcon){	
-                                    showConfirm("确认执行?", function() {							
+								})
+								if (bcon){
+                                    showConfirm("确认执行?", function() {
     									$.post("/admin/distribution/runInstances",obj,function(data){
     										if(data&&data.rep){
-    											window.location.href="/admin/distribution"			
+    											window.location.href="/admin/distribution"
     										}else{
     											showTip(data.msg,1000)
     										}
@@ -183,6 +188,22 @@ $(function () {
 		});
 	})
 })
+function ContinueExecution(_id){
+    showConfirm("确定继续抽取?", function() {
+        $.ajax({
+            url:"/admin/distribution/continueExecution",
+            type:"post",
+            data:{"InstanceId":_id},
+            success:function(r){
+                if(r.rep){
+                    ttable.ajax.reload();
+                }else{
+                    showTip("执行失败", 1000, function() {});
+                }
+            }
+        })
+    });
+}
 function del(_id){
 	showConfirm("确定释放实例?", function() {
         $.ajax({
@@ -190,7 +211,7 @@ function del(_id){
 			type:"post",
 			data:{"InstanceId":_id},
 			success:function(r){
-				if(r.rep){				
+				if(r.rep){
 					ttable.ajax.reload();
 				}else{
 					showTip("释放失败", 1000, function() {});
@@ -199,6 +220,22 @@ function del(_id){
 		})
 	});
 }
+function delInstanceId(_id){
+    showConfirm("确定删除实例(数据库的)?", function() {
+        $.ajax({
+            url:"/admin/distribution/deleteInstancedb",
+            type:"post",
+            data:{"InstanceId":_id},
+            success:function(r){
+                if(r.rep){
+                    ttable.ajax.reload();
+                }else{
+                    showTip("删除实例失败", 1000, function() {});
+                }
+            }
+        })
+    });
+}
 function upInstanceIds(){
     showConfirm("更新实例?", function() {
     	com.maskShow("正在更新实例...");
@@ -206,7 +243,7 @@ function upInstanceIds(){
     		url:"/admin/distribution/upstatus",
     		type:"post",
     		success:function(r){
-    			if(r.rep){			
+    			if(r.rep){
     				ttable.ajax.reload();
                     com.maskHide();
     			}
@@ -222,14 +259,14 @@ function deploy(){
     		url:"/admin/distribution/deploy",
     		type:"post",
     		success:function(r){
-    			if(r.rep){				
+    			if(r.rep){
     				ttable.ajax.reload();
                     com.maskHide();
     			}else{
     				showTip("部署失败", 1000, function() {});
     			}
     		}
-    	}) 
+    	})
     })
 }
 
@@ -240,7 +277,7 @@ function rangetask(){
     		url:"/admin/distribution/rangetask",
     		type:"post",
     		success:function(r){
-    			if(r.rep){				
+    			if(r.rep){
     				ttable.ajax.reload();
                     com.maskHide();
     			}else{
@@ -248,6 +285,6 @@ function rangetask(){
     			}
     		}
     	})
-    }) 
+    })
 }
 </script>

+ 13 - 0
udp_ocr_conter/config.json

@@ -0,0 +1,13 @@
+{
+  "udpip": "127.0.0.1",
+  "udpport": "1990",
+  "dbsize": "5",
+  "mongodb_ip": "192.168.3.207:27081",
+  "mongodb_db": "qfw",
+  "mongodb_c": "bidding",
+  "mongodb_filefiled": "projectinfo",
+  "json_sidfiled": "gtid",
+  "json_eidfiled": "lteid",
+  "toudpip": "127.0.0.1",
+  "toudpport": "1481"
+}

+ 111 - 0
udp_ocr_conter/main.go

@@ -0,0 +1,111 @@
+package main
+
+import (
+	"encoding/json"
+	"gopkg.in/mgo.v2/bson"
+	"jy/mongodbutil"
+	"log"
+	mu "mfw/util"
+	"net"
+	qu "qfw/util"
+	"strings"
+	"sync"
+)
+
+var udpclient mu.UdpClient //udp对象
+var Sysconfig map[string]interface{}
+var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
+var sys sync.RWMutex
+
+func init() {
+
+	qu.ReadConfig(&Sysconfig)
+	MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
+	MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
+	MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
+	SidField = qu.ObjToString(Sysconfig["json_sidfiled"])
+	EidField = qu.ObjToString(Sysconfig["json_eidfiled"])
+	MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo")
+	if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
+		log.Println("获取配置文件参数失败", Sysconfig)
+		return
+	}
+	mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
+	log.Println(mongodbutil.Mgo.Get().Ping())
+
+}
+func main() {
+	udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
+	b := make(chan bool, 1)
+	<-b
+}
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	defer qu.Catch()
+	switch act {
+	case mu.OP_TYPE_DATA: //保存服务
+		go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra)
+		tmp := make(map[string]interface{})
+		err := json.Unmarshal(data, &tmp)
+		if err != nil {
+			go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
+			return
+		}
+		tmp["start"] = tmp[qu.ObjToString(SidField)]
+		bytes, _ := json.Marshal(tmp)
+		b := mongodbutil.Mgo.Save("ocr_task", string(bytes))
+		log.Println("保存id:",b)
+	case mu.OP_NOOP: //其他节点回应消息打印
+		log.Println("节点接收成功", string(data),ra.String())
+	case mu.OP_GET_DOWNLOADERCODE: //分发任务
+		if `{"permission":"get_ocr_task"}` != string(data){
+			log.Println("没有权限:",string(data),ra)
+			go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra)
+			return
+		}
+		sys.Lock()
+		datas, _ := mongodbutil.Mgo.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
+		if len(*datas) == 0 {
+			go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
+			sys.Unlock()
+			return
+		}
+		tmp := (*datas)[0]
+		ObjectId := tmp["_id"]
+		sid := qu.ObjToString(tmp[qu.ObjToString(SidField)])
+		eid := qu.ObjToString(tmp[qu.ObjToString(EidField)])
+		rdata, _ := mongodbutil.Mgo.FindOneByField(MgoC, bson.M{"_id": bson.M{
+			"$gt": bson.ObjectIdHex(sid),
+		}}, `{"_id":1,"`+MgoFileFiled+`":1}`)
+		//log.Println(rdata)
+		if len((*rdata)) == 0 {
+			go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
+			sys.Unlock()
+			return
+		}
+		newId := (*rdata)["_id"]
+		if newId.(bson.ObjectId).Hex() >= eid {
+			go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//起始位置
+			go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务
+			totmp := make(map[string]string)
+			totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
+			totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)])
+			tobyte, _ := json.Marshal(totmp)
+			go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP(qu.ObjToString(Sysconfig["toudpip"])),
+				Port: qu.IntAll(Sysconfig["toudpport"]),
+			})
+			log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
+			mongodbutil.Mgo.Del("ocr_task", bson.M{"_id":ObjectId.(bson.ObjectId)})
+			sys.Unlock()
+			return
+		}
+		go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务
+		//log.Println(newId.(bson.ObjectId).Hex())
+		tmp[SidField] = newId.(bson.ObjectId).Hex()
+		mongodbutil.Mgo.Update("ocr_task",
+			bson.M{"_id":ObjectId.(bson.ObjectId)},tmp,false,false)
+		sys.Unlock()
+	}
+}

+ 6 - 8
udpfileocr/config.json

@@ -1,14 +1,12 @@
 {
   "udpip": "127.0.0.1",
   "udpport": "1490",
-  "channelsize": "1",
   "dbsize": "5",
-  "mongodb_one_ip": "192.168.3.207:27082",
-  "mongodb_one_db": "spider",
-  "mongodb_one_c": "bidding_file",
-  "mongodb_one_filefiled": "projectinfo",
+  "mongodb_ip": "192.168.3.207:27081",
+  "mongodb_db": "qfw",
+  "mongodb_c": "bidding",
+  "mongodb_filefiled": "projectinfo",
   "file2text": "192.168.3.207:1234",
-  "PageSize":5000,
-  "toudpip": "127.0.0.1",
-  "toudpport": "1481"
+  "get_data_ip": "127.0.0.1",
+  "get_data_port": "1990"
 }

+ 111 - 190
udpfileocr/main.go

@@ -2,8 +2,6 @@ package main
 
 import (
 	"encoding/json"
-	"fmt"
-	"github.com/go-gomail/gomail"
 	"gopkg.in/mgo.v2/bson"
 	"jy/mongodbutil"
 	"log"
@@ -11,34 +9,34 @@ import (
 	"net"
 	"net/rpc"
 	"path"
-	"qfw/common/src/qfw/util"
 	qu "qfw/util"
-	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
 var udpclient mu.UdpClient //udp对象
 var Sysconfig map[string]interface{}
-var MgoIP, MgoDB, MgoC, MgoFileFiled string
-var ChanB chan bool
-var PageSize int
+var MgoIP, MgoDB, MgoC, MgoFileFiled, GetDataIp, GetDataPort string
+var sys sync.RWMutex
+var ChanA,ChanB = make(chan bool),make(chan bool,1)
+var tmpNUM int32
 
 func init() {
 	qu.ReadConfig(&Sysconfig)
-	MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"])
-	MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"])
-	MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"])
-	PageSize = qu.IntAllDef(Sysconfig["PageSize"],2000)
+	MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
+	MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
+	MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
+	GetDataIp = qu.ObjToString(Sysconfig["get_data_ip"])
+	GetDataPort = qu.ObjToString(Sysconfig["get_data_port"])
 	MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
-	if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" ||PageSize <=0{
+	if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
 		log.Println("获取配置文件参数失败", Sysconfig)
 		return
 	}
 	mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
 	log.Println(mongodbutil.Mgo.Get().Ping())
-	ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5))
 }
 
 func main() {
@@ -46,121 +44,106 @@ func main() {
 	udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
+	ticker := time.NewTicker(time.Second * 30)
+	var num int
+	task: for {
+		select {
+		case <-ticker.C:
+			num++
+			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, &net.UDPAddr{
+				IP:   net.ParseIP(GetDataIp),
+				Port: qu.IntAll(GetDataPort),
+			})
+			log.Println("程序启动,开启循环请求数据信息", num)
+		case abc, ok := <-ChanA:
+			log.Println("abc,ok:", abc, ok)
+			if !ok {
+				log.Println("主动循环请求数据已关闭")
+				break task
+			}
+		}
+	}
 	b := make(chan bool, 1)
 	<-b
 }
+
 //  "file2text": "192.168.3.207:1234",
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	defer qu.Catch()
 	switch act {
 	case mu.OP_TYPE_DATA:
+		atomic.AddInt32(&tmpNUM, 1)
+		v := atomic.LoadInt32(&tmpNUM)
+		if v == 1 {
+			ChanA <- true
+			close(ChanA)
+		}
+		log.Println("data:",string(data))
+		sys.Lock()
 		var mapInfo map[string]interface{}
 		err := json.Unmarshal(data, &mapInfo)
 		if err != nil {
-			log.Println("json err :", err, string(data))
+			log.Println("json err :", err, string(data),ra.String())
+			time.Sleep(time.Second * 30)
+			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
+			sys.Unlock()
 			return
 		}
-		log.Println("updocr接收数据:",mapInfo)
-		stime :=time.Now()
-		gid := strings.TrimSpace(mapInfo["gtid"].(string))
-		rgid := gid
-		lid := strings.TrimSpace(mapInfo["lteid"].(string))
-		//err = udpclient.WriteUdp([]byte("updocr接收数据成功"), mu.OP_TYPE_DATA, &net.UDPAddr{
-		//	IP:   net.ParseIP(Sysconfig["toudpip"].(string)),
-		//	Port: qu.IntAll(Sysconfig["toudpport"]),
-		//})
-		////forfunc(lid)
-		//log.Println("接收数据成功,发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err)
-		if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
-			var jsq int64
-			query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid),"$lte": bson.ObjectIdHex(lid),}}
-			log.Println("query---:", query)
-			sum :=mongodbutil.Mgo.Count(MgoC,query)
-			log.Println("sum:", sum)
-			pageNum := (sum + PageSize - 1) / PageSize
-			limit := PageSize
-			if sum < PageSize {
-				limit = sum
-			}
-			for i := 0; i < pageNum; i++ {
-				query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}}
-				log.Println("page=", i+1,"query=", query,limit)
-				list, b := mongodbutil.Mgo.Find(MgoC,query,nil,bson.M{"_id": 1,MgoFileFiled:1},false,0, limit)
-				if !b{
-					log.Println("查询失败")
-					continue
-				}
-
-				for _,v:=range *list {
-					gid = qu.BsonIdToSId(v["_id"])
-					jsq++
-					updateNum :=0
-					qmap := qu.ObjToMap(v)
-					mid := (*qmap)["_id"]
-					if v, ok := (*qmap)[MgoFileFiled].(map[string]interface{}); !ok {
-						//log.Println(mid, "mgo 没有字段", MgoFileFiled)
+		if qu.ObjToString(mapInfo["permission"])!="ocr_task"{
+			log.Println("数据异常 :", string(data),ra.String())
+			time.Sleep(time.Second * 3)
+			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
+			sys.Unlock()
+			return
+		}
+		ObjectId := qu.ObjToString(mapInfo["id"])
+		if ObjectId == "" || !bson.IsObjectIdHex(ObjectId) {
+			log.Println("获取数据id错误", mapInfo,ra.String())
+			time.Sleep(time.Second * 3)
+			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
+			sys.Unlock()
+			return
+		}
+		sys.Unlock()
+		log.Println("获取数据成功:", mapInfo,ra.String())
+		data, _ := mongodbutil.Mgo.FindById(MgoC, ObjectId, bson.M{"_id": 1, MgoFileFiled: 1})
+		if len(*data) == 0 {
+			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
+			return
+		}
+		if v, ok := (*data)[MgoFileFiled].(map[string]interface{}); !ok {
+			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
+			return
+		} else {
+			switch v["attachments"].(type) {
+			case map[string]interface{}:
+				att := v["attachments"].(map[string]interface{})
+				updateNum := 0
+				for attk, vaatt := range att {
+					if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
+						//log.Println(mid, "mgo 结构体转换失败", vaatt)
 						continue
 					} else {
-						switch v["attachments"].(type) {
-						case map[string]interface{}:
-							att := v["attachments"].(map[string]interface{})
-							for attk, vaatt := range att {
-								if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
-									//log.Println(mid, "mgo 结构体转换失败", vaatt)
-									continue
-								} else {
-									ChanB <- true
-									if qu.ObjToString(fileinfo["fid"]) ==""{
-										<-ChanB
-										//log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
-										continue
-									}
-									//if (strings.Contains(qu.ObjToString(fileinfo["url"]),"fs.qmx.top")|| strings.Contains(qu.ObjToString(fileinfo["url"]),"fj1.jianyu360.com"))&& (strings.TrimSpace(qu.ObjToString(fileinfo["content"]))==""||strings.Contains(qu.ObjToString(fileinfo["content"]),"error") ){
-									//	save(mid,attk, qmap, &fileinfo,&updateNum)
-									//	<-ChanB
-									//}else {
-									//	<-ChanB
-									//}
-									//if qu.ObjToString(fileinfo["update"]) ==""{
-									//	<-ChanB
-									//	log.Println(mid, "mgo ", MgoFileFiled,"没有update ")
-									//	continue
-									//}
-									save(mid,attk, qmap, &fileinfo,&updateNum)
-									<-ChanB
-								}
-							}
+						ChanB <- true
+						if qu.ObjToString(fileinfo["fid"]) == "" {
+							//log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
+							<-ChanB
+							continue
 						}
+						save(bson.ObjectIdHex(ObjectId), attk, &v, &fileinfo, &updateNum)
+						<-ChanB
 					}
 				}
 			}
-			//发送udp信号
-			by, _ := json.Marshal(map[string]interface{}{
-				"gtid":  rgid,
-				"lteid": lid,
-				"stype": "fujian",
-			})
-			err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-				IP:   net.ParseIP(Sysconfig["toudpip"].(string)),
-				Port: qu.IntAll(Sysconfig["toudpport"]),
-			})
-			//识别完以后再次查询数据库,进行下一轮识别
-			log.Println("处理查询数据结束...",jsq,time.Now().Sub(stime))
-			SendMail(rgid+"--->"+lid+"处理完成")
-			//进行下一轮识别
-			forfunc(lid)
-			log.Println("发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err)
-		} else {
-			log.Println("开始id或结束id参数错误:", string(data))
+			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
 		}
-
 	case mu.OP_NOOP: //下个节点回应
 		log.Println("接收成功", string(data))
 
 	}
 
 }
-func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},updatenum *int) {
+func save(mid interface{}, attk string, qmap, fileinfo *map[string]interface{}, updatenum *int) {
 	defer qu.Catch()
 	type FileData struct {
 		ObjId   string //Id
@@ -180,17 +163,17 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},up
 	//bs, _ := ioutil.ReadFile("1.docx")
 	var fffpath string
 	fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"]))
-	if strings.TrimSpace(fffpath) == ""{
-		fffpath  = qu.ObjToString((*fileinfo)["ftype"])
-	}else {
+	if strings.TrimSpace(fffpath) == "" {
+		fffpath = qu.ObjToString((*fileinfo)["ftype"])
+	} else {
 		fffpath = fffpath[1:]
 	}
 	fileData := &FileData{
-		ObjId:mid.(bson.ObjectId).String(),
+		ObjId:  mid.(bson.ObjectId).String(),
 		OrgUrl: qu.ObjToString((*fileinfo)["url"]),
-		Name: qu.ObjToString((*fileinfo)["filename"]),
-		Fid:  qu.ObjToString((*fileinfo)["fid"]), //附件id
-		Type: fffpath,
+		Name:   qu.ObjToString((*fileinfo)["filename"]),
+		Fid:    qu.ObjToString((*fileinfo)["fid"]), //附件id
+		Type:   fffpath,
 	}
 	//log.Println(mid, fileData)
 	err = client.Call("FileToText.FileToContext", fileData, &reply)
@@ -208,14 +191,14 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},up
 	//	{"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
 	//}
 	//reply, _ = json.Marshal(testfiles)
-	if len(reply) == 0{
-		log.Println(mid, "rpc返回数据为空:",qu.ObjToString((*fileinfo)["fid"]), string(reply))
+	if len(reply) == 0 {
+		log.Println(mid, "rpc返回数据为空:", qu.ObjToString((*fileinfo)["fid"]), string(reply))
 		return
 	}
 	//log.Println(mid, string(reply))
 	rdata := make(map[string]interface{})
 	if err := json.Unmarshal(reply, &rdata); err != nil {
-		log.Println(mid, "rpc返回数据解析失败:",qu.ObjToString((*fileinfo)["fid"]), err)
+		log.Println(mid, "rpc返回数据解析失败:", qu.ObjToString((*fileinfo)["fid"]), err)
 		return
 	}
 	if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
@@ -225,10 +208,10 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},up
 			(*fileinfo)["content"] = rdata["context"]
 		}
 		(*fileinfo)["expend"] = rdata["expend"]
-		delete(*fileinfo,"update")
+		delete(*fileinfo, "update")
 		//log.Println((*fileinfo))
 
-		(*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk]=*fileinfo
+		(*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk] = *fileinfo
 		//asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
 		//qwer := asdf["attachments"].(map[string]interface{})
 		//qwer[attk] =*fileinfo
@@ -239,91 +222,29 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},up
 				MgoFileFiled: (*qmap)[MgoFileFiled],
 			},
 		})
-		if updateBool{
+		if updateBool {
 			*updatenum++
 			mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
 				"$set": bson.M{
 					"updatefileNum": &updatenum,
 				},})
 			log.Println(mid, "mongo更新数据成功")
-		}else {
-			log.Println(mid, "mongo更新数据失败",qu.ObjToString((*fileinfo)["fid"]))
-		}
-		nowHour := time.Now().Hour()
-		rdlock.Lock()
-		if nowHour != hourNum{
-			log.Println("send email:",SendMail(fmt.Sprint(updateBool,mid)))
-			hourNum = nowHour
+		} else {
+			log.Println(mid, "mongo更新数据失败", qu.ObjToString((*fileinfo)["fid"]))
 		}
-		rdlock.Unlock()
+		//nowHour := time.Now().Hour()
+		//rdlock.Lock()
+		//if nowHour != hourNum {
+		//	log.Println("send email:", SendMail(fmt.Sprint(updateBool, mid)))
+		//	hourNum = nowHour
+		//}
+		//rdlock.Unlock()
 	} else {
-		log.Println(mid, "调用rpc服务解析异常:",mid,qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
+		log.Println(mid, "调用rpc服务解析异常:", mid, qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
 	}
 
-}
-var hourNum int
-var rdlock sync.RWMutex
-func SendMail( body string ) error {
-	//定义邮箱服务器连接信息,如果是阿里邮箱 pass填密码,qq邮箱填授权码
-	mailConn := map[string]string {
-		"user": "550838476@qq.com",
-		"pass": "",
-		"host": "smtp.qq.com",
-		"port": "465",
-	}
-
-	port, _ := strconv.Atoi(mailConn["port"]) //转换端口类型为int
-
-	m := gomail.NewMessage()
-	m.SetHeader("From","Get to" + "<" + mailConn["user"] + ">")  //这种方式可以添加别名,即“XD Game”, 也可以直接用<code>m.SetHeader("From",mailConn["user"])</code> 读者可以自行实验下效果
-	m.SetHeader("To", []string{"550838476@qq.com"}...)  //发送给多个用户
-	m.SetHeader("Subject", "MongoId")  //设置邮件主题
-	m.SetBody("text/html","服务器:"+ body)     //设置邮件正文
-
-	d := gomail.NewDialer(mailConn["host"], port, mailConn["user"], mailConn["pass"])
-
-	err := d.DialAndSend(m)
-	return err
-
 }
 
-func forfunc(lid string) {
-	for {
-		//查询最后一个id
-		lastObjectId, _ := mongodbutil.Mgo.Find(MgoC,nil,"-_id",bson.M{"_id":1},true,-1,-1)
-		lastId,ok := (*lastObjectId)[0]["_id"].(bson.ObjectId)
-		log.Println("lastID:",lastId)
-		//查询最后一个id出错重新查询
-		if!ok{//转换失败
-			log.Println("查询异常",*lastObjectId)
-			time.Sleep(time.Minute)
-			continue
-		}
-		//查询最后一个id等于上一轮的id就重新查询
-		if lastId.Hex() == lid {
-			log.Println("没有新数据",lastId.Hex())
-			SendMail(time.Now().String()+"没有最新数据,当前最后一条数据id:"+lastId.Hex())
-			time.Sleep(time.Hour)
-			continue
-		}
-		//不相等说明有新数据,进行下次处理
-		m := map[string]string{
-			"gtid":lid,//上一轮结束的最后id
-			"lteid":lastId.Hex(),//新一轮查询出来的id
-		}
-		bytes, _ := json.Marshal(m)
-		//发送udp
-		err := udpclient.WriteUdp(bytes,mu.OP_TYPE_DATA,&net.UDPAddr{
-			IP:   net.ParseIP( util.ObjToString(Sysconfig["udpip"])),
-			Port:  util.IntAll(Sysconfig["udpport"]),
-		})
-		if err != nil{
-			log.Println("发送udp失败",err,string(bytes))
-			time.Sleep(time.Minute)
-			continue
-		}
-		SendMail(time.Now().String()+fmt.Sprint("发送udp成功,gtid:",lid,",lteid:",lastId.Hex()))
-		log.Println("发送udp成功,gtid:",lid,",lteid:",lastId.Hex())
-		break//发送完后终止循环
-	}
-}
+//
+//var hourNum int
+//var rdlock sync.RWMutex