ソースを参照

支持配置第二套ai流程;支持oss 获取detail内容

wcc 8 ヶ月 前
コミット
0f3a8d11bd

+ 40 - 1
createEsIndex/bidding_es.go

@@ -109,6 +109,13 @@ func biddingTask(mapInfo map[string]interface{}) {
 			if stype == "bidding_history" && tmp["history_updatetime"] == nil {
 				return
 			}
+			//开启OSS时,detail需要重新获取
+			if config.Conf.Env.Oss {
+				id := mongodb.BsonIdToSId(tmp["_id"])
+				val := oss.OssGetObject(id, config.Conf.DB.Oss.DetailBucket)
+				tmp["detail"] = val
+			}
+
 			indexLock.Lock()
 			index++
 			indexLock.Unlock()
@@ -162,7 +169,12 @@ func biddingTask(mapInfo map[string]interface{}) {
 	if stype == "bidding" {
 		uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])},
 			"lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}}
-		MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 8, "updatetime": time.Now().Unix(), "index_num": index}}, false, true)
+
+		if config.Conf.Env.Ai {
+			MgoBOld.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess_ai": 7, "updatetime": time.Now().Unix(), "index_num": index}}, false, true)
+		} else {
+			MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 8, "updatetime": time.Now().Unix(), "index_num": index}}, false, true)
+		}
 	}
 
 	////发送udp,附件补采 才需要
@@ -251,6 +263,13 @@ func biddingAllTask(mapInfo map[string]interface{}) {
 				return
 			}
 
+			//开启OSS时,detail需要重新获取
+			if config.Conf.Env.Oss {
+				id := mongodb.BsonIdToSId(tmp["_id"])
+				val := oss.OssGetObject(id, config.Conf.DB.Oss.DetailBucket)
+				tmp["detail"] = val
+			}
+
 			indexLock.Lock()
 			index++
 			indexLock.Unlock()
@@ -262,6 +281,19 @@ func biddingAllTask(mapInfo map[string]interface{}) {
 					newTmp["object_type"] = objectType
 				}
 			}
+
+			//todo 处理中国移动定制标签
+			if len(globalRegs) > 0 && len(MatchArr) > 0 {
+				gs, _, _ := TaskTags(tmp, globalRegs)
+				if len(gs) > 0 {
+					tags, match, add := TaskTags(tmp, MatchArr)
+					if len(tags) > 0 {
+						newTmp["mobile_tag"] = tags
+						update["mobile_tag"] = tags
+						log.Info("biddingTask", zap.Any(mongodb.BsonIdToSId(tmp["_id"]), match+","+add))
+					}
+				}
+			}
 			newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
 			if len(update) > 0 {
 				updateBiddingPool <- map[string]interface{}{
@@ -402,6 +434,13 @@ func biddingTaskById(mapInfo map[string]interface{}) {
 	if sensitive := util.ObjToString((*tmp)["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
 		return
 	}
+	//开启OSS时,detail需要重新获取
+	if config.Conf.Env.Oss {
+		id := mongodb.BsonIdToSId((*tmp)["_id"])
+		val := oss.OssGetObject(id, config.Conf.DB.Oss.DetailBucket)
+		(*tmp)["detail"] = val
+	}
+
 	if util.IntAll((*tmp)["extracttype"]) == 1 {
 		newTmp, update := GetEsField(*tmp, stype)
 		newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段

+ 4 - 1
createEsIndex/common.toml

@@ -74,7 +74,8 @@
     accesskey = "LTAI4G5x9aoZx8dDamQ7vfZi"
     accesssecret = "Bk98FsbPYXcJe72n1bG3Ssf73acuNh"
     bucketname = "topjy"
-    filesize = 500000  ## 单位字节,附件总字节长度限制;超过就不再读取
+    filesize = 500000        ## 单位字节,附件总字节长度限制;超过就不再读取
+    detailbucket = "jy-datadetail" ## 获取详情时,oss的配置
 [db.es]
     addr = "http://127.0.0.1:19908"      ## 正常bidding 链接
 #    addr = "http://192.168.3.149:9201"      ## 测试环境 bidding 链接
@@ -134,6 +135,8 @@ api = "http://172.17.162.36:19281/_send/_mail"
     dbfile = "./db"
 #    openpre = false      ## 是否开启预处理流程
 #    spectype = "day"    ## 定时任务类型;正式环境应该是 month 。day 表示每天创建一个索引;month 表示每个月创建一个
+    ai = false  ## ai 模型测试;true 时,更新 bidding_processing_ids的dataprocess_ai字段值,数据第二套代码
+    oss = false ## 是否针对详情,使用oss 获取内容;更新后的需要单独使用oss 获取detail值
 
 
 [[others]]

+ 3 - 0
createEsIndex/config/conf.go

@@ -102,6 +102,7 @@ type oss struct {
 	AccessKey    string
 	AccessSecret string
 	BucketName   string
+	DetailBucket string
 	Filesize     int
 }
 type mgo struct {
@@ -121,6 +122,8 @@ type env struct {
 	Alias    string
 	SpecType string
 	Dbfile   string //配置文件,读取bitmap
+	Ai       bool   //是否Ai m模型测试
+	Oss      bool   //是否针对详情,使用oss 获取内容
 }
 
 type es struct {

+ 13 - 0
createEsIndex/init.go

@@ -75,6 +75,19 @@ func InitMgo() {
 		log.Error("InitMgo", zap.String("MgoB", "查询表为空"))
 	}
 	log.Info("InitMgo", zap.Any("MgoB duration", time.Since(now).Seconds()))
+	// 判断是否接入大模型
+	if config.Conf.Env.Ai {
+		MgoBOld = &mongodb.MongodbSim{
+			MongodbAddr: config.Conf.DB.MongoB.Addr,
+			DbName:      "qfw",
+			Size:        config.Conf.DB.MongoB.Size,
+			UserName:    config.Conf.DB.MongoB.User,
+			Password:    config.Conf.DB.MongoB.Password,
+			Direct:      config.Conf.DB.MongoB.Direct,
+		}
+		MgoBOld.InitPool()
+		log.Info("InitMgo", zap.Any("MgoBOLD duration", time.Since(now).Seconds()))
+	}
 	//项目信息
 	MgoP = &mongodb.MongodbSim{
 		MongodbAddr: config.Conf.DB.MongoP.Addr,

+ 6 - 5
createEsIndex/main.go

@@ -24,11 +24,12 @@ import (
 )
 
 var (
-	MgoB  *mongodb.MongodbSim
-	MgoP  *mongodb.MongodbSim
-	MgoQ  *mongodb.MongodbSim
-	MgoS  *mongodb.MongodbSim
-	Mysql *mysqldb.Mysql
+	MgoB    *mongodb.MongodbSim
+	MgoBOld *mongodb.MongodbSim
+	MgoP    *mongodb.MongodbSim
+	MgoQ    *mongodb.MongodbSim
+	MgoS    *mongodb.MongodbSim
+	Mysql   *mysqldb.Mysql
 
 	Es, Es2, Es3 *elastic.Elastic //Es3 迁移华为云新集群地址
 	//PreEs        = make(map[string]*elastic.Elastic, 0) //预处理 索引客户端