Browse Source

更新 预处理,修复bug

wcc 1 year ago
parent
commit
187f194fc7

+ 30 - 1
createEsIndex/bidding_es.go

@@ -72,6 +72,15 @@ func biddingTask(mapInfo map[string]interface{}) {
 				<-ch
 				wg.Done()
 			}()
+			//判断是否是预处理数据;pre_id 是标识
+			if config.Conf.Env.OpenPre {
+				if pre_id, ok := tmp["pre_id"]; ok {
+					preID := util.ObjToString(pre_id)
+					if preID != "" {
+						deletePreEsData(preID)
+					}
+				}
+			}
 			if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
 				tmp = make(map[string]interface{})
 				return
@@ -209,6 +218,16 @@ func biddingAllTask(mapInfo map[string]interface{}) {
 				<-ch
 				wg.Done()
 			}()
+			//判断是否是预处理数据;pre_id 是标识
+			if config.Conf.Env.OpenPre {
+				if pre_id, ok := tmp["pre_id"]; ok {
+					preID := util.ObjToString(pre_id)
+					if preID != "" {
+						deletePreEsData(preID)
+					}
+				}
+			}
+
 			if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
 				tmp = make(map[string]interface{})
 				return
@@ -362,6 +381,15 @@ func biddingTaskById(mapInfo map[string]interface{}) {
 	stype := util.ObjToString(mapInfo["stype"])
 	infoid := util.ObjToString(mapInfo["infoid"])
 	tmp, _ := MgoB.FindById(config.Conf.DB.MongoB.Coll, infoid, map[string]interface{}{"contenthtml": 0})
+	//判断是否是预处理数据;pre_id 是标识
+	if config.Conf.Env.OpenPre {
+		if pre_id, ok := (*tmp)["pre_id"]; ok {
+			preID := util.ObjToString(pre_id)
+			if preID != "" {
+				deletePreEsData(preID)
+			}
+		}
+	}
 	if sensitive := util.ObjToString((*tmp)["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
 		return
 	}
@@ -545,7 +573,8 @@ func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{
 	// 附件内容长度不做限制,大于长度限制做记录
 	filetext := getFileText(tmp)
 	if len([]rune(filetext)) > 10 {
-		newTmp["filetext"] = filetext
+		//去除 空格
+		newTmp["filetext"] = strings.Replace(filetext, " ", "", -1)
 		if len([]rune(filetext)) > fileLength {
 			//saveErr["filetext"] = filetext
 			saveErr["filetext_length"] = len([]rune(filetext))

+ 68 - 53
createEsIndex/common-pro.toml

@@ -1,35 +1,34 @@
-
 [udp]
-locport = ":1783"
-jyaddr = "172.17.145.178"
-jyport = 11118
+    locport = ":1783"
+    jyaddr = "172.17.162.35"
+    jyport = 11118
 # neaddr = "127.0.0.1" ## 下个 es地址
 # neport = 1784
 
 [db]
 [db.mongoB]
-addr = "172.17.4.187:27082,172.17.145.163:27083"
-dbname = "qfw"
-coll = "bidding"
-size = 15
-user = "SJZY_RWbid_ES"
-password = "SJZY@B4i4D5e6S"
+    addr = "172.17.189.140:27080,172.17.189.141:27081"
+    dbname = "qfw"
+    coll = "bidding"
+    size = 15
+    user = "SJZY_RWbid_ES"
+    password = "SJZY@B4i4D5e6S"
 
 [db.mongoP]
-addr = "172.17.4.85:27080"
-dbname = "qfw"
-coll = "projectset_20230904"
-size = 15
-user = ""
-password = ""
+    addr = "172.17.4.85:27080"
+    dbname = "qfw"
+    coll = "projectset_20230904"
+    size = 15
+    user = ""
+    password = ""
 
 [db.mongoQ]
-addr = "172.17.4.187:27082,172.17.145.163:27083"
-dbname = "mixdata"
-coll = ""
-size = 15
-user = "SJZY_RWbid_ES"
-password = "SJZY@B4i4D5e6S"
+    addr = "172.17.189.140:27080,172.17.189.141:27081"
+    dbname = "mixdata"
+    coll = ""
+    size = 15
+    user = "SJZY_RWbid_ES"
+    password = "SJZY@B4i4D5e6S"
 
 [db.mongoS] ##  181 特殊企业,采购单位验证
     # addr = "127.0.0.1:27001"
@@ -48,49 +47,65 @@ password = "SJZY@B4i4D5e6S"
     password = "Wcc#20221209P"
 
 [db.oss]
-   endpoint = "oss-cn-beijing-internal.aliyuncs.com"## 正式环境
+    endpoint = "oss-cn-beijing-internal.aliyuncs.com"## 正式环境
     # endpoint = "oss-cn-beijing.aliyuncs.com"## 测试环境
     accesskey = "LTAI4G5x9aoZx8dDamQ7vfZi"
     accesssecret = "Bk98FsbPYXcJe72n1bG3Ssf73acuNh"
     bucketname = "topjy"
     filesize = 500000  ## 单位字节,附件总字节长度限制;超过就不再读取。不配置默认500000
 
-    
+
 [db.es]
-addr = "http://172.17.4.184:19805"
-# addr = "http://172.17.4.184:19805,http://172.17.148.50:19805,http://172.17.145.164:19805"
-addrp = "http://172.17.145.178:9200" ## 9200单机版,已经切换7.1
-size = 5
-username = "es_all"
-password = "TopJkO2E_d1x"
-indexb = "bidding"
-indextmp = "bidding_temporary" ## 临时索引,其他程序需要
-typeb = "bidding"
-indexp = "projectset"
-typep = "projectset"
-indexwinner = "winner"
-typewinner = "winner"
-indexbuyer = "buyer"
-typebuyer = "buyer"
-detailfilter = ["(招标网|千里马|采招网|招标采购导航网|招标与采购网|中国招投标网|中国采购与招标网|中国采购与招标|优质采)[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}(qianlima|zhaobiao|okcis|zbytb|infobidding|bidcenter|youzhicai|chinabidding|Chinabidding|CHINABIDDING)[a-z0-9.\\/\\/]{0,40}",
-    "招标网[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}zhaobiao[a-z0-9.\\/\\/]{0,40}",
-    "千里马[\\w\\W]{0,15}[a-z0-9:\\/\\/.]{0,20}qianlima[a-z0-9.\\/\\/]{0,10}",
-    "[\\((]?(网址)?[::;;]?(http|https|htpps)*[::]?(\\/\\/)?(www|jinan|WWW)?.(zhaobiao|chinabidding|Chinabidding|CHINABIDDING|infobidding|zbytb|okcis|qianlima|youzhicai).(com|cn|COM|CN)?(.cn|.CN)?\\/?[\\))]?",
-    "[\\((]?(网址)?(::)?(http|https|htpps)*(:|:)?\\/\\/www.bidcenter.com.cn\\/",
-    "千里马(平台|网站)+", "[“\"]?优质采(平台|电子交易平台|云采购平台|交易平台)?[”\"]?", "《?(中国采购与|中国)?招(投)?标(与采购|采购导航)?网》?",
-    "《?元博网(采购与招标网)?》?", "《?(中国)?招标采购导航网》?", "中\\W{0,3}国采\\W{0,3}招\\W{0,3}网\\W*[((]?(bidcenter.com.cn)?[))]?", "已方宝", "中国招标与采购"]
-addr2 = "http://172.17.4.184:19905"
-username2 = "jybid"
-password2 = "Top2023_JEB01i@31"
-indexb2 = "bidding"
+    addr = "http://172.17.4.184:19805"
+    # addr = "http://172.17.4.184:19805,http://172.17.148.50:19805,http://172.17.145.164:19805"
+    addrp = "http://172.17.145.178:9200" ## 9200单机版,已经切换7.1
+    size = 5
+    username = "es_all"
+    password = "TopJkO2E_d1x"
+    indexb = "bidding"
+    indextmp = "bidding_temporary" ## 临时索引,其他程序需要
+    typeb = "bidding"
+    indexp = "projectset"
+    typep = "projectset"
+    indexwinner = "winner"
+    typewinner = "winner"
+    indexbuyer = "buyer"
+    typebuyer = "buyer"
+    detailfilter = ["(招标网|千里马|采招网|招标采购导航网|招标与采购网|中国招投标网|中国采购与招标网|中国采购与招标|优质采)[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}(qianlima|zhaobiao|okcis|zbytb|infobidding|bidcenter|youzhicai|chinabidding|Chinabidding|CHINABIDDING)[a-z0-9.\\/\\/]{0,40}",
+        "招标网[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}zhaobiao[a-z0-9.\\/\\/]{0,40}",
+        "千里马[\\w\\W]{0,15}[a-z0-9:\\/\\/.]{0,20}qianlima[a-z0-9.\\/\\/]{0,10}",
+        "[\\((]?(网址)?[::;;]?(http|https|htpps)*[::]?(\\/\\/)?(www|jinan|WWW)?.(zhaobiao|chinabidding|Chinabidding|CHINABIDDING|infobidding|zbytb|okcis|qianlima|youzhicai).(com|cn|COM|CN)?(.cn|.CN)?\\/?[\\))]?",
+        "[\\((]?(网址)?(::)?(http|https|htpps)*(:|:)?\\/\\/www.bidcenter.com.cn\\/",
+        "千里马(平台|网站)+", "[“\"]?优质采(平台|电子交易平台|云采购平台|交易平台)?[”\"]?", "《?(中国采购与|中国)?招(投)?标(与采购|采购导航)?网》?",
+        "《?元博网(采购与招标网)?》?", "《?(中国)?招标采购导航网》?", "中\\W{0,3}国采\\W{0,3}招\\W{0,3}网\\W*[((]?(bidcenter.com.cn)?[))]?", "已方宝", "中国招标与采购"]
+    addr2 = "http://172.17.4.184:19905"
+    username2 = "jybid"
+    password2 = "Top2023_JEB01i@31"
+    indexb2 = "bidding"
 
 [mail]
-send = true
-to = "wangjianghan@topnet.net.cn,wangchengcheng@topnet.net.cn,zhangjinkun@topnet.net.cn,zhengkun@topnet.net.cn"
-api = "http://172.17.145.179:19281/_send/_mail"
+    send = true
+    to = "wangjianghan@topnet.net.cn,wangchengcheng@topnet.net.cn,zhangjinkun@topnet.net.cn,zhengkun@topnet.net.cn"
+    api = "http://172.17.145.179:19281/_send/_mail"
+
+[[pre]]  ## 预处理数据 索引配置
+    [pre.19905] ## 华为云集群
+    addr = "http://172.17.4.184:19905"
+    username = "jybid"
+    password = "Top2023_JEB01i@31"
+    #        index = ["bidding"]  ## 生索引的表名,支持多个
+
+    [pre.19805] ## 老集群
+    addr = "http://172.17.4.184:19805"
+    username = "es_all"
+    password = "TopJkO2E_d1x"
+    #        index = ["bidding"]  ## 生索引的表名,支持多个
 
 [env]
     stype = 0 ## 默认0 正式环境;1测试环境。测试环境不会执行定时任务更新采购单位、中标单位、数据检测
+    openpre = true      ## 是否开启预处理流程
+    spectype = "hour"    ## 定时任务类型;正式环境应该是 month 。day 表示每天创建一个索引;month 表示每个月创建一个
+    alias = "bidding_pre"   ## 根据别名删除数据;正式环境应使用bidding
 
 # 日志
 [log]

+ 22 - 3
createEsIndex/common.toml

@@ -1,6 +1,6 @@
 [udp]
     locport = ":17834"
-    jyaddr = "127.0.0.1"
+    jyaddr = "127.0.0.1"  ## 剑鱼信息发布数据
     jyport = 11118
 #    neaddr = "127.0.0.1" ## 转发下个 es地址,
 #    neport = 1784
@@ -9,7 +9,6 @@
 [db.mongoB] ## bidding标讯数据
     addr = "127.0.0.1:27083"
 #    addr = "192.168.3.206:27002"    ## 测试环境
-#    dbname = "wcc"
     dbname = "qfw"
     coll = "bidding"
     size = 15
@@ -80,6 +79,23 @@ username2 = "jybid"
 password2 = "Top2023_JEB01i@31"
 indexb2 = "bidding"
 
+## 预处理数据 索引配置
+[[pre]]
+    [pre.19905] ## 华为云集群
+        addr = "http://127.0.0.1:19905"
+#        addr = "http://172.17.4.184:19905"
+        username = "jybid"
+        password = "Top2023_JEB01i@31"
+#        index = ["bidding"]  ## 生索引的表名,支持多个
+
+    [pre.19805] ## 老集群
+        addr = "http://127.0.0.1:19805"
+#        addr = "http://172.17.4.184:19805"
+        username = "es_all"
+        password = "TopJkO2E_d1x"
+#        index = ["bidding"]  ## 生索引的表名,支持多个
+
+
 
 [mail]
 send = false
@@ -87,7 +103,10 @@ to = "wangjianghan@topnet.net.cn"
 api = "http://172.17.145.179:19281/_send/_mail"
 
 [env]
-    stype = 1 ## 默认0 正式环境;1测试环境。测试环境不会执行定时任务更新采购单位、中标单位、数据检测
+    stype = 1           ## 默认0 正式环境;1测试环境。测试环境不会执行定时任务更新采购单位、中标单位、数据检测
+    openpre = true      ## 是否开启预处理流程
+    spectype = "day"    ## 定时任务类型;正式环境应该是 month 。day 表示每天创建一个索引;month 表示每个月创建一个
+    alias = "bidding_pre"   ## 根据别名删除数据;正式环境应使用bidding
 
 
 [[others]]

+ 13 - 1
createEsIndex/config/conf.go

@@ -45,6 +45,7 @@ type conf struct {
 	Log    log
 	Env    env
 	Others map[string]OthersData
+	Pre    map[string]PreConf
 }
 
 type udp struct {
@@ -115,7 +116,10 @@ type mgo struct {
 
 //env 全局的相关配置
 type env struct {
-	Stype int //默认0,正式环境;1是测试环境,不会执行定时任务更新采购单位、中标单位、数据检测
+	Stype    int  //默认0,正式环境;1是测试环境,不会执行定时任务更新采购单位、中标单位、数据检测
+	OpenPre  bool //默认关闭,不开启预处理流程
+	Alias    string
+	SpecType string
 }
 
 type es struct {
@@ -145,6 +149,14 @@ type es struct {
 	Indexb2   string
 }
 
+//PreConf 预处理 配置
+type PreConf struct {
+	Addr     string
+	Username string
+	Password string
+	Index    []string
+}
+
 type OthersData struct {
 	MgoAddr     string
 	MgoDB       string

+ 1 - 2
createEsIndex/go.mod

@@ -3,12 +3,11 @@ module esindex
 go 1.16
 
 require (
-	github.com/BurntSushi/toml v1.2.0
 	github.com/aliyun/aliyun-oss-go-sdk v2.2.5+incompatible
 	github.com/olivere/elastic/v7 v7.0.32
 	github.com/robfig/cron v1.2.0
 	github.com/spf13/viper v1.15.0
 	go.mongodb.org/mongo-driver v1.10.2
 	go.uber.org/zap v1.23.0
-	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230919095552-4cb37e2b9caf
+	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20231122020338-4956718a7e9e
 )

+ 4 - 4
createEsIndex/go.sum

@@ -1371,10 +1371,10 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
 honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
 honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
-jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230712115659-b418d6181de3 h1:kgtSaRR/hRunxM6Kxi66REk7f2PqN1u56j/V+8FfPW8=
-jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230712115659-b418d6181de3/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=
-jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230919095552-4cb37e2b9caf h1:7RYFbRUmw5Yug9x85AgcFfGuRsdePk635j5BA+VBE2U=
-jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230919095552-4cb37e2b9caf/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20231118071819-0a80eb1ae4cc h1:LSYmjPaV6QGcjLW+r2AqOpFlupug+m0BOOWBtZw9+As=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20231118071819-0a80eb1ae4cc/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20231122020338-4956718a7e9e h1:oXoOPTtZIfnvce0ulokHBSjXqhLiN6DPVOYTm3V6z8U=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20231122020338-4956718a7e9e/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=
 rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
 rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
 rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=

+ 32 - 0
createEsIndex/init.go

@@ -21,6 +21,7 @@ var (
 	ProjectListF      = make(map[string]string, 200)
 	BiddingField      = make(map[string]string, 200)       //bidding_processing_field, level=1 最外层字段,
 	BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
+	PreProcessField   = make(map[string]string, 500)       //预处理流程 bidding字段
 )
 
 // InitLog @Description
@@ -204,6 +205,37 @@ func InitField() {
 	log.Info("InitField", zap.Any("duration", time.Since(now).Seconds()))
 }
 
+//InitPreProcessField 预处理阶段字段
+func InitPreProcessField() {
+	now := time.Now()
+	info, _ := MgoB.Find("bidding_processing_field", `{"stype": "pre_process"}`, nil, nil, false, -1, -1)
+	if len(*info) > 0 {
+		for _, m := range *info {
+			if util.IntAll(m["level"]) == 1 {
+				PreProcessField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
+			}
+		}
+	}
+	log.Info("InitPreProcessField", zap.Int("PreProcessField", len(ProjectField)))
+	log.Info("InitPreProcessField", zap.Any("duration", time.Since(now).Seconds()))
+}
+
+//InitPreEsClient 实例化预处理 索引客户端
+func InitPreEsClient() {
+	if len(config.Conf.Pre) > 0 {
+		for k, v := range config.Conf.Pre {
+			cli := &elastic.Elastic{
+				S_esurl:  v.Addr,
+				I_size:   30,
+				Username: v.Username,
+				Password: v.Password,
+			}
+			cli.InitElasticSize()
+			PreEs[k] = cli
+		}
+	}
+}
+
 //InitEsBiddingField 初始化 bidding 索引字段
 func InitEsBiddingField() {
 	now := time.Now()

+ 13 - 1
createEsIndex/main.go

@@ -28,6 +28,7 @@ var (
 	Mysql *mysqldb.Mysql
 
 	Es, Es1, Es2 *elastic.Elastic
+	PreEs        = make(map[string]*elastic.Elastic, 0) //预处理 索引客户端
 
 	UdpClient  udp.UdpClient
 	UdpTaskMap = &sync.Map{}
@@ -63,9 +64,15 @@ func init() {
 	InitMgo()
 	InitEs()
 	InitField()
+
+	if config.Conf.Env.OpenPre {
+		InitPreProcessField()
+		InitPreEsClient()
+	}
+
 	InitEsBiddingField()
 	oss.InitOss()
-	//verifyESFields() //检测es 字段类型
+	verifyESFields() //检测es 字段类型
 
 	JyUdpAddr = &net.UDPAddr{
 		IP:   net.ParseIP(config.Conf.Udp.JyAddr),
@@ -99,6 +106,11 @@ func main() {
 	go SaveProjectEs()
 
 	go SaveBidErr()
+	//添加预处理函数
+	if config.Conf.Env.OpenPre {
+		go SavePreEsMethod()
+		go dealPreProcess()
+	}
 
 	UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
 	UdpClient.Listen(processUdpMsg)

+ 145 - 0
createEsIndex/pre_bidding.go

@@ -0,0 +1,145 @@
+package main
+
+import (
+	"esindex/config"
+	"fmt"
+	"go.uber.org/zap"
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"strconv"
+	"time"
+)
+
+var (
+	savePreEsPool = make(chan map[string]interface{}, 5000) //保存binding_pre 数据到es
+	savePreEsSp   = make(chan bool, 5)
+)
+
+/*
+	预处理数据,先读取bidding_nomal bidding_file 数据生索引,先不判重、读取附件等
+	读取 bidding_extract 数据,读取完打标签,is_pre = 1,表示已经生了索引
+*/
+
+//dealPreProcess 处理预处理数据
+func dealPreProcess() {
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
+
+	where := map[string]interface{}{
+		"is_pre": map[string]interface{}{
+			"$exists": 0,
+		},
+	}
+
+	for {
+		it := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding_extract").Find(&where).Select(nil).Iter()
+		c1, index := 0, 0
+		for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
+			if len(tmp) == 0 {
+				continue
+			}
+			id := mongodb.BsonIdToSId(tmp["_id"])
+			newTmp, update := GetEsField(tmp, "pre_process")
+
+			insert := make(map[string]interface{}, 0)
+			for k, _ := range PreProcessField {
+				if data, ok := newTmp[k]; ok {
+					insert[k] = data
+				}
+			}
+
+			if len(insert) > 0 {
+				insert["dataweight"] = 0 //索引数据新增 jy置顶字段
+				insert["_id"] = insert["id"]
+				//针对中国政府采购网,单独处理
+				if util.ObjToString(tmp["site"]) == "中国政府采购网" {
+					objectType := MatchService(tmp)
+					if objectType != "" {
+						insert["object_type"] = objectType
+					}
+				}
+				update["is_pre"] = 1
+
+				bs := MgoB.UpdateById("bidding_extract", id, map[string]interface{}{
+					"$set": update,
+				})
+
+				if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
+					tmp = make(map[string]interface{})
+					continue
+				}
+				//针对产权数据,暂时不入es 索引库
+				if util.IntAll(tmp["infoformat"]) == 3 {
+					continue
+				}
+
+				if bs {
+					index++
+					savePreEsPool <- insert
+				}
+			}
+
+		}
+
+		if c1 > 0 {
+			log.Info("dealPreProcess - over", zap.Int("count", c1), zap.Int("index", index))
+		}
+
+		time.Sleep(time.Second * 10)
+
+	}
+
+}
+
+//SavePreEsMethod 保存到bidding_pre
+func SavePreEsMethod() {
+	arru := make([]map[string]interface{}, EsBulkSize)
+	indexu := 0
+	for {
+		month := int(time.Now().Month())
+		monthStr := strconv.Itoa(month)
+		year := time.Now().Year()
+		yearStr := strconv.Itoa(year)
+		//预处理索引名称
+		preBiddingIndex := fmt.Sprintf("bidding_%s%s", yearStr, monthStr)
+		if config.Conf.Env.SpecType == "day" {
+			preBiddingIndex = preBiddingIndex + strconv.Itoa(time.Now().Day())
+		} else if config.Conf.Env.SpecType == "hour" {
+			preBiddingIndex = preBiddingIndex + strconv.Itoa(time.Now().Day()) + strconv.Itoa(time.Now().Hour())
+		}
+
+		select {
+		case v := <-savePreEsPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == EsBulkSize {
+				savePreEsSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-savePreEsSp
+					}()
+					for _, client := range PreEs {
+						client.BulkSave(preBiddingIndex, arru)
+					}
+				}(arru)
+				arru = make([]map[string]interface{}, EsBulkSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				savePreEsSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-savePreEsSp
+					}()
+					for _, client := range PreEs {
+						client.BulkSave(preBiddingIndex, arru)
+					}
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, EsBulkSize)
+				indexu = 0
+			}
+		}
+	}
+}

+ 75 - 0
createEsIndex/utils.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"encoding/json"
+	"esindex/config"
 	"fmt"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
@@ -12,7 +13,9 @@ import (
 	"net"
 	"regexp"
 	"sort"
+	"strconv"
 	"strings"
+	"time"
 	"unicode"
 	"unicode/utf8"
 )
@@ -426,3 +429,75 @@ func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
 	UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
 	log.Info("SendUdpMsg", zap.Any("data", data), zap.Any("target", target))
 }
+
+//deletePreEsData 删除预处理索引数据
+func deletePreEsData(preId string) {
+	now := time.Now()
+	month := int(time.Now().Month())
+	monthStr := strconv.Itoa(month)
+	year := time.Now().Year()
+	yearStr := strconv.Itoa(year)
+	//当前处理索引名称
+	preBiddingIndex := fmt.Sprintf("bidding_%s%s", yearStr, monthStr)
+	lastIndex := ""
+
+	//按小时创建
+	if config.Conf.Env.SpecType == "hour" {
+		preBiddingIndex = preBiddingIndex + strconv.Itoa(now.Day()) + strconv.Itoa(now.Hour())
+		last := now.Add(-time.Hour)
+		month2 := int(last.Month())
+		monthStr2 := strconv.Itoa(month2)
+		year2 := last.Year()
+		yearStr2 := strconv.Itoa(year2)
+		dayStr2 := strconv.Itoa(last.Day())
+		//上个索引名称
+		lastIndex = "bidding_" + yearStr2 + monthStr2
+		lastIndex = lastIndex + dayStr2 + strconv.Itoa(last.Hour())
+
+	} else if config.Conf.Env.SpecType == "day" {
+		//按天创建
+		preBiddingIndex = preBiddingIndex + strconv.Itoa(time.Now().Day())
+		last := now.AddDate(0, 0, -1)
+		month2 := int(last.Month())
+		monthStr2 := strconv.Itoa(month2)
+		year2 := last.Year()
+		yearStr2 := strconv.Itoa(year2)
+		dayStr2 := strconv.Itoa(last.Day())
+		//上个索引名称
+		lastIndex = "bidding_" + yearStr2 + monthStr2
+		lastIndex = lastIndex + dayStr2
+
+	} else if config.Conf.Env.SpecType == "month" {
+		// 月份;
+		last := now.AddDate(0, -1, 0)
+		month2 := int(last.Month())
+		monthStr2 := strconv.Itoa(month2)
+		year2 := last.Year()
+		yearStr2 := strconv.Itoa(year2)
+		//上个索引名称
+		lastIndex = "bidding_" + yearStr2 + monthStr2
+	}
+
+	//删除预处理 索引数据
+	if len(PreEs) == 0 {
+		time.Sleep(time.Second)
+	}
+	for _, client := range PreEs {
+		if client == nil {
+			continue
+		}
+		// 老索引有数据
+		if client.Count(lastIndex, nil) > 0 {
+			err := client.DeleteByID(lastIndex, preId)
+			if err != nil {
+				fmt.Println("deletePreEsData: ", preId, err)
+			}
+		}
+
+		err := client.DeleteByID(preBiddingIndex, preId)
+		if err != nil {
+			fmt.Println("deletePreEsData: ", preId, err)
+		}
+	}
+
+}