xuzhiheng 9 сар өмнө
parent
commit
599a096b39

+ 0 - 1
customerdata/src/datamodel.go

@@ -28,7 +28,6 @@ type Customer struct {
 	IdRanges     bson.M
 	DataSave     string
 	Exact        int
-	Sbidding     string
 }
 
 // 部门模型

+ 21 - 18
customerdata/src/main.go

@@ -21,6 +21,7 @@ type sysconfig struct {
 	TaskTime               int64                  `json:"tasktime"`
 	LatestId               string                 `json:"latestid"`
 	LatestTime             int64                  `json:"latesttime"`
+	LatestTimes            int64                  `json:"latesttimes"`
 	BuyerInfo              map[string]interface{} `json:"buyerinfo"`
 	Enterprise             map[string]interface{} `json:"enterprise"`
 	Save                   map[string]interface{} `json:"save"`
@@ -50,23 +51,23 @@ type sysconfig struct {
 }
 
 var (
-	Sysconfig    sysconfig
-	LatestId     string              //起始id
-	LatestTime   int64               //createtime
-	MgoTag       *mongodb.MongodbSim //标签库连接
-	MgoSave      *mongodb.MongodbSim //数据保存库连接
-	MgoBuyer     *mongodb.MongodbSim //医院等级信息
-	MgoEnps      *mongodb.MongodbSim //企业信息
-	MgoExtract   *mongodb.MongodbSim //抽取
-	MgoBidding   *mongodb.MongodbSim //bidding表
-	SaveColl     string
-	HospColl     string
-	BuyerEntColl string
-	EnpsColl     string
-	Es           elastic.Es //es
-	Index        string
-	Itype        string
-	TaskTime     int64 //定时任务时间
+	Sysconfig               sysconfig
+	LatestId                string              //起始id
+	LatestTime, LatestTimes int64               //createtime
+	MgoTag                  *mongodb.MongodbSim //标签库连接
+	MgoSave                 *mongodb.MongodbSim //数据保存库连接
+	MgoBuyer                *mongodb.MongodbSim //医院等级信息
+	MgoEnps                 *mongodb.MongodbSim //企业信息
+	MgoExtract              *mongodb.MongodbSim //抽取
+	MgoBidding              *mongodb.MongodbSim //bidding表
+	SaveColl                string
+	HospColl                string
+	BuyerEntColl            string
+	EnpsColl                string
+	Es, Es2                 elastic.Es //es
+	Index                   string
+	Itype                   string
+	TaskTime                int64 //定时任务时间
 	//历史数据
 	SId                    string
 	EId                    string
@@ -168,6 +169,7 @@ func init() {
 	// }
 	// Es.InitElasticSize()
 	Es = elastic.NewEs(qu.ObjToString(es["version"]), qu.ObjToString(es["addr"]), qu.IntAllDef(es["pool"], 15), qu.ObjToString(es["userName"]), qu.ObjToString(es["password"]))
+	Es2 = elastic.NewEs(qu.ObjToString(es["version"]), "http://172.17.4.184:19908", qu.IntAllDef(es["pool"], 15), qu.ObjToString(es["userName"]), qu.ObjToString(es["password"]))
 	redis.InitRedis(Sysconfig.RedisAddrs)
 	Index = qu.ObjToString(es["index"])
 	Itype = qu.ObjToString(es["itype"])
@@ -175,6 +177,7 @@ func init() {
 	TaskTime = Sysconfig.TaskTime
 	LatestId = Sysconfig.LatestId
 	LatestTime = Sysconfig.LatestTime
+	LatestTimes = Sysconfig.LatestTimes
 	IsNewSql = Sysconfig.IsNewSql
 	// tmp := map[string]interface{}{
 	// 	"buyer":    "四川大学华西第二医院",
@@ -229,7 +232,7 @@ func main() {
 	if CustomerName != "" {
 		go HistoryTask(CustomerName)
 	} else {
-		go TimeTask() //定时任务
+		TimeTask() //定时任务
 	}
 	ch := make(chan bool, 1)
 	<-ch

+ 88 - 10
customerdata/src/task.go

@@ -21,19 +21,22 @@ import (
 	esv "es"
 
 	"github.com/antonmedv/expr"
-	"github.com/donnie4w/go-logger/logger"
 	"go.mongodb.org/mongo-driver/bson/primitive"
 
 	esV7 "github.com/olivere/elastic"
 )
 
 func TimeTask() {
-	StartTask()
+	// StartTask()
 	c := cron.New()
 	//cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?"
 	cronstr := "0 5 */" + fmt.Sprint(TaskTime) + " * * ?" //每TaskTime小时执行一次
 	c.AddFunc(cronstr, func() { StartTask() })
 	c.Start()
+	d := cron.New()
+	cronstrs := "0 45 */" + fmt.Sprint(TaskTime) + " * * ?" //每TaskTime小时执行一次
+	d.AddFunc(cronstrs, func() { GetCustomerDatas() })
+	d.Start()
 }
 func StartTask() {
 	GetCustomerData()
@@ -47,7 +50,7 @@ func GetCustomerData() {
 	if !ok {
 		return
 	}
-	logger.Debug("此次任务区间:", idRange, idRange2)
+	log.Println("此次任务区间:", idRange, idRange2)
 	//查询企业库开启推送的客户
 	customers, _ := MgoTag.Find("euser", map[string]interface{}{"i_push": 1, "b_delete": false}, nil, nil)
 	for _, c := range customers {
@@ -58,7 +61,11 @@ func GetCustomerData() {
 		pushModel := qu.IntAll(c["i_pushmodel"])  //推送模式
 		dataSave := qu.ObjToString(c["s_dataSave"])
 		exact := qu.IntAll(c["i_exact"])
-		log.Println("当前客户 ", customer)
+		s_bidding := qu.ObjToString(c["s_bidding"])
+		log.Println("当前客户 ", customer, s_bidding)
+		if s_bidding == "high" {
+			continue
+		}
 		cus := &Customer{}
 		cus.SaveDataMap = map[string]map[string]interface{}{}
 		cus.SaveDataArr = map[string]map[string]interface{}{}
@@ -100,7 +107,70 @@ func GetCustomerData() {
 	Sysconfig.LatestId = LatestId
 	Sysconfig.LatestTime = endTime
 	qu.WriteSysConfig(Sysconfig)
-	logger.Debug("定时任务结束-endId-Sysconfig.LatestTime ", endTime)
+	log.Println("定时任务结束-endId-Sysconfig.LatestTime ", endTime)
+}
+
+func GetCustomerDatas() {
+	defer qu.Catch()
+	log.Println("高质量定时任务开始")
+	idRange, idRange2, ok, endTime := GetIdRanges() //获取id区间
+	if !ok {
+		return
+	}
+	log.Println("此次任务区间:", idRange, idRange2)
+	//查询企业库开启推送的客户
+	customers, _ := MgoTag.Find("euser", map[string]interface{}{"i_push": 1, "b_delete": false, "s_bidding": "high"}, nil, nil)
+	for _, c := range customers {
+		customerId := mgoutil.BsonTOStringId(c["_id"])
+		customer := qu.ObjToString(c["s_name"])   //客户名称
+		appId := qu.ObjToString(c["s_appid"])     //appid
+		extends := qu.ObjToString(c["s_extends"]) //扩展信息
+		pushModel := qu.IntAll(c["i_pushmodel"])  //推送模式
+		dataSave := qu.ObjToString(c["s_dataSave"])
+		exact := qu.IntAll(c["i_exact"])
+		log.Println("当前客户 ", customer)
+		cus := &Customer{}
+		cus.SaveDataMap = map[string]map[string]interface{}{}
+		cus.SaveDataArr = map[string]map[string]interface{}{}
+		cus.IdRange = idRange
+		cus.IdRanges = idRange2
+		cus.ID = customerId
+		cus.Name = customer
+		cus.PushModel = pushModel
+		cus.AppId = appId
+		cus.DataSave = dataSave
+		cus.Exact = exact
+
+		for _, v := range strings.Split(extends, ",") {
+			if v == "hospitalgrade" {
+				cus.IsSearchHosp = true
+			} else if v == "enterpise" {
+				cus.IsSearchEnps = true
+			}
+		}
+		//
+		if projectAppidMap[appId] {
+			start := time.Now().Unix()
+			log.Println("加载projectId---开始")
+			InitProjectId(appId)
+			end := time.Now().Unix()
+			log.Println("加载projectId---结束,耗时", end-start, "秒")
+		} else {
+			projectIdMap = sync.Map{}
+		}
+		//
+		cus.GetTagRules()      //获取客户打标签规则
+		cus.GetDepartments("") //获取客户信息
+		//PrintLog(cus)        //打印查看初始化的信息
+		qu.Debug("customer:", cus.ID, cus.Name, cus.PushModel, cus.AppId, cus.IsTagRule, cus.IsTagRule2, cus.IsTagRule3, cus.IsSearchHosp, cus.IsSearchEnps, len(cus.TagRules), len(cus.TagRules2), len(cus.TagRules3), len(cus.Departments))
+		cus.GetData("high")       //获取数据
+		cus.RemoveRepeatData()    //数据去重
+		cus.AssembelAndSaveData() //组装、保存数据
+	}
+	Sysconfig.LatestId = LatestId
+	Sysconfig.LatestTimes = endTime
+	qu.WriteSysConfig(Sysconfig)
+	log.Println("高质量定时任务结束-endId-Sysconfig.LatestTimes ", endTime)
 }
 
 // 获取客户打标签规则
@@ -218,8 +288,13 @@ func (c *Customer) GetData(stype string) {
 	esversion := qu.ObjToString(esConfig["version"])
 	if esversion == "v1" {
 	} else {
-		esCon := esv.VarEs.(*esv.EsV7)
-		c.EsConGetDataV7(stype, esCon)
+		if stype == "high" {
+			esCon := Es2.(*esv.EsV7)
+			c.EsConGetDataV7(stype, esCon)
+		} else {
+			esCon := Es.(*esv.EsV7)
+			c.EsConGetDataV7(stype, esCon)
+		}
 	}
 }
 
@@ -257,11 +332,14 @@ func (c *Customer) EsConGetDataV7(stype string, esCon *esv.EsV7) {
 			// ch := make(chan bool, 10)
 			// wg := &sync.WaitGroup{}
 			esindex := Index
-			if c.Sbidding == "high" {
+			escount := int64(0)
+			if stype == "high" {
 				esindex = "bidding_ai"
+				escount = Es2.Count(esindex, Itype, sr.EsQuery)
+			} else {
+				escount = Es.Count(esindex, Itype, sr.EsQuery)
 			}
-			escount := Es.Count(esindex, Itype, sr.EsQuery)
-			log.Println("查询总数:", escount, "规则ID:", sr.ID, "EsQuery:", sr.EsQuery)
+			log.Println("查询总数:", escount, "规则ID:", sr.ID, "EsQuery:", sr.EsQuery, stype)
 			if escount == 0 {
 				continue
 			}

+ 61 - 0
customerdata/src/util.go

@@ -275,6 +275,67 @@ func GetIdRange() (bson.M, bson.M, bool, int64) {
 	}
 }
 
+func GetIdRanges() (bson.M, bson.M, bool, int64) {
+	defer qu.Catch()
+	// now := time.Now().Unix()
+	// for { //当前时间一直向前推半小时,直到取到数据
+	// now = now - 600 //10分钟前
+	// endTime := time.Unix(now, 0)
+	// endId := bson.NewObjectIdWithTime(endTime).Hex()
+	// if (now1 - now) > 3*600 {
+	// 	return bson.M{"range": bson.M{
+	// 		"id": bson.M{
+	// 			"lte": endId,
+	// 			"gt":  LatestId,
+	// 		},
+	// 	}}, true
+	// }
+	idQuery, endTime := "", int64(0)
+	esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gt":"%d"}}}}},"_source":["id","pici"],"sort":{"pici":"desc"},"from":0,"size":1}`
+	if LatestId == "" {
+		idQuery = strings.Replace(fmt.Sprintf(esquery, LatestTime), `"gt"`, `"gte"`, -1)
+	} else {
+		esquerys := `{"query":{"bool":{"must":{"range":{"id":{"gt":"%s"}}}}},"_source":["id","pici"],"sort":{"pici":"desc"},"from":0,"size":1}`
+		idQuery = fmt.Sprintf(esquerys, LatestId)
+	}
+	resId := Es2.Get("bidding_ai", "", idQuery)
+	if resId != nil && *resId != nil && len(*resId) == 1 {
+		endTime = qu.Int64All((*resId)[0]["pici"]) - 600
+	} else {
+		logger.Debug("本次任务未查找到数据...", idQuery)
+		return bson.M{}, bson.M{}, false, endTime
+	}
+	tmpRange1 := bson.M{
+		"range": bson.M{
+			"pici": bson.M{
+				"lte": endTime,
+				"gt":  LatestTime,
+			},
+		},
+	}
+	tmpRange2 := bson.M{
+		"range": bson.M{
+			"pici": bson.M{
+				"lte": endTime,
+			},
+		},
+	}
+	tmpRange3 := bson.M{
+		"range": bson.M{
+			"id": bson.M{
+				"gt": LatestId,
+			},
+		},
+	}
+	LatestTimes = endTime
+	if LatestId == "" {
+		return tmpRange1, bson.M{}, true, endTime
+	} else {
+		LatestId = ""
+		return tmpRange2, tmpRange3, true, endTime
+	}
+}
+
 // 处理文本
 func ProcessData(text string) string {
 	defer qu.Catch()