浏览代码

Merge branch 'dev0.3' of http://192.168.3.207:10080/qmx/datatag into dev0.3

# Conflicts:
#	src/service/task_rule.go
apple 5 年之前
父节点
当前提交
f079fee4ef

+ 2 - 2
src/config.json

@@ -1,5 +1,6 @@
 {
   "port": "7000",
+  "udpport": ":11116",
   "mgodb": "192.168.3.207:27092",
   "dbsize": 5,
   "dbname": "datatag",
@@ -18,6 +19,5 @@
   "jkmail": {
     "to": "wangjianghan@topnet.net.cn",
     "api": "http://10.171.112.160:19281/_send/_mail"
-  },
-  "udpport": ":11116"
+  }
 }

+ 3 - 3
src/main.go

@@ -7,18 +7,17 @@ import (
 	qu "qfw/util"
 	"service"
 	"time"
+	. "udptask"
 	"util"
 
 	"github.com/go-xweb/xweb"
 )
 
-
-
 func init() {
 	qu.ReadConfig(&util.Sysconfig) //初始化配置
 	util.InitMgoPool()             //初始化连接
 	util.InitOther()
-
+	InitUdp()
 	//xweb框架配置
 	xweb.Config.RecoverPanic = true
 	xweb.Config.Profiler = true
@@ -42,6 +41,7 @@ func init() {
 	xweb.RootApp().Logger.SetOutputLevel(4)
 
 	//xweb.AddTmplVar("add", func(a, b int) int { return a + b })
+	//InitTask("5ecf56ed92b4ee16ffd7e21e")
 }
 
 func main() {

+ 47 - 101
src/service/task_rule.go

@@ -2,70 +2,28 @@ package service
 
 import (
 	"encoding/json"
-	"github.com/go-xweb/xweb"
-	"gopkg.in/mgo.v2/bson"
 	"log"
+	mu "mfw/util"
 	"net"
 	qu "qfw/util"
 	"strings"
 	"time"
+	. "udptask"
 	. "util"
-	mu "mfw/util"
+
+	"github.com/go-xweb/xweb"
+	"gopkg.in/mgo.v2/bson"
 )
 
 type TaskRule struct {
 	*xweb.Action
-	taskList     xweb.Mapper `xweb:"/service/task/list"`         //任务列表
-	taskCreate   xweb.Mapper `xweb:"/service/task/create"`       //任务创建
-	taskEdit  	 xweb.Mapper `xweb:"/service/task/edit"`         //任务编辑
-	taskSave   	 xweb.Mapper `xweb:"/service/task/save"`       	 //任务保存
-	taskDelete   xweb.Mapper `xweb:"/service/task/delete"`       //任务保存
-	taskStart    xweb.Mapper `xweb:"/service/task/start"`    //任务保存
-	taskEnd	 	 xweb.Mapper `xweb:"/service/task/end"`      //任务保存
-}
-var (
-	udpclient    	mu.UdpClient             //udp对象
-	taskConfig		map[string]interface{} //配置文件
-	isTaskOK		bool
-	timeout = 3
-)
-func init() {
-	qu.ReadConfig(&taskConfig) //初始化配置
-	updport := taskConfig["udpport"].(string)
-	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
-	log.Println("Udp服务监听", updport)
-	udpclient.Listen(processUdpMsg)
-}
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
-	switch act {
-	case mu.OP_TYPE_DATA:
-		var rep map[string]interface{}
-		err := json.Unmarshal(data, &rep)
-		if err != nil { //测试接收
-			go udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
-		} else {
-			by, _ := json.Marshal(map[string]interface{}{
-				"taskid":  rep["taskid"],
-				"stype":   rep["stype"],
-			})
-			go udpclient.WriteUdp(by, mu.OP_NOOP, ra) //回应上一个节点
-
-		}
-	case mu.OP_NOOP: //下个节点回应
-		log.Println("接收回应:",string(data))
-		var rep map[string]interface{}
-		err := json.Unmarshal(data, &rep)
-		if err != nil {//空数据
-			//走超时失败
-		}else {//正确
-			if qu.ObjToString(rep["stype"])=="startTask" {
-				updateMgoIsuse("1",qu.ObjToString(rep["taskid"]))
-			}else if rep["stype"]=="stopTask"{
-				updateMgoIsuse("0",qu.ObjToString(rep["taskid"]))
-			}else {
-
-			}
-		}}
+	taskList   xweb.Mapper `xweb:"/service/task/list"`   //任务列表
+	taskCreate xweb.Mapper `xweb:"/service/task/create"` //任务创建
+	taskEdit   xweb.Mapper `xweb:"/service/task/edit"`   //任务编辑
+	taskSave   xweb.Mapper `xweb:"/service/task/save"`   //任务保存
+	taskDelete xweb.Mapper `xweb:"/service/task/delete"` //任务保存
+	taskStart  xweb.Mapper `xweb:"/service/task/start"`  //任务保存
+	taskEnd    xweb.Mapper `xweb:"/service/task/end"`    //任务保存
 }
 
 //展示列表
@@ -87,7 +45,7 @@ func (task *TaskRule) TaskList() {
 			"recordsFiltered": count,
 			"recordsTotal":    count,
 		})
-	}else {
+	} else {
 		task.Render("task/task_list.html")
 	}
 }
@@ -133,14 +91,14 @@ func (task *TaskRule) TaskSave() {
 			data["s_createuser"] = user["name"]
 			data["i_updatetime"] = curtime
 
-		}else {
-			log.Println("更新-数据",id)
+		} else {
+			log.Println("更新-数据", id)
 			data["i_updatetime"] = curtime
 			data["s_updateuser"] = user["name"]
 		}
-		id, rep := saveTaskMongo(id,data)
+		id, rep := saveTaskMongo(id, data)
 		task.ServeJson(map[string]interface{}{
-			"rep":       rep,
+			"rep": rep,
 		})
 	}
 }
@@ -149,8 +107,8 @@ func (task *TaskRule) TaskSave() {
 func (task *TaskRule) TaskDelete() {
 	defer qu.Catch()
 	_id := task.GetString("_id")
-	set := bson.M{"_id":qu.StringTOBsonId(_id)}
-	b :=Mgo.Del("taskinfo",set)
+	set := bson.M{"_id": qu.StringTOBsonId(_id)}
+	b := Mgo.Del("taskinfo", set)
 	task.ServeJson(map[string]interface{}{
 		"rep": b,
 	})
@@ -164,42 +122,42 @@ func (task *TaskRule) TaskStart() {
 	//发送udp 开启
 	by, _ := json.Marshal(map[string]interface{}{
 		"taskid": id,
-		"stype": "startTask",
+		"stype":  "startTask",
 	})
 	addr := &net.UDPAddr{
 		IP:   net.ParseIP(data["addr"].(string)),
 		Port: qu.IntAll(data["port"]),
 	}
 
-	err :=udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
-	if err!=nil {
+	err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+	if err != nil {
 		task.ServeJson(map[string]interface{}{
 			"rep": false,
-			"msg":"任务开始失败-udp",
+			"msg": "任务开始失败-udp",
 		})
 	}
 
-	n:=0
+	n := 0
 	for {
-		if !isTaskOK && n < timeout {
+		if !IsTaskOK && n < Timeout {
 			time.Sleep(1 * time.Second)
 			n++
 		} else {
 			break
 		}
 	}
-	log.Println("循环结束",isTaskOK,n,"处理开启post回调")
-	if isTaskOK {
-		isTaskOK = false
+	log.Println("循环结束", IsTaskOK, n, "处理开启post回调")
+	if IsTaskOK {
+		IsTaskOK = false
 		task.ServeJson(map[string]interface{}{
 			"rep": true,
-			"msg":"任务开始成功",
+			"msg": "任务开始成功",
 		})
-	}else {
-		isTaskOK = false
+	} else {
+		IsTaskOK = false
 		task.ServeJson(map[string]interface{}{
 			"rep": false,
-			"msg":"任务开始失败-超时",
+			"msg": "任务开始失败-超时",
 		})
 	}
 }
@@ -211,59 +169,47 @@ func (task *TaskRule) TaskEnd() {
 	id := qu.ObjToString(data["id"])
 	//发送udp 关闭
 	by, _ := json.Marshal(map[string]interface{}{
-		"taskid":  id,
-		"stype": "stopTask",
+		"taskid": id,
+		"stype":  "stopTask",
 	})
 	addr := &net.UDPAddr{
 		IP:   net.ParseIP(data["addr"].(string)),
 		Port: qu.IntAll(data["port"]),
 	}
-	err :=udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
-	if err!=nil {
+	err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+	if err != nil {
 		task.ServeJson(map[string]interface{}{
 			"rep": false,
-			"msg":"任务开始失败-udp",
+			"msg": "任务开始失败-udp",
 		})
 	}
 
-	n:=0
+	n := 0
 	for {
-		if !isTaskOK && n < timeout {
+		if !IsTaskOK && n < Timeout {
 			time.Sleep(1 * time.Second)
 			n++
 		} else {
 			break
 		}
 	}
-	log.Println("循环结束",isTaskOK,n,"处理关闭post回调")
-	if isTaskOK {
-		isTaskOK = false
+	log.Println("循环结束", IsTaskOK, n, "处理关闭post回调")
+	if IsTaskOK {
+		IsTaskOK = false
 		task.ServeJson(map[string]interface{}{
 			"rep": true,
-			"msg":"任务开始成功",
+			"msg": "任务开始成功",
 		})
-	}else {
-		isTaskOK = false
+	} else {
+		IsTaskOK = false
 		task.ServeJson(map[string]interface{}{
 			"rep": false,
-			"msg":"任务开始失败-超时",
+			"msg": "任务开始失败-超时",
 		})
 	}
 }
 
-
 //******方法
-func updateMgoIsuse(isu string,id string)  {
-	set := bson.M{
-		"$set": bson.M{
-			"s_isuse": isu,
-		},
-	}
-	log.Println(id)
-	Mgo.UpdateById("taskinfo", id, set)
-	isTaskOK = true
-}
-
 func saveTaskMongo(id string, rdata map[string]interface{}) (rid string, rep bool) {
 	defer qu.Catch()
 	if id == "" {
@@ -281,4 +227,4 @@ func saveTaskMongo(id string, rdata map[string]interface{}) (rid string, rep boo
 		rep = Mgo.Update("taskinfo", query, bson.M{"$set": rdata}, false, false)
 	}
 	return
-}
+}

+ 0 - 0
src/tagging/src/config.json → src/tagservice/src/config.json


+ 19 - 10
src/tagging/src/data.go → src/tagservice/src/data.go

@@ -39,14 +39,16 @@ type Rule struct {
 
 //关键词类型
 type KeyWord struct {
-	KeyReg    []*regexp.Regexp
-	MatchType []string //关键词的匹配方式
+	KeyReg     []*regexp.Regexp
+	MatchType  []string     //关键词的匹配方式
+	KeyWordMap map[int]bool //记录KeyReg中字母规则
 }
 
 //附加词类型
 type AddWord struct {
-	KeyReg    []*regexp.Regexp
-	MatchType []string //关键词的匹配方式
+	KeyReg     []*regexp.Regexp
+	MatchType  []string     //附加词的匹配方式
+	AddWordMap map[int]bool //记录KeyReg中字母规则
 }
 
 func InitTags() {
@@ -90,19 +92,25 @@ func InitTags() {
 		aw_commaArr := strings.Split(addword, ",")
 		for _, comma := range aw_commaArr {
 			aw := &AddWord{}
+			aw.AddWordMap = make(map[int]bool)
 			aw.MatchType = awmArr
 			aw_addArr := strings.Split(comma, "&&")
 			if len(aw_addArr) == 1 { //,
 				tmp_aw := aw_addArr[0]
 				if tmp_aw != "" {
 					if LetterCase.MatchString(tmp_aw) {
-						tmp_aw = strings.ToUpper(tmp_aw)
+						tmp_aw = strings.ToUpper(tmp_aw) //附加词中有英文全部转为大写
+						aw.AddWordMap[len(aw.KeyReg)] = true
 					}
 					aw.KeyReg = append(aw.KeyReg, regexp.MustCompile(tmp_aw))
 				}
 			} else { //&&
 				for _, and := range aw_addArr {
 					if and != "" {
+						if LetterCase.MatchString(and) {
+							and = strings.ToUpper(and) //附加词中有英文全部转为大写
+							aw.AddWordMap[len(aw.KeyReg)] = true
+						}
 						aw.KeyReg = append(aw.KeyReg, regexp.MustCompile(and))
 					}
 				}
@@ -125,6 +133,7 @@ func InitTags() {
 		kw_commaArr := strings.Split(keyword, ",")
 		for _, comma := range kw_commaArr {
 			kw := &KeyWord{}
+			kw.KeyWordMap = make(map[int]bool)
 			kw.MatchType = kwmArr
 			kw_addArr := strings.Split(comma, "&&")
 			if len(kw_addArr) == 1 { //,
@@ -132,6 +141,7 @@ func InitTags() {
 				if tmp_kw != "" {
 					if LetterCase.MatchString(tmp_kw) {
 						tmp_kw = strings.ToUpper(tmp_kw)
+						kw.KeyWordMap[len(kw.KeyReg)] = true
 					}
 					kw.KeyReg = append(kw.KeyReg, regexp.MustCompile(tmp_kw))
 				}
@@ -139,9 +149,8 @@ func InitTags() {
 				for _, and := range kw_addArr {
 					if and != "" {
 						if LetterCase.MatchString(and) {
-							qu.Debug(and)
 							and = strings.ToUpper(and)
-							qu.Debug(and)
+							kw.KeyWordMap[len(kw.KeyReg)] = true
 						}
 						kw.KeyReg = append(kw.KeyReg, regexp.MustCompile(and))
 					}
@@ -161,7 +170,7 @@ func InitTags() {
 			}
 		}
 	}
-	// for i, r := range Rules {
-	// 	qu.Debug(i, r.TagName, r.KW.KeyReg, len(r.KW.KeyReg), r.KW.MatchType, len(r.KW.MatchType), r.AW.KeyReg, len(r.AW.KeyReg), r.AW.MatchType, len(r.AW.MatchType))
-	// }
+	for i, r := range Rules {
+		qu.Debug(i, r.TagName, r.KW.KeyReg, len(r.KW.KeyReg), r.KW.MatchType, len(r.KW.MatchType), r.KW.KeyWordMap, "---", r.AW.KeyReg, len(r.AW.KeyReg), r.AW.MatchType, len(r.AW.MatchType), r.AW.AddWordMap)
+	}
 }

+ 15 - 6
src/tagging/src/main.go → src/tagservice/src/main.go

@@ -3,11 +3,11 @@ package main
 
 import (
 	"encoding/json"
-	"errors"
 	"log"
 	mongo "qfw/mongodb"
 	qu "qfw/util"
 	"qfw/util/elastic"
+	"regexp"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -29,9 +29,10 @@ var (
 	sp                    chan bool
 )
 var lock *sync.Mutex
-var EOS = errors.New("EOS")
 
-var updatelock *sync.Mutex = new(sync.Mutex)                  //保存锁
+//var EOS = errors.New("EOS")
+//var updatelock *sync.Mutex = new(sync.Mutex)                  //保存锁
+var FilteReg = regexp.MustCompile("[()(){}-]*")
 var UpdateCache = make(chan map[string]string, 500)           //更新集合
 var UpdataMgoCache = make(chan []map[string]interface{}, 500) //更新集合
 
@@ -109,7 +110,7 @@ func main() {
 						L:
 							for _, kwm := range r.KW.MatchType {
 								if text := qu.ObjToString(tmp[kwm]); text != "" {
-									text = strings.ToUpper(text) //文本中的英文全转为大写
+									text = ProcessData(text) //过滤数据
 									for _, kw_reg := range r.KW.KeyReg {
 										if kw_reg.MatchString(text) { //关键词匹配成功
 											//关键词匹配成功后,匹配附加词
@@ -171,6 +172,12 @@ func main() {
 	<-w
 }
 
+func ProcessData(text string) string {
+	text = strings.ToUpper(text)               //文本中的英文全转为大写
+	text = FilteReg.ReplaceAllString(text, "") //去除一些特殊符号
+	return text
+}
+
 //更新es
 func TimeUpdate() {
 	log.Println("Save...")
@@ -293,6 +300,7 @@ func main_es_one() {
 						L:
 							for _, kwm := range r.KW.MatchType {
 								if text := qu.ObjToString(tmp[kwm]); text != "" {
+									text = ProcessData(text)
 									for _, kw_reg := range r.KW.KeyReg {
 										if kw_reg.MatchString(text) { //关键词匹配成功
 											kwMatch = true
@@ -372,6 +380,7 @@ func main_mongo_one() {
 			L:
 				for _, kwm := range r.KW.MatchType {
 					if text := qu.ObjToString(tmp[kwm]); text != "" {
+						text = ProcessData(text)
 						for _, kw_reg := range r.KW.KeyReg {
 							if kw_reg.MatchString(text) { //关键词匹配成功
 								//qu.Debug(kwm, kw_reg)
@@ -464,7 +473,7 @@ func main_mongo_many() {
 			L:
 				for _, kwm := range r.KW.MatchType {
 					if text := qu.ObjToString(tmp[kwm]); text != "" {
-						text = strings.ToUpper(text) //文本中的英文全转为大写
+						text = ProcessData(text)
 						for _, kw_reg := range r.KW.KeyReg {
 							if kw_reg.MatchString(text) { //关键词匹配成功
 								//qu.Debug(kwm, kw_reg)
@@ -567,7 +576,7 @@ func main_mongo_matchrecords() {
 				//L:
 				for _, kwm := range r.KW.MatchType {
 					if text := qu.ObjToString(tmp[kwm]); text != "" {
-						text = strings.ToUpper(text) //文本中的英文全转为大写
+						text = ProcessData(text)
 						for _, kw_reg := range r.KW.KeyReg {
 							if kw_reg.MatchString(text) { //关键词匹配成功
 								if len(r.AW.KeyReg) == 0 { //无附加词

+ 0 - 0
src/tagging/src/main.go1 → src/tagservice/src/main.go1


+ 0 - 0
src/tagging/src/tag.xlsx → src/tagservice/src/tag.xlsx


+ 71 - 0
src/udptask/udptask.go

@@ -0,0 +1,71 @@
+package udptask
+
+import (
+	"encoding/json"
+	"log"
+	mu "mfw/util"
+	"net"
+	qu "qfw/util"
+	"util"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+var (
+	Udpclient mu.UdpClient //udp对象
+	IsTaskOK  bool
+	Timeout   = 3
+)
+
+func InitUdp() {
+	updport := util.Sysconfig["udpport"].(string)
+	Udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
+	log.Println("Udp服务监听", updport)
+	Udpclient.Listen(processUdpMsg)
+}
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	switch act {
+	case mu.OP_TYPE_DATA:
+		var rep map[string]interface{}
+		err := json.Unmarshal(data, &rep)
+		if err != nil { //测试接收
+
+			go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
+		} else {
+			by, _ := json.Marshal(map[string]interface{}{
+				"taskid": rep["taskid"],
+				"stype":  rep["stype"],
+			})
+			go Udpclient.WriteUdp(by, mu.OP_NOOP, ra) //回应上一个节点
+
+		}
+	case mu.OP_NOOP: //下个节点回应
+		log.Println("接收回应:", string(data))
+		var rep map[string]interface{}
+		err := json.Unmarshal(data, &rep)
+		if err != nil { //空数据
+			//
+		} else { //正确
+			if qu.ObjToString(rep["stype"]) == "startTask" {
+				updateMgoIsuse("1", qu.ObjToString(rep["taskid"]))
+			} else if rep["stype"] == "stopTask" {
+				updateMgoIsuse("0", qu.ObjToString(rep["taskid"]))
+			} else {
+
+			}
+		}
+	}
+
+}
+
+func updateMgoIsuse(isu string, id string) {
+	set := bson.M{
+		"$set": bson.M{
+			"s_isuse": isu,
+		},
+	}
+	log.Println(id)
+	util.Mgo.UpdateById("taskinfo", id, set)
+	IsTaskOK = true
+}

+ 313 - 0
src/util/task.go

@@ -0,0 +1,313 @@
+package util
+
+import (
+	"log"
+	mongo "qfw/mongodb"
+	qu "qfw/util"
+	"qfw/util/elastic"
+	"regexp"
+	"strings"
+	"time"
+)
+
+//匹配方式map
+var task_export_matchtype = map[string]interface{}{
+	"1": "title",
+	"2": "detail",
+	"3": "purchasing",
+	"4": "filetext",
+	"5": "projectname",
+	"6": "buyer",
+	"7": "s_winner",
+}
+var LetterCase = regexp.MustCompile("[A-Za-z]")
+
+//任务模型
+type Task struct {
+	//任务信息
+	Id      string      //任务id
+	StartId string      //起始id
+	From    string      //数据出处(es mongodb)
+	To      string      //数据更新去处(es mongodb)
+	Index   string      //es index
+	Itype   string      //es type
+	MgoColl string      //mgo coll
+	Rules   []*Tag_Rule //任务相关规则(对数据打标签)
+	IsRun   bool        //是否运行
+	//存储相关
+	Mgo            *mongo.MongodbSim             //mgo
+	Es             *elastic.Elastic              //es
+	EsUpdateCache  chan map[string]string        //es更新集合
+	MgoUpdataCache chan []map[string]interface{} //mgo更新集合
+	SP             chan bool                     //批量更新时的线程控制
+
+}
+
+//规则
+type Tag_Rule struct {
+	KW      *KeyWord
+	AW      *AddWord
+	TagName string
+}
+
+//关键词类型
+type KeyWord struct {
+	KeyReg     []*regexp.Regexp
+	MatchType  []string     //关键词的匹配方式
+	KeyWordMap map[int]bool //记录KeyReg中字母规则
+}
+
+//附加词类型
+type AddWord struct {
+	KeyReg     []*regexp.Regexp
+	MatchType  []string     //附加词的匹配方式
+	AddWordMap map[int]bool //记录KeyReg中字母规则
+}
+
+//更新es
+func (t *Task) UpdateEs() {
+	log.Println("Es Save...")
+	arru := make([]map[string]string, 200)
+	indexu := 0
+	for {
+		select {
+		case v := <-t.EsUpdateCache:
+			arru[indexu] = v
+			indexu++
+			if indexu == 200 {
+				t.SP <- true
+				go func(arru []map[string]string) {
+					defer func() {
+						<-t.SP
+					}()
+					elastic.BulkUpdateArr(t.Index, t.Itype, arru)
+				}(arru)
+				arru = make([]map[string]string, 200)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				t.SP <- true
+				go func(arru []map[string]string) {
+					defer func() {
+						<-t.SP
+					}()
+					elastic.BulkUpdateArr(t.Index, t.Itype, arru)
+				}(arru[:indexu])
+				arru = make([]map[string]string, 200)
+				indexu = 0
+			}
+		}
+	}
+}
+
+//更新mongo
+func (t *Task) UpdateMgo() {
+	log.Println("Mgo Save...")
+	arru := make([][]map[string]interface{}, 200)
+	indexu := 0
+	for {
+		select {
+		case v := <-t.MgoUpdataCache:
+			arru[indexu] = v
+			indexu++
+			if indexu == 200 {
+				t.SP <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-t.SP
+					}()
+					t.Mgo.UpdateBulk(t.MgoColl, arru...)
+				}(arru)
+				arru = make([][]map[string]interface{}, 200)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				t.SP <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-t.SP
+					}()
+					t.Mgo.UpdateBulk(t.MgoColl, arru...)
+				}(arru[:indexu])
+				arru = make([][]map[string]interface{}, 200)
+				indexu = 0
+			}
+		}
+	}
+}
+
+//初始化任务信息
+func InitTask(taskid string) {
+	t := &Task{}
+	data, _ := Mgo.FindById("taskinfo", taskid, nil)
+	t.Id = taskid
+	t.StartId = qu.ObjToString((*data)["s_startid"])
+	from := qu.ObjToString((*data)["s_fromtype"])
+	t.From = from
+	to := qu.ObjToString((*data)["s_totype"])
+	t.To = to
+	if from == to { //同库
+		url := qu.ObjToString((*data)["s_fromdburl"])
+		dbname := qu.ObjToString((*data)["s_fromdbname"])
+		coll := qu.ObjToString((*data)["s_fromdbcoll"])
+		if from == "es" { //es
+			t.InitEs(url, dbname, coll)
+		} else { //mgo
+			t.InitMgo(url, dbname, coll)
+		}
+	} else { //异库
+		fromdburl := qu.ObjToString((*data)["s_fromdburl"])
+		fromdbname := qu.ObjToString((*data)["s_fromdbname"])
+		fromdbcoll := qu.ObjToString((*data)["s_fromdbcoll"])
+		todburl := qu.ObjToString((*data)["s_todburl"])
+		todbname := qu.ObjToString((*data)["s_todbname"])
+		todbcoll := qu.ObjToString((*data)["s_todbcoll"])
+		if from == "es" {
+			t.InitEs(fromdburl, fromdbname, fromdbcoll)
+			t.InitMgo(todburl, todbname, todbcoll)
+		} else {
+			t.InitMgo(fromdburl, fromdbname, fromdbcoll)
+			t.InitEs(todburl, todbname, todbcoll)
+		}
+	}
+	t.EsUpdateCache = make(chan map[string]string, 500)
+	t.MgoUpdataCache = make(chan []map[string]interface{}, 500)
+	t.SP = make(chan bool, 5)
+	t.InitRules(qu.ObjToString((*data)["s_tasktype"])) //rules
+}
+
+//初始化Rules
+func (t *Task) InitRules(tasktype string) {
+	query := map[string]interface{}{
+		"i_isuse":    1, //启用状态
+		"s_tasktype": tasktype,
+		"b_delete":   false,
+	}
+	list, _ := Mgo.Find("tagrule", query, nil, `{"o_list":1,"s_tagname":1}`, false, -1, -1)
+	for _, l := range *list {
+		tagname := qu.ObjToString(l["s_tagname"])
+		o_list := l["o_list"].([]interface{})
+		for _, o := range o_list {
+			o_map := o.(map[string]interface{})
+			//附加词匹配方式
+			awm := qu.ObjToString(o_map["s_addkeymatch"])
+			awmArr := []string{}
+			for _, av := range strings.Split(awm, ",") {
+				if field := qu.ObjToString(task_export_matchtype[av]); field != "" {
+					awmArr = append(awmArr, field)
+				}
+			}
+			//附加词
+			tmp_aw := []*AddWord{}
+			addword := qu.ObjToString(o_map["s_addkey"])
+			aw_commaArr := strings.Split(addword, ",")
+			for _, comma := range aw_commaArr {
+				aw := &AddWord{}
+				aw.AddWordMap = make(map[int]bool)
+				aw.MatchType = awmArr
+				aw_addArr := strings.Split(comma, "&&")
+				if len(aw_addArr) == 1 { //,
+					tmp_aw := aw_addArr[0]
+					if tmp_aw != "" {
+						if LetterCase.MatchString(tmp_aw) { //判断附加词中是否有英文
+							tmp_aw = strings.ToUpper(tmp_aw) //附加词中有英文全部转为大写
+							aw.AddWordMap[len(aw.KeyReg)] = true
+						}
+						aw.KeyReg = append(aw.KeyReg, regexp.MustCompile(tmp_aw))
+					}
+				} else { //&&
+					for _, and := range aw_addArr {
+						if and != "" {
+							if LetterCase.MatchString(and) { //判断附加词中是否有英文
+								and = strings.ToUpper(and) //附加词中有英文全部转为大写
+								aw.AddWordMap[len(aw.KeyReg)] = true
+							}
+							aw.KeyReg = append(aw.KeyReg, regexp.MustCompile(and))
+						}
+					}
+				}
+				tmp_aw = append(tmp_aw, aw)
+			}
+			//关键词匹配方式
+			kwm := qu.ObjToString(o_map["s_keymatch"])
+			kwmArr := []string{}
+			for _, kv := range strings.Split(kwm, ",") {
+				if field := qu.ObjToString(task_export_matchtype[kv]); field != "" {
+					kwmArr = append(kwmArr, field)
+				}
+			}
+			//关键词
+			tmp_kw := []*KeyWord{}
+			keyword := qu.ObjToString(o_map["s_matchkey"])
+			kw_commaArr := strings.Split(keyword, ",")
+			for _, comma := range kw_commaArr {
+				kw := &KeyWord{}
+				kw.KeyWordMap = make(map[int]bool)
+				kw.MatchType = kwmArr
+				kw_addArr := strings.Split(comma, "&&")
+				if len(kw_addArr) == 1 { //,
+					tmp_kw := kw_addArr[0]
+					if tmp_kw != "" {
+						if LetterCase.MatchString(tmp_kw) {
+							tmp_kw = strings.ToUpper(tmp_kw)
+							kw.KeyWordMap[len(kw.KeyReg)] = true
+						}
+						kw.KeyReg = append(kw.KeyReg, regexp.MustCompile(tmp_kw))
+					}
+				} else { //&&
+					for _, and := range kw_addArr {
+						if and != "" {
+							if LetterCase.MatchString(and) {
+								and = strings.ToUpper(and)
+								kw.KeyWordMap[len(kw.KeyReg)] = true
+							}
+							kw.KeyReg = append(kw.KeyReg, regexp.MustCompile(and))
+						}
+					}
+				}
+				tmp_kw = append(tmp_kw, kw)
+			}
+
+			//组合
+			for _, tk := range tmp_kw {
+				for _, aw := range tmp_aw {
+					rule := &Tag_Rule{}
+					rule.KW = tk
+					rule.AW = aw
+					rule.TagName = tagname
+					t.Rules = append(t.Rules, rule)
+				}
+			}
+
+		}
+	}
+
+	// for i, r := range t.Rules {
+	// 	qu.Debug(i, r.TagName, r.KW.KeyReg, len(r.KW.KeyReg), r.KW.MatchType, len(r.KW.MatchType), r.KW.KeyWordMap, "---", r.AW.KeyReg, len(r.AW.KeyReg), r.AW.MatchType, len(r.AW.MatchType), r.AW.AddWordMap)
+	// }
+	// qu.Debug(t.Id, t.From, t.To, t.StartId)
+}
+
+//初始化mgo
+func (t *Task) InitMgo(url, dbname, coll string) {
+	t.Mgo = &mongo.MongodbSim{
+		MongodbAddr: url,
+		Size:        10,
+		DbName:      dbname,
+	}
+	t.Mgo.InitPool()
+	t.MgoColl = coll
+}
+
+//初始化es
+func (t *Task) InitEs(url, dbname, coll string) {
+	t.Es = &elastic.Elastic{
+		S_esurl: url,
+		I_size:  15,
+	}
+	t.Es.InitElasticSize()
+	t.Index = dbname
+	t.Itype = coll
+}