Bladeren bron

更新索引,支持同时二个集群

wcc 1 jaar geleden
bovenliggende
commit
9bbe37b837

+ 883 - 0
createEsIndex/bidding_es.go.back

@@ -0,0 +1,883 @@
+package main
+
+import (
+	"encoding/json"
+	"esindex/config"
+	"esindex/oss"
+	"fmt"
+	"github.com/spf13/viper"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.uber.org/zap"
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
+	"reflect"
+	"regexp"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+	"unicode/utf8"
+)
+
+var (
+	TimeV1      = regexp.MustCompile("(\\d{4})[年.]?$")
+	TimeV2      = regexp.MustCompile("(\\d{4}[年.\\-/]?)(\\d{1,2}[月.\\-/]?$)")
+	TimeV3      = regexp.MustCompile("(\\d{4}[年.\\-/]?)(\\d{1,2}[月.\\-/]?)(\\d{1,2}日?$)")
+	TimeClear   = regexp.MustCompile("[年|月|日|/|.|-]")
+	filterSpace = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]")
+	date1       = regexp.MustCompile("20[0-2][0-9][年|\\-/.][0-9]{1,2}[月|\\-/.][0-9]{1,2}[日]?")
+	HtmlReg     = regexp.MustCompile("<[^>]+>")
+)
+
+func biddingTask(mapInfo map[string]interface{}) {
+	defer util.Catch()
+
+	stype := util.ObjToString(mapInfo["stype"])
+	if stype == "bidding" {
+		uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])},
+			"lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}}
+		MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 8, "updatetime": time.Now().Unix()}}, false, true)
+	}
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	} else {
+		//针对gte/lte,单独转换
+		q = convertToMongoID(q)
+	}
+
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+
+	//bidding库
+	biddingConn := MgoB.GetMgoConn()
+	count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count()
+	log.Info("bidding表", zap.Int64("同步总数:", count))
+	it := biddingConn.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(&q).Select(map[string]interface{}{
+		"contenthtml": 0,
+	}).Iter()
+	c1, index := 0, 0
+	var indexLock sync.Mutex
+	for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
+		if c1%1000 == 0 {
+			log.Info("biddingTask", zap.Int("current:", c1))
+			log.Info("biddingTask", zap.Any("current:_id =>", tmp["_id"]))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
+				tmp = make(map[string]interface{})
+				return
+			}
+			//只针对增量数据处理;全量数据 需要用extracttype字段判断
+			//7:	重复数据
+			//8:	不重复
+			if util.IntAll(tmp["dataprocess"]) != 8 {
+				return
+			}
+			//// 增量数据使用上面判断;全量数据使用下面配置
+			//-1:重复 ,1:不重复 ,0:入库 9:分类
+			//if util.IntAll(tmp["extracttype"]) != 1 {
+			//	return
+			//}
+
+			//针对产权数据,暂时不入es 索引库
+			if util.IntAll(tmp["infoformat"]) == 3 {
+				return
+			}
+
+			/**
+			数据抽取时,有的数据的发布时间是之前的,属于增量历史数据,在判重和同步到bidding表是,会添加history_updatetime
+			字段,所以下面判断才会处理
+			*/
+			if stype == "bidding_history" && tmp["history_updatetime"] == nil {
+				return
+			}
+			indexLock.Lock()
+			index++
+			indexLock.Unlock()
+			newTmp, update := GetEsField(tmp, stype)
+			newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
+
+			//针对中国政府采购网,单独处理
+			if util.ObjToString(tmp["site"]) == "中国政府采购网" {
+				objectType := MatchService(tmp)
+				if objectType != "" {
+					newTmp["object_type"] = objectType
+				}
+			}
+
+			if len(update) > 0 {
+				updateBiddingPool <- []map[string]interface{}{{
+					"_id": tmp["_id"],
+				},
+					{"$set": update},
+				}
+			}
+			if util.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" {
+				// 剑鱼信息发布数据 通过udp通知信息发布程序
+				go UdpMethod(mongodb.BsonIdToSId(newTmp["_id"]))
+			}
+			saveEsPool <- newTmp
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	log.Info("biddingTask over", zap.Int("count", c1), zap.Int("index", index))
+
+	////发送udp,附件补采 才需要
+	//data := map[string]interface{}{
+	//	"stype": "update",
+	//	"gtid":  mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
+	//	"lteid": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
+	//}
+	//target := &net.UDPAddr{
+	//	Port: 1782,
+	//	IP:   net.ParseIP("127.0.0.1"),
+	//}
+	//bytes, _ := json.Marshal(data)
+	//err := UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
+	//if err != nil {
+	//	log.Info("biddingTask ", zap.Any("WriteUdp err", err), zap.Any("target", target))
+	//}
+	//
+	//log.Info("biddingTask ", zap.Any("target", target), zap.Any("data", data))
+	//
+
+	//
+	//重采平台需要
+	//mapInfo["stype"] = ""
+	//datas, _ := json.Marshal(mapInfo)
+	//var next = &net.UDPAddr{
+	//	IP:   net.ParseIP("127.0.0.1"),
+	//	Port: 1910,
+	//}
+	//log.Info("bidding index es over", zap.Any("es", next), zap.String("mapinfo", string(datas)))
+}
+
+//biddingAllTask 补充存量数据
+func biddingAllTask(mapInfo map[string]interface{}) {
+	defer util.Catch()
+
+	stype := util.ObjToString(mapInfo["stype"])
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	} else {
+		//针对gte/lte,单独转换
+		q = convertToMongoID(q)
+	}
+
+	ch := make(chan bool, 50)
+	wg := &sync.WaitGroup{}
+
+	//bidding库
+	biddingConn := MgoB.GetMgoConn()
+	it := biddingConn.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(&q).Select(map[string]interface{}{
+		"contenthtml": 0,
+	}).Iter()
+	c1, index := 0, 0
+	var indexLock sync.Mutex
+	for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
+		if c1%20000 == 0 {
+			log.Info("biddingAllTask", zap.Int("current:", c1))
+			log.Info("biddingAllTask", zap.Any("current:_id =>", tmp["_id"]))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
+				tmp = make(map[string]interface{})
+				return
+			}
+			// 针对存量数据,重复数据不进索引
+			if util.IntAll(tmp["extracttype"]) == -1 {
+				return
+			}
+
+			//针对产权数据,暂时不入es 索引库
+			if util.IntAll(tmp["infoformat"]) == 3 {
+				return
+			}
+
+			indexLock.Lock()
+			index++
+			indexLock.Unlock()
+			newTmp, update := GetEsField(tmp, stype)
+			//针对中国政府采购网,单独处理
+			if util.ObjToString(tmp["site"]) == "中国政府采购网" {
+				objectType := MatchService(tmp)
+				if objectType != "" {
+					newTmp["object_type"] = objectType
+				}
+			}
+			newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
+			if len(update) > 0 {
+				updateBiddingPool <- []map[string]interface{}{{
+					"_id": tmp["_id"],
+				},
+					{"$set": update},
+				}
+			}
+			saveEsPool <- newTmp
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	log.Info("biddingAllTask over", zap.Int("count", c1), zap.Int("index", index))
+
+}
+
+//biddingAllDataTask 处理配置文件的存量数据
+func biddingAllDataTask() {
+	type Biddingall struct {
+		Coll  string
+		Gtid  string
+		Lteid string
+		Start int
+		End   int
+	}
+	type RoutinesConf struct {
+		Num int
+	}
+	type AllConf struct {
+		All      map[string]Biddingall
+		Routines RoutinesConf
+	}
+	var all AllConf
+
+	viper.SetConfigFile("biddingall.toml")
+	viper.SetConfigName("biddingall") // 配置文件名称(无扩展名)
+	viper.SetConfigType("toml")       // 如果配置文件的名称中没有扩展名,则需要配置此项
+	viper.AddConfigPath("./")
+	err := viper.ReadInConfig() // 查找并读取配置文件
+	if err != nil {             // 处理读取配置文件的错误
+		fmt.Println("ReadInConfig err =>", err)
+		return
+	}
+	err = viper.Unmarshal(&all)
+	if err != nil {
+		fmt.Println("biddingAllDataTask Unmarshal err =>", err)
+		return
+	}
+
+	for k, conf := range all.All {
+		go dealData(conf.Coll, conf.Gtid, conf.Lteid, k, all.Routines.Num, conf.Start)
+	}
+
+}
+
+func dealData(coll, gtid, lteid, kword string, routines int, startID int) {
+	ch := make(chan bool, routines)
+	wg := &sync.WaitGroup{}
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  mongodb.StringTOBsonId(gtid),
+			"$lte": mongodb.StringTOBsonId(lteid),
+		},
+	}
+	biddingConn := MgoB.GetMgoConn()
+	it := biddingConn.DB(config.Conf.DB.MongoB.Dbname).C(coll).Find(&q).Select(map[string]interface{}{
+		"contenthtml": 0,
+	}).Iter()
+	c1, index := 0, 0
+	var indexLock sync.Mutex
+	for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
+		if c1%20000 == 0 {
+			log.Info(kword, zap.Int("current:", c1))
+			log.Info(kword, zap.Any("current:_id =>", tmp["_id"]))
+		}
+		autoid := startID + c1
+		tmp["autoid"] = autoid
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			insert := map[string]interface{}{
+				"autoid": tmp["autoid"],
+				"_id":    tmp["_id"],
+			}
+			MgoB.SaveByOriID("wcc_bidding_id_tmp", insert)
+
+			if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
+				tmp = make(map[string]interface{})
+				return
+			}
+			// 针对存量数据,重复数据不进索引
+			if util.IntAll(tmp["extracttype"]) == -1 {
+				return
+			}
+
+			//针对产权数据,暂时不入es 索引库
+			if util.IntAll(tmp["infoformat"]) == 3 {
+				return
+			}
+
+			indexLock.Lock()
+			index++
+			indexLock.Unlock()
+			newTmp, update := GetEsField(tmp, "biddingall")
+			//针对中国政府采购网,单独处理
+			if util.ObjToString(tmp["site"]) == "中国政府采购网" {
+				objectType := MatchService(tmp)
+				if objectType != "" {
+					newTmp["object_type"] = objectType
+				}
+			}
+			newTmp["autoid"] = tmp["autoid"]
+			newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
+			if len(update) > 0 {
+				updateBiddingPool <- []map[string]interface{}{{
+					"_id": tmp["_id"],
+				},
+					{"$set": update},
+				}
+			}
+			saveEsPool <- newTmp
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("%s over", kword), zap.Int("count", c1), zap.Int("index", index))
+}
+
+func biddingTaskById(mapInfo map[string]interface{}) {
+	defer util.Catch()
+
+	stype := util.ObjToString(mapInfo["stype"])
+	infoid := util.ObjToString(mapInfo["infoid"])
+	tmp, _ := MgoB.FindById(config.Conf.DB.MongoB.Coll, infoid, map[string]interface{}{"contenthtml": 0})
+	if sensitive := util.ObjToString((*tmp)["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
+		return
+	}
+	if util.IntAll((*tmp)["extracttype"]) == 1 {
+		newTmp, update := GetEsField(*tmp, stype)
+		newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
+		if len(update) > 0 {
+			//updateBiddingPool <- []map[string]interface{}{{
+			//	"_id": mongodb.StringTOBsonId(infoid),
+			//},
+			//	{"$set": update},
+			//}
+		}
+		saveEsPool <- newTmp
+	}
+	log.Info("biddingTaskById over", zap.Any("mapInfo", mapInfo))
+}
+
+// GetEsField @Description ES字段
+// @Author J 2022/6/7 11:34 AM
+func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{}, map[string]interface{}) {
+	newTmp := make(map[string]interface{})
+	update := make(map[string]interface{}) // bidding 修改字段
+	saveErr := make(map[string]interface{})
+	//for field, ftype := range config.Conf.DB.Es.FieldEs {
+	for field, ftype := range BiddingField {
+		if tmp[field] != nil { //
+			if field == "purchasinglist" { //标的物处理
+				purchasinglist_new := []map[string]interface{}{}
+				if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 {
+					for _, ls := range pcl {
+						lsm_new := make(map[string]interface{})
+						lsm := ls.(map[string]interface{})
+						for pf, pftype := range BiddingLevelField[field] {
+							lsmv := lsm[pf]
+							if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype {
+								lsm_new[pf] = lsm[pf]
+							}
+						}
+						if lsm_new != nil && len(lsm_new) > 0 {
+							purchasinglist_new = append(purchasinglist_new, lsm_new)
+						}
+					}
+				}
+				if len(purchasinglist_new) > 0 {
+					newTmp[field] = purchasinglist_new
+				}
+			} else if field == "procurementlist" {
+				if tmp["procurementlist"] != nil {
+					var arr []interface{}
+					plist := tmp["procurementlist"].([]interface{})
+					for _, p := range plist {
+						p1 := p.(map[string]interface{})
+						p2 := make(map[string]interface{})
+						for k, v := range BiddingLevelField[field] {
+							if k == "projectname" && util.ObjToString(p1[k]) == "" {
+								p2[k] = util.ObjToString(tmp["projectname"])
+							} else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
+								p2[k] = util.ObjToString(tmp["buyer"])
+							} else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
+								res := getMethod(util.ObjToString(p1[k]))
+								if res != 0 {
+									p2[k] = res
+								}
+							} else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
+								p2[k] = p1[k]
+							}
+
+						}
+						arr = append(arr, p2)
+					}
+					if len(arr) > 0 {
+						newTmp[field] = arr
+					}
+				}
+			} else if field == "projectscope" {
+				ps, _ := tmp["projectscope"].(string)
+				newTmp["projectscope"] = ps
+				//新版本已无需记录长度
+				//if len(ps) > pscopeLength {
+				//	saveErr["projectscope"] = ps
+				//	saveErr["projectscope_length"] = len(ps)
+				//}
+			} else if field == "winnerorder" { //中标候选
+				winnerorder_new := []map[string]interface{}{}
+				if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 {
+					for _, win := range winnerorder {
+						winMap_new := make(map[string]interface{})
+						winMap := win.(map[string]interface{})
+						for wf, wftype := range BiddingLevelField[field] {
+							wfv := winMap[wf]
+							if wfv != nil && reflect.TypeOf(wfv).String() == wftype {
+								if wf == "sort" && util.Int64All(wfv) > 100 {
+									continue
+								}
+								winMap_new[wf] = winMap[wf]
+							}
+						}
+						if winMap_new != nil && len(winMap_new) > 0 {
+							winnerorder_new = append(winnerorder_new, winMap_new)
+						}
+					}
+				}
+				if len(winnerorder_new) > 0 {
+					newTmp[field] = winnerorder_new
+				}
+			} else if field == "qualifies" {
+				//项目资质
+				qs := []string{}
+				if q, _ := tmp[field].([]interface{}); len(q) > 0 {
+					for _, v := range q {
+						v1 := v.(map[string]interface{})
+						qs = append(qs, util.ObjToString(v1["key"]))
+					}
+				}
+				if len(qs) > 0 {
+					newTmp[field] = strings.Join(qs, ",")
+				}
+			} else if field == "bidopentime" {
+				if tmp[field] != nil && tmp["bidendtime"] == nil {
+					newTmp["bidendtime"] = tmp[field]
+					newTmp[field] = tmp[field]
+				} else if tmp[field] == nil && tmp["bidendtime"] != nil {
+					newTmp["bidendtime"] = tmp[field]
+					newTmp[field] = tmp["bidendtime"]
+				} else {
+					if tmp["bidopentime"] != nil {
+						newTmp[field] = tmp["bidopentime"]
+					}
+				}
+			} else if field == "detail" { //过滤
+				detail, _ := tmp[field].(string)
+				detail = filterSpace.ReplaceAllString(detail, "")
+				// 不需要再保存记录长度
+				//if len(detail) > pscopeLength {
+				//	saveErr["detail"] = detail
+				//	saveErr["detail_length"] = len(detail)
+				//}
+				if tmp["cleartag"] != nil {
+					if tmp["cleartag"].(bool) {
+						text, _ := FilterDetail(detail)
+						newTmp[field] = util.ObjToString(tmp["title"]) + " " + text
+					} else {
+						newTmp[field] = util.ObjToString(tmp["title"]) + " " + detail
+					}
+				} else {
+					text, b := FilterDetail(detail)
+					newTmp[field] = util.ObjToString(tmp["title"]) + " " + text
+					update["cleartag"] = b
+				}
+			} else if field == "topscopeclass" || field == "entidlist" {
+				newTmp[field] = tmp[field]
+			} else if field == "_id" {
+				newTmp["_id"] = mongodb.BsonIdToSId(tmp["_id"])
+				newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"])
+			} else if field == "publishtime" || field == "comeintime" {
+				//字段类型不正确,特别处理
+				if tmp[field] != nil && util.Int64All(tmp[field]) > 0 {
+					newTmp[field] = util.Int64All(tmp[field])
+				}
+			} else if field == "package" {
+				//分包信息处理
+				packages := dealPackage(tmp)
+				if len(packages) > 0 {
+					newTmp["package"] = packages
+					newTmp["subpackage"] = 1
+				}
+			} else if field == "infoformat" {
+				newTmp[field] = tmp[field]
+			} else { //其它字段判断数据类型,不正确舍弃
+				if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
+					continue
+				} else {
+					if fieldval != "" {
+						newTmp[field] = fieldval
+					}
+				}
+			}
+		}
+	}
+	// 附件内容长度不做限制,大于长度限制做记录
+	filetext := getFileText(tmp)
+	if len([]rune(filetext)) > 10 {
+		newTmp["filetext"] = filetext
+		if len([]rune(filetext)) > fileLength {
+			//saveErr["filetext"] = filetext
+			saveErr["filetext_length"] = len([]rune(filetext))
+		}
+	}
+	YuceEndtime(newTmp) // 预测结果时间
+	if stype == "bidding" || stype == "bidding_history" || stype == "index-by-id" {
+		newTmp["createtime"] = time.Now().Unix() // es库数据创建时间,只有增量数据有
+		newTmp["pici"] = time.Now().Unix()       //createtime跟pici一样,为了剑鱼功能需要,并行存在一段时间,之后可以删掉createtime
+		update["pici"] = time.Now().Unix()
+	}
+
+	if len(saveErr) > 0 {
+		saveErr["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
+		saveErr["time"] = time.Now().Unix()
+		saveErrBidPool <- saveErr
+	}
+	return newTmp, update
+}
+
+// @Description 采购意向 预计采购时间处理
+// @Author J 2022/6/7 8:04 PM
+func getMethod(str string) int64 {
+	if TimeV1.MatchString(str) {
+		arr := TimeV1.FindStringSubmatch(str)
+		st := arr[1] + "0000"
+		parseInt, err := strconv.ParseInt(st, 10, 64)
+		if err == nil {
+			return parseInt
+		}
+	} else if TimeV2.MatchString(str) {
+		arr := TimeV2.FindStringSubmatch(str)
+		str1 := arr[2]
+		if len(str1) == 1 {
+			str1 = "0" + str1
+		}
+		str2 := TimeClear.ReplaceAllString(arr[1], "") + TimeClear.ReplaceAllString(str1, "") + "00"
+		parseInt, err := strconv.ParseInt(str2, 10, 64)
+		if err == nil {
+			return parseInt
+		}
+	} else if TimeV3.MatchString(str) {
+		match := TimeV3.FindStringSubmatch(str)
+		if len(match) >= 4 {
+			year, _ := strconv.Atoi(match[1])
+			month, _ := strconv.Atoi(match[2])
+			day, _ := strconv.Atoi(match[3])
+
+			dateInt64 := int64(year*10000 + month*100 + day)
+			return dateInt64
+		}
+	}
+	return 0
+}
+
+func FilterDetail(text string) (string, bool) {
+	b := false // 清理标记
+	for _, s := range config.Conf.DB.Es.DetailFilter {
+		reg := regexp.MustCompile(s)
+		if reg.MatchString(text) {
+			text = reg.ReplaceAllString(text, "")
+			if !b {
+				b = true
+			}
+		}
+	}
+	return text, b
+}
+
+// @Description 附件内容
+// @Author J 2022/6/7 1:54 PM
+func getFileText(tmp map[string]interface{}) (filetext string) {
+	if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
+		for _, tmpData1 := range attchMap {
+			if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok {
+				for _, result := range tmpData2 {
+					if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok {
+						if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" {
+							bs := oss.OssGetObject(attach_url, mongodb.BsonIdToSId(tmp["_id"])) //oss读数据
+							//附件总长度限制550000,其中最后一个文件长度限制50000
+							size := config.Conf.DB.Oss.Filesize
+							if size <= 0 {
+								size = 500000
+							}
+							if utf8.RuneCountInString(filetext+bs) < 50000+size {
+								filetext += bs + "\n"
+							} else {
+								if len(bs) > 50000 {
+									filetext += bs[0:50000]
+								} else {
+									filetext += bs
+								}
+							}
+							//附件总长度限制550000
+							if utf8.RuneCountInString(filetext) >= 50000+size {
+								return
+							}
+
+							//正式环境
+							//if utf8.RuneCountInString(filetext+bs) < fileLength {
+							//	filetext += bs + "\n"
+							//} else {
+							//	if utf8.RuneCountInString(bs) > fileLength {
+							//		filetext = bs[0:fileLength]
+							//	} else {
+							//		filetext = bs
+							//	}
+							//	break
+							//}
+
+						}
+					}
+				}
+			}
+		}
+	}
+	return
+}
+
+// 预测结果时间
+func YuceEndtime(tmp map[string]interface{}) {
+	flag := false
+	flag2 := false
+	scope := []string{"信息技术_运维服务", "信息技术_软件开发", "信息技术_系统集成及安全", "信息技术_其他"}
+	titles := []string{"短信服务", "短信发送服务"}
+	details := []string{"短信发送服务", "短信服务平台", "短信服务项目"}
+	subscopeclass := util.ObjToString(tmp["s_subscopeclass"])
+	//先判断满足 s_subscopeclass 条件
+	for _, v := range scope {
+		if strings.Contains(subscopeclass, v) {
+			flag = true
+			break
+		}
+	}
+	//满足 s_subscopeclass ,再去判断title  detail
+	if flag {
+		title := util.ObjToString(tmp["title"])
+		for _, v := range titles {
+			if strings.Contains(title, v) {
+				flag2 = true
+			}
+		}
+		if !flag2 {
+			detail := util.ObjToString(tmp["detail"])
+			for _, v := range details {
+				if strings.Contains(detail, v) {
+					flag2 = true
+				}
+			}
+		}
+	}
+
+	if !flag2 {
+		return
+	}
+
+	subtype := util.ObjToString(tmp["subtype"])
+	if subtype == "成交" || subtype == "合同" {
+		// yucestarttime、yuceendtime
+		yucestarttime, yuceendtime := int64(0), int64(0)
+		// 项目周期中
+		if util.ObjToString(tmp["projectperiod"]) != "" {
+			dateStr := date1.FindStringSubmatch(util.ObjToString(tmp["projectperiod"]))
+			if len(dateStr) == 2 {
+				sdate := FormatDateStr(dateStr[0])
+				edate := FormatDateStr(dateStr[1])
+				if sdate < edate && sdate != 0 && edate != 0 {
+					yucestarttime = sdate
+					yuceendtime = edate
+				}
+			}
+		}
+		if yucestarttime > 0 && yuceendtime > yucestarttime {
+			tmp["yuceendtime"] = yuceendtime
+			return
+		}
+		// 预测开始时间 合同签订日期
+		if yucestarttime == 0 {
+			if util.IntAll(tmp["signaturedate"]) <= 0 {
+				if util.IntAll(tmp["publishtime"]) <= 0 {
+					return
+				} else {
+					yucestarttime = util.Int64All(tmp["publishtime"])
+				}
+			} else {
+				yucestarttime = util.Int64All(tmp["signaturedate"])
+			}
+		}
+		// 预测结束时间
+		if yucestarttime > 0 && yuceendtime == 0 {
+			if util.IntAll(tmp["project_duration"]) > 0 && util.ObjToString(tmp["project_timeunit"]) != "" {
+				yuceendtime = YcEndTime(yucestarttime, util.IntAll(tmp["project_duration"]), util.ObjToString(tmp["project_timeunit"]))
+				tmp["yuceendtime"] = yuceendtime
+			}
+		}
+	}
+}
+
+func FormatDateStr(ds string) int64 {
+	ds = strings.Replace(ds, "年", "-", -1)
+	ds = strings.Replace(ds, "月", "-", -1)
+	ds = strings.Replace(ds, "日", "", -1)
+	ds = strings.Replace(ds, "/", "-", -1)
+	ds = strings.Replace(ds, ".", "-", -1)
+
+	location, err := time.ParseInLocation(util.Date_Short_Layout, ds, time.Local)
+	if err != nil {
+		log.Error("FormatDateStr", zap.Error(err))
+		return 0
+	} else {
+		return location.Unix()
+	}
+}
+
+func YcEndTime(starttime int64, num int, unit string) int64 {
+	yuceendtime := int64(0)
+	if unit == "日历天" || unit == "天" || unit == "日" {
+		yuceendtime = starttime + int64(num*86400)
+	} else if unit == "周" {
+		yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num*7).Unix()
+	} else if unit == "月" {
+		yuceendtime = time.Unix(starttime, 0).AddDate(0, num, 0).Unix()
+	} else if unit == "年" {
+		yuceendtime = time.Unix(starttime, 0).AddDate(num, 0, 0).Unix()
+	} else if unit == "工作日" {
+		n := num / 7 * 2
+		yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num+n).Unix()
+	}
+	return yuceendtime
+}
+
+// UdpMethod @Description rpc调用信息发布程序接口
+// @Author J 2022/4/13 9:13 AM
+func UdpMethod(id string) {
+	mapinfo := map[string]interface{}{
+		"infoid": id,
+		"stype":  "jyfb_data_over",
+	}
+	datas, _ := json.Marshal(mapinfo)
+	log.Info("UdpMethod", zap.Any("JyUdpAddr", JyUdpAddr), zap.String("mapinfo", string(datas)))
+	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, JyUdpAddr)
+}
+
+//MatchService 针对中国招标网,匹配关键词打标签,object_type,货物、服务、工程,jsondata.item
+func MatchService(tmp map[string]interface{}) (res string) {
+	if jsondata, ok := tmp["jsondata"]; ok {
+		if da, ok := jsondata.(map[string]interface{}); ok {
+			if item, ok := da["item"]; ok {
+				services := []string{"货物", "服务", "工程"}
+				for _, v := range services {
+					if strings.Contains(util.ObjToString(item), v) {
+						return v
+					}
+				}
+			}
+		}
+	}
+
+	return
+}
+
+//dealPackage 处理package 字段
+func dealPackage(tmp map[string]interface{}) (newpackages []map[string]interface{}) {
+	package1, ok1 := tmp["package"]
+	s_winner, ok2 := tmp["s_winner"]
+	bidamount, ok3 := tmp["bidamount"]
+	var innerWinners = make([]string, 0)
+	var biaoAmounts = make([]float64, 0)
+	// 三个字段都存在
+	if ok3 && ok2 && ok1 {
+		packageMap, ok := package1.(map[string]interface{})
+		if ok {
+			if len(packageMap) >= 2 {
+				var packages = make([]map[string]interface{}, 0)
+				//var newTmp = make(map[string]interface{})
+				winner_amount_count := 0
+				for _, pack := range packageMap {
+					var newPackage = make(map[string]interface{})
+					pac, okk := pack.(map[string]interface{})
+					if okk {
+						_, okk1 := pac["winner"]
+						_, okk2 := pac["bidamount"]
+						_, okk3 := pac["name"]
+
+						if okk1 {
+							innerWinners = append(innerWinners, util.ObjToString(pac["winner"]))
+						}
+						if okk2 {
+							biaoAmounts = append(biaoAmounts, util.Float64All(pac["bidamount"]))
+						}
+						//winner bidamount 二个字段都存在
+						if okk1 && okk2 {
+							winner_amount_count++
+							newPackage["winner"] = pac["winner"]
+							newPackage["bidamount"] = pac["bidamount"]
+							if okk3 {
+								newPackage["name"] = pac["name"]
+							}
+							packages = append(packages, newPackage)
+						}
+
+					}
+
+				}
+
+				//出现次数大于1
+				if winner_amount_count > 1 {
+					swinner := util.ObjToString(s_winner)
+					swinners := strings.Split(swinner, ",")
+					//判断里外 winner 是否相等
+					eq := StringSliceValuesEqual(swinners, innerWinners)
+					if eq {
+						//判断金额相等
+						if Float64Equal1Precision(Float64SliceSum(biaoAmounts), util.Float64All(bidamount)) {
+							newpackages = packages
+						}
+					}
+				}
+
+			}
+
+		}
+	}
+
+	return
+}

+ 124 - 58
createEsIndex/biddingall.toml

@@ -5,62 +5,128 @@ num = 50
 
 [[all]]
 
-    [all.bidding_back]
-    coll = "bidding_back"
-    gtid = "0"
-    lteid = "5a862e7040d2d9bbe88e3b1f" ## bidding_back 最后一个ID
+#    [all.bidding_back]
+#    coll = "bidding_back"
+##    gtid = "0"
+#    gtid = "5847fad761a0721f159cdc8d"
+#    lteid = "5a862e7040d2d9bbe88e3b1f" ## bidding_back 最后一个ID,21516712条
+##    start = 1
+#    start = 3974150
+#    end = 21516713
+#
+#
+#    [all.02]
+#    coll = "bidding"
+#    gtid = "5afbeefda5cb26b9b715da0d"
+##    gtid = "0"
+#    lteid = "5c531b800000000000000000" ## 2019.2.1  15493432 15493430
+#    start = 25010291
+##    start = 21516714
+#    end = 37010146
+#
+#    [all.03]
+#    coll = "bidding"
+##    gtid = "5c531b800000000000000000"
+#    gtid = "5ccffddca5cb26b9b75edab2"
+#    lteid = "5e0b70800000000000000000" ## 2020.1.1  17995862
+#    start = 40880433
+##    start = 37010147
+#    end = 55006009
+#
+#    [all.04]
+#    coll = "bidding"
+##    gtid = "5e0b70800000000000000000"
+#    gtid = "5e817fbb50b5ea296ee8abf6"
+#    lteid = "5f74ab800000000000000000" ## 2020.10.1 17611742 17611742
+#    start = 58490789
+##    start = 55006010
+#    end = 72617752
+#
+#
+#    [all.05]
+#    coll = "bidding"
+#    gtid = "5fb72cc7f0f9d716c15d262f"
+##    gtid = "5f74ab800000000000000000"
+#    lteid = "608c29800000000000000000" ## 2021.5.1  17135203 17135203
+#    start = 76374373
+##    start = 72617753
+#    end = 89752956
+#
+#    [all.06]
+#    coll = "bidding"
+##    gtid = "608c29800000000000000000"
+#    gtid = "60b29f4b8a2adb30a592ac44"
+#    lteid = "6155df000000000000000000" ## 2021.10.1  20316855 20316855
+#    start = 93477557
+##    start = 89752957
+#    end = 110069812
+#
+#
+#    [all.07]
+#    coll = "bidding"
+##    gtid = "6155df000000000000000000"
+#    gtid = "6183d6614074d195df39ad3f"
+#    lteid = "621cf1800000000000000000" ## 2022.3.1  18930270 18930269
+#    start = 113936468
+##    start = 110069813
+#    end = 129000083
+#
+#
+#    [all.08]
+#    coll = "bidding"
+##    gtid = "621cf1800000000000000000"
+#    gtid = "62455fc8923488e172478f23"
+#    lteid = "62bdc8800000000000000000" ## 2022.7.1 18373938 18373937
+#    start = 132706094
+##    start = 129000084
+#    end = 147374022
+#
+#
+#[all.09]
+#    coll = "bidding"
+##    gtid = "62bdc8800000000000000000"
+#    gtid = "62d8e3cc487b1e446ecc17d3"
+#    lteid = "633712800000000000000000" ## 2022.10.1  19093157
+#    start = 151344287
+##    start = 147374023
+#    end = 166467180
+#
+#
+#[all.10]
+#    coll = "bidding"
+##    gtid = "633712800000000000000000"
+#    gtid = "635b4f42c994e88c25144a17"
+#    lteid = "63b05c800000000000000000" ## 2023.1.1   20198847
+#    start = 170612203
+##    start = 166467181
+#    end = 186666028
+#
+#
+#
+#[all.11]
+#    coll = "bidding"
+##    gtid = "63b05c800000000000000000"
+#    gtid = "63dd6be38aea8786d1baa874"
+#    lteid = "644e90800000000000000000" ##  2023.5.1  18038591
+#    start = 190960080
+##    start = 186666029
+#    end = 204704620
+#
+#
+#[all.12]
+#    coll = "bidding"
+##    gtid = "644e90800000000000000000"  ##  2023.5.1
+#    gtid = "646b48b60ebbbcdcb5c5d172"  ##  2023.5.1
+##    lteid = "64baab800000000000000000" ##  2023.7.22
+#    lteid = "64f0b9000000000000000000" ##  2023.09.01  26169882
+#    start = 208718561
+##    start = 204704621
+#    end = 230874503
 
-    [all.02]
-    coll = "bidding"
-    gtid = "0"
-    lteid = "5c531b800000000000000000" ## 2019.2.1  15493432
-
-    [all.03]
-    coll = "bidding"
-    gtid = "5c531b800000000000000000"
-    lteid = "5e0b70800000000000000000" ## 2020.1.1  17995862
-
-    [all.04]
-    coll = "bidding"
-    gtid = "5e0b70800000000000000000"
-    lteid = "5f74ab800000000000000000" ## 2020.10.1 17611742
-
-    [all.05]
-    coll = "bidding"
-    gtid = "5f74ab800000000000000000"
-    lteid = "608c29800000000000000000" ## 2021.5.1  17135203
-
-    [all.06]
-    coll = "bidding"
-    gtid = "608c29800000000000000000"
-    lteid = "6155df000000000000000000" ## 2021.10.1  20316855
-
-    [all.07]
-    coll = "bidding"
-    gtid = "6155df000000000000000000"
-    lteid = "621cf1800000000000000000" ## 2022.3.1  18930270
-
-    [all.08]
-    coll = "bidding"
-    gtid = "621cf1800000000000000000"
-    lteid = "62bdc8800000000000000000" ## 2022.7.1 18373938
-
-    [all.09]
-    coll = "bidding"
-    gtid = "62bdc8800000000000000000"
-    lteid = "633712800000000000000000" ## 2022.10.1  19093157
-
-    [all.10]
-    coll = "bidding"
-    gtid = "633712800000000000000000"
-    lteid = "63b05c800000000000000000" ## 2023.1.1   20198847
-
-    [all.11]
-    coll = "bidding"
-    gtid = "63b05c800000000000000000"
-    lteid = "644e90800000000000000000" ##  2023.5.1  18038591
-
-    [all.12]
-    coll = "bidding"
-    gtid = "644e90800000000000000000"  ##  2023.5.1
-    lteid = "64baab800000000000000000" ##  2023.7.22
+[all.13]
+coll = "bidding"
+gtid = "64f0b9000000000000000000"  ##  2023.09.01
+lteid = "64f0b900e2d7d34fa00f4f12"
+#lteid = "650407f7e2d7d34fa02edeca" ##  2023.09.01  26169882
+start = 230874513
+end = 240000000

+ 17 - 11
createEsIndex/common.toml

@@ -5,14 +5,14 @@
 
 [db]
 [db.mongoB] ## bidding标讯数据
-#    addr = "127.0.0.1:27017"
-    addr = "192.168.3.206:27002"    ## 测试环境
+    addr = "127.0.0.1:27083"
+#    addr = "192.168.3.206:27002"    ## 测试环境
 #    dbname = "wcc"
-    dbname = "qfw_data"
+    dbname = "qfw"
     coll = "bidding"
     size = 15
-    user = "root"
-    password = "root"
+    user = "SJZY_RWbid_ES"
+    password = "SJZY@B4i4D5e6S"
 
 [db.mongoP] ## projectset 项目信息
     addr = "192.168.3.206:27002"
@@ -54,13 +54,14 @@
     bucketname = "topjy"
     filesize = 500000  ## 单位字节,附件总字节长度限制;超过就不再读取
 [db.es]
-    addr = "http://192.168.3.149:9201"      ## 正常bidding 链接
-    addrp = "http://192.168.3.149:9201"   ## 采集使用的单机版地址
-    username = "es_all"
-    password = "TopJkO2E_d1x"
+    addr = "http://127.0.0.1:19805"      ## 正常bidding 链接
+#    addr = "http://192.168.3.149:9201"      ## 正常bidding 链接
+#    addrp = "http://192.168.3.149:9201"   ## 采集使用的单机版地址
+username = "es_all"
+password = "TopJkO2E_d1x"
     size = 5
-    indexb = "bidding_v1"
-    indextmp = "bidding_temporary"       ## 临时索引,其他程序需要
+    indexb = "bidding"
+#    indextmp = "bidding_temporary"       ## 临时索引,其他程序需要
     indexp = "projectset_v1"
     indexwinner = "winner"
     indexbuyer = "buyer_v2"
@@ -71,6 +72,11 @@ detailfilter = ["(招标网|千里马|采招网|招标采购导航网|招标与
     "[\\((]?(网址)?(::)?(http|https|htpps)*(:|:)?\\/\\/www.bidcenter.com.cn\\/",
     "千里马(平台|网站)+", "[“\"]?优质采(平台|电子交易平台|云采购平台|交易平台)?[”\"]?", "《?(中国采购与|中国)?招(投)?标(与采购|采购导航)?网》?",
     "《?元博网(采购与招标网)?》?", "《?(中国)?招标采购导航网》?", "中\\W{0,3}国采\\W{0,3}招\\W{0,3}网\\W*[((]?(bidcenter.com.cn)?[))]?", "已方宝", "中国招标与采购"]
+addr2 = "http://127.0.0.1:19905"
+username2 = "jybid"
+password2 = "Top2023_JEB01i@31"
+indexb2 = "bidding"
+
 
 [mail]
 send = false

+ 4 - 0
createEsIndex/config/conf.go

@@ -120,6 +120,10 @@ type es struct {
 	//FieldProcurementList map[string]interface{}
 	//FieldWinnerOrder     map[string]interface{}
 	//Package              map[string]interface{}
+	Addr2     string
+	Username2 string
+	Password2 string
+	Indexb2   string
 }
 
 type duration struct {

+ 38 - 31
createEsIndex/es_test.go

@@ -200,28 +200,28 @@ func TestIsHanStart(t *testing.T) {
 
 func TestParseTime(t *testing.T) {
 
-	dateString := "2023年12月"
-
-	// 正则表达式匹配
-	pattern := `(\d{4})[年.\-/]?(\d{1,2})[月.\-/]?(\d{1,2})日?$`
-	re := regexp.MustCompile(pattern)
-	match := re.FindStringSubmatch(dateString)
-
-	if len(match) >= 4 {
-		year, _ := strconv.Atoi(match[1])
-		month, _ := strconv.Atoi(match[2])
-		day, _ := strconv.Atoi(match[3])
-
-		dateInt64 := int64(year*10000 + month*100 + day)
-		fmt.Println(dateInt64)
-	} else {
-		fmt.Println("Date string does not match the pattern.")
-	}
-
-	str := "2023年09月24日"
-	arr := getMethod(str)
-
-	fmt.Println(arr)
+	//dateString := "2023年12月"
+	//
+	//// 正则表达式匹配
+	//pattern := `(\d{4})[年.\-/]?(\d{1,2})[月.\-/]?(\d{1,2})日?$`
+	//re := regexp.MustCompile(pattern)
+	//match := re.FindStringSubmatch(dateString)
+	//
+	//if len(match) >= 4 {
+	//	year, _ := strconv.Atoi(match[1])
+	//	month, _ := strconv.Atoi(match[2])
+	//	day, _ := strconv.Atoi(match[3])
+	//
+	//	dateInt64 := int64(year*10000 + month*100 + day)
+	//	fmt.Println(dateInt64)
+	//} else {
+	//	fmt.Println("Date string does not match the pattern.")
+	//}
+	//
+	//str := "2023年09月24日"
+	//arr := getMethod(str)
+	//
+	//fmt.Println(arr)
 	dateStrings := []string{
 		"2022年3月",
 		"2022-03",
@@ -233,6 +233,11 @@ func TestParseTime(t *testing.T) {
 		"2022年10月",
 		"2022年10月12日",
 		"2022-10",
+		"2023/4/28 0:12:12",
+		"[2023/8/28/]",
+		"2023-8-28T12:12:12",
+		"2023.8/28",
+		"8/28",
 	}
 
 	for _, dateString := range dateStrings {
@@ -249,15 +254,17 @@ func TestParseTime(t *testing.T) {
 func parseDateString(dateString string) (int64, error) {
 	// Regular expressions for different date formats
 	regexPatterns := []string{
-		`^(\d{4})年(\d{1,2})月(\d{1,2})日$`,
-		`^(\d{4})年(\d{1,2})月$`,
-		`^(\d{4})-(\d{1,2})-(\d{1,2})$`,
-		`^(\d{4})年(\d{1,2})$`,
-		`^(\d{4})\-(\d{1,2})$`,
-		`^(\d{4})\.(\d{1,2})\.(\d{1,2})$`,
-		`^(\d{4})\.(\d{1,2})$`,
-		`^(\d{4})年$`,
-		`^(\d{4})$`,
+		//`^(\d{4})年(\d{1,2})月(\d{1,2})日?$`,
+		//`^(\d{4})年(\d{1,2})月$`,
+		//`^(\d{4})-(\d{1,2})-(\d{1,2})$`,
+		//`^(\d{4})年(\d{1,2})$`,
+		//`^(\d{4})\-(\d{1,2})$`,
+		//`^(\d{4})\.(\d{1,2})\.(\d{1,2})$`,
+		//`^(\d{4})\.(\d{1,2})$`,
+		//`^(\d{4})年$`,
+		//`^(\d{4})$`,
+		`(\d{4})[年.\-/]?(\d{1,2})[月.\-/]?(\d{1,2})日?`,
+		`(\d{1,2})[月.\-/]?(\d{1,2})日?`,
 	}
 
 	for _, pattern := range regexPatterns {

+ 1 - 1
createEsIndex/go.mod

@@ -10,5 +10,5 @@ require (
 	github.com/spf13/viper v1.15.0
 	go.mongodb.org/mongo-driver v1.10.2
 	go.uber.org/zap v1.23.0
-	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230712115659-b418d6181de3
+	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230919095552-4cb37e2b9caf
 )

+ 2 - 0
createEsIndex/go.sum

@@ -1373,6 +1373,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
 jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230712115659-b418d6181de3 h1:kgtSaRR/hRunxM6Kxi66REk7f2PqN1u56j/V+8FfPW8=
 jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230712115659-b418d6181de3/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230919095552-4cb37e2b9caf h1:7RYFbRUmw5Yug9x85AgcFfGuRsdePk635j5BA+VBE2U=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230919095552-4cb37e2b9caf/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=
 rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
 rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
 rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=

+ 12 - 0
createEsIndex/init.go

@@ -54,6 +54,7 @@ func InitMgo() {
 		Size:        config.Conf.DB.MongoB.Size,
 		UserName:    config.Conf.DB.MongoB.User,
 		Password:    config.Conf.DB.MongoB.Password,
+		//Direct:      true,
 	}
 	MgoB.InitPool()
 	if config.Conf.DB.MongoB.Addr == "" || config.Conf.DB.MongoB.Dbname == "" {
@@ -162,6 +163,17 @@ func InitEs() {
 	}
 	Es1.InitElasticSize()
 
+	if config.Conf.DB.Es.Addr2 != "" {
+		Es2 = &elastic.Elastic{
+			S_esurl:  config.Conf.DB.Es.Addr2,
+			I_size:   config.Conf.DB.Es.Size,
+			Username: config.Conf.DB.Es.Username2,
+			Password: config.Conf.DB.Es.Password2,
+		}
+		Es2.InitElasticSize()
+		fmt.Println("es2", Es2)
+	}
+
 	log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))
 }
 

+ 8 - 1
createEsIndex/main.go

@@ -27,7 +27,7 @@ var (
 	MgoS  *mongodb.MongodbSim
 	Mysql *mysqldb.Mysql
 
-	Es, Es1 *elastic.Elastic
+	Es, Es1, Es2 *elastic.Elastic
 
 	UdpClient  udp.UdpClient
 	UdpTaskMap = &sync.Map{}
@@ -334,6 +334,10 @@ func SaveEsMethod() {
 					if config.Conf.DB.Es.IndexTmp != "" {
 						Es.BulkSave(config.Conf.DB.Es.IndexTmp, arru)
 					}
+					if config.Conf.DB.Es.Addr2 != "" {
+						Es2.BulkSave(config.Conf.DB.Es.Indexb2, arru)
+					}
+
 				}(arru)
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0
@@ -349,6 +353,9 @@ func SaveEsMethod() {
 					if config.Conf.DB.Es.IndexTmp != "" {
 						Es.BulkSave(config.Conf.DB.Es.IndexTmp, arru)
 					}
+					if config.Conf.DB.Es.Addr2 != "" {
+						Es2.BulkSave(config.Conf.DB.Es.Indexb2, arru)
+					}
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0