Browse Source

添加 索引类型检测,peojectset 添加存量 更新

wcc 2 years ago
parent
commit
1ef35dcba6

+ 1 - 0
createEsIndex/bidding_es.go

@@ -146,6 +146,7 @@ func biddingTask(mapInfo map[string]interface{}) {
 	//log.Info("bidding index es over", zap.Any("es", next), zap.String("mapinfo", string(datas)))
 }
 
+//biddingAllTask 补充存量数据
 func biddingAllTask(mapInfo map[string]interface{}) {
 	defer util.Catch()
 

+ 1 - 1
createEsIndex/biddingall.toml

@@ -42,7 +42,7 @@ num = 50
 
     [all.08]
     coll = "bidding"
-    gtid = "6155df000000000000000000"
+    gtid = "621cf1800000000000000000"
     lteid = "62bdc8800000000000000000" ## 2022.7.1 18373938
 
     [all.09]

+ 11 - 9
createEsIndex/common.toml

@@ -6,8 +6,9 @@
 [db]
 [db.mongoB]
     addr = "127.0.0.1:27017"
-#    addr = "192.168.3.207:29099"    ## 测试环境
+#    addr = "192.168.3.206:27002"    ## 测试环境
     dbname = "wcc"
+#    dbname = "qfw_data"
     coll = "bidding"
     size = 15
 #    user = "root"
@@ -36,12 +37,13 @@
     accesssecret = "Bk98FsbPYXcJe72n1bG3Ssf73acuNh"
     bucketname = "topjy"
 [db.es]
-    addr = "http://192.168.3.149:9200" ## 正常bidding 链接
-    addrp = "http://127.0.0.1:9200"  ## biddingall 链接
-#    username = "elastic"
-#    password = "123456"
+    addr = "http://127.0.0.1:19805" ## 正常bidding 链接
+    addrp = "http://172.17.145.178:9200" ## 采集使用的单机版地址
+    username = "es_all"
+    password = "TopJkO2E_d1x"
     size = 5
-    indexb = "wcc_test"
+    indexb = "bidding"
+    indextmp = "bidding_temporary" ## 临时索引,其他程序需要
 #    typeb = "bidding"
     indexp = "projectset"
 #    typep = "projectset"
@@ -107,7 +109,7 @@ format = "text"
 #"projectname" = "string"
 #"bidstatus" = "string"
 #"buyerclass" = "string"
-#"topscopeclass" = ""
+"topscopeclass" = ""
 #"s_topscopeclass" = "string"
 #"s_subscopeclass" = "string"
 #"area" = "string"
@@ -125,9 +127,9 @@ format = "text"
 #"spidercode" = "string"
 #"subtype" = "string"
 #"toptype" = "string"
-#"projectinfo" = ""
+#"projectinfo" = "" ## 废弃
 #"purchasing" = "string"
-#"purchasinglist" = ""
+"purchasinglist" = ""
 #"channel" = "string"
 #"winnerorder" = ""
 #"project_scale" = "string"

+ 1 - 0
createEsIndex/config/conf.go

@@ -98,6 +98,7 @@ type es struct {
 	IndexBuyer   string
 	TypeBuyer    string
 	DetailFilter []string
+	IndexTmp     string
 	//FieldEs              map[string]interface{}
 	//FieldPurchasingist  map[string]interface{}
 	//FieldProcurementList map[string]interface{}

+ 73 - 58
createEsIndex/es_test.go

@@ -1,11 +1,14 @@
 package main
 
 import (
+	"app.yhyue.com/data_processing/common_utils/log"
 	"context"
 	"encoding/json"
+	"esindex/config"
 	"fmt"
 	"github.com/olivere/elastic/v7"
-	"reflect"
+	"go.uber.org/zap"
+	"strings"
 	"testing"
 )
 
@@ -37,11 +40,12 @@ func TestMatchService(t *testing.T) {
 
 func TestGetMappting(t *testing.T) {
 	client, _ := elastic.NewClient(
-		elastic.SetURL("http://192.168.3.149:9200"),
+		elastic.SetURL(config.Conf.DB.Es.Addr),
+		elastic.SetBasicAuth(config.Conf.DB.Es.Username, config.Conf.DB.Es.Password),
 		elastic.SetSniff(false),
 	)
 
-	index := "wcc_test"
+	index := config.Conf.DB.Es.IndexB
 	// 获取 Elasticsearch 索引的 mapping 信息
 	mapping, err := client.GetMapping().Index(index).Do(context.Background())
 	if err != nil {
@@ -49,74 +53,85 @@ func TestGetMappting(t *testing.T) {
 		return
 	}
 
-	properties := mapping[index].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})
-	fmt.Println(properties)
+	indexName, _ := GetIndexName(client, index)
+	properties := mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})
 
 	var errField = make([]string, 0)
-
-	//for _, v := range BiddingField {
-	//	if v != "" {
+	var okField = make([]string, 0)
+	var analyzerMap = make(map[string]string) // 分词信息
+	var esMap = make(map[string]string)       //存储es 字段类型
 	//
-	//	}
-	//}
-	for k, v := range properties {
-		b, ok := BiddingField[k]
-		if ok {
-			mappingType := v.(map[string]interface{})["type"].(string)
-			analyzer := v.(map[string]interface{})["analyzer"].(string)
-			//分词器不为空
-			if analyzer != "" {
-
-			}
-
-			//="",表示二级
-			if b == "" {
+	for field, ftype := range BiddingField {
+		eftypeMap, _ := properties[field].(map[string]interface{})
+		var etype string
+		var analyzer string
+		if fftype, ok := eftypeMap["type"]; ok {
+			etype = fftype.(string)
+			esMap[field] = etype
+		}
+		if ffanalyzer, ok := eftypeMap["analyzer"]; ok {
+			analyzer = ffanalyzer.(string)
+			analyzerMap[field] = analyzer
+		}
 
-				fmt.Println(111)
+		if ftype != "" {
+			if chargeType(ftype, etype) {
+				okField = append(okField, field)
 			} else {
-				if mappingType == "keyword" || mappingType == "text" {
-					if b != "string" {
-						errField = append(errField, k)
-					}
-				} else if mappingType == "boolean" {
-					if b != "bool" {
-						errField = append(errField, k)
+				errField = append(errField, field)
+			}
+		} else {
+			if field == "_id" {
+				continue
+			} else if field == "purchasinglist" || field == "package" || field == "winnerorder" || field == "procurementlist" {
+				if eproperties, ok := eftypeMap["properties"]; ok {
+					if eproMap, ok := eproperties.(map[string]interface{}); ok {
+						for k, v := range eproMap {
+							if innerMap, ok := v.(map[string]interface{}); ok {
+								if innerType, ok := innerMap["type"]; ok {
+									innerLevel := BiddingLevelField[field]
+									esMap[fmt.Sprintf("%s.%s", field, k)] = innerType.(string)
+									if chargeType(innerLevel[k], innerType.(string)) {
+										okField = append(okField, fmt.Sprintf("%s.%s", field, k))
+									} else {
+										errField = append(errField, fmt.Sprintf("%s.%s", field, k))
+									}
+								}
+							}
+						}
 					}
-				} else if mappingType == "" {
-
 				}
-				fmt.Println(222)
 			}
+		}
+	}
 
+	if len(errField) > 0 {
+		log.Info("test", zap.Int("错误字段数量", len(errField)))
+		for _, field := range errField {
+			if strings.Contains(field, ".") {
+				fe := strings.Split(field, ".")
+				log.Info(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingLevelField[fe[0]][fe[1]]), esMap[field]))
+			} else {
+				log.Info(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingField[field]), esMap[field]))
+			}
 		}
+	} else {
+		log.Info("es 字段类型检测结束,", zap.Int("所有字段都符合,检测字段数量为:", len(okField)))
 	}
+
 }
 
-func getGoVarType(esMappingType string) reflect.Type {
-	var goVarType reflect.Type
+func TestGetIndexName(t *testing.T) {
+	client, _ := elastic.NewClient(
+		elastic.SetURL(config.Conf.DB.Es.Addr),
+		elastic.SetBasicAuth(config.Conf.DB.Es.Username, config.Conf.DB.Es.Password),
+		elastic.SetSniff(false),
+	)
 
-	switch esMappingType {
-	case "text", "keyword", "ip", "geo_point", "date", "binary":
-		goVarType = reflect.TypeOf("")
-	case "long":
-		goVarType = reflect.TypeOf(int64(0))
-	case "integer":
-		goVarType = reflect.TypeOf(int32(0))
-	case "short":
-		goVarType = reflect.TypeOf(int16(0))
-	case "byte":
-		goVarType = reflect.TypeOf(int8(0))
-	case "double":
-		goVarType = reflect.TypeOf(0.0)
-	case "float":
-		goVarType = reflect.TypeOf(float32(0.0))
-	case "boolean":
-		goVarType = reflect.TypeOf(false)
-	case "nested":
-		goVarType = reflect.TypeOf(make(map[string]interface{}))
-	default:
-		goVarType = reflect.TypeOf(nil)
-	}
+	index := "bidding_v2"
+	//index := config.Conf.DB.Es.IndexB
 
-	return goVarType
+	name, _ := GetIndexName(client, index)
+	fmt.Println("name ->", name)
+	fmt.Println(name)
 }

+ 1 - 0
createEsIndex/go.mod

@@ -6,6 +6,7 @@ require (
 	app.yhyue.com/data_processing/common_utils v0.0.0-20230427103005-4289580ee061
 	github.com/BurntSushi/toml v1.2.0
 	github.com/aliyun/aliyun-oss-go-sdk v2.2.5+incompatible
+	github.com/olivere/elastic/v7 v7.0.32
 	github.com/robfig/cron v1.2.0
 	github.com/spf13/viper v1.15.0
 	go.mongodb.org/mongo-driver v1.10.2

+ 122 - 0
createEsIndex/init.go

@@ -5,10 +5,13 @@ import (
 	"app.yhyue.com/data_processing/common_utils/elastic"
 	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
+	"context"
 	"esindex/config"
 	"fmt"
+	es7 "github.com/olivere/elastic/v7"
 	"go.uber.org/zap"
 	"os"
+	"strings"
 )
 
 var (
@@ -97,6 +100,7 @@ func InitField() {
 	log.Info("InitField", zap.Int("ProjectField", len(ProjectField)), zap.Int("ProjectListF", len(ProjectListF)))
 }
 
+//InitEsBiddingField 初始化 bidding 索引字段
 func InitEsBiddingField() {
 	info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
 	if len(*info) > 0 {
@@ -117,3 +121,121 @@ func InitEsBiddingField() {
 	log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
 	log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
 }
+
+//verifyESFields 验证es 定义字段类型和 MongoDB 数据字段
+func verifyESFields() {
+	log.Info("verifyESFields", zap.String("开始类型检测", ""))
+	client, _ := es7.NewClient(
+		es7.SetURL(config.Conf.DB.Es.Addr),
+		es7.SetBasicAuth(config.Conf.DB.Es.Username, config.Conf.DB.Es.Password),
+		es7.SetSniff(false),
+	)
+	index := config.Conf.DB.Es.IndexB //索引表 bidding
+	// 获取 Elasticsearch 索引的 mapping 信息
+	mapping, err := client.GetMapping().Index(index).Do(context.Background())
+	if err != nil {
+		log.Info("verifyESFields", zap.Any("getting Elasticsearch mapping:", err))
+	}
+
+	indexName, _ := GetIndexName(client, index)
+
+	if indexName == "" {
+		log.Info("verifyESFields", zap.String("索引不存在,请检查索引", index))
+		return
+	}
+	properties := mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})
+
+	var errField = make([]string, 0)
+	var okField = make([]string, 0)
+	var analyzerMap = make(map[string]string) // 分词信息
+	var esMap = make(map[string]string)       //存储es 字段类型
+	//
+	for field, ftype := range BiddingField {
+		eftypeMap, _ := properties[field].(map[string]interface{})
+		var etype string
+		var analyzer string
+		if fftype, ok := eftypeMap["type"]; ok {
+			etype = fftype.(string)
+			esMap[field] = etype
+		}
+		if ffanalyzer, ok := eftypeMap["analyzer"]; ok {
+			analyzer = ffanalyzer.(string)
+			analyzerMap[field] = analyzer
+		}
+
+		if ftype != "" {
+			if chargeType(ftype, etype) {
+				okField = append(okField, field)
+			} else {
+				errField = append(errField, field)
+			}
+		} else {
+			if field == "_id" {
+				continue
+			} else if field == "purchasinglist" || field == "package" || field == "winnerorder" || field == "procurementlist" {
+				if eproperties, ok := eftypeMap["properties"]; ok {
+					if eproMap, ok := eproperties.(map[string]interface{}); ok {
+						for k, v := range eproMap {
+							if innerMap, ok := v.(map[string]interface{}); ok {
+								if innerType, ok := innerMap["type"]; ok {
+									innerLevel := BiddingLevelField[field]
+									esMap[fmt.Sprintf("%s.%s", field, k)] = innerType.(string)
+									if chargeType(innerLevel[k], innerType.(string)) {
+										okField = append(okField, fmt.Sprintf("%s.%s", field, k))
+									} else {
+										errField = append(errField, fmt.Sprintf("%s.%s", field, k))
+									}
+								}
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+
+	if len(errField) > 0 {
+		log.Info("verifyESFields", zap.Int("错误字段数量", len(errField)))
+		for _, field := range errField {
+			if strings.Contains(field, ".") {
+				fe := strings.Split(field, ".")
+				log.Info(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingLevelField[fe[0]][fe[1]]), esMap[field]))
+			} else {
+				log.Info(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingField[field]), esMap[field]))
+			}
+		}
+	} else {
+		log.Info("es 字段类型检测结束,", zap.Int("所有字段都符合,检测字段数量为:", len(okField)))
+	}
+
+}
+
+func GetIndexName(client *es7.Client, name string) (string, error) {
+	// 判断 name 是否为一个别名
+	res, err := client.Aliases().Alias(name).Do(context.Background())
+	if err != nil {
+		// 错误处理
+		return "", err
+	}
+
+	for k, v := range res.Indices {
+		for _, vv := range v.Aliases {
+			if vv.AliasName == name {
+				return k, nil
+			}
+		}
+	}
+
+	// 判断 name 是否为一个正式索引名称
+	resa, err := client.IndexExists(name).Do(context.Background())
+	if err != nil {
+		// 错误处理
+		return "", err
+	}
+	if resa {
+		return name, nil
+	}
+
+	// 如果 name 既不是别名,也不是正式索引名称,则返回空字符串
+	return "", nil
+}

+ 18 - 3
createEsIndex/main.go

@@ -37,7 +37,7 @@ var (
 	saveEsSp          = make(chan bool, 5)
 	saveProjectEsPool = make(chan map[string]interface{}, 5000) //保存project数据到es
 	saveProjectSp     = make(chan bool, 5)
-	saveEsAllPool     = make(chan map[string]interface{}, 5000)
+	saveEsAllPool     = make(chan map[string]interface{}, 5000) //存储单机版es,爬虫采集判重使用
 	saveEsAllSp       = make(chan bool, 5)
 
 	//saveErrBidPool = make(chan map[string]interface{}, 5000)
@@ -57,6 +57,7 @@ func init() {
 	InitField()
 	InitEsBiddingField()
 	oss.InitOss()
+	verifyESFields() //检测es 字段类型
 
 	JyUdpAddr = &net.UDPAddr{
 		IP:   net.ParseIP(config.Conf.Udp.JyAddr),
@@ -120,7 +121,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					biddingTask(mapInfo)
 				}()
-			case "biddingall":
+			case "biddingall": //补充存量数据
 				pool <- true
 				go func() {
 					defer func() {
@@ -153,7 +154,15 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					projectTask(data, mapInfo)
 				}()
-			case "biddingdata": //bidding全量数据
+			case "project_all_data": //存量 projectset 数据
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					projectAllData()
+				}()
+			case "biddingdata": //es 单机版,采集判重
 				pool <- true
 				go func() {
 					defer func() {
@@ -299,6 +308,9 @@ func SaveEsMethod() {
 						<-saveEsSp
 					}()
 					Es.BulkSave(config.Conf.DB.Es.IndexB, arru)
+					if config.Conf.DB.Es.IndexTmp != "" {
+						Es.BulkSave(config.Conf.DB.Es.IndexTmp, arru)
+					}
 				}(arru)
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0
@@ -311,6 +323,9 @@ func SaveEsMethod() {
 						<-saveEsSp
 					}()
 					Es.BulkSave(config.Conf.DB.Es.IndexB, arru)
+					if config.Conf.DB.Es.IndexTmp != "" {
+						Es.BulkSave(config.Conf.DB.Es.IndexTmp, arru)
+					}
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0

+ 257 - 0
createEsIndex/projectall.go

@@ -0,0 +1,257 @@
+package main
+
+import (
+	util "app.yhyue.com/data_processing/common_utils"
+	"app.yhyue.com/data_processing/common_utils/log"
+	"app.yhyue.com/data_processing/common_utils/mongodb"
+	"fmt"
+	"github.com/spf13/viper"
+	"go.uber.org/zap"
+	"math"
+	"reflect"
+	"strconv"
+	"sync"
+)
+
+//projectAllData 处理配置文件的project存量数据
+func projectAllData() {
+	type Biddingall struct {
+		Coll  string
+		Gtid  string
+		Lteid string
+	}
+	type RoutinesConf struct {
+		Num int
+	}
+	type AllConf struct {
+		All      map[string]Biddingall
+		Routines RoutinesConf
+	}
+	var all AllConf
+
+	viper.SetConfigFile("projectall.toml")
+	viper.SetConfigName("projectall") // 配置文件名称(无扩展名)
+	viper.SetConfigType("toml")       // 如果配置文件的名称中没有扩展名,则需要配置此项
+	viper.AddConfigPath("./")
+	err := viper.ReadInConfig() // 查找并读取配置文件
+	if err != nil {             // 处理读取配置文件的错误
+		fmt.Println("ReadInConfig err =>", err)
+		return
+	}
+	err = viper.Unmarshal(&all)
+	if err != nil {
+		fmt.Println("biddingAllDataTask Unmarshal err =>", err)
+		return
+	}
+
+	for k, conf := range all.All {
+		go dealProject(conf.Coll, conf.Gtid, conf.Lteid, k, all.Routines.Num)
+	}
+
+}
+
+func dealProject(coll, gtid, lteid, kword string, routines int) {
+	ch := make(chan bool, routines)
+	wg := &sync.WaitGroup{}
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  mongodb.StringTOBsonId(gtid),
+			"$lte": mongodb.StringTOBsonId(lteid),
+		},
+	}
+
+	conn := MgoP.GetMgoConn()
+	defer MgoP.DestoryMongoConn(conn)
+
+	count, _ := conn.DB(MgoP.DbName).C(coll).Find(&q).Count()
+	log.Info("dealProject", zap.Int64(kword, count))
+
+	query := conn.DB(MgoP.DbName).C(coll).Find(q).Iter()
+	c1, index := 0, 0
+
+	for tmp := make(map[string]interface{}); query.Next(tmp); c1++ {
+		if c1%20000 == 0 {
+			log.Info(kword, zap.Int("current:", c1))
+			log.Info(kword, zap.Any("current:_id =>", tmp["_id"]))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+
+			newTmp := make(map[string]interface{})
+			newTmp["s_projectname"] = tmp["projectname"]
+			for f, ftype := range ProjectField {
+				if tmp[f] != nil {
+					if f == "package" {
+						pp := map[string]map[string]interface{}{}
+						if packages, ok := tmp["package"].(map[string]interface{}); ok {
+							for _, pks := range packages {
+								if pk, ok := pks.([]interface{}); ok {
+									for _, v := range pk {
+										if p, ok := v.(map[string]interface{}); ok {
+											winner := util.ObjToString(p["winner"])
+											bidamount := util.Float64All((p["bidamount"]))
+											if len(winner) > 4 && bidamount > 0 {
+												p := map[string]interface{}{
+													"winner":    winner,
+													"bidamount": bidamount,
+												}
+												pp[winner] = p
+											}
+										}
+									}
+								}
+							}
+						} else {
+							winner := util.ObjToString(tmp["winner"])
+							bidamount := util.Float64All(tmp["bidamount"])
+							if len(winner) > 4 && bidamount > 0 {
+								p := map[string]interface{}{
+									"winner":    winner,
+									"bidamount": bidamount,
+								}
+								pp[winner] = p
+							}
+						}
+						pk1 := []map[string]interface{}{}
+						for _, v := range pp {
+							pk1 = append(pk1, v)
+						}
+						if len(pk1) > 0 {
+							newTmp["package1"] = pk1
+						}
+					} else if f == "topscopeclass" {
+						if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
+							tc := []string{}
+							m2 := map[string]bool{}
+							for _, v := range topscopeclass {
+								str := util.ObjToString(v)
+								str = regLetter.ReplaceAllString(str, "") // 去除字母
+								if !m2[str] {
+									m2[str] = true
+									tc = append(tc, str)
+								}
+							}
+							newTmp["topscopeclass"] = tc
+						}
+					} else if f == "list" {
+						if list, ok := tmp[f].([]interface{}); ok {
+							var newList []map[string]interface{}
+							for _, item := range list {
+								item1 := item.(map[string]interface{})
+								listm := make(map[string]interface{})
+								for f1, ftype1 := range ProjectListF {
+									if item1[f1] != nil {
+										if f == "topscopeclass" || f == "subscopeclass" {
+											listm[f] = item1[f1]
+										} else {
+											if fieldval := item1[f1]; reflect.TypeOf(fieldval).String() != ftype1 {
+												continue
+											} else {
+												if fieldval != "" {
+													listm[f1] = fieldval
+												}
+											}
+										}
+									}
+								}
+								newList = append(newList, listm)
+							}
+							newTmp[f] = newList
+						}
+					} else if f == "budget" || f == "bidamount" || f == "sortprice" {
+						if tmp[f] != nil && util.Float64All(tmp[f]) <= 1000000000 {
+							newTmp[f] = tmp[f]
+						}
+					} else if f == "projectscope" {
+						projectscopeRune := []rune(util.ObjToString(tmp[f]))
+						if len(projectscopeRune) > 1000 {
+							newTmp[f] = util.ObjToString(tmp[f])[:1000]
+						} else {
+							newTmp[f] = tmp[f]
+						}
+					} else if f == "ids" || f == "mpc" || f == "mpn" || f == "review_experts" || f == "winnerorder" ||
+						f == "entidlist" || f == "first_cooperation" || f == "subscopeclass" {
+						newTmp[f] = tmp[f]
+					} else if f == "_id" {
+						newTmp["_id"] = mongodb.BsonIdToSId(tmp["_id"])
+						newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"])
+					} else {
+						if fieldval := tmp[f]; reflect.TypeOf(fieldval).String() != ftype {
+							continue
+						} else {
+							if fieldval != "" {
+								newTmp[f] = fieldval
+							}
+						}
+					}
+				}
+			}
+
+			budget := util.Float64All(newTmp["budget"])
+			bidamount := util.Float64All(newTmp["bidamount"])
+			if float64(budget) > 0 && float64(bidamount) > 0 {
+				rate := float64(1) - float64(bidamount)/float64(budget)
+				f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
+				//不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
+				if f < 0 || f > 0.6 {
+					delete(newTmp, "bidamount")
+					newTmp["prate_flag"] = 1
+				} else {
+					newTmp["project_rate"] = f
+				}
+			}
+
+			bidopentime := util.Int64All(tmp["bidopentime"]) //开标日期
+			fzb_publishtime := int64(0)                      //记录第一个招标信息的publishtime
+			bidcycle_flag := false                           //判断是否已计算出标书表编制周期
+			list := tmp["list"].([]interface{})
+			for _, m := range list {
+				tmpM := m.(map[string]interface{})
+				if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
+					tmpB := util.Float64All(tmpM["bidamount"])
+					tmpM["bidamount"] = tmpB
+				}
+				//计算bidcycle标书表编制周期字段
+				if !bidcycle_flag && bidopentime > 0 { //bidopentime>0证明list中有bidopentime,无则不用计算bidcycle
+					if toptype := util.ObjToString(tmpM["toptype"]); toptype == "招标" {
+						zb_bidopentime := util.Int64All(tmpM["bidopentime"])
+						zb_publishtime := util.Int64All(tmpM["publishtime"])
+						if zb_publishtime > 0 {
+							if zb_bidopentime > 0 {
+								if tmpTime := zb_bidopentime - zb_publishtime; tmpTime > 0 {
+									f_day := float64(tmpTime) / float64(86400)
+									day := math.Ceil(f_day)
+									tmp["bidcycle"] = int(day)
+									bidcycle_flag = true
+								}
+							}
+							if fzb_publishtime == 0 { //仅赋值第一个招标信息的publishtime
+								fzb_publishtime = zb_publishtime
+							}
+						}
+					}
+				}
+			}
+			//计算bidcycle标书表编制周期字段
+			//list中招标信息中未能计算出bidcycle,用第一个招标信息的fzb_publishtime和外围bidopentime计算
+			if !bidcycle_flag && bidopentime > 0 && fzb_publishtime > 0 {
+				if tmpTime := bidopentime - fzb_publishtime; tmpTime > 0 {
+					f_day := float64(tmpTime) / float64(86400)
+					day := math.Ceil(f_day)
+					newTmp["bidcycle"] = int(day)
+				}
+			}
+			saveProjectEsPool <- newTmp
+			tmp = make(map[string]interface{})
+
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("%s over", kword), zap.Int("count", c1), zap.Int("index", index))
+}

+ 31 - 0
createEsIndex/projectall.toml

@@ -0,0 +1,31 @@
+[routines]  ## 开启协程个数
+num = 20
+
+
+
+[[all]]
+
+[all.01]
+coll = "projectset_20230407"
+gtid = "0"
+lteid = "5d839796a5cb26b9b770bc27" ##
+
+[all.02]
+coll = "projectset_20230407"
+gtid = "5d839796a5cb26b9b770bc27"
+lteid = "60e28e641a75b8f446ee805d" ##
+
+[all.03]
+coll = "projectset_20230407"
+gtid = "60e28e641a75b8f446ee805d"
+lteid = "62d9519d4d0d9b2bc2b402fa" ##
+
+[all.04]
+coll = "projectset_20230407"
+gtid = "62d9519d4d0d9b2bc2b402fa"
+lteid = "6476e4b7eb01e8efa62a676e" ## mongo表最新ID
+
+
+
+
+

+ 34 - 0
createEsIndex/utils.go

@@ -57,3 +57,37 @@ func Float64SliceSum(nums []float64) float64 {
 func Float64Equal1Precision(a, b float64) bool {
 	return int(math.Round(a*10)) == int(math.Round(b*10))
 }
+
+//chargeType 判断mongo 字段类型和 es 字段类型相匹配
+func chargeType(ftype, etype string) bool {
+	if ftype != "" {
+		switch ftype {
+		case "string":
+			if etype == "keyword" || etype == "text" {
+				return true
+			} else {
+				return false
+			}
+		case "bool":
+			if etype == "boolean" {
+				return true
+			} else {
+				return false
+			}
+		case "int64", "int32", "int":
+			if etype == "long" || etype == "integer" {
+				return true
+			} else {
+				return false
+			}
+		case "float64", "float32":
+			if etype == "double" || etype == "float" {
+				return true
+			} else {
+				return false
+			}
+		}
+	}
+
+	return false
+}