Browse Source

更新 处理es 程序,支持 开始结束时间

wcc 2 years ago
parent
commit
d0fbb01968
4 changed files with 67 additions and 38 deletions
  1. 36 20
      qyxy_es_new/main.go
  2. 11 1
      qyxy_es_new/stdall.toml
  3. 5 6
      qyxy_es_new/task.go
  4. 15 11
      udp/main.go

+ 36 - 20
qyxy_es_new/main.go

@@ -20,12 +20,12 @@ var (
 	Dbcoll    string
 	Es        *elastic.Elastic
 	Index     string
-	Itype     string
-	EsFields  []string
+	//Itype     string
+	EsFields []string
 
-	Updatetime int64
-	localPort  string // 本地监听端口
-	UdpClient  udp.UdpClient
+	//Updatetime int64
+	localPort string // 本地监听端口
+	UdpClient udp.UdpClient
 )
 var EsSaveCache = make(chan map[string]interface{}, 5000)
 var SP = make(chan bool, 5)
@@ -46,7 +46,7 @@ func init() {
 	//es
 	econf := Sysconfig["elastic"].(map[string]interface{})
 	Index = econf["index"].(string)
-	Itype = econf["itype"].(string)
+	//Itype = econf["itype"].(string)
 	Es = &elastic.Elastic{
 		S_esurl:  econf["addr"].(string),
 		I_size:   utils.IntAllDef(econf["pool"], 12),
@@ -57,7 +57,7 @@ func init() {
 
 	EsFields = utils.ObjArrToStringArr(econf["esfields"].([]interface{}))
 
-	Updatetime = utils.Int64All(Sysconfig["updatetime"])
+	//Updatetime = utils.Int64All(Sysconfig["updatetime"])
 
 	localPort = Sysconfig["local_port"].(string) //udp 本地监听地址
 	UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
@@ -97,15 +97,10 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	case udp.OP_TYPE_DATA:
 		var mapInfo map[string]interface{}
 		err := json.Unmarshal(data, &mapInfo)
-		log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
-		//updatetime,配置文件有值就用配置文件的,否则使用udp 传递的数据
-		if _, ok2 := mapInfo["start_time"]; ok2 {
-			if utils.Int64All(Sysconfig["updatetime"]) > 0 {
-				Updatetime = utils.Int64All(Sysconfig["updatetime"])
-			} else {
-				Updatetime = utils.Int64All(mapInfo["start_time"])
-			}
+		if err != nil {
+			log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
 		}
+		log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
 
 		if tasktype, ok := mapInfo["stype"].(string); ok {
 			switch tasktype {
@@ -120,13 +115,34 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		} else {
 			//拿到同步信号,开始同步数据
 			if _, ok := mapInfo["start"]; ok {
-				//go SaveEs() // 写入es
-				go StdAdd() //读取qyxy_std 数据,放入es 数组
+				var start_time, end_time int64
+				if _, ok2 := mapInfo["start_time"]; ok2 {
+					start_time = utils.Int64All(mapInfo["start_time"])
+					end_time = utils.Int64All(mapInfo["end_time"])
+				}
+
+				var q map[string]interface{}
+				if start_time > 0 {
+					if end_time > 0 {
+						q = map[string]interface{}{
+							"updatetime": map[string]interface{}{
+								"$gte": start_time,
+								"$lte": end_time,
+							},
+						}
+					} else {
+						q = map[string]interface{}{
+							"updatetime": map[string]interface{}{
+								"$gte": start_time,
+							},
+						}
+					}
+					go StdAdd(q) //读取qyxy_std 数据,放入es 数组
+				} else {
+					fmt.Println("参数 start_time 为0")
+				}
 			}
 
-			if err != nil {
-				utils.Debug("Unmarshal err :=>", err)
-			}
 		}
 
 	default:

+ 11 - 1
qyxy_es_new/stdall.toml

@@ -54,6 +54,16 @@ ltime = 1682955869  ##  31993374
 [all.10]
 coll = "qyxy_std"
 gtime = 1682955869
-ltime = 1683529778 ##  31546549
+ltime = 1683529778 ## 5.8凌晨 31546549
+
+[all.11]
+coll = "qyxy_std"
+gtime = 1683529778
+ltime = 1683907200 ##  5.13 凌晨
+
+[all.12]
+coll = "qyxy_std"
+gtime = 1683907200 ## 5.13 凌晨
+ltime = 1684512000 ## 5.20 凌晨
 
 

+ 5 - 6
qyxy_es_new/task.go

@@ -5,7 +5,6 @@ import (
 	"app.yhyue.com/data_processing/common_utils/log"
 	"fmt"
 	"github.com/spf13/viper"
-	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
 	"strconv"
 	"strings"
@@ -32,7 +31,7 @@ var (
 )
 
 // StdAdd 增量数据
-func StdAdd() {
+func StdAdd(q interface{}) {
 	defer util.Catch()
 	sess := Mgo.GetMgoConn()
 	defer Mgo.DestoryMongoConn(sess)
@@ -40,15 +39,15 @@ func StdAdd() {
 	pool := make(chan bool, 5)
 	wg := &sync.WaitGroup{}
 	//q := bson.M{"_id": "affe29f8d061f3faa4170cafba41f316"}
-	q := bson.M{"updatetime": bson.M{"$gt": Updatetime}}
-	util.Debug(q)
+	//q := bson.M{"updatetime": bson.M{"$gt": Updatetime}}
+	log.Info("StdAdd", zap.Any("q", q))
 	it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
 		if count%20000 == 0 {
 			//log.Println("current:", count)
 			log.Info("StdAdd", zap.Int("current:", count))
-			log.Info("StdAdd", zap.Int64("Updatetime:", Updatetime))
+			log.Info("StdAdd", zap.Any("q", q), zap.Any("updatetime", tmp["updatetime"]))
 		}
 		if util.IntAll(tmp["use_flag"]) > 5 {
 			continue
@@ -210,7 +209,7 @@ func StdAdd() {
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()
-	log.Info("StdAdd", zap.Int("Run Over...Count::", count))
+	log.Info("StdAdd", zap.Any("q", q), zap.Int("Run Over...Count::", count))
 }
 
 // StdAll 分段处理存量数据

+ 15 - 11
udp/main.go

@@ -15,21 +15,21 @@ import (
 var startDate, endDate string
 
 func main() {
-	ip, gtime, ltime, p, tmptime, tmpkey, id1, id2, stype, q, bkey, param, ids, path, start := "", 0, 0, 0, 0, "", "", "", "", "", "", "", "", "", ""
+	ip, start_time, end_time, p, tmptime, tmpkey, id1, id2, stype, q, bkey, param, ids, path, start := "", 0, 0, 0, 0, "", "", "", "", "", "", "", "", "", ""
 	flag.StringVar(&startDate, "startDate", "", "开始日期2006-01-02")
 	flag.StringVar(&endDate, "endDate", "", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
 	flag.IntVar(&p, "p", 6601, "端口")
 	flag.IntVar(&tmptime, "tmptime", 0, "时间查询")
-	flag.IntVar(&gtime, "gtime", 0, "开始时间戳")
-	flag.IntVar(&ltime, "ltime", 0, "结束时间戳")
+	flag.IntVar(&start_time, "start_time", 0, "开始时间戳")
+	flag.IntVar(&end_time, "end_time", 0, "结束时间戳")
 	flag.StringVar(&tmpkey, "tmpkey", "", "时间字段")
 	flag.StringVar(&id1, "gtid", "", "gtid")
 	flag.StringVar(&id2, "lteid", "", "lteid")
 	flag.StringVar(&path, "path", "", "path")    // 指定路径
 	flag.StringVar(&start, "start", "", "start") // 开始标志
 	flag.StringVar(&ids, "ids", "", "id1,id2")
-	flag.StringVar(&stype, "stype", "biddingall", "stype,传递类型")
+	flag.StringVar(&stype, "stype", "", "stype,传递类型")
 	flag.StringVar(&bkey, "bkey", "", "bkey,加上此参数表示不生关键词和摘要")
 	flag.StringVar(&q, "q", "", "q查询语句\"{'':''}\",有q就不要gtid,lteid")
 	flag.StringVar(&param, "param", "", "param,生信息发布或其他索引时用双引号套单引号\"{'mgoaddr':'','d':'','c':'','index':'','type':''}\"")
@@ -54,11 +54,13 @@ func main() {
 				os.Exit(0)
 			}
 		})
-		m1 := map[string]interface{}{
-			//"gtid":  id1,
-			//"lteid": id2,
-			"stype": stype,
+
+		m1 := map[string]interface{}{}
+
+		if stype != "" {
+			m1["stype"] = stype
 		}
+
 		if id1 != "" {
 			m1["gtid"] = id1
 		}
@@ -74,11 +76,13 @@ func main() {
 		}
 
 		// 针对qyxy_es 企业数据,传递时间戳
-		if ltime > 0 {
-			m1["ltime"] = ltime
-			m1["gtime"] = gtime
+		if start_time > 0 {
+			m1["start_time"] = start_time
 		}
 
+		if end_time > 0 {
+			m1["end_time"] = end_time
+		}
 		if bkey != "" {
 			m1["bkey"] = bkey
 		}