|
@@ -1,4 +1,4 @@
|
|
|
-package util
|
|
|
+package main
|
|
|
|
|
|
import (
|
|
|
"log"
|
|
@@ -7,7 +7,10 @@ import (
|
|
|
"qfw/util/elastic"
|
|
|
"regexp"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
+ "sync/atomic"
|
|
|
"time"
|
|
|
+ //"gopkg.in/mgo.v2/bson"
|
|
|
)
|
|
|
|
|
|
//匹配方式map
|
|
@@ -21,22 +24,30 @@ var task_export_matchtype = map[string]interface{}{
|
|
|
"7": "s_winner",
|
|
|
}
|
|
|
var LetterCase = regexp.MustCompile("[A-Za-z]")
|
|
|
+var FilteReg = regexp.MustCompile("[()(){}]*")
|
|
|
+var TaskList = make(map[string]*Task) //存储启动任务
|
|
|
+var Fields = map[string]interface{}{"title": 1, "detail": 1, "tagname": 1}
|
|
|
|
|
|
//任务模型
|
|
|
type Task struct {
|
|
|
//任务信息
|
|
|
- Id string //任务id
|
|
|
- StartId string //起始id
|
|
|
- From string //数据出处(es mongodb)
|
|
|
- To string //数据更新去处(es mongodb)
|
|
|
- Index string //es index
|
|
|
- Itype string //es type
|
|
|
- MgoColl string //mgo coll
|
|
|
- Rules []*Tag_Rule //任务相关规则(对数据打标签)
|
|
|
- IsRun bool //是否运行
|
|
|
+ Id string //任务id
|
|
|
+ StartId string //起始id
|
|
|
+ From string //数据出处(es mongodb)
|
|
|
+ //To string //数据更新去处(es mongodb)
|
|
|
+ Index string //es index
|
|
|
+ Itype string //es type
|
|
|
+ MgoDb string //mgo db
|
|
|
+ MgoColl string //mgo coll
|
|
|
+ Rules []*Rule //任务相关规则(对数据打标签)
|
|
|
+ IsRun bool //是否运行
|
|
|
+ IsIndex bool //是否同步es
|
|
|
//存储相关
|
|
|
+ Wg *sync.WaitGroup
|
|
|
+ Lock *sync.Mutex
|
|
|
Mgo *mongo.MongodbSim //mgo
|
|
|
Es *elastic.Elastic //es
|
|
|
+ DataChan chan bool //
|
|
|
EsUpdateCache chan map[string]string //es更新集合
|
|
|
MgoUpdataCache chan []map[string]interface{} //mgo更新集合
|
|
|
SP chan bool //批量更新时的线程控制
|
|
@@ -44,7 +55,7 @@ type Task struct {
|
|
|
}
|
|
|
|
|
|
//规则
|
|
|
-type Tag_Rule struct {
|
|
|
+type Rule struct {
|
|
|
KW *KeyWord
|
|
|
AW *AddWord
|
|
|
TagName string
|
|
@@ -64,113 +75,194 @@ type AddWord struct {
|
|
|
AddWordMap map[int]bool //记录KeyReg中字母规则
|
|
|
}
|
|
|
|
|
|
-//更新es
|
|
|
-func (t *Task) UpdateEs() {
|
|
|
- log.Println("Es Save...")
|
|
|
- arru := make([]map[string]string, 200)
|
|
|
- indexu := 0
|
|
|
- for {
|
|
|
- select {
|
|
|
- case v := <-t.EsUpdateCache:
|
|
|
- arru[indexu] = v
|
|
|
- indexu++
|
|
|
- if indexu == 200 {
|
|
|
- t.SP <- true
|
|
|
- go func(arru []map[string]string) {
|
|
|
- defer func() {
|
|
|
- <-t.SP
|
|
|
- }()
|
|
|
- elastic.BulkUpdateArr(t.Index, t.Itype, arru)
|
|
|
- }(arru)
|
|
|
- arru = make([]map[string]string, 200)
|
|
|
- indexu = 0
|
|
|
- }
|
|
|
- case <-time.After(1000 * time.Millisecond):
|
|
|
- if indexu > 0 {
|
|
|
- t.SP <- true
|
|
|
- go func(arru []map[string]string) {
|
|
|
- defer func() {
|
|
|
- <-t.SP
|
|
|
- }()
|
|
|
- elastic.BulkUpdateArr(t.Index, t.Itype, arru)
|
|
|
- }(arru[:indexu])
|
|
|
- arru = make([]map[string]string, 200)
|
|
|
- indexu = 0
|
|
|
- }
|
|
|
- }
|
|
|
+func StartTask(taskid string) {
|
|
|
+ t := &Task{}
|
|
|
+ t.InitTask(taskid)
|
|
|
+ t.IsRun = true //更新任务状态
|
|
|
+ qu.Debug(t.Id, t.From, t.Index, t.Itype, t.IsRun, t.IsIndex)
|
|
|
+ TaskList[taskid] = t //加入map
|
|
|
+ if t.From == "mongodb" {
|
|
|
+ t.RunMgo() //增量
|
|
|
+ //go t.UpdateMgo() //开启mgo保存
|
|
|
+ } else {
|
|
|
+ t.RunEs()
|
|
|
+ go t.UpdateEs() //开启es保存
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-//更新mongo
|
|
|
-func (t *Task) UpdateMgo() {
|
|
|
- log.Println("Mgo Save...")
|
|
|
- arru := make([][]map[string]interface{}, 200)
|
|
|
- indexu := 0
|
|
|
- for {
|
|
|
- select {
|
|
|
- case v := <-t.MgoUpdataCache:
|
|
|
- arru[indexu] = v
|
|
|
- indexu++
|
|
|
- if indexu == 200 {
|
|
|
- t.SP <- true
|
|
|
- go func(arru [][]map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-t.SP
|
|
|
- }()
|
|
|
- t.Mgo.UpdateBulk(t.MgoColl, arru...)
|
|
|
- }(arru)
|
|
|
- arru = make([][]map[string]interface{}, 200)
|
|
|
- indexu = 0
|
|
|
+//mgo增量
|
|
|
+func (t *Task) RunMgo() {
|
|
|
+ if TaskList[t.Id] == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ //oldId := t.StartId //记录起始id
|
|
|
+ queryOne := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gte": qu.StringTOBsonId(t.StartId),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ data, _ := t.Mgo.Find(t.MgoColl, queryOne, `{"_id":-1}`, nil, false, 0, 1) //找最后一条数据
|
|
|
+ endId := qu.BsonIdToSId((*data)[0]["_id"])
|
|
|
+ if endId <= t.StartId { //判断id
|
|
|
+ return
|
|
|
+ }
|
|
|
+ sid := t.StartId
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": qu.StringTOBsonId(sid),
|
|
|
+ "$lte": qu.StringTOBsonId(endId),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ sess := t.Mgo.GetMgoConn()
|
|
|
+ defer t.Mgo.DestoryMongoConn(sess)
|
|
|
+ count, _ := sess.DB(t.MgoDb).C(t.MgoColl).Find(&query).Count()
|
|
|
+ log.Println("查询语句:", query, "查询总数:", count)
|
|
|
+ it := sess.DB(t.MgoDb).C(t.MgoColl).Find(&query).Select(Fields).Sort("_id").Iter()
|
|
|
+
|
|
|
+ t.Wg = &sync.WaitGroup{}
|
|
|
+ t.Lock = &sync.Mutex{}
|
|
|
+ t.DataChan = make(chan bool, 20)
|
|
|
+ update := [][]map[string]interface{}{}
|
|
|
+ //遍历
|
|
|
+ index := 0
|
|
|
+ n := int64(0)
|
|
|
+ for tmp := map[string]interface{}{}; it.Next(&tmp); index++ {
|
|
|
+ if index%500 == 0 {
|
|
|
+ log.Println("current:", index)
|
|
|
+ }
|
|
|
+ tid := qu.BsonIdToSId(tmp["_id"])
|
|
|
+ t.Wg.Add(1)
|
|
|
+ t.DataChan <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-t.DataChan
|
|
|
+ t.Wg.Done()
|
|
|
+ }()
|
|
|
+ tmpTagName := map[string]bool{}
|
|
|
+ for _, r := range t.Rules {
|
|
|
+ //关键词匹配
|
|
|
+ L:
|
|
|
+ for _, kwm := range r.KW.MatchType {
|
|
|
+ if text := qu.ObjToString(tmp[kwm]); text != "" {
|
|
|
+ text = ProcessData(text)
|
|
|
+ for _, kw_reg := range r.KW.KeyReg {
|
|
|
+ if kw_reg.MatchString(text) { //关键词匹配成功
|
|
|
+ //qu.Debug(kwm, kw_reg)
|
|
|
+ //kwMatch = true
|
|
|
+ //关键词匹配成功后,匹配附加词
|
|
|
+ //qu.Debug(len(r.AW.KeyReg), r.AW.KeyReg)
|
|
|
+ if len(r.AW.KeyReg) == 0 { //无附加词
|
|
|
+ //adMatch = true
|
|
|
+ tmpTagName[r.TagName] = true
|
|
|
+ break L
|
|
|
+ } else {
|
|
|
+ for _, awm := range r.AW.MatchType {
|
|
|
+ if text := qu.ObjToString(tmp[awm]); text != "" {
|
|
|
+ for _, aw_reg := range r.AW.KeyReg {
|
|
|
+ if aw_reg.MatchString(text) { //附加词匹配成功
|
|
|
+ //adMatch = true
|
|
|
+ tmpTagName[r.TagName] = true
|
|
|
+ break L
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- case <-time.After(1000 * time.Millisecond):
|
|
|
- if indexu > 0 {
|
|
|
- t.SP <- true
|
|
|
- go func(arru [][]map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-t.SP
|
|
|
- }()
|
|
|
- t.Mgo.UpdateBulk(t.MgoColl, arru...)
|
|
|
- }(arru[:indexu])
|
|
|
- arru = make([][]map[string]interface{}, 200)
|
|
|
- indexu = 0
|
|
|
+ if len(tmpTagName) > 0 {
|
|
|
+ tagname_arr := []string{}
|
|
|
+ for tagname, _ := range tmpTagName {
|
|
|
+ tagname_arr = append(tagname_arr, tagname)
|
|
|
+ }
|
|
|
+
|
|
|
+ atomic.AddInt64(&n, +1) //n++
|
|
|
+
|
|
|
+ t.Lock.Lock()
|
|
|
+ idAndSet := []map[string]interface{}{}
|
|
|
+ _id := map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ }
|
|
|
+ set := map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "tagname": strings.Join(tagname_arr, ","),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ idAndSet = append(idAndSet, _id) //第一个为查询条件
|
|
|
+ idAndSet = append(idAndSet, set) //第二个为更新内容
|
|
|
+
|
|
|
+ update = append(update, idAndSet)
|
|
|
+ if len(update) > 500 {
|
|
|
+ t.Mgo.UpdateBulk(t.MgoColl, update...)
|
|
|
+ update = [][]map[string]interface{}{} //更新后把数据置空
|
|
|
+ }
|
|
|
+ t.Lock.Unlock()
|
|
|
}
|
|
|
+ }(tmp)
|
|
|
+ if tid > t.StartId {
|
|
|
+ t.StartId = tid
|
|
|
}
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ t.Wg.Wait()
|
|
|
+ t.Lock.Lock()
|
|
|
+ if len(update) > 0 {
|
|
|
+ t.Mgo.UpdateBulk(t.MgoColl, update...)
|
|
|
+ update = [][]map[string]interface{}{} //更新后把数据置空
|
|
|
}
|
|
|
+ t.Lock.Unlock()
|
|
|
+ log.Println("Update Count:", n)
|
|
|
+ //更新起始id
|
|
|
+ setid := map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "s_startid": t.StartId,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ go Mgo.Update("taskinfo", `{"_id":"`+t.Id+`"}`, setid, false, false)
|
|
|
+ //是否同步es
|
|
|
+ if t.IsIndex {
|
|
|
+ go processEs(sid, endId)
|
|
|
+ }
|
|
|
+ time.AfterFunc(time.Minute*5, t.RunMgo)
|
|
|
+}
|
|
|
+
|
|
|
+func (t *Task) RunEs() {
|
|
|
+
|
|
|
}
|
|
|
|
|
|
//初始化任务信息
|
|
|
-func InitTask(taskid string) {
|
|
|
- t := &Task{}
|
|
|
+func (t *Task) InitTask(taskid string) {
|
|
|
data, _ := Mgo.FindById("taskinfo", taskid, nil)
|
|
|
t.Id = taskid
|
|
|
t.StartId = qu.ObjToString((*data)["s_startid"])
|
|
|
from := qu.ObjToString((*data)["s_fromtype"])
|
|
|
t.From = from
|
|
|
- to := qu.ObjToString((*data)["s_totype"])
|
|
|
- t.To = to
|
|
|
- if from == to { //同库
|
|
|
- url := qu.ObjToString((*data)["s_fromdburl"])
|
|
|
- dbname := qu.ObjToString((*data)["s_fromdbname"])
|
|
|
- coll := qu.ObjToString((*data)["s_fromdbcoll"])
|
|
|
- if from == "es" { //es
|
|
|
- t.InitEs(url, dbname, coll)
|
|
|
- } else { //mgo
|
|
|
- t.InitMgo(url, dbname, coll)
|
|
|
+ url := qu.ObjToString((*data)["s_fromdburl"])
|
|
|
+ dbname := qu.ObjToString((*data)["s_fromdbname"])
|
|
|
+ coll := qu.ObjToString((*data)["s_fromdbcoll"])
|
|
|
+ if from == "mongodb" { //初始化mgo
|
|
|
+ t.Mgo = &mongo.MongodbSim{
|
|
|
+ MongodbAddr: url,
|
|
|
+ Size: 10,
|
|
|
+ DbName: dbname,
|
|
|
}
|
|
|
- } else { //异库
|
|
|
- fromdburl := qu.ObjToString((*data)["s_fromdburl"])
|
|
|
- fromdbname := qu.ObjToString((*data)["s_fromdbname"])
|
|
|
- fromdbcoll := qu.ObjToString((*data)["s_fromdbcoll"])
|
|
|
- todburl := qu.ObjToString((*data)["s_todburl"])
|
|
|
- todbname := qu.ObjToString((*data)["s_todbname"])
|
|
|
- todbcoll := qu.ObjToString((*data)["s_todbcoll"])
|
|
|
- if from == "es" {
|
|
|
- t.InitEs(fromdburl, fromdbname, fromdbcoll)
|
|
|
- t.InitMgo(todburl, todbname, todbcoll)
|
|
|
- } else {
|
|
|
- t.InitMgo(fromdburl, fromdbname, fromdbcoll)
|
|
|
- t.InitEs(todburl, todbname, todbcoll)
|
|
|
+ t.Mgo.InitPool()
|
|
|
+ t.MgoDb = dbname
|
|
|
+ t.MgoColl = coll
|
|
|
+ if s_synces := qu.ObjToString((*data)["s_synces"]); s_synces == "1" {
|
|
|
+ t.IsIndex = true //mgo打标签是否同步es
|
|
|
}
|
|
|
+ } else { //初始化es
|
|
|
+ t.Es = &elastic.Elastic{
|
|
|
+ S_esurl: url,
|
|
|
+ I_size: 15,
|
|
|
+ }
|
|
|
+ t.Es.InitElasticSize()
|
|
|
+ t.Index = dbname
|
|
|
+ t.Itype = coll
|
|
|
}
|
|
|
t.EsUpdateCache = make(chan map[string]string, 500)
|
|
|
t.MgoUpdataCache = make(chan []map[string]interface{}, 500)
|
|
@@ -273,7 +365,7 @@ func (t *Task) InitRules(tasktype string) {
|
|
|
//组合
|
|
|
for _, tk := range tmp_kw {
|
|
|
for _, aw := range tmp_aw {
|
|
|
- rule := &Tag_Rule{}
|
|
|
+ rule := &Rule{}
|
|
|
rule.KW = tk
|
|
|
rule.AW = aw
|
|
|
rule.TagName = tagname
|
|
@@ -284,30 +376,88 @@ func (t *Task) InitRules(tasktype string) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // for i, r := range t.Rules {
|
|
|
- // qu.Debug(i, r.TagName, r.KW.KeyReg, len(r.KW.KeyReg), r.KW.MatchType, len(r.KW.MatchType), r.KW.KeyWordMap, "---", r.AW.KeyReg, len(r.AW.KeyReg), r.AW.MatchType, len(r.AW.MatchType), r.AW.AddWordMap)
|
|
|
- // }
|
|
|
- // qu.Debug(t.Id, t.From, t.To, t.StartId)
|
|
|
+ for i, r := range t.Rules {
|
|
|
+ qu.Debug(i, r.TagName, r.KW.KeyReg, len(r.KW.KeyReg), r.KW.MatchType, len(r.KW.MatchType), r.KW.KeyWordMap, "---", r.AW.KeyReg, len(r.AW.KeyReg), r.AW.MatchType, len(r.AW.MatchType), r.AW.AddWordMap)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-//初始化mgo
|
|
|
-func (t *Task) InitMgo(url, dbname, coll string) {
|
|
|
- t.Mgo = &mongo.MongodbSim{
|
|
|
- MongodbAddr: url,
|
|
|
- Size: 10,
|
|
|
- DbName: dbname,
|
|
|
+//处理文本
|
|
|
+func ProcessData(text string) string {
|
|
|
+ text = strings.ToUpper(text) //文本中的英文全转为大写
|
|
|
+ text = FilteReg.ReplaceAllString(text, "") //去除一些特殊符号
|
|
|
+ return text
|
|
|
+}
|
|
|
+
|
|
|
+//更新es
|
|
|
+func (t *Task) UpdateEs() {
|
|
|
+ log.Println("Es Save...")
|
|
|
+ arru := make([]map[string]string, 200)
|
|
|
+ indexu := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-t.EsUpdateCache:
|
|
|
+ arru[indexu] = v
|
|
|
+ indexu++
|
|
|
+ if indexu == 200 {
|
|
|
+ t.SP <- true
|
|
|
+ go func(arru []map[string]string) {
|
|
|
+ defer func() {
|
|
|
+ <-t.SP
|
|
|
+ }()
|
|
|
+ elastic.BulkUpdateArr(t.Index, t.Itype, arru)
|
|
|
+ }(arru)
|
|
|
+ arru = make([]map[string]string, 200)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ case <-time.After(1000 * time.Millisecond):
|
|
|
+ if indexu > 0 {
|
|
|
+ t.SP <- true
|
|
|
+ go func(arru []map[string]string) {
|
|
|
+ defer func() {
|
|
|
+ <-t.SP
|
|
|
+ }()
|
|
|
+ elastic.BulkUpdateArr(t.Index, t.Itype, arru)
|
|
|
+ }(arru[:indexu])
|
|
|
+ arru = make([]map[string]string, 200)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- t.Mgo.InitPool()
|
|
|
- t.MgoColl = coll
|
|
|
}
|
|
|
|
|
|
-//初始化es
|
|
|
-func (t *Task) InitEs(url, dbname, coll string) {
|
|
|
- t.Es = &elastic.Elastic{
|
|
|
- S_esurl: url,
|
|
|
- I_size: 15,
|
|
|
+//更新mongo
|
|
|
+func (t *Task) UpdateMgo() {
|
|
|
+ log.Println("Mgo Save...")
|
|
|
+ arru := make([][]map[string]interface{}, 200)
|
|
|
+ indexu := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-t.MgoUpdataCache:
|
|
|
+ arru[indexu] = v
|
|
|
+ indexu++
|
|
|
+ if indexu == 200 {
|
|
|
+ t.SP <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-t.SP
|
|
|
+ }()
|
|
|
+ t.Mgo.UpdateBulk(t.MgoColl, arru...)
|
|
|
+ }(arru)
|
|
|
+ arru = make([][]map[string]interface{}, 200)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ case <-time.After(1000 * time.Millisecond):
|
|
|
+ if indexu > 0 {
|
|
|
+ t.SP <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-t.SP
|
|
|
+ }()
|
|
|
+ t.Mgo.UpdateBulk(t.MgoColl, arru...)
|
|
|
+ }(arru[:indexu])
|
|
|
+ arru = make([][]map[string]interface{}, 200)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- t.Es.InitElasticSize()
|
|
|
- t.Index = dbname
|
|
|
- t.Itype = coll
|
|
|
}
|