|
@@ -1,793 +0,0 @@
|
|
|
-package job
|
|
|
-
|
|
|
-import (
|
|
|
- "encoding/json"
|
|
|
- "fmt"
|
|
|
- . "match/config"
|
|
|
- . "match/matcher"
|
|
|
- . "match/util"
|
|
|
- . "public"
|
|
|
- "qfw/util"
|
|
|
- "qfw/util/elastic"
|
|
|
- "qfw/util/mongodb"
|
|
|
- "qfw/util/redis"
|
|
|
- "sort"
|
|
|
- "strings"
|
|
|
- "sync"
|
|
|
- "time"
|
|
|
-
|
|
|
- "github.com/donnie4w/go-logger/logger"
|
|
|
- "gopkg.in/mgo.v2/bson"
|
|
|
-)
|
|
|
-
|
|
|
-const (
|
|
|
- DB = "bidding"
|
|
|
- MaxId = `{"query":{"filtered":{"filter":{"bool":{"must":{"range":{"id":{"gt":"%s"}}}}}}},"_source":["_id","comeintime"],"sort":{"id":"desc"},"from":0,"size":1}`
|
|
|
- ProjectQuery = `{"query":{"filtered":{"filter":{"term":{"list.infoid":"%s"}}}},"_source":["_id","list.infoid"],"sort":{"id":"desc"},"from":0,"size":1}`
|
|
|
-)
|
|
|
-
|
|
|
-type Project struct {
|
|
|
- Id string
|
|
|
- List_last_infoid string
|
|
|
-}
|
|
|
-
|
|
|
-type MatchJob struct {
|
|
|
- allProject *sync.Map
|
|
|
-}
|
|
|
-
|
|
|
-//定时任务,匹配数据,存库
|
|
|
-func (m *MatchJob) Execute() {
|
|
|
- defer util.Catch()
|
|
|
- logger.Info("开始匹配数据任务。。。", TaskConfig.LastId, TaskConfig.LastTime)
|
|
|
- startId := util.ObjToString(TaskConfig.LastId)
|
|
|
- //获取本次查询的最大id
|
|
|
- idQuery := ""
|
|
|
- if startId == "" {
|
|
|
- idQuery = strings.Replace(fmt.Sprintf(MaxId, startId), `"gt"`, `"gte"`, -1)
|
|
|
- } else {
|
|
|
- idQuery = fmt.Sprintf(MaxId, startId)
|
|
|
- }
|
|
|
- resId := elastic.Get(DB, DB, idQuery)
|
|
|
- endId := ""
|
|
|
- var endTime interface{}
|
|
|
- if resId != nil && *resId != nil && len(*resId) == 1 {
|
|
|
- endId = util.ObjToString((*resId)[0]["_id"])
|
|
|
- endTime = (*resId)[0]["endTime"]
|
|
|
- } else {
|
|
|
- logger.Info("获取本次查询的最大id的时候,未查找到数据!", idQuery)
|
|
|
- return
|
|
|
- }
|
|
|
- st, _ := time.ParseInLocation(util.Date_Full_Layout, TaskConfig.LastTime, time.Local)
|
|
|
- datas := m.LoadBidding(startId, endId, st.Unix())
|
|
|
- if datas == nil || len(*datas) == 0 {
|
|
|
- return
|
|
|
- }
|
|
|
- m.Start(datas)
|
|
|
- m.allProject = &sync.Map{}
|
|
|
- if endTime == nil {
|
|
|
- endTime = time.Now().Unix()
|
|
|
- }
|
|
|
- TaskConfig.LastTime = util.FormatDateWithObj(&endTime, util.Date_Full_Layout)
|
|
|
- TaskConfig.LastId = endId
|
|
|
- logger.Info("匹配数据任务结束。。。", TaskConfig.LastId, TaskConfig.LastTime)
|
|
|
- //
|
|
|
-}
|
|
|
-
|
|
|
-//加载数据到内存中
|
|
|
-func (m *MatchJob) LoadBidding(lastId, newId string, lastTime int64) *[]map[string]interface{} {
|
|
|
- defer util.Catch()
|
|
|
- c_query := map[string]interface{}{
|
|
|
- "publishtime": map[string]interface{}{
|
|
|
- "$gt": lastTime - 7*86400,
|
|
|
- },
|
|
|
- "extracttype": 1,
|
|
|
- }
|
|
|
- idQuery := map[string]interface{}{}
|
|
|
- if lastId != "" {
|
|
|
- idQuery["$gt"] = bson.ObjectIdHex(lastId)
|
|
|
- }
|
|
|
- if newId != "" {
|
|
|
- idQuery["$lte"] = bson.ObjectIdHex(newId)
|
|
|
- }
|
|
|
- if len(idQuery) > 0 {
|
|
|
- c_query["_id"] = idQuery
|
|
|
- }
|
|
|
- //c_query = map[string]interface{}{
|
|
|
- //"_id": bson.ObjectIdHex("5da702bfa5cb26b9b773e4d7"),
|
|
|
- //}
|
|
|
- logger.Info("开始加载", Bidding, "数据", c_query)
|
|
|
- var res []map[string]interface{}
|
|
|
- sess := mongodb.GetMgoConn()
|
|
|
- defer mongodb.DestoryMongoConn(sess)
|
|
|
- it := sess.DB(DbName).C(Bidding).Find(c_query).Select(map[string]interface{}{
|
|
|
- "title": 1,
|
|
|
- "detail": 1,
|
|
|
- "projectscope": 1,
|
|
|
- "publishtime": 1,
|
|
|
- "toptype": 1,
|
|
|
- "subtype": 1,
|
|
|
- "type": 1,
|
|
|
- "area": 1,
|
|
|
- "s_subscopeclass": 1,
|
|
|
- "city": 1,
|
|
|
- "buyerclass": 1,
|
|
|
- "jsondata": 1,
|
|
|
- }).Sort("_id").Iter()
|
|
|
- index := 0
|
|
|
- for temp := make(map[string]interface{}); it.Next(&temp); {
|
|
|
- _id := util.BsonIdToSId(temp["_id"])
|
|
|
- temp["_id"] = _id
|
|
|
- if util.ObjToString(temp["area"]) == "A" {
|
|
|
- temp["area"] = "全国"
|
|
|
- }
|
|
|
- res = append(res, temp)
|
|
|
- //信息缓存3天
|
|
|
- info := map[string]interface{}{}
|
|
|
- for _, v := range InfoSaveFields {
|
|
|
- if v == "_id" || temp[v] == nil {
|
|
|
- continue
|
|
|
- }
|
|
|
- info[v] = temp[v]
|
|
|
- }
|
|
|
- redis.Put(Pushcache_1, "info_"+_id, info, 259200)
|
|
|
- temp = make(map[string]interface{})
|
|
|
- index++
|
|
|
- if index%500 == 0 {
|
|
|
- logger.Info("加载", Bidding, "数据:", index)
|
|
|
- }
|
|
|
- }
|
|
|
- logger.Info(Bidding, "数据已经加载结束。。。", index)
|
|
|
- return &res
|
|
|
-}
|
|
|
-
|
|
|
-func (m *MatchJob) InitEnt() {
|
|
|
- //初始化企业
|
|
|
- InitEnts(Mysql)
|
|
|
- //初始化部门
|
|
|
- InitEntDepts(Mysql)
|
|
|
- InitEntDeptParents(Mysql)
|
|
|
- //初始化人员
|
|
|
- InitEntUsers(Mysql)
|
|
|
- //初始化分发规则
|
|
|
- m.InitEntDistribute()
|
|
|
-}
|
|
|
-func (m *MatchJob) ClearEnt() {
|
|
|
- Ents = map[int]*Ent{}
|
|
|
- EntDepts = map[int]*EntDept{}
|
|
|
- EntParentDept = map[int][]*EntDeptParent{}
|
|
|
- EntChildDept = map[int][]*EntDeptParent{}
|
|
|
- EntUsers = map[int]*EntUser{}
|
|
|
- EntDeptDis = map[int][]*EntDistribute{}
|
|
|
-}
|
|
|
-func (m *MatchJob) Start(datas *[]map[string]interface{}) {
|
|
|
- defer util.Catch()
|
|
|
- defer m.ClearEnt()
|
|
|
- m.InitEnt()
|
|
|
- subSet := m.LoadSubSet()
|
|
|
- if subSet != nil && len(subSet.EntRules) > 0 {
|
|
|
- m.ToMatch(subSet, datas)
|
|
|
- }
|
|
|
- m.allProject = &sync.Map{}
|
|
|
-}
|
|
|
-
|
|
|
-func (m *MatchJob) LoadSubSet() *EntSubSet {
|
|
|
- defer util.Catch()
|
|
|
- entSubSet := NewEntSubSet()
|
|
|
- //标题匹配
|
|
|
- title_key := make(map[string]*[]*EntRule)
|
|
|
- title_notkey := make(map[string]*[]*EntRule)
|
|
|
- //正文匹配
|
|
|
- detail_key := make(map[string]*[]*EntRule)
|
|
|
- detail_notkey := make(map[string]*[]*EntRule)
|
|
|
- //
|
|
|
- sess := mongodb.GetMgoConn()
|
|
|
- defer mongodb.DestoryMongoConn(sess)
|
|
|
- query := map[string]interface{}{}
|
|
|
- if len(Config.TestQuery) > 0 {
|
|
|
- for k, v := range Config.TestQuery {
|
|
|
- query[k] = v
|
|
|
- }
|
|
|
- }
|
|
|
- logger.Info("加载订阅设置", query)
|
|
|
- it := sess.DB(DbName).C(Entniche_rule).Find(query).Select(map[string]interface{}{
|
|
|
- "i_entid": 1,
|
|
|
- "i_deptid": 1,
|
|
|
- "i_userid": 1,
|
|
|
- "o_entniche": 1,
|
|
|
- }).Iter()
|
|
|
- for _temp := make(map[string]interface{}); it.Next(&_temp); {
|
|
|
- func(temp map[string]interface{}) {
|
|
|
- entId := util.IntAll(temp["i_entid"])
|
|
|
- deptId := util.IntAll(temp["i_deptid"])
|
|
|
- userId := util.IntAll(temp["i_userid"])
|
|
|
- if entId == 0 || (deptId == 0 && userId == 0) {
|
|
|
- return
|
|
|
- } else if Ents[entId] == nil {
|
|
|
- logger.Info("没有找到该企业", entId)
|
|
|
- return
|
|
|
- }
|
|
|
- entName := Ents[entId].Name
|
|
|
- if deptId > 0 && EntDepts[deptId] == nil {
|
|
|
- logger.Info("没有找到该部门", entName, entId, deptId)
|
|
|
- return
|
|
|
- }
|
|
|
- deptName := ""
|
|
|
- if EntDepts[deptId] != nil {
|
|
|
- deptName = EntDepts[deptId].Name
|
|
|
- }
|
|
|
- subSet, _ := temp["o_entniche"].(map[string]interface{})
|
|
|
- if subSet == nil || len(subSet) == 0 {
|
|
|
- logger.Info("订阅设置为空,过滤掉", entName, entId, deptName, deptId, userId)
|
|
|
- return
|
|
|
- }
|
|
|
- //
|
|
|
- if deptId > 0 { //部门订阅
|
|
|
- if Ents[entId].Model != 1 {
|
|
|
- logger.Info("不是统一订阅模式,过滤掉", entName, entId, deptName, deptId)
|
|
|
- return
|
|
|
- } else if EntDepts[deptId] == nil {
|
|
|
- logger.Info("没有找到该部门,过滤掉", entName, entId, deptName, deptId)
|
|
|
- return
|
|
|
- } else if EntDepts[deptId].Subdis == 0 {
|
|
|
- logger.Info("该部门的订阅分发没有开启,过滤掉", entName, entId, deptName, deptId)
|
|
|
- return
|
|
|
- } else if EntDepts[deptId].Nodiff == 0 && EntDeptDis[deptId] == nil {
|
|
|
- logger.Info("该部门开启了订阅分发,没有开启全员无差别接收,但是没有分发规则,过滤掉", entName, entId, deptName, deptId)
|
|
|
- return
|
|
|
- }
|
|
|
- //如果我的上级部门打开了订阅分发,并且开启了全员无差别接收,本部门规则无效
|
|
|
- for _, dept := range EntParentDept[deptId] {
|
|
|
- if EntDepts[dept.Pid].Subdis == 1 && EntDepts[dept.Pid].Nodiff == 1 {
|
|
|
- logger.Info("我的上级部门", dept.Pid, "开启了订阅分发和全员无差别,过滤掉", entName, entId, deptName, deptId)
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- } else if userId > 0 { //个人订阅
|
|
|
- if Ents[entId].Model != 2 {
|
|
|
- logger.Info("不是个人订阅模式,过滤掉", entName, entId, userId)
|
|
|
- return
|
|
|
- } else if EntUsers[userId] == nil {
|
|
|
- logger.Info("没有找到该用户,过滤掉", entName, entId, userId)
|
|
|
- return
|
|
|
- } else if EntUsers[userId].Power == 0 {
|
|
|
- logger.Info("该用户没有权限,过滤掉", entName, entId, userId)
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- rule := &EntRule{
|
|
|
- EntId: entId,
|
|
|
- EntName: entName,
|
|
|
- DeptId: deptId,
|
|
|
- UserId: userId,
|
|
|
- AppPush: util.IntAllDef(subSet["i_apppush"], 1),
|
|
|
- MailPush: util.IntAll(subSet["i_mailpush"]),
|
|
|
- RateMode: util.IntAllDef(subSet["i_ratemode"], 2),
|
|
|
- MatchType: util.IntAllDef(subSet["i_matchway"], 1),
|
|
|
- }
|
|
|
- logger.Info("加载订阅设置", "entId", entId, "entName", entName, "deptId", deptId, "deptName", deptName, "userId", userId, "appPush", rule.AppPush, "mailPush", rule.MailPush, "rateMode", rule.RateMode, "matchType", rule.MatchType)
|
|
|
- if (deptId > 0 && EntDepts[deptId].Nodiff == 1) || userId > 0 {
|
|
|
- if m.InitSubSet(entSubSet, subSet, rule, &title_key, &title_notkey, &detail_key, &detail_notkey) {
|
|
|
- entSubSet.EntRules[rule] = true
|
|
|
- }
|
|
|
- }
|
|
|
- if deptId <= 0 {
|
|
|
- return
|
|
|
- }
|
|
|
- //分发规则
|
|
|
- if EntDepts[deptId].Nodiff == 1 {
|
|
|
- logger.Info("该部门开启了全员无差别,分发规则无效", entName, entId, deptName, deptId)
|
|
|
- return
|
|
|
- } else if EntDeptDis[deptId] == nil {
|
|
|
- logger.Info("该部门没有分发规则", entName, entId, deptName, deptId)
|
|
|
- return
|
|
|
- }
|
|
|
- itemMap := map[string]interface{}{}
|
|
|
- items, _ := subSet["a_items"].([]interface{})
|
|
|
- for _, v := range items {
|
|
|
- item, _ := v.(map[string]interface{})
|
|
|
- if item == nil {
|
|
|
- continue
|
|
|
- }
|
|
|
- item_name, _ := item["s_item"].(string)
|
|
|
- if item_name == "" {
|
|
|
- continue
|
|
|
- }
|
|
|
- itemMap[item_name] = item
|
|
|
- }
|
|
|
- for _, dis := range EntDeptDis[deptId] {
|
|
|
- child_rule := &EntRule{
|
|
|
- EntId: rule.EntId,
|
|
|
- EntName: rule.EntName,
|
|
|
- DeptId: rule.DeptId,
|
|
|
- AppPush: rule.AppPush,
|
|
|
- MailPush: rule.MailPush,
|
|
|
- RateMode: rule.RateMode,
|
|
|
- MatchType: rule.MatchType,
|
|
|
- DisId: dis.Id,
|
|
|
- }
|
|
|
- child_items := []interface{}{}
|
|
|
- for _, item_name := range dis.Items {
|
|
|
- if itemMap[item_name] == nil {
|
|
|
- continue
|
|
|
- }
|
|
|
- child_items = append(child_items, itemMap[item_name])
|
|
|
- }
|
|
|
- child_subSet := map[string]interface{}{}
|
|
|
- if len(dis.Buyerclass) > 0 {
|
|
|
- child_subSet["a_buyerclass"] = dis.Buyerclass
|
|
|
- }
|
|
|
- if len(dis.Area) > 0 {
|
|
|
- child_subSet["o_area"] = dis.Area
|
|
|
- }
|
|
|
- if len(child_items) > 0 {
|
|
|
- child_subSet["a_items"] = child_items
|
|
|
- }
|
|
|
- if len(child_subSet) == 0 {
|
|
|
- continue
|
|
|
- }
|
|
|
- if m.InitSubSet(entSubSet, child_subSet, child_rule, &title_key, &title_notkey, &detail_key, &detail_notkey) {
|
|
|
- entSubSet.EntRules[child_rule] = true
|
|
|
- }
|
|
|
- }
|
|
|
- }(_temp)
|
|
|
- _temp = make(map[string]interface{})
|
|
|
- }
|
|
|
- //
|
|
|
- title_pjob := &KeyDfa{
|
|
|
- Key_entRule: &title_key,
|
|
|
- Notkey_entRule: &title_notkey,
|
|
|
- }
|
|
|
- title_pjob.CreateDaf()
|
|
|
- detail_pjob := &KeyDfa{
|
|
|
- Key_entRule: &detail_key,
|
|
|
- Notkey_entRule: &detail_notkey,
|
|
|
- }
|
|
|
- detail_pjob.CreateDaf()
|
|
|
- entSubSet.Title_KeyDfa = title_pjob
|
|
|
- entSubSet.Detail_KeyDfa = detail_pjob
|
|
|
- return entSubSet
|
|
|
-}
|
|
|
-func (m *MatchJob) InitSubSet(entSubSet *EntSubSet, subSet map[string]interface{}, entRule *EntRule, title_key, title_notkey, detail_key, detail_notkey *map[string]*[]*EntRule) bool {
|
|
|
- keys := []string{} //过滤后的关键词
|
|
|
- notkeys := []string{}
|
|
|
- var allKeySet []*KeySet
|
|
|
- var err error
|
|
|
- items, _ := subSet["a_items"].([]interface{})
|
|
|
- for _, v := range items {
|
|
|
- item, _ := v.(map[string]interface{})
|
|
|
- var keySet []*KeySet
|
|
|
- keySet, err = m.GetKeySet(item["a_key"])
|
|
|
- if err != nil {
|
|
|
- break
|
|
|
- }
|
|
|
- allKeySet = append(allKeySet, keySet...)
|
|
|
- }
|
|
|
- if err != nil {
|
|
|
- logger.Error("订阅设置错误!", entRule.EntName, "entId", entRule.EntId, "deptId", entRule.DeptId, "userId", entRule.UserId, err)
|
|
|
- return false
|
|
|
- }
|
|
|
- key_notkey := map[string]map[string]bool{} //关键词所对应的排除词
|
|
|
- originalKeys := []string{} //原始关键词
|
|
|
- ////////////////
|
|
|
- for _, vs := range allKeySet {
|
|
|
- if vs == nil {
|
|
|
- logger.Error("订阅设置异常", entRule.EntName, "entId", entRule.EntId, "deptId", entRule.DeptId, "userId", entRule.UserId)
|
|
|
- continue
|
|
|
- }
|
|
|
- var vs_keys []string
|
|
|
- for _, vs_v := range [][]string{vs.Keys, vs.AppendKeys} {
|
|
|
- for _, vs_vv := range vs_v {
|
|
|
- vs_vv = strings.TrimSpace(vs_vv)
|
|
|
- if vs_vv == "" {
|
|
|
- continue
|
|
|
- }
|
|
|
- vs_keys = append(vs_keys, vs_vv)
|
|
|
- }
|
|
|
- }
|
|
|
- if len(vs_keys) == 0 {
|
|
|
- continue
|
|
|
- }
|
|
|
- key := strings.Join(vs_keys, "+")
|
|
|
- originalKeys = append(originalKeys, key)
|
|
|
- if FilterReg.String() != "" && FilterReg.MatchString(key) {
|
|
|
- continue
|
|
|
- } else if RetainReg.String() != "" && !RetainReg.MatchString(key) {
|
|
|
- continue
|
|
|
- }
|
|
|
- keys = append(keys, key)
|
|
|
- notkeys = append(notkeys, vs.NotKeys...)
|
|
|
- //转大写
|
|
|
- upperKey := strings.ToUpper(key)
|
|
|
- //建立与排除词的对应关系
|
|
|
- for _, notkey := range vs.NotKeys {
|
|
|
- upperNotkey := strings.ToUpper(notkey)
|
|
|
- if key_notkey[upperKey] == nil {
|
|
|
- key_notkey[upperKey] = map[string]bool{}
|
|
|
- }
|
|
|
- key_notkey[upperKey][upperNotkey] = true
|
|
|
- }
|
|
|
- }
|
|
|
- buyerclass, _ := subSet["a_buyerclass"].([]interface{})
|
|
|
- infotype, _ := subSet["a_infotype"].([]interface{})
|
|
|
- area, _ := subSet["o_area"].(map[string]interface{})
|
|
|
- if len(keys) == 0 && len(buyerclass) == 0 && len(infotype) == 0 && len(area) == 0 {
|
|
|
- return false
|
|
|
- }
|
|
|
- entRule.Keys = originalKeys
|
|
|
- entRule.Key_notkey = key_notkey
|
|
|
- entRule.O_entniche = &O_entniche{
|
|
|
- ProjectMatch: util.IntAllDef(subSet["i_projectmatch"], 1),
|
|
|
- }
|
|
|
- //采购单位行业
|
|
|
- //如果有关键词 有采购单位行业,行业加上“其它”
|
|
|
- if len(buyerclass) == 0 {
|
|
|
- entSubSet.Add("", entRule, &entSubSet.Buyerclass)
|
|
|
- } else {
|
|
|
- if len(keys) > 0 {
|
|
|
- buyerclass = append(buyerclass, "其它")
|
|
|
- }
|
|
|
- for _, v := range buyerclass {
|
|
|
- s_v, _ := v.(string)
|
|
|
- if s_v == "" {
|
|
|
- continue
|
|
|
- }
|
|
|
- entSubSet.Add(s_v, entRule, &entSubSet.Buyerclass)
|
|
|
- }
|
|
|
- }
|
|
|
- //区域
|
|
|
- if len(area) == 0 {
|
|
|
- entSubSet.Add("", entRule, &entSubSet.Area)
|
|
|
- } else {
|
|
|
- for k, v := range area {
|
|
|
- if k == "" {
|
|
|
- continue
|
|
|
- }
|
|
|
- vs, _ := v.([]interface{})
|
|
|
- if len(vs) == 0 {
|
|
|
- entSubSet.Add(k, entRule, &entSubSet.Area)
|
|
|
- } else {
|
|
|
- for _, vv := range vs {
|
|
|
- s_vv, _ := vv.(string)
|
|
|
- if s_vv == "" {
|
|
|
- continue
|
|
|
- }
|
|
|
- entSubSet.Add(s_vv, entRule, &entSubSet.City)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- //信息类型
|
|
|
- if len(infotype) == 0 {
|
|
|
- entSubSet.Add("", entRule, &entSubSet.InfoType)
|
|
|
- } else {
|
|
|
- for _, v := range infotype {
|
|
|
- s_v, _ := v.(string)
|
|
|
- if s_v == "" {
|
|
|
- continue
|
|
|
- }
|
|
|
- entRule.O_entniche.SubTypes = append(entRule.O_entniche.SubTypes, s_v)
|
|
|
- entSubSet.Add(s_v, entRule, &entSubSet.InfoType)
|
|
|
- }
|
|
|
- }
|
|
|
- m.MakeKeyUser(keys, entRule, title_key)
|
|
|
- m.MakeKeyUser(notkeys, entRule, title_notkey)
|
|
|
- if entRule.MatchType == 2 {
|
|
|
- m.MakeKeyUser(keys, entRule, detail_key)
|
|
|
- m.MakeKeyUser(notkeys, entRule, detail_notkey)
|
|
|
- }
|
|
|
- return true
|
|
|
-}
|
|
|
-func (m *MatchJob) InitEntDistribute() {
|
|
|
- sess := mongodb.GetMgoConn()
|
|
|
- defer mongodb.DestoryMongoConn(sess)
|
|
|
- it := sess.DB(DbName).C(Entniche_distribute).Find(map[string]interface{}{
|
|
|
- "i_status": map[string]interface{}{"$ne": 1},
|
|
|
- }).Select(map[string]interface{}{
|
|
|
- "_id": 1,
|
|
|
- "i_entid": 1,
|
|
|
- "i_deptid": 1,
|
|
|
- "a_items": 1,
|
|
|
- "o_area": 1,
|
|
|
- "a_buyerclass": 1,
|
|
|
- }).Iter()
|
|
|
- for temp := make(map[string]interface{}); it.Next(&temp); {
|
|
|
- if temp["i_deptid"] == nil {
|
|
|
- continue
|
|
|
- }
|
|
|
- deptId := util.IntAll(temp["i_deptid"])
|
|
|
- a_items, _ := temp["a_items"].([]interface{})
|
|
|
- o_area, _ := temp["o_area"].(map[string]interface{})
|
|
|
- a_buyerclass, _ := temp["a_buyerclass"].([]interface{})
|
|
|
- EntDeptDis[deptId] = append(EntDeptDis[deptId], &EntDistribute{
|
|
|
- Id: util.BsonIdToSId(temp["_id"]),
|
|
|
- DeptId: deptId,
|
|
|
- Area: o_area,
|
|
|
- Buyerclass: a_buyerclass,
|
|
|
- Items: util.ObjArrToStringArr(a_items),
|
|
|
- })
|
|
|
- temp = make(map[string]interface{})
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-//把规则挂在词下面
|
|
|
-func (m *MatchJob) MakeKeyUser(keys []string, entRule *EntRule, key_entRule *map[string]*[]*EntRule) {
|
|
|
- mp := map[string]bool{}
|
|
|
- for _, key := range keys {
|
|
|
- v := strings.ToUpper(key)
|
|
|
- if v == "" || mp[v] {
|
|
|
- continue
|
|
|
- }
|
|
|
- mp[v] = true
|
|
|
- arr := (*key_entRule)[v]
|
|
|
- if arr == nil {
|
|
|
- arr = &[]*EntRule{}
|
|
|
- }
|
|
|
- *arr = append(*arr, entRule)
|
|
|
- (*key_entRule)[v] = arr
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-//遍历数据并执行推送操作
|
|
|
-func (m *MatchJob) EachAllBidInfo(matcher Matcher, datas *[]map[string]interface{}) (*map[*EntRule]*SortList, *map[*EntRule]*[]string) {
|
|
|
- defer util.Catch()
|
|
|
- logger.Info("开始匹配数据。。。")
|
|
|
- userMap := map[*EntRule]*SortList{}
|
|
|
- projectUserMap := map[*EntRule]*[]string{}
|
|
|
- lock := &sync.Mutex{}
|
|
|
- var index int
|
|
|
- matchPool := make(chan bool, Config.MatchPoolSize)
|
|
|
- matchWaitGroup := &sync.WaitGroup{}
|
|
|
- for _, temp := range *datas {
|
|
|
- matchPool <- true
|
|
|
- matchWaitGroup.Add(1)
|
|
|
- go func(info map[string]interface{}) {
|
|
|
- defer util.Catch()
|
|
|
- defer func() {
|
|
|
- matchWaitGroup.Done()
|
|
|
- <-matchPool
|
|
|
- }()
|
|
|
- entRules, projectEntRules := matcher.Match(&info)
|
|
|
- lock.Lock()
|
|
|
- defer lock.Unlock()
|
|
|
- if entRules != nil && len(*entRules) > 0 {
|
|
|
- for k, v := range *entRules {
|
|
|
- l := userMap[k]
|
|
|
- if l == nil {
|
|
|
- l = &SortList{}
|
|
|
- }
|
|
|
- *l = append(*l, &MatchInfo{
|
|
|
- Info: &info,
|
|
|
- Keys: v.Keys,
|
|
|
- })
|
|
|
- userMap[k] = l
|
|
|
- }
|
|
|
- }
|
|
|
- if projectEntRules != nil && len(*projectEntRules) > 0 {
|
|
|
- for _, v := range *projectEntRules {
|
|
|
- l := projectUserMap[v]
|
|
|
- if l == nil {
|
|
|
- l = &[]string{}
|
|
|
- }
|
|
|
- *l = append(*l, util.ObjToString(info["_id"]))
|
|
|
- projectUserMap[v] = l
|
|
|
- }
|
|
|
- }
|
|
|
- }(temp)
|
|
|
- index++
|
|
|
- if index%500 == 0 {
|
|
|
- logger.Info("匹配数据", index)
|
|
|
- }
|
|
|
- }
|
|
|
- matchWaitGroup.Wait()
|
|
|
- logger.Info("匹配数据结束。。。", index)
|
|
|
- return &userMap, &projectUserMap
|
|
|
-}
|
|
|
-
|
|
|
-func (m *MatchJob) ToMatch(matcher Matcher, datas *[]map[string]interface{}) {
|
|
|
- logger.Info("开始匹配商机管理订阅规则。。。")
|
|
|
- entRuleMap, projectentRuleMap := m.EachAllBidInfo(matcher, datas)
|
|
|
- logger.Info("开始匹配商机管理订阅规则。。。匹配结束。。。")
|
|
|
- logger.Info("开始保存到", Pushspace_entniche_temp, "表。。。")
|
|
|
- index := 0
|
|
|
- var saveBatch []map[string]interface{}
|
|
|
- myMatchId := map[string]map[string]bool{}
|
|
|
- myFilterId := map[string]map[string]bool{}
|
|
|
- //
|
|
|
- savePool := make(chan bool, Config.SavePoolSize)
|
|
|
- saveWaitGroup := &sync.WaitGroup{}
|
|
|
- lock := &sync.Mutex{}
|
|
|
- for k, v := range *entRuleMap {
|
|
|
- savePool <- true
|
|
|
- saveWaitGroup.Add(1)
|
|
|
- go func(entRule *EntRule, infos *SortList) {
|
|
|
- defer util.Catch()
|
|
|
- defer func() {
|
|
|
- <-savePool
|
|
|
- saveWaitGroup.Done()
|
|
|
- }()
|
|
|
- //取最新50条
|
|
|
- sort.Sort(infos)
|
|
|
- var array []*MatchInfo
|
|
|
- matchTitle := map[string]bool{}
|
|
|
- matchId := map[string]bool{}
|
|
|
- filterId := map[string]bool{}
|
|
|
- size := 0
|
|
|
- for _, v2 := range *infos {
|
|
|
- title := util.ObjToString((*v2.Info)["title"])
|
|
|
- _id := util.ObjToString((*v2.Info)["_id"])
|
|
|
- if matchTitle[title] {
|
|
|
- filterId[_id] = true
|
|
|
- continue
|
|
|
- }
|
|
|
- matchTitle[title] = true
|
|
|
- info := map[string]interface{}{}
|
|
|
- for _, field := range InfoSaveFields {
|
|
|
- if (*v2.Info)[field] == nil {
|
|
|
- continue
|
|
|
- }
|
|
|
- info[field] = (*v2.Info)[field]
|
|
|
- }
|
|
|
- matchId[_id] = true
|
|
|
- array = append(array, &MatchInfo{
|
|
|
- Info: &info,
|
|
|
- Keys: v2.Keys,
|
|
|
- })
|
|
|
- size++
|
|
|
- if size >= Config.MaxMatchSize {
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- if size == 0 {
|
|
|
- return
|
|
|
- }
|
|
|
- lock.Lock()
|
|
|
- defer lock.Unlock()
|
|
|
- unique := GetUnique(entRule.EntId, entRule.DeptId, entRule.UserId, entRule.DisId)
|
|
|
- myMatchId[unique] = matchId
|
|
|
- myFilterId[unique] = filterId
|
|
|
- saveBatch = append(saveBatch, map[string]interface{}{
|
|
|
- "entid": entRule.EntId,
|
|
|
- "entname": entRule.EntName,
|
|
|
- "deptid": entRule.DeptId,
|
|
|
- "userid": entRule.UserId,
|
|
|
- "distributeid": entRule.DisId,
|
|
|
- "ratemode": entRule.RateMode,
|
|
|
- "apppush": entRule.AppPush,
|
|
|
- "mailpush": entRule.MailPush,
|
|
|
- "list": array,
|
|
|
- "size": size,
|
|
|
- "words": entRule.Keys,
|
|
|
- "timestamp": time.Now().Unix(),
|
|
|
- })
|
|
|
- if len(saveBatch) == Config.BulkSize {
|
|
|
- mongodb.SaveBulk(Pushspace_entniche_temp, saveBatch...)
|
|
|
- saveBatch = []map[string]interface{}{}
|
|
|
- }
|
|
|
- }(k, v)
|
|
|
- index++
|
|
|
- if index%500 == 0 {
|
|
|
- logger.Info("保存到", Pushspace_entniche_temp, "表:", index)
|
|
|
- }
|
|
|
- }
|
|
|
- saveWaitGroup.Wait()
|
|
|
- if len(saveBatch) > 0 {
|
|
|
- mongodb.SaveBulk(Pushspace_entniche_temp, saveBatch...)
|
|
|
- saveBatch = []map[string]interface{}{}
|
|
|
- }
|
|
|
- logger.Info("保存到", Pushspace_entniche_temp, "表结束。。。", index)
|
|
|
- m.ToRelationProject(projectentRuleMap, &myMatchId, &myFilterId)
|
|
|
-}
|
|
|
-
|
|
|
-//关联项目
|
|
|
-func (m *MatchJob) ToRelationProject(projectEntRule *map[*EntRule]*[]string, myMatchId, myFilterId *map[string]map[string]bool) {
|
|
|
- logger.Info("开始关联项目。。。")
|
|
|
- index := 0
|
|
|
- var updateproject [][]map[string]interface{}
|
|
|
- //
|
|
|
- savePool := make(chan bool, Config.SavePoolSize)
|
|
|
- saveWaitGroup := &sync.WaitGroup{}
|
|
|
- lock := &sync.Mutex{}
|
|
|
- for k, v := range *projectEntRule {
|
|
|
- savePool <- true
|
|
|
- saveWaitGroup.Add(1)
|
|
|
- go func(entRule *EntRule, _ids *[]string) {
|
|
|
- defer util.Catch()
|
|
|
- defer func() {
|
|
|
- <-savePool
|
|
|
- saveWaitGroup.Done()
|
|
|
- }()
|
|
|
- unique := GetUnique(entRule.EntId, entRule.DeptId, entRule.UserId, entRule.DisId)
|
|
|
- needLength := 50 - len((*myMatchId)[unique])
|
|
|
- for _, _id := range *_ids {
|
|
|
- if (*myFilterId)[unique] != nil && (*myFilterId)[unique][_id] {
|
|
|
- continue
|
|
|
- }
|
|
|
- //如果有信息类型,优先用订阅匹配上的信息,然后最多关联50条
|
|
|
- if len(entRule.O_entniche.SubTypes) > 0 {
|
|
|
- if (*myMatchId)[unique] == nil || !(*myMatchId)[unique][_id] {
|
|
|
- if needLength > 0 {
|
|
|
- needLength--
|
|
|
- } else {
|
|
|
- continue
|
|
|
- }
|
|
|
- }
|
|
|
- } else { //如果没有信息类型,只关联订阅匹配上的信息
|
|
|
- if (*myMatchId)[unique] == nil || !(*myMatchId)[unique][_id] {
|
|
|
- continue
|
|
|
- }
|
|
|
- }
|
|
|
- list_last_infoid := ""
|
|
|
- projectId := ""
|
|
|
- if value, ok := m.allProject.Load(_id); ok {
|
|
|
- project, _ := value.(*Project)
|
|
|
- projectId = project.Id
|
|
|
- list_last_infoid = project.List_last_infoid
|
|
|
- } else {
|
|
|
- projects := elastic.Get(Projectset, Projectset, fmt.Sprintf(ProjectQuery, _id))
|
|
|
- if projects == nil || len(*projects) == 0 {
|
|
|
- continue
|
|
|
- }
|
|
|
- list := (*projects)[0]["list"].([]interface{})
|
|
|
- if len(list) == 0 {
|
|
|
- continue
|
|
|
- }
|
|
|
- list_last, _ := list[len(list)-1].(map[string]interface{})
|
|
|
- list_last_infoid = util.ObjToString(list_last["infoid"])
|
|
|
- projectId, _ = (*projects)[0]["_id"].(string)
|
|
|
- m.allProject.Store(_id, &Project{
|
|
|
- Id: projectId,
|
|
|
- List_last_infoid: list_last_infoid,
|
|
|
- })
|
|
|
- }
|
|
|
- if projectId == "" || list_last_infoid == "" {
|
|
|
- continue
|
|
|
- }
|
|
|
- lock.Lock()
|
|
|
- updateproject = append(updateproject, []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "projectid": projectId,
|
|
|
- "unique": unique,
|
|
|
- },
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "projectid": projectId,
|
|
|
- "infoid": _id,
|
|
|
- "unique": unique,
|
|
|
- "entid": entRule.EntId,
|
|
|
- "deptid": entRule.DeptId,
|
|
|
- "userid": entRule.UserId,
|
|
|
- "distributeid": entRule.DisId,
|
|
|
- "maxid": list_last_infoid,
|
|
|
- "subtypes": entRule.O_entniche.SubTypes,
|
|
|
- "createtime": time.Now().Unix(),
|
|
|
- },
|
|
|
- },
|
|
|
- })
|
|
|
- if len(updateproject) == Config.BigBulkSize {
|
|
|
- mongodb.NewUpdateBulk(Pushspace_entniche_project, true, true, updateproject...)
|
|
|
- updateproject = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
- }
|
|
|
- }(k, v)
|
|
|
- index++
|
|
|
- if index%500 == 0 {
|
|
|
- logger.Info("关联项目:", index)
|
|
|
- }
|
|
|
- }
|
|
|
- saveWaitGroup.Wait()
|
|
|
- if len(updateproject) > 0 {
|
|
|
- mongodb.NewUpdateBulk(Pushspace_entniche_project, true, true, updateproject...)
|
|
|
- updateproject = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- logger.Info("关联项目结束。。。", index)
|
|
|
-}
|
|
|
-
|
|
|
-func (m *MatchJob) GetKeySet(a_key interface{}) ([]*KeySet, error) {
|
|
|
- var keySet []*KeySet
|
|
|
- _bs, err := json.Marshal(a_key)
|
|
|
- if err == nil {
|
|
|
- err = json.Unmarshal(_bs, &keySet)
|
|
|
- }
|
|
|
- return keySet, err
|
|
|
-}
|