Pārlūkot izejas kodu

支持其他来源数据入库 索引ES

wcc 1 gadu atpakaļ
vecāks
revīzija
3cf43324d1

+ 152 - 0
createEsIndex/attachment.go

@@ -0,0 +1,152 @@
+package main
+
+import (
+	"encoding/json"
+	"esindex/config"
+	"go.uber.org/zap"
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
+	"net"
+	"sync"
+)
+
+//attachmentBiddingTask 附件补采入库es
+func attachmentBiddingTask(mapInfo map[string]interface{}, other config.OthersData) {
+	defer util.Catch()
+	var MgoOther *mongodb.MongodbSim
+	//初始化MongoDB
+	MgoOther = &mongodb.MongodbSim{
+		MongodbAddr: other.MgoAddr,
+		DbName:      other.MgoDB,
+		Size:        10,
+		UserName:    other.MgoUsername,
+		Password:    other.MgoPassword,
+	}
+	MgoOther.InitPool()
+
+	log.Info("attachmentBiddingTask", zap.Any("MgoOther", MgoOther))
+
+	stype := util.ObjToString(mapInfo["stype"])
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	} else {
+		//针对gte/lte,单独转换
+		q = convertToMongoID(q)
+	}
+
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+
+	//bidding库
+	biddingConn := MgoOther.GetMgoConn()
+	count, _ := biddingConn.DB(MgoOther.DbName).C(other.MgoColl).Find(&q).Count()
+	log.Info(other.MgoColl, zap.Int64("同步总数:", count))
+	it := biddingConn.DB(MgoOther.DbName).C(other.MgoColl).Find(&q).Select(map[string]interface{}{
+		"contenthtml": 0,
+	}).Iter()
+	c1, index := 0, 0
+	var indexLock sync.Mutex
+	for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
+		if c1%1000 == 0 {
+			log.Info("attachmentBiddingTask", zap.Int("current:", c1))
+			log.Info("attachmentBiddingTask", zap.Any("current:_id =>", tmp["_id"]))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
+				tmp = make(map[string]interface{})
+				return
+			}
+			//只针对增量数据处理;全量数据 需要用extracttype字段判断
+			//7:	重复数据
+			//8:	不重复
+			if util.IntAll(tmp["dataprocess"]) != 8 {
+				return
+			}
+			//// 增量数据使用上面判断;全量数据使用下面配置
+			//-1:重复 ,1:不重复 ,0:入库 9:分类
+			//if util.IntAll(tmp["extracttype"]) != 1 {
+			//	return
+			//}
+
+			//针对产权数据,暂时不入es 索引库
+			if util.IntAll(tmp["infoformat"]) == 3 {
+				return
+			}
+
+			/**
+			数据抽取时,有的数据的发布时间是之前的,属于增量历史数据,在判重和同步到bidding表是,会添加history_updatetime
+			字段,所以下面判断才会处理
+			*/
+			if stype == "bidding_history" && tmp["history_updatetime"] == nil {
+				return
+			}
+			indexLock.Lock()
+			index++
+			indexLock.Unlock()
+			newTmp, update := GetEsField(tmp, stype)
+			newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
+
+			//针对中国政府采购网,单独处理
+			if util.ObjToString(tmp["site"]) == "中国政府采购网" {
+				objectType := MatchService(tmp)
+				if objectType != "" {
+					newTmp["object_type"] = objectType
+				}
+			}
+
+			if len(update) > 0 {
+				updateBiddingPool <- []map[string]interface{}{{
+					"_id": tmp["_id"],
+				},
+					{"$set": update},
+				}
+			}
+
+			saveEsPool <- newTmp
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	log.Info("attachmentBiddingTask over", zap.Int("count", c1), zap.Int("index", index))
+
+	if other.NextAddr != "" {
+		//发送udp,附件补采 才需要
+		data := map[string]interface{}{
+			//"stype": "update",
+			"gtid":  mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
+			"lteid": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
+		}
+		//udp 传递的信息
+		for k, v := range other.Data {
+			data[k] = v
+		}
+
+		//下个udp 地址信息
+		target := &net.UDPAddr{
+			Port: other.NextPort,
+			IP:   net.ParseIP(other.NextAddr),
+		}
+		bytes, _ := json.Marshal(data)
+		err := UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
+		if err != nil {
+			log.Info("attachmentBiddingTask ", zap.Any("WriteUdp err", err), zap.Any("target", target))
+		}
+
+		log.Info("attachmentBiddingTask ", zap.Any("target", target), zap.Any("data", data))
+	}
+
+}

+ 20 - 0
createEsIndex/common.toml

@@ -89,6 +89,26 @@ api = "http://172.17.145.179:19281/_send/_mail"
 [env]
     stype = 1 ## 默认0 正式环境;1测试环境。测试环境不会执行定时任务更新采购单位、中标单位、数据检测
 
+
+[[others]]
+[others.attachment] ## 附件补充程序,更新入库 bidding ES
+ mgoaddr = "127.0.0.1:27083"       ## bidding查询地址
+ mgocoll = "bidding_downloadfile_log"       ## bidding表名
+ mgodb = "qfw"         ## 库名
+ mgousername="SJZY_RWbid_ES"     ## 用户名
+ mgopassword="SJZY@B4i4D5e6S"     ## 密码
+# esaddr = "127.0.0.1:19805"        ## 存入的es 地址
+# esindex= "bidding"        ## 存入的es 表
+# esusername="es_all"      ## es 用户名
+# espassword=""      ## es 密码
+ nextaddr ="172.17.145.179"       ## udp 下个地址
+ nextport =1782      ## udp 下个地址 的端口
+ [others.attachment.data]             ## 下个udp 节点传递的数据
+  stype="update"
+
+
+
+
 # 日志
 [log]
 # 日志路径,为空将输出控制台

+ 42 - 11
createEsIndex/config/conf.go

@@ -2,10 +2,9 @@ package config
 
 import (
 	"fmt"
+	"github.com/spf13/viper"
 	"os"
 	"time"
-
-	"github.com/BurntSushi/toml"
 )
 
 var (
@@ -13,22 +12,39 @@ var (
 	Conf *conf
 )
 
-// Init Config
-func Init(conf string) {
-	_, err := toml.DecodeFile(conf, &Conf)
+// InitConf Config
+func InitConf(conf string) {
+	//_, err := toml.DecodeFile(conf, &Conf)
+	//if err != nil {
+	//	fmt.Printf("Err %v", err)
+	//	os.Exit(1)
+	//}
+
+	viper.SetConfigFile("common.toml")
+	viper.SetConfigName("common") // 配置文件名称(无扩展名)
+	viper.SetConfigType("toml")   // 如果配置文件的名称中没有扩展名,则需要配置此项
+	viper.AddConfigPath("./")
+	err := viper.ReadInConfig() // 查找并读取配置文件
+	if err != nil {             // 处理读取配置文件的错误
+		fmt.Println("ReadInConfig err =>", err)
+		return
+	}
+	err = viper.Unmarshal(&Conf)
 	if err != nil {
-		fmt.Printf("Err %v", err)
+		fmt.Println("Init Unmarshal err =>", err)
 		os.Exit(1)
+		return
 	}
 
 }
 
 type conf struct {
-	DB   db
-	Udp  udp
-	Mail mail
-	Log  log
-	Env  env
+	DB     db
+	Udp    udp
+	Mail   mail
+	Log    log
+	Env    env
+	Others map[string]OthersData
 }
 
 type udp struct {
@@ -129,6 +145,21 @@ type es struct {
 	Indexb2   string
 }
 
+type OthersData struct {
+	MgoAddr     string
+	MgoDB       string
+	MgoColl     string
+	MgoUsername string
+	MgoPassword string
+	EsAddr      string
+	EsIndex     string
+	EsUsername  string
+	EsPassword  string
+	NextAddr    string
+	NextPort    int
+	Data        map[string]interface{}
+}
+
 type duration struct {
 	time.Duration
 }

+ 16 - 2
createEsIndex/main.go

@@ -57,7 +57,7 @@ var (
 )
 
 func init() {
-	config.Init("./common.toml")
+	config.InitConf("./common.toml")
 	InitLog()
 	InitMysql()
 	InitMgo()
@@ -65,7 +65,7 @@ func init() {
 	InitField()
 	InitEsBiddingField()
 	oss.InitOss()
-	verifyESFields() //检测es 字段类型
+	//verifyESFields() //检测es 字段类型
 
 	JyUdpAddr = &net.UDPAddr{
 		IP:   net.ParseIP(config.Conf.Udp.JyAddr),
@@ -237,6 +237,19 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					winnerEsAll()
 				}()
+			case "attachment": // 补充附件采集,对应bidding为bidding_downloadfile_log
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					//有单独配置其他操作
+					if len(config.Conf.Others) > 0 {
+						if v, ok := config.Conf.Others[tasktype]; ok {
+							attachmentBiddingTask(mapInfo, v)
+						}
+					}
+				}()
 			default:
 				pool <- true
 				go func() {
@@ -246,6 +259,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					log.Info("err", zap.Any("mapInfo", mapInfo))
 				}()
 			}
+
 		}
 	case udp.OP_NOOP: //下个节点回应
 		ok := string(data)