|
@@ -0,0 +1,313 @@
|
|
|
+package util
|
|
|
+
|
|
|
+import (
|
|
|
+ "log"
|
|
|
+ mongo "qfw/mongodb"
|
|
|
+ qu "qfw/util"
|
|
|
+ "qfw/util/elastic"
|
|
|
+ "regexp"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+//匹配方式map
|
|
|
+var task_export_matchtype = map[string]interface{}{
|
|
|
+ "1": "title",
|
|
|
+ "2": "detail",
|
|
|
+ "3": "purchasing",
|
|
|
+ "4": "filetext",
|
|
|
+ "5": "projectname",
|
|
|
+ "6": "buyer",
|
|
|
+ "7": "s_winner",
|
|
|
+}
|
|
|
+var LetterCase = regexp.MustCompile("[A-Za-z]")
|
|
|
+
|
|
|
+//任务模型
|
|
|
+type Task struct {
|
|
|
+ //任务信息
|
|
|
+ Id string //任务id
|
|
|
+ StartId string //起始id
|
|
|
+ From string //数据出处(es mongodb)
|
|
|
+ To string //数据更新去处(es mongodb)
|
|
|
+ Index string //es index
|
|
|
+ Itype string //es type
|
|
|
+ MgoColl string //mgo coll
|
|
|
+ Rules []*Tag_Rule //任务相关规则(对数据打标签)
|
|
|
+ IsRun bool //是否运行
|
|
|
+ //存储相关
|
|
|
+ Mgo *mongo.MongodbSim //mgo
|
|
|
+ Es *elastic.Elastic //es
|
|
|
+ EsUpdateCache chan map[string]string //es更新集合
|
|
|
+ MgoUpdataCache chan []map[string]interface{} //mgo更新集合
|
|
|
+ SP chan bool //批量更新时的线程控制
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+//规则
|
|
|
+type Tag_Rule struct {
|
|
|
+ KW *KeyWord
|
|
|
+ AW *AddWord
|
|
|
+ TagName string
|
|
|
+}
|
|
|
+
|
|
|
+//关键词类型
|
|
|
+type KeyWord struct {
|
|
|
+ KeyReg []*regexp.Regexp
|
|
|
+ MatchType []string //关键词的匹配方式
|
|
|
+ KeyWordMap map[int]bool //记录KeyReg中字母规则
|
|
|
+}
|
|
|
+
|
|
|
+//附加词类型
|
|
|
+type AddWord struct {
|
|
|
+ KeyReg []*regexp.Regexp
|
|
|
+ MatchType []string //附加词的匹配方式
|
|
|
+ AddWordMap map[int]bool //记录KeyReg中字母规则
|
|
|
+}
|
|
|
+
|
|
|
+//更新es
|
|
|
+func (t *Task) UpdateEs() {
|
|
|
+ log.Println("Es Save...")
|
|
|
+ arru := make([]map[string]string, 200)
|
|
|
+ indexu := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-t.EsUpdateCache:
|
|
|
+ arru[indexu] = v
|
|
|
+ indexu++
|
|
|
+ if indexu == 200 {
|
|
|
+ t.SP <- true
|
|
|
+ go func(arru []map[string]string) {
|
|
|
+ defer func() {
|
|
|
+ <-t.SP
|
|
|
+ }()
|
|
|
+ elastic.BulkUpdateArr(t.Index, t.Itype, arru)
|
|
|
+ }(arru)
|
|
|
+ arru = make([]map[string]string, 200)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ case <-time.After(1000 * time.Millisecond):
|
|
|
+ if indexu > 0 {
|
|
|
+ t.SP <- true
|
|
|
+ go func(arru []map[string]string) {
|
|
|
+ defer func() {
|
|
|
+ <-t.SP
|
|
|
+ }()
|
|
|
+ elastic.BulkUpdateArr(t.Index, t.Itype, arru)
|
|
|
+ }(arru[:indexu])
|
|
|
+ arru = make([]map[string]string, 200)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//更新mongo
|
|
|
+func (t *Task) UpdateMgo() {
|
|
|
+ log.Println("Mgo Save...")
|
|
|
+ arru := make([][]map[string]interface{}, 200)
|
|
|
+ indexu := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-t.MgoUpdataCache:
|
|
|
+ arru[indexu] = v
|
|
|
+ indexu++
|
|
|
+ if indexu == 200 {
|
|
|
+ t.SP <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-t.SP
|
|
|
+ }()
|
|
|
+ t.Mgo.UpdateBulk(t.MgoColl, arru...)
|
|
|
+ }(arru)
|
|
|
+ arru = make([][]map[string]interface{}, 200)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ case <-time.After(1000 * time.Millisecond):
|
|
|
+ if indexu > 0 {
|
|
|
+ t.SP <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-t.SP
|
|
|
+ }()
|
|
|
+ t.Mgo.UpdateBulk(t.MgoColl, arru...)
|
|
|
+ }(arru[:indexu])
|
|
|
+ arru = make([][]map[string]interface{}, 200)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//初始化任务信息
|
|
|
+func InitTask(taskid string) {
|
|
|
+ t := &Task{}
|
|
|
+ data, _ := Mgo.FindById("taskinfo", taskid, nil)
|
|
|
+ t.Id = taskid
|
|
|
+ t.StartId = qu.ObjToString((*data)["s_startid"])
|
|
|
+ from := qu.ObjToString((*data)["s_fromtype"])
|
|
|
+ t.From = from
|
|
|
+ to := qu.ObjToString((*data)["s_totype"])
|
|
|
+ t.To = to
|
|
|
+ if from == to { //同库
|
|
|
+ url := qu.ObjToString((*data)["s_fromdburl"])
|
|
|
+ dbname := qu.ObjToString((*data)["s_fromdbname"])
|
|
|
+ coll := qu.ObjToString((*data)["s_fromdbcoll"])
|
|
|
+ if from == "es" { //es
|
|
|
+ t.InitEs(url, dbname, coll)
|
|
|
+ } else { //mgo
|
|
|
+ t.InitMgo(url, dbname, coll)
|
|
|
+ }
|
|
|
+ } else { //异库
|
|
|
+ fromdburl := qu.ObjToString((*data)["s_fromdburl"])
|
|
|
+ fromdbname := qu.ObjToString((*data)["s_fromdbname"])
|
|
|
+ fromdbcoll := qu.ObjToString((*data)["s_fromdbcoll"])
|
|
|
+ todburl := qu.ObjToString((*data)["s_todburl"])
|
|
|
+ todbname := qu.ObjToString((*data)["s_todbname"])
|
|
|
+ todbcoll := qu.ObjToString((*data)["s_todbcoll"])
|
|
|
+ if from == "es" {
|
|
|
+ t.InitEs(fromdburl, fromdbname, fromdbcoll)
|
|
|
+ t.InitMgo(todburl, todbname, todbcoll)
|
|
|
+ } else {
|
|
|
+ t.InitMgo(fromdburl, fromdbname, fromdbcoll)
|
|
|
+ t.InitEs(todburl, todbname, todbcoll)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ t.EsUpdateCache = make(chan map[string]string, 500)
|
|
|
+ t.MgoUpdataCache = make(chan []map[string]interface{}, 500)
|
|
|
+ t.SP = make(chan bool, 5)
|
|
|
+ t.InitRules(qu.ObjToString((*data)["s_tasktype"])) //rules
|
|
|
+}
|
|
|
+
|
|
|
+//初始化Rules
|
|
|
+func (t *Task) InitRules(tasktype string) {
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "i_isuse": 1, //启用状态
|
|
|
+ "s_tasktype": tasktype,
|
|
|
+ "b_delete": false,
|
|
|
+ }
|
|
|
+ list, _ := Mgo.Find("tagrule", query, nil, `{"o_list":1,"s_tagname":1}`, false, -1, -1)
|
|
|
+ for _, l := range *list {
|
|
|
+ tagname := qu.ObjToString(l["s_tagname"])
|
|
|
+ o_list := l["o_list"].([]interface{})
|
|
|
+ for _, o := range o_list {
|
|
|
+ o_map := o.(map[string]interface{})
|
|
|
+ //附加词匹配方式
|
|
|
+ awm := qu.ObjToString(o_map["s_addkeymatch"])
|
|
|
+ awmArr := []string{}
|
|
|
+ for _, av := range strings.Split(awm, ",") {
|
|
|
+ if field := qu.ObjToString(task_export_matchtype[av]); field != "" {
|
|
|
+ awmArr = append(awmArr, field)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //附加词
|
|
|
+ tmp_aw := []*AddWord{}
|
|
|
+ addword := qu.ObjToString(o_map["s_addkey"])
|
|
|
+ aw_commaArr := strings.Split(addword, ",")
|
|
|
+ for _, comma := range aw_commaArr {
|
|
|
+ aw := &AddWord{}
|
|
|
+ aw.AddWordMap = make(map[int]bool)
|
|
|
+ aw.MatchType = awmArr
|
|
|
+ aw_addArr := strings.Split(comma, "&&")
|
|
|
+ if len(aw_addArr) == 1 { //,
|
|
|
+ tmp_aw := aw_addArr[0]
|
|
|
+ if tmp_aw != "" {
|
|
|
+ if LetterCase.MatchString(tmp_aw) { //判断附加词中是否有英文
|
|
|
+ tmp_aw = strings.ToUpper(tmp_aw) //附加词中有英文全部转为大写
|
|
|
+ aw.AddWordMap[len(aw.KeyReg)] = true
|
|
|
+ }
|
|
|
+ aw.KeyReg = append(aw.KeyReg, regexp.MustCompile(tmp_aw))
|
|
|
+ }
|
|
|
+ } else { //&&
|
|
|
+ for _, and := range aw_addArr {
|
|
|
+ if and != "" {
|
|
|
+ if LetterCase.MatchString(and) { //判断附加词中是否有英文
|
|
|
+ and = strings.ToUpper(and) //附加词中有英文全部转为大写
|
|
|
+ aw.AddWordMap[len(aw.KeyReg)] = true
|
|
|
+ }
|
|
|
+ aw.KeyReg = append(aw.KeyReg, regexp.MustCompile(and))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tmp_aw = append(tmp_aw, aw)
|
|
|
+ }
|
|
|
+ //关键词匹配方式
|
|
|
+ kwm := qu.ObjToString(o_map["s_keymatch"])
|
|
|
+ kwmArr := []string{}
|
|
|
+ for _, kv := range strings.Split(kwm, ",") {
|
|
|
+ if field := qu.ObjToString(task_export_matchtype[kv]); field != "" {
|
|
|
+ kwmArr = append(kwmArr, field)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //关键词
|
|
|
+ tmp_kw := []*KeyWord{}
|
|
|
+ keyword := qu.ObjToString(o_map["s_matchkey"])
|
|
|
+ kw_commaArr := strings.Split(keyword, ",")
|
|
|
+ for _, comma := range kw_commaArr {
|
|
|
+ kw := &KeyWord{}
|
|
|
+ kw.KeyWordMap = make(map[int]bool)
|
|
|
+ kw.MatchType = kwmArr
|
|
|
+ kw_addArr := strings.Split(comma, "&&")
|
|
|
+ if len(kw_addArr) == 1 { //,
|
|
|
+ tmp_kw := kw_addArr[0]
|
|
|
+ if tmp_kw != "" {
|
|
|
+ if LetterCase.MatchString(tmp_kw) {
|
|
|
+ tmp_kw = strings.ToUpper(tmp_kw)
|
|
|
+ kw.KeyWordMap[len(kw.KeyReg)] = true
|
|
|
+ }
|
|
|
+ kw.KeyReg = append(kw.KeyReg, regexp.MustCompile(tmp_kw))
|
|
|
+ }
|
|
|
+ } else { //&&
|
|
|
+ for _, and := range kw_addArr {
|
|
|
+ if and != "" {
|
|
|
+ if LetterCase.MatchString(and) {
|
|
|
+ and = strings.ToUpper(and)
|
|
|
+ kw.KeyWordMap[len(kw.KeyReg)] = true
|
|
|
+ }
|
|
|
+ kw.KeyReg = append(kw.KeyReg, regexp.MustCompile(and))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tmp_kw = append(tmp_kw, kw)
|
|
|
+ }
|
|
|
+
|
|
|
+ //组合
|
|
|
+ for _, tk := range tmp_kw {
|
|
|
+ for _, aw := range tmp_aw {
|
|
|
+ rule := &Tag_Rule{}
|
|
|
+ rule.KW = tk
|
|
|
+ rule.AW = aw
|
|
|
+ rule.TagName = tagname
|
|
|
+ t.Rules = append(t.Rules, rule)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // for i, r := range t.Rules {
|
|
|
+ // qu.Debug(i, r.TagName, r.KW.KeyReg, len(r.KW.KeyReg), r.KW.MatchType, len(r.KW.MatchType), r.KW.KeyWordMap, "---", r.AW.KeyReg, len(r.AW.KeyReg), r.AW.MatchType, len(r.AW.MatchType), r.AW.AddWordMap)
|
|
|
+ // }
|
|
|
+ // qu.Debug(t.Id, t.From, t.To, t.StartId)
|
|
|
+}
|
|
|
+
|
|
|
+//初始化mgo
|
|
|
+func (t *Task) InitMgo(url, dbname, coll string) {
|
|
|
+ t.Mgo = &mongo.MongodbSim{
|
|
|
+ MongodbAddr: url,
|
|
|
+ Size: 10,
|
|
|
+ DbName: dbname,
|
|
|
+ }
|
|
|
+ t.Mgo.InitPool()
|
|
|
+ t.MgoColl = coll
|
|
|
+}
|
|
|
+
|
|
|
+//初始化es
|
|
|
+func (t *Task) InitEs(url, dbname, coll string) {
|
|
|
+ t.Es = &elastic.Elastic{
|
|
|
+ S_esurl: url,
|
|
|
+ I_size: 15,
|
|
|
+ }
|
|
|
+ t.Es.InitElasticSize()
|
|
|
+ t.Index = dbname
|
|
|
+ t.Itype = coll
|
|
|
+}
|