Pārlūkot izejas kodu

添加biddingall.toml 配置文件,迁移存量数据

wcc 2 gadi atpakaļ
vecāks
revīzija
017828e148

+ 109 - 1
createEsIndex/bidding_es.go

@@ -8,6 +8,8 @@ import (
 	"encoding/json"
 	"esindex/config"
 	"esindex/oss"
+	"fmt"
+	"github.com/spf13/viper"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
 	"reflect"
@@ -187,7 +189,7 @@ func biddingAllTask(mapInfo map[string]interface{}) {
 				tmp = make(map[string]interface{})
 				return
 			}
-			// 针对17833,需要单独屏蔽这个判断,不需要处理
+			// 针对存量数据,重复数据不进索引
 			if util.IntAll(tmp["extracttype"]) == -1 {
 				return
 			}
@@ -222,6 +224,112 @@ func biddingAllTask(mapInfo map[string]interface{}) {
 	}
 	wg.Wait()
 	log.Info("biddingAllTask over", zap.Int("count", c1), zap.Int("index", index))
+
+}
+
+//biddingAllDataTask 处理配置文件的存量数据
+func biddingAllDataTask() {
+	type Biddingall struct {
+		Coll  string
+		Gtid  string
+		Lteid string
+	}
+	type RoutinesConf struct {
+		num int
+	}
+	type AllConf struct {
+		All     map[string]Biddingall
+		Routine RoutinesConf
+	}
+	var all AllConf
+
+	viper.SetConfigFile("biddingall.toml")
+	viper.SetConfigName("biddingall") // 配置文件名称(无扩展名)
+	viper.SetConfigType("toml")       // 如果配置文件的名称中没有扩展名,则需要配置此项
+	viper.AddConfigPath("./")
+	err := viper.ReadInConfig() // 查找并读取配置文件
+	if err != nil {             // 处理读取配置文件的错误
+		fmt.Println("ReadInConfig err =>", err)
+		return
+	}
+	err = viper.Unmarshal(&all)
+	if err != nil {
+		fmt.Println("biddingAllDataTask Unmarshal err =>", err)
+		return
+	}
+	//fmt.Printf("%+v", all)
+	for k, conf := range all.All {
+		go dealData(conf.Coll, conf.Gtid, conf.Lteid, k)
+	}
+
+}
+
+func dealData(coll, gtid, lteid, kword string) {
+	ch := make(chan bool, 50)
+	wg := &sync.WaitGroup{}
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  mongodb.StringTOBsonId(gtid),
+			"$lte": mongodb.StringTOBsonId(lteid),
+		},
+	}
+	biddingConn := MgoB.GetMgoConn()
+	it := biddingConn.DB(config.Conf.DB.MongoB.Dbname).C(coll).Find(&q).Select(map[string]interface{}{
+		"contenthtml": 0,
+	}).Iter()
+	c1, index := 0, 0
+	var indexLock sync.Mutex
+	for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
+		if c1%20000 == 0 {
+			log.Info(kword, zap.Int("current:", c1))
+			log.Info(kword, zap.Any("current:_id =>", tmp["_id"]))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
+				tmp = make(map[string]interface{})
+				return
+			}
+			// 针对存量数据,重复数据不进索引
+			if util.IntAll(tmp["extracttype"]) == -1 {
+				return
+			}
+
+			//针对产权数据,暂时不入es 索引库
+			if util.IntAll(tmp["infoformat"]) == 3 {
+				return
+			}
+
+			indexLock.Lock()
+			index++
+			indexLock.Unlock()
+			newTmp, update := GetEsField(tmp, "biddingall")
+			//针对中国政府采购网,单独处理
+			if util.ObjToString(tmp["site"]) == "中国政府采购网" {
+				objectType := MatchService(tmp)
+				if objectType != "" {
+					newTmp["object_type"] = objectType
+				}
+			}
+			newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
+			if len(update) > 0 {
+				updateBiddingPool <- []map[string]interface{}{{
+					"_id": tmp["_id"],
+				},
+					{"$set": update},
+				}
+			}
+			saveEsPool <- newTmp
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("%s over", kword), zap.Int("count", c1), zap.Int("index", index))
 }
 
 func biddingTaskById(mapInfo map[string]interface{}) {

+ 4 - 3
createEsIndex/common.toml

@@ -5,7 +5,8 @@
 
 [db]
 [db.mongoB]
-    addr = "127.0.0.1:27017"
+#    addr = "127.0.0.1:27017"
+    addr = "192.168.3.207:29099"    ## 测试环境
     dbname = "wcc"
     coll = "bidding_wcc_random2"
     size = 15
@@ -29,8 +30,8 @@
     password = "root"
 
 [db.oss]
-    endpoint = "oss-cn-beijing-internal.aliyuncs.com"## 正式环境
-  ##  endpoint = "oss-cn-beijing.aliyuncs.com"## 测试环境
+#    endpoint = "oss-cn-beijing-internal.aliyuncs.com"## 正式环境
+    endpoint = "oss-cn-beijing.aliyuncs.com"## 测试环境
     accesskey = "LTAI4G5x9aoZx8dDamQ7vfZi"
     accesssecret = "Bk98FsbPYXcJe72n1bG3Ssf73acuNh"
     bucketname = "topjy"

+ 0 - 1
createEsIndex/config/conf.go

@@ -21,7 +21,6 @@ func Init(conf string) {
 		os.Exit(1)
 	}
 
-	fmt.Println("aa", Conf.DB.Oss)
 }
 
 type conf struct {

+ 1 - 1
createEsIndex/go.mod

@@ -7,7 +7,7 @@ require (
 	github.com/BurntSushi/toml v1.2.0
 	github.com/aliyun/aliyun-oss-go-sdk v2.2.5+incompatible
 	github.com/robfig/cron v1.2.0
+	github.com/spf13/viper v1.15.0
 	go.mongodb.org/mongo-driver v1.10.2
 	go.uber.org/zap v1.23.0
-	golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
 )

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 1148 - 15
createEsIndex/go.sum


+ 8 - 0
createEsIndex/main.go

@@ -128,6 +128,14 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					biddingAllTask(mapInfo)
 				}()
 
+			case "bidding_all_data": //根据biddingall配置文件,存量迁移数据
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					biddingAllDataTask()
+				}()
 			case "bidding_history":
 				pool <- true
 				go func() {

Daži faili netika attēloti, jo izmaiņu fails ir pārāk liels