Parcourir la source

更新配置,支持oss获取detail,以及dbname_old 配置大模型流程更新qfw库下面dataprocess_ai 状态

wcc il y a 9 mois
Parent
commit
e72e2a9e1e
7 fichiers modifiés avec 211 ajouts et 45 suppressions
  1. 12 1
      src/README.md
  2. 8 0
      src/config.json
  3. 65 23
      src/task/task.go
  4. 16 1
      src/task/updatetask.go
  5. 73 8
      src/tools/tools.go
  6. 23 11
      src/udptask/udptask.go
  7. 14 1
      src/util/charge_rule.go

+ 12 - 1
src/README.md

@@ -6,4 +6,15 @@ dev1.0分支
 dev2.0.1分支
 1.增加数据监控
 2.config.json中的招标(zhaobiao)配置项只是为了测试时使用
-/opt/mnt/rz_createindex/classification2.0
+/opt/mnt/rz_createindex/classification2.0
+
+
+------------------------
+
+配置文件:
+-dbname_old: 是部署第二套ai 时,需要更新老的qfw.bidding_processing_ids 使用;第一套时,不需要这个配置
+-is_oss:     是为了配置oss,判断是否需要从Oss 获取附件正文内容,分类的时候需要使用
+-oss 是具体的配置参数, endpoint、access_key、access_secret、bucket_name 都是oss获取文件内容的具体参数; 其中 bucket_name 分为:"jy-datadetail" 和 "jy-datahtml" 二个库
+
+
+

+ 8 - 0
src/config.json

@@ -2,7 +2,15 @@
     "mgoaddr": "127.0.0.1:27017",
     "mgosize": 10,
     "dbname": "wcc",
+    "dbname_old": "wcc",
     "isbidding": false,
+    "is_oss": true,
+    "oss": {
+        "endpoint": "oss-cn-beijing-internal.aliyuncs.com",
+        "access_key": "LTAI4G5x9aoZx8dDamQ7vfZi",
+        "access_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "bucket_name": "jy-datadetail"
+    },
     "dbname_dis": "classfication",
     "dbinfo": {
     	"bidding":{

+ 65 - 23
src/task/task.go

@@ -789,8 +789,12 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
 											da := dataa[0]
 											if len(util.ObjToString(da)) > 0 {
 												cs := strings.Split(util.ObjToString(da), "-")
-												SMap.Map["toptype"] = cs[0]
-												SMap.Map["subtype"] = cs[1]
+												if cs[0] != "" {
+													SMap.Map["toptype"] = cs[0]
+												}
+												if cs[1] != "" {
+													SMap.Map["subtype"] = cs[1]
+												}
 											}
 										}
 									}
@@ -803,7 +807,11 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
 										resa := ReSub(tt, tmp, "结果")
 										subtype := resa.Map["subtype"]
 										delete(SMap.Map, "subtype")
-										SMap.Map["subtype"] = subtype
+										if util.ObjToString(subtype) != "" {
+											SMap.Map["subtype"] = subtype
+										} else {
+											SMap.Map["subtype"] = "其它"
+										}
 									}
 								}
 							}
@@ -821,7 +829,7 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
 							}
 							//一级分类成功,但是二级没有分类成功,并且原值里有subtype
 							if _, ok := SMap.Map["toptype"]; ok {
-								if _, ok2 := SMap.Map["subtype"]; !ok2 {
+								if subtype, ok2 := SMap.Map["subtype"]; !ok2 || util.ObjToString(subtype) == "" {
 									//没有二级分类,直接设置为 其它
 									SMap.Map["subtype"] = "其它"
 								}
@@ -1219,8 +1227,12 @@ func UdpTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}, stype s
 										}
 									}
 									newTops, newSubs := u.ProcessTopscopeclass(resultTobs, resultSubs)
-									SMap.Map["topscopeclass"] = newTops
-									SMap.Map["subscopeclass"] = newSubs
+									if len(newTops) > 0 {
+										SMap.Map["topscopeclass"] = newTops
+									}
+									if len(newSubs) > 0 {
+										SMap.Map["subscopeclass"] = newSubs
+									}
 								} else {
 									SMap.Map["topscopeclass"] = []string{"其它"}
 									SMap.Map["subscopeclass"] = []string{"其它"}
@@ -1407,20 +1419,39 @@ func FindId_back(coll string) (gtid, lteid string) {
 }
 
 func FindId(coll string) (gtid, lteid string) {
-	data, _ := tools.MgoClass.Find(coll, map[string]interface{}{"dataprocess": 0}, `{"_id":1}`, nil, false, -1, -1)
-	for _, d := range *data {
-		gtid = d["gtid"].(string)
-		lteid = d["lteid"].(string)
-		set := map[string]interface{}{
-			"$set": map[string]interface{}{
-				"dataprocess": 2,
-				"updatetime":  time.Now().Unix(),
-			},
+	// 使用老表更新dataprocess 时
+	if util.ObjToString(tools.Config["dbname_old"]) != "" {
+		data, _ := tools.MgoClassOld.Find(coll, map[string]interface{}{"dataprocess_ai": 0}, `{"_id":1}`, nil, false, -1, -1)
+		for _, d := range *data {
+			gtid = d["gtid"].(string)
+			lteid = d["lteid"].(string)
+			set := map[string]interface{}{
+				"$set": map[string]interface{}{
+					"dataprocess_ai": 2,
+					"updatetime":     time.Now().Unix(),
+				},
+			}
+			tools.MgoClassOld.Update(coll, map[string]interface{}{"_id": d["_id"]}, set, false, false)
+			break
+		}
+		return gtid, lteid
+	} else {
+		data, _ := tools.MgoClass.Find(coll, map[string]interface{}{"dataprocess": 0}, `{"_id":1}`, nil, false, -1, -1)
+		for _, d := range *data {
+			gtid = d["gtid"].(string)
+			lteid = d["lteid"].(string)
+			set := map[string]interface{}{
+				"$set": map[string]interface{}{
+					"dataprocess": 2,
+					"updatetime":  time.Now().Unix(),
+				},
+			}
+			tools.MgoClass.Update(coll, map[string]interface{}{"_id": d["_id"]}, set, false, false)
+			break
 		}
-		tools.MgoClass.Update(coll, map[string]interface{}{"_id": d["_id"]}, set, false, false)
-		break
+		return gtid, lteid
 	}
-	return gtid, lteid
+	return
 }
 
 // NewLoadTestTask 测试任务
@@ -1626,12 +1657,23 @@ func InitTimeTask() *TTask {
 func StartTask(t *TTask) {
 	defer util.Catch()
 	logger.Debug("开始执行定时任务")
-	query := map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gt": u.StringTOBsonId(tools.IdCollSid),
-		},
-		"dataprocess": 8,
+	query := make(map[string]interface{})
+	if util.ObjToString(tools.Config["dbname_old"]) != "" {
+		query = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt": u.StringTOBsonId(tools.IdCollSid),
+			},
+			"dataprocess_ai": 8,
+		}
+	} else {
+		query = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt": u.StringTOBsonId(tools.IdCollSid),
+			},
+			"dataprocess": 8,
+		}
 	}
+
 	order := map[string]interface{}{"_id": -1}
 	logger.Debug("query:", query)
 	list, _ := tools.MgoClass.Find(t.S_idcoll, query, order, nil, false, -1, -1)

+ 16 - 1
src/task/updatetask.go

@@ -1,6 +1,7 @@
 package task
 
 import (
+	"common_utils/mongodb"
 	//"classification"
 	"encoding/json"
 	"fmt"
@@ -286,7 +287,21 @@ func NewClassificationRun(tt *TTask, tmp map[string]interface{}) *tools.SortMap
 							}
 						}
 					} else {
-						val = util.ObjToString(tmp[f]) //取识别内容
+						//如果配置了OSS 配置
+						if tools.IsOss {
+							// 针对detail内容,需要从oss 获取
+							if f == "detail" {
+								id := mongodb.BsonIdToSId(tmp["_id"])
+								val = tools.OssGetObject(id, tools.BucketName)
+								if val == "" {
+									log.Println("获取oss 内容为空", id)
+								}
+							} else {
+								val = util.ObjToString(tmp[f]) //取识别内容
+							}
+						} else {
+							val = util.ObjToString(tmp[f]) //取识别内容
+						}
 					}
 					val = PreFilter(val, tt.Task_PreRule) //任务的前置过滤
 					rulval[f] = val                       //整个任务仅过滤一次,将其存储

+ 73 - 8
src/tools/tools.go

@@ -2,6 +2,9 @@ package tools
 
 import (
 	"encoding/json"
+	"github.com/aliyun/aliyun-oss-go-sdk/oss"
+	"io"
+	"qfw/util"
 	//"fmt"
 	"fmt"
 	"io/ioutil"
@@ -36,14 +39,18 @@ var (
 )
 
 var (
-	Config    map[string]interface{}
-	MgoClass  *u.MongodbSim //规则库
-	MgoDcs    *u.MongodbSim //分布式保存库
-	NoAutoRun int
-	Fields    []string
+	Config      map[string]interface{}
+	MgoClass    *u.MongodbSim //规则库
+	MgoClassOld *u.MongodbSim //规则库,ai 第二套时,需要使用qfw 老的数据库连接,更新bidding_processing_ids
+	MgoDcs      *u.MongodbSim //分布式保存库
+	NoAutoRun   int
+	Fields      []string
+	ossClient   *oss.Client
+	BucketName  string
+	IsOss       bool
 )
 
-//定时任务
+// 定时任务
 var (
 	IsStart      bool   //是否执行
 	IdCollSid    string //ocr_flie_over id
@@ -99,6 +106,19 @@ func init() {
 		MgoClass.Password = DbInfo["bidding"][1]
 	}
 	MgoClass.InitPool()
+	// qfw 老数据库
+	if util.ObjToString(Config["dbname_old"]) != "" {
+		MgoClassOld = &u.MongodbSim{
+			MongodbAddr: Config["mgoaddr"].(string),
+			Size:        IntAll(Config["mgosize"]),
+			DbName:      Config["dbname_old"].(string),
+		}
+		if Config["isbidding"].(bool) { //目前分类规则表在招标bidding库需要账号密码
+			MgoClassOld.UserName = DbInfo["bidding"][0]
+			MgoClassOld.Password = DbInfo["bidding"][1]
+		}
+		MgoClassOld.InitPool()
+	}
 
 	MgoDcs = &u.MongodbSim{ //分布式目前只在跑历史数据时应用,不连招标库不需要配账号密码
 		MongodbAddr: Config["mgoaddr"].(string),
@@ -150,6 +170,51 @@ func init() {
 			}
 		}
 	*/
+
+	//初始化oss
+	InitOss()
+}
+
+// InitOss 初始化OSS
+func InitOss() {
+	IsOss = Config["is_oss"].(bool)
+	if IsOss {
+		ossConf, ok := Config["oss"].(map[string]interface{})
+		if ok {
+			client, err := oss.New(util.ObjToString(ossConf["endpoint"]), util.ObjToString(ossConf["access_key"]), util.ObjToString(ossConf["access_secret"]))
+			if err != nil {
+				fmt.Println("Error:", err)
+				os.Exit(-1)
+			}
+			BucketName = util.ObjToString(ossConf["bucket_name"])
+			ossClient = client
+		}
+	}
+}
+
+// OssGetObject 获取oss 数据,objectName 默认标讯id,ossBucketName 默认 jy-datadetail 获取详情detail
+func OssGetObject(objectName, ossBucketName string) string {
+	util.Catch()
+	// 获取存储空间。
+	bucket, err := ossClient.Bucket(ossBucketName)
+	if err != nil {
+		fmt.Println("Error:", err)
+		return ""
+	}
+
+	// 下载文件到流。
+	body, err := bucket.GetObject(objectName)
+	if err != nil {
+		fmt.Println("Error:", err)
+		return ""
+	}
+	defer body.Close()
+	data, err := io.ReadAll(body)
+	if err != nil {
+		fmt.Println("Error:", err)
+		return ""
+	}
+	return string(data)
 }
 
 func Catch() {
@@ -227,7 +292,7 @@ func Int64All(num interface{}) int64 {
 	}
 }
 
-//读取配置文件
+// 读取配置文件
 func ReadConfig(config ...interface{}) {
 	var r *os.File
 	filepath := "./config.json"
@@ -254,7 +319,7 @@ func ReadConfig(config ...interface{}) {
 // 	}
 // }
 
-//对象数组转成string数组
+// 对象数组转成string数组
 func ObjArrToStringArr(old []interface{}) []string {
 	defer Catch()
 	if old != nil {

+ 23 - 11
src/udptask/udptask.go

@@ -1,6 +1,7 @@
 package udptask
 
 import (
+	util "common_utils"
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
@@ -83,13 +84,24 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 						"gtid":  gtid,
 						"lteid": lteid,
 					}
-					set := map[string]interface{}{
-						"$set": map[string]interface{}{
-							"dataprocess": 5,
-							"updatetime":  time.Now().Unix(),
-						},
+					// 使用老表更新dataprocess 时
+					if util.ObjToString(Config["dbname_old"]) != "" {
+						set := map[string]interface{}{
+							"$set": map[string]interface{}{
+								"dataprocess_ai": 4,
+								"updatetime":     time.Now().Unix(),
+							},
+						}
+						MgoClassOld.Update("bidding_processing_ids", query, set, false, false)
+					} else {
+						set := map[string]interface{}{
+							"$set": map[string]interface{}{
+								"dataprocess": 5,
+								"updatetime":  time.Now().Unix(),
+							},
+						}
+						MgoClass.Update("bidding_processing_ids", query, set, false, false)
 					}
-					MgoClass.Update("bidding_processing_ids", query, set, false, false)
 				} else if stype == "monitor" { //程序监听类型
 					fmt.Println("stype :monitor")
 				} else if stype != "" {
@@ -105,7 +117,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 }
 
-//行业分类udp任务执行
+// 行业分类udp任务执行
 func RunningHangyeClass() {
 	defer qutil.Catch()
 	go func() {
@@ -122,7 +134,7 @@ func RunningHangyeClass() {
 	}
 }
 
-//UdpTask udp 任务
+// UdpTask udp 任务
 func UdpTask(stype string, mapInfo map[string]interface{}) int {
 	total := 0
 	defer qutil.Catch()
@@ -195,7 +207,7 @@ func UdpTask(stype string, mapInfo map[string]interface{}) int {
 	return total
 }
 
-//分布式抽取-执行
+// 分布式抽取-执行
 func ExtractByUdp(ra *net.UDPAddr, instanceId ...string) {
 	if len(instanceId) > 0 { //分布式抽取进度
 		go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
@@ -268,7 +280,7 @@ func ExtractByUdp(ra *net.UDPAddr, instanceId ...string) {
 	}
 }
 
-//LastUdpJob 处理UDP 没有接受数据
+// LastUdpJob 处理UDP 没有接受数据
 func LastUdpJob() {
 	for {
 		responselock.Lock()
@@ -282,7 +294,7 @@ func LastUdpJob() {
 	}
 }
 
-//sendErrMailApi 发送邮件
+// sendErrMailApi 发送邮件
 func sendErrMailApi(title, body string) {
 	jkmail, _ := Config["jkmail"].(map[string]interface{})
 	if jkmail != nil {

+ 14 - 1
src/util/charge_rule.go

@@ -3,6 +3,7 @@ package util
 import (
 	"go.mongodb.org/mongo-driver/bson/primitive"
 	"regexp"
+	"sort"
 	"strings"
 )
 
@@ -211,7 +212,9 @@ func ProcessTopscopeclass(tops, subs []string) ([]string, []string) {
 	for _, top := range tops {
 		parts := strings.Split(top, "")
 		cleanedTop := strings.Join(parts[:len(parts)-1], "")
-		cleanedTops = append(cleanedTops, cleanedTop)
+		if !IsInStringArray(cleanedTop, cleanedTops) {
+			cleanedTops = append(cleanedTops, cleanedTop)
+		}
 	}
 
 	// 用于标记 cleanedTops 中已存在于 subs 的元素
@@ -235,3 +238,13 @@ func ProcessTopscopeclass(tops, subs []string) ([]string, []string) {
 
 	return tops, subs
 }
+
+// IsInStringArray 判断数组中是否存在字符串
+func IsInStringArray(str string, arr []string) bool {
+	// 先对字符串数组进行排序
+	sort.Strings(arr)
+	// 使用二分查找算法查找字符串
+	pos := sort.SearchStrings(arr, str)
+	// 如果找到了则返回 true,否则返回 false
+	return pos < len(arr) && arr[pos] == str
+}