wangchuanjin 7 лет назад
Родитель
Сommit
de7b56f206

+ 4 - 3
jyinfomatch/src/config.json

@@ -5,9 +5,10 @@
     "mgoSize": 50,
     "mgoAddr": "192.168.3.18:27080",
 	"mongodbName": "qyfw",
-	"collection": "datas",
+	"collection": "matchresult",
     "elasticPoolSize": 10,
     "elasticsearch": "http://192.168.3.18:9800",
-    "maxSearch": 2,
-	"saveSize": 200
+    "maxSearch": 500,
+	"saveSize": 200,
+	"poolSize": 100
 }

+ 105 - 0
jyinfomatch/src/luascript/dev.lua

@@ -0,0 +1,105 @@
+--用户唯一标识
+userId="1"
+--匹配词
+keys1={
+	{"政务云","不动产登记","涉密网","机要网","质量监督管理局","政法网","电子政务","警务云"},
+	{"三通两平台","云学堂","云课堂","云教室","薄改","改薄","校园网","教育云"},
+	{"卫生云","卫生信息平台","医疗卫生机构管理信息系统","医疗机构管理信息系统","工业4.0","智能制造","全民健康","中医馆","MES","HIS","HANA","高性能计算"},
+	{"交通一卡通","两网融合","智慧交通","智慧高速","智慧公交"}
+}
+keys1_1={"政府","教育","企业","公共事业"}
+keys2={"交换机","路由器","防火墙","网络安全","负载均衡","服务器","存储","小机","小型机","无线网","大数据","云计算","云平台","等保","等级保护","物联网"}
+--排除词
+notkey1={"监理","设计","施工","装修","维修","维护","维保","运维","打印","扫描","投影","数据整合","缆","土建","空调","电脑"}
+notkey2={"软件","办公","服务"}
+notkey2_2={{"数据中心","平台","网络"},{"办公网"},{"服务器","硬件集成","云平台建设","网络"}}
+--脚本主入口方法
+function filterValidate(data)
+	--匹配上的父节点,对应的子节点,是否成功
+	local y_p,y_k,y_ok = "","",false
+	--标题处理
+	local title = data["title"]
+	--print(title)
+	if title ~= nil and title ~= "" then
+		--标题匹配
+		y_p,y_k,y_ok = contain(title)
+		--print("title包含--",y_p,y_k,y_ok)
+		--如果标题匹配上,进行标题排除
+		if y_ok then
+			--排除匹配上的词,对应的保留词,是否成功
+			local n_p,n_k,n_ok = exclude(title)
+			if n_p ~= "" and n_k ~= "" then
+				--print("title排除--","排除",n_p,",保留",n_k,",",n_ok)
+			else
+				--print("title排除--",n_p,n_k,n_ok)
+			end
+			if n_ok then
+				return nil
+			end
+		end
+	end
+	--正文处理
+	local detail = data["detail"]
+	--print(detail)
+	if detail ~= nil and detail ~= "" then
+		--如果标题没有匹配上,匹配正文
+		if not y_ok then
+			--排除匹配上的词,对应的保留词,是否成功
+			y_p,y_k,y_ok = contain(detail)
+			--print("detail包含--",y_p,y_k,y_ok)
+		end
+		--匹配上标题或者正文,都要进行正文排除
+		--排除匹配上的词,对应的保留词,是否成功
+		if y_ok then
+			local n_p,n_k,n_ok = exclude(detail)
+			if n_p ~= "" and n_k ~= "" then
+				--print("detail排除--","排除",n_p,",保留",n_k,",",n_ok)
+			else
+				--print("detail排除--",n_p,n_k,n_ok)
+			end
+			if n_ok then
+				return nil
+			end
+		end
+	end
+	--没有匹配上
+	if not y_ok then
+		return nil
+	end
+	return data
+end
+--包含
+function contain(value)
+	for k,keys in pairs(keys1) do
+		for i,key in pairs(keys) do
+			if string.find(value,key) ~= nil then
+				return keys1_1[k],key,true
+			end
+		end
+	end
+	for k,key in pairs(keys2) do
+		if string.find(value,key) ~= nil then
+			return "",key,true
+		end
+	end
+	return "","",false
+end
+--排除
+function exclude(value)
+	for k,key in pairs(notkey1) do
+		if string.find(value,key) ~= nil then
+			return key,"",true
+		end
+	end
+	for k,key in pairs(notkey2) do
+		if string.find(value,key) ~= nil then
+			for i,n in pairs(notkey2_2[k]) do
+				if string.find(value,n) ~= nil then
+					return key,n,false
+				end
+			end
+			return key,"",true
+		end
+	end
+	return "","",false
+end

+ 0 - 4
jyinfomatch/src/luascript/my.lua

@@ -1,4 +0,0 @@
-userId="1";
-function analysis(data)
-	return data
-end

+ 1 - 0
jyinfomatch/src/main.go

@@ -19,6 +19,7 @@ func init() {
 	elastic.InitElasticSize(qyfw.SysConfig["elasticsearch"].(string), util.IntAllDef(qyfw.SysConfig["elasticPoolSize"], 20))
 	qyfw.Collection = qyfw.SysConfig["collection"].(string)
 	qyfw.SaveSize = util.IntAllDef(qyfw.SysConfig["saveSize"], 200)
+	qyfw.PoolSize = util.IntAllDef(qyfw.SysConfig["poolSize"], 100)
 }
 
 //主应用,定时任务

+ 27 - 0
jyinfomatch/src/main_test.go

@@ -0,0 +1,27 @@
+package main
+
+import (
+	"io/ioutil"
+	"log"
+	"qfw/util/mongodb"
+	"qyfw"
+	"testing"
+)
+
+func Test_main(t *testing.T) {
+	mongodb.InitMongodbPool(1, "192.168.3.18:27080", "qyfw")
+	list := []map[string]interface{}{}
+	data := mongodb.FindById("matchresult", "5a055045cbac61d2bcb1db12", nil)
+	list = append(list, *data)
+	qyfw.IsSave = false
+	job := &qyfw.Job{}
+	job.Results = &[]map[string]interface{}{}
+	job.Name = "test"
+	luafile, err := ioutil.ReadFile("luascript/dev.lua")
+	if err != nil {
+		log.Println(err)
+		return
+	}
+	job.ScriptFile = string(luafile)
+	job.Start(&list)
+}

+ 7 - 6
jyinfomatch/src/qyfw/handler.go

@@ -13,7 +13,7 @@ import (
 
 var (
 	AllJobs           = map[string]*Job{}
-	LoadLuaScripePool = make(chan bool, 100)
+	LoadLuaScripePool = make(chan bool, 1)
 	LoadLuaScripeWait = &sync.WaitGroup{}
 )
 
@@ -47,7 +47,7 @@ func LoadAllLuaScript() {
 func runJob() {
 	util.Try(func() {
 		LoadDatasByEs(&SysConfig)
-		//util.WriteSysConfig(SysConfig)
+		util.WriteSysConfig(SysConfig)
 	}, func(e interface{}) {})
 	time.AfterFunc(time.Duration(util.IntAll(SysConfig["durationMinutes"]))*time.Minute, runJob)
 }
@@ -58,10 +58,11 @@ func NewLuaScript(name, luafile string) *Job {
 	job := &Job{}
 	job.Results = &[]map[string]interface{}{}
 	job.Name = name
-	job.LoadScript(luafile)
-	job.UserId = job.GetVar("userId")
-	if job.UserId == "" {
-		log.Println("error:从脚本", name, "中获取到UserId为空!")
+	job.ScriptFile = luafile
+	job.EachListPool = make(chan bool, PoolSize)
+	//只是单纯的验证一下,lua脚本是否有问题
+	script := &Script{}
+	if !script.LoadScript(name, luafile, true) {
 		return nil
 	}
 	return job

+ 43 - 25
jyinfomatch/src/qyfw/job.go

@@ -5,6 +5,7 @@ package qyfw
 
 import (
 	"log"
+	qutil "qfw/util"
 	"qfw/util/mongodb"
 	"sync"
 	"util"
@@ -13,44 +14,61 @@ import (
 )
 
 var (
-	Collection   = "datas"
-	eachListPool = make(chan bool, 100)
-	SaveSize     = 200
+	Collection = "matchresult"
+	SaveSize   = 200
+	IsSave     = true
+	PoolSize   = 100
 )
 
 type Job struct {
-	Script
-	UserId    string                    //用户的唯一标识
-	Name      string                    //名称
-	Results   *[]map[string]interface{} //最终要存库的数据
-	Lock      sync.Mutex
-	WaitGroup sync.WaitGroup
+	Name         string                    //名称
+	Results      *[]map[string]interface{} //最终要存库的数据
+	Lock         sync.Mutex
+	WaitGroup    sync.WaitGroup
+	ScriptFile   string
+	EachListPool chan bool
 }
 
 //任务
 func (j *Job) Start(list *[]map[string]interface{}) {
+	count := 0
 	for _, v := range *list {
-		eachListPool <- true
+		j.EachListPool <- true
 		j.WaitGroup.Add(1)
-		go func() {
+		go func(info map[string]interface{}) {
 			defer func() {
-				<-eachListPool
+				<-j.EachListPool
 				j.WaitGroup.Done()
 			}()
+			script := &Script{}
+			if !script.LoadScript(j.Name, j.ScriptFile, false) {
+				return
+			}
+			userId := script.GetVar("userId")
+			if userId == "" {
+				log.Println("error:从脚本", j.Name, "中获取到UserId为空!")
+				return
+			}
 			//调用lua,匹配
-			result := j.ExecJob(&v)
+			result := j.ExecJob(script, &info)
 			//保存
-			if result != nil && len(*result) > 0 {
-				j.Save(result, false)
+			if result != nil && len(*result) > 0 && IsSave {
+				j.Save(result, userId, false)
 			}
-		}()
+		}(v)
+		if count%200 == 0 {
+			log.Println(j.Name, "index:", count)
+		}
+		count++
 	}
 	j.WaitGroup.Wait()
-	j.Save(nil, true)
+	j.Save(nil, "", true)
+	log.Println("脚本", j.Name, "执行完毕!")
 }
 
-func (j *Job) ExecJob(info *map[string]interface{}) *map[string]interface{} {
-	tab := j.L.NewTable()
+func (j *Job) ExecJob(script *Script, info *map[string]interface{}) *map[string]interface{} {
+	defer qutil.Catch()
+	tab := script.L.NewTable()
 	for k, v := range *info {
 		if val, ok := v.(string); ok {
 			tab.RawSet(lua.LString(k), lua.LString(val))
@@ -66,8 +84,8 @@ func (j *Job) ExecJob(info *map[string]interface{}) *map[string]interface{} {
 			tab.RawSet(lua.LString(k), lua.LBool(val))
 		}
 	}
-	err := j.L.CallByParam(lua.P{
-		Fn:      j.L.GetGlobal("analysis"),
+	err := script.L.CallByParam(lua.P{
+		Fn:      script.L.GetGlobal("filterValidate"),
 		NRet:    1,
 		Protect: true,
 	}, tab)
@@ -75,8 +93,8 @@ func (j *Job) ExecJob(info *map[string]interface{}) *map[string]interface{} {
 		log.Println(err)
 		return nil
 	}
-	lv := j.L.Get(-1)
-	j.L.Pop(1)
+	lv := script.L.Get(-1)
+	script.L.Pop(1)
 	if tbl, ok := lv.(*lua.LTable); ok {
 		return util.TableToMap(tbl)
 	} else {
@@ -85,11 +103,11 @@ func (j *Job) ExecJob(info *map[string]interface{}) *map[string]interface{} {
 }
 
 //保存到mongodb
-func (j *Job) Save(result *map[string]interface{}, flag bool) {
+func (j *Job) Save(result *map[string]interface{}, userId string, flag bool) {
 	j.Lock.Lock()
 	defer j.Lock.Unlock()
 	if result != nil {
-		(*result)["userid"] = j.UserId
+		(*result)["userid"] = userId
 		(*result)["id"] = (*result)["_id"]
 		delete(*result, "_id")
 		*j.Results = append(*j.Results, *result)

+ 8 - 10
jyinfomatch/src/qyfw/loadDatas.go

@@ -12,8 +12,7 @@ import (
 
 const (
 	//industry
-	ShowField = `"_id","title","detail","projectscope","publishtime","toptype","subtype","type","area","href","areaval","infoformat",` +
-		`"projectname","buyer","winner","buyer","budget","bidamount","bidopentime","s_subscopeclass"`
+	ShowField   = `"_id","title","detail","projectscope","publishtime","toptype","subtype","type","area","href","projectname","winner","buyer","budget","bidamount","bidopentime","s_subscopeclass"`
 	SortQuery   = `{"publishtime":"desc"}`
 	DB          = "bidding"
 	IDRange     = `{"range":{"id":{"gt":"%s","lte":"%s"}}},{"range":{"publishtime":{"gt": %d}}}`
@@ -36,7 +35,7 @@ func LoadDatasByEs(Config *map[string]interface{}) bool {
 	st, _ := time.ParseInLocation(util.Date_Full_Layout, (*Config)["StartTime"].(string), time.Local)
 	lastTime := st.Unix()
 	_id := util.ObjToString((*Config)["lastid"])
-	log.Println("开始执行任务-id-lasttime:", _id, lastTime)
+	log.Println("开始执行定时任务-id-lasttime:", _id, lastTime)
 	//获取本次查询的最大id
 	idQuery := ""
 	if _id == "" {
@@ -61,7 +60,7 @@ func LoadDatasByEs(Config *map[string]interface{}) bool {
 	}
 	//
 	go runLuaScript(&list)
-	log.Println("任务结束-comeintime-lastid", comeintime, lastid)
+	log.Println("定时任务结束-comeintime-lastid", comeintime, lastid)
 	(*Config)["StartTime"] = util.FormatDateWithObj(&comeintime, util.Date_Full_Layout)
 	(*Config)["lastid"] = lastid
 	return true
@@ -76,8 +75,7 @@ func initBiddingCache(_id, lastid string, lastTime int64, startTime, endTime int
 		c_query = fmt.Sprintf(FilterQuery, fmt.Sprintf(IDRange, _id, lastid, lastTime-7*86400))
 	}
 	log.Println("es query:", c_query)
-	//testquery := `{"terms":{"_id":["596f21935d11e1c7455ddc77"]}}`
-	//testquery := ``
+	//testquery := `{"terms":{"_id":["596f21935d11e1c7455ddc7f"]}}`
 	//c_query = fmt.Sprintf(FilterQuery, testquery)
 	count := int(elastic.Count(DB, DB, c_query))
 	log.Println("本次推送共查到数据", count, "条")
@@ -128,13 +126,13 @@ func initBiddingCache(_id, lastid string, lastTime int64, startTime, endTime int
 func runLuaScript(list *[]map[string]interface{}) bool {
 	for _, j := range AllJobs {
 		eachLuaPool <- true
-		go func() {
+		go func(job *Job) {
 			defer func() {
 				<-eachLuaPool
 			}()
-			log.Println("执行脚本", j.Name)
-			j.Start(list)
-		}()
+			log.Println("执行脚本", job.Name)
+			job.Start(list)
+		}(j)
 	}
 	return true
 }

+ 8 - 5
jyinfomatch/src/qyfw/script.go

@@ -15,14 +15,12 @@ import (
 
 //脚本
 type Script struct {
-	ScriptFile string
-	L          *lua.LState
+	L *lua.LState
 }
 
 //加载文件
-func (s *Script) LoadScript(script_file string) {
+func (s *Script) LoadScript(name, script_file string, isValidate bool) bool {
 	defer util.Catch()
-	s.ScriptFile = script_file
 	options := lua.Options{
 		RegistrySize:        256 * 20,
 		CallStackSize:       256,
@@ -31,8 +29,13 @@ func (s *Script) LoadScript(script_file string) {
 	s.L = lua.NewState(options)
 	s.L.PreloadModule("json", lujson.Loader)
 	if err := s.L.DoString(script_file); err != nil {
-		panic("加载lua脚本错误:" + err.Error())
+		panic("加载" + name + "脚本错误:" + err.Error())
+		return false
 	}
+	if isValidate {
+		return true
+	}
+	return true
 }
 
 //取得变量