|
@@ -1,15 +1,21 @@
|
|
|
package main
|
|
|
|
|
|
import (
|
|
|
- "log"
|
|
|
- "strings"
|
|
|
- "time"
|
|
|
-
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
"github.com/robfig/cron/v3"
|
|
|
"github.com/spf13/viper"
|
|
|
+ "go.uber.org/zap"
|
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
|
|
|
+ jlog "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
+ "log"
|
|
|
+ "os"
|
|
|
+ "regexp"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
var (
|
|
@@ -21,8 +27,29 @@ var (
|
|
|
RuleB = make([]string, 0)
|
|
|
RuleC = make([]string, 0)
|
|
|
RuleD = make([]string, 0)
|
|
|
+ RuleE = make([]string, 0)
|
|
|
+ RuleF = make([]string, 0)
|
|
|
+ //更新es
|
|
|
+ updateEsPool = make(chan []map[string]interface{}, 5000)
|
|
|
+ updateEsSp = make(chan bool, 3) //保存协程
|
|
|
+ hasWyMap = sync.Map{}
|
|
|
+ titleMap = sync.Map{}
|
|
|
+ FawuMap = sync.Map{}
|
|
|
+ // 更新mongo
|
|
|
+ updatePool = make(chan []map[string]interface{}, 5000)
|
|
|
+ updateSp = make(chan bool, 5)
|
|
|
+ key = "4d5206b1b297c1e7b77f9578edcb2cf7.TNU2i8G1oUNdR02i"
|
|
|
+ model = "glm-4-flash"
|
|
|
+ re = regexp.MustCompile(`title:(.*?),projectname:(.*?),id:(.*?),class:(.*?)(?:\s*$|\n)`)
|
|
|
)
|
|
|
|
|
|
+type RequestData struct {
|
|
|
+ Title string
|
|
|
+ Projectname string
|
|
|
+ Class []string
|
|
|
+ ID string
|
|
|
+}
|
|
|
+
|
|
|
const timeTypeAll = 1
|
|
|
const timeTypeInc = 2
|
|
|
|
|
@@ -46,7 +73,12 @@ func InitConfig() (err error) {
|
|
|
}
|
|
|
|
|
|
func Init() {
|
|
|
- InitConfig()
|
|
|
+ err := InitConfig()
|
|
|
+ if err != nil {
|
|
|
+ log.Println("配置文件错误", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
Mgo = &mongodb.MongodbSim{
|
|
|
MongodbAddr: GF.Mongo.Host,
|
|
|
//MongodbAddr: "127.0.0.1:27083",
|
|
@@ -66,23 +98,56 @@ func Init() {
|
|
|
}
|
|
|
Esa.InitElasticSize()
|
|
|
|
|
|
- Esb = &es.Elastic{
|
|
|
- S_esurl: GF.Esb.URL,
|
|
|
- I_size: 5,
|
|
|
- Username: GF.Esb.Username,
|
|
|
- Password: GF.Esb.Password,
|
|
|
+ if GF.Esb.URL != "" {
|
|
|
+ Esb = &es.Elastic{
|
|
|
+ S_esurl: GF.Esb.URL,
|
|
|
+ I_size: 5,
|
|
|
+ Username: GF.Esb.Username,
|
|
|
+ Password: GF.Esb.Password,
|
|
|
+ }
|
|
|
+ Esb.InitElasticSize()
|
|
|
}
|
|
|
- Esb.InitElasticSize()
|
|
|
|
|
|
+ InitLog()
|
|
|
RuleA = strings.Split(GF.Env.Rulea, "\n")
|
|
|
RuleB = strings.Split(GF.Env.Ruleb, "\n")
|
|
|
RuleC = strings.Split(GF.Env.Rulec, "\n")
|
|
|
RuleD = strings.Split(GF.Env.Ruled, "\n")
|
|
|
+ RuleE = strings.Split(GF.Env.Rulee, "\n")
|
|
|
+ RuleF = strings.Split(GF.Env.Rulef, "\n")
|
|
|
+}
|
|
|
+
|
|
|
+func InitLog() {
|
|
|
+ now := time.Now()
|
|
|
+ err := jlog.InitLog(
|
|
|
+ jlog.Path(GF.Log.LogPath),
|
|
|
+ jlog.Level(GF.Log.LogLevel),
|
|
|
+ jlog.Compress(GF.Log.Compress),
|
|
|
+ jlog.MaxSize(GF.Log.MaxSize),
|
|
|
+ jlog.MaxBackups(GF.Log.MaxBackups),
|
|
|
+ jlog.MaxAge(GF.Log.MaxAge),
|
|
|
+ jlog.Format(GF.Log.Format),
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ fmt.Printf("InitLog failed: %v\n", err)
|
|
|
+ os.Exit(1)
|
|
|
+ }
|
|
|
+
|
|
|
+ jlog.Info("InitLog", zap.Any("duration", time.Since(now).Seconds()))
|
|
|
}
|
|
|
|
|
|
func main() {
|
|
|
Init()
|
|
|
- // dealAll() //存量数据
|
|
|
+
|
|
|
+ go updateEsMethod()
|
|
|
+ go updateMethod()
|
|
|
+ //dealAll() //规则处理存量数据
|
|
|
+ //dealInc()
|
|
|
+ //dealAllAi() //大模型
|
|
|
+
|
|
|
+ //dealInc()
|
|
|
+
|
|
|
+ //dealTopInformationAi7(nil)
|
|
|
|
|
|
local, _ := time.LoadLocation("Asia/Shanghai")
|
|
|
c := cron.New(cron.WithLocation(local), cron.WithSeconds())
|
|
@@ -93,12 +158,13 @@ func main() {
|
|
|
|
|
|
c.Start()
|
|
|
defer c.Stop()
|
|
|
-
|
|
|
+ ////
|
|
|
select {}
|
|
|
}
|
|
|
|
|
|
// dealInc 处理增量数据
|
|
|
func dealInc() {
|
|
|
+ jlog.Info("dealInc", zap.String("开始处理增量数据", ""))
|
|
|
now := time.Now()
|
|
|
start := time.Date(now.Year(), now.Month(), now.Day()+GF.Env.Start, 0, 0, 0, 0, now.Location())
|
|
|
end := time.Date(now.Year(), now.Month(), now.Day()+GF.Env.End, 0, 0, 0, 0, now.Location())
|
|
@@ -109,105 +175,851 @@ func dealInc() {
|
|
|
"$lt": end.Unix(),
|
|
|
},
|
|
|
}
|
|
|
- dealTopInformation(where, timeTypeInc)
|
|
|
+ jlog.Info("dealInc", zap.Any("开始处理增量数据", where))
|
|
|
+
|
|
|
+ //dealTopInformation(where, timeTypeInc)
|
|
|
+ ////大模型处理,保存字段 tag_topinformation_zp
|
|
|
+ //if GF.Env.Doai {
|
|
|
+ // log.Println("开始处理大模型结果")
|
|
|
+ // go dealTopInformationAi4(where)
|
|
|
+ //}
|
|
|
+
|
|
|
+ // 暂时只使用大模型
|
|
|
+ //dealTopInformationAi6(where)
|
|
|
+ dealTopInformationAi7(where)
|
|
|
+
|
|
|
+ jlog.Info("dealInc", zap.Any("开始处理增量数据", "数据处理完毕"))
|
|
|
}
|
|
|
|
|
|
// dealAll 处理存量数据
|
|
|
func dealAll() {
|
|
|
where := map[string]interface{}{
|
|
|
"comeintime": map[string]interface{}{
|
|
|
- "$gte": 1726070400,
|
|
|
- "$lt": 1726156800,
|
|
|
+ "$gte": 1711900800,
|
|
|
+ "$lt": 1726851634,
|
|
|
},
|
|
|
+ //"comeintime": map[string]interface{}{
|
|
|
+ // //"$gte": 1704038400,
|
|
|
+ // "$lt": 1704038400,
|
|
|
+ //},
|
|
|
+
|
|
|
+ //"title": "门诊大楼中医特色诊疗中心改造项目结算审计招标公告",
|
|
|
}
|
|
|
|
|
|
dealTopInformation(where, timeTypeAll) //处理情报标签一级
|
|
|
}
|
|
|
|
|
|
-// dealTopInformation 处理情报标签一级
|
|
|
+// dealAllAi 大模型处理存量数据
|
|
|
+func dealAllAi() {
|
|
|
+ where := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ //"$gte": 1726070400,
|
|
|
+ "$lt": 1728316800,
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("开始处理数据")
|
|
|
+ //dealTopInformationAi(where) //处理情报标签一级
|
|
|
+ //dealTopInformationAi2(where) //处理情报标签一级;50个一组调用
|
|
|
+ //dealTopInformationAi3(where) //单个数据调用
|
|
|
+ dealTopInformationAi4(where) //5一组,开启携程
|
|
|
+ //dealTopInformationAi5(nil) //单条调用智普,开启携程
|
|
|
+ log.Println("存量数据处理完毕")
|
|
|
+}
|
|
|
+
|
|
|
+// dealTopInformation 处理情报标签一级;剑鱼码 处理方式
|
|
|
func dealTopInformation(where interface{}, timeType int) {
|
|
|
if where == nil {
|
|
|
log.Println("查询条件为空")
|
|
|
- return
|
|
|
}
|
|
|
+
|
|
|
+ log.Println(where)
|
|
|
defer util.Catch()
|
|
|
sess := Mgo.GetMgoConn()
|
|
|
defer Mgo.DestoryMongoConn(sess)
|
|
|
count := 0
|
|
|
- it := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter()
|
|
|
+ ch := make(chan bool, 15)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ it := sess.DB(GF.Mongo.DB).C("bidding").Find(where).Select(nil).Iter()
|
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
- if count%5000 == 0 {
|
|
|
- log.Println("current:", count, tmp["title"])
|
|
|
+ if count%1000 == 0 {
|
|
|
+ log.Println("dealTopInformation,current:", count, tmp["title"], tmp["_id"])
|
|
|
}
|
|
|
-
|
|
|
- if gov_classify, ok := tmp["gov_classify"]; !ok {
|
|
|
- continue
|
|
|
- } else {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ //
|
|
|
topinformation := make([]string, 0) //
|
|
|
- hasNew := false
|
|
|
+ //hasNew := false
|
|
|
if existTop, okk := tmp["tag_topinformation"]; okk {
|
|
|
if tops, ok2 := existTop.([]interface{}); ok2 {
|
|
|
for _, v := range tops {
|
|
|
- topinformation = append(topinformation, util.ObjToString(v))
|
|
|
+ if util.ObjToString(v) == "情报_安防" {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ topinformation = append(topinformation, util.ObjToString(v))
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if govMap, ok3 := gov_classify.(map[string]interface{}); ok3 {
|
|
|
+ if govMap, ok3 := tmp["gov_classify"].(map[string]interface{}); ok3 {
|
|
|
zc_code := util.ObjToString(govMap["zc_code"])
|
|
|
if IsInStringArray(zc_code, RuleA) {
|
|
|
- hasNew = true
|
|
|
+ //hasNew = true
|
|
|
topinformation = append(topinformation, "情报_环境采购")
|
|
|
- } else if IsInStringArray(zc_code, RuleB) {
|
|
|
- hasNew = true
|
|
|
+ }
|
|
|
+ if IsInStringArray(zc_code, RuleB) {
|
|
|
+ //hasNew = true
|
|
|
topinformation = append(topinformation, "情报_印务商机")
|
|
|
- } else if IsInStringArray(zc_code, RuleC) {
|
|
|
- hasNew = true
|
|
|
+ }
|
|
|
+ if IsInStringArray(zc_code, RuleC) {
|
|
|
+ //hasNew = true
|
|
|
topinformation = append(topinformation, "情报_家具招投标")
|
|
|
- } else if IsInStringArray(zc_code, RuleD) {
|
|
|
- hasNew = true
|
|
|
+ }
|
|
|
+ if IsInStringArray(zc_code, RuleD) {
|
|
|
+ //hasNew = true
|
|
|
topinformation = append(topinformation, "情报_车辆租赁")
|
|
|
}
|
|
|
+ if IsInStringArray(zc_code, RuleE) {
|
|
|
+ //hasNew = true
|
|
|
+ topinformation = append(topinformation, "情报_安防")
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// 有匹配新的标签,需要更新MongoDB以及ES 数据
|
|
|
- if hasNew {
|
|
|
- topinformation = removeDuplicates(topinformation) //去重
|
|
|
- //ToDo 1.更新MongoDB
|
|
|
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
- updateMgo := map[string]interface{}{
|
|
|
- "tag_topinformation": topinformation,
|
|
|
- "topinformation_time": time.Now().Unix(),
|
|
|
+ //if hasNew {
|
|
|
+ topinformation = removeDuplicates(topinformation) //去重
|
|
|
+ //ToDo 1.更新MongoDB
|
|
|
+ biddingID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ updateMgo := map[string]interface{}{
|
|
|
+ "tag_topinformation": topinformation,
|
|
|
+ "topinformation_time": time.Now().Unix(),
|
|
|
+ }
|
|
|
+
|
|
|
+ updateEs := map[string]interface{}{
|
|
|
+ "tag_topinformation": topinformation,
|
|
|
+ }
|
|
|
+
|
|
|
+ //log.Println("hasNew", " ====== ", biddingID)
|
|
|
+ Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": updateMgo})
|
|
|
+ //ToDo 2.更新es
|
|
|
+
|
|
|
+ //// 更新es
|
|
|
+ updateEsPool <- []map[string]interface{}{
|
|
|
+ {"_id": biddingID},
|
|
|
+ updateEs,
|
|
|
+ }
|
|
|
+
|
|
|
+ //if GF.Esa.URL != "" {
|
|
|
+ //_ = Esa.UpdateDocument("bidding", biddingID, updateEs)
|
|
|
+ //}
|
|
|
+ //
|
|
|
+ //if GF.Esb.URL != "" {
|
|
|
+ // _ = Esb.UpdateDocument("bidding", biddingID, updateEs)
|
|
|
+ //}
|
|
|
+ //}
|
|
|
+
|
|
|
+ }(tmp)
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+ log.Println("剑鱼码规则处理数据处理完毕")
|
|
|
+}
|
|
|
+
|
|
|
+// dealTopInformationAi2的基础上,开启协程
|
|
|
+func dealTopInformationAi4(where interface{}) {
|
|
|
+ defer util.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ count := 0
|
|
|
+
|
|
|
+ var lines = make([]string, 0)
|
|
|
+ it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(where).Select(nil).Sort("-_id").Iter()
|
|
|
+ sem := make(chan struct{}, 50) // 控制并发数量
|
|
|
+
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%100 == 0 {
|
|
|
+ log.Println("current:", count, tmp["title"], tmp["_id"])
|
|
|
+ }
|
|
|
+ title := util.ObjToString(tmp["title"])
|
|
|
+ projectname := util.ObjToString(tmp["projectname"])
|
|
|
+ biddingID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ lines = append(lines, fmt.Sprintf("title:%s,projectname:%s,id:%s", title, projectname, biddingID))
|
|
|
+ if len(lines) == 5 {
|
|
|
+ sem <- struct{}{} // 增加并发控制
|
|
|
+ go func(lines []string) {
|
|
|
+ defer func() { <-sem }() // 释放并发控制
|
|
|
+ report := strings.Join(lines, "\n")
|
|
|
+ resu, err := ZpAI4(key, model, report)
|
|
|
+ lines = make([]string, 0)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("智普请求失败,", err)
|
|
|
+ return
|
|
|
}
|
|
|
- updateEs := map[string]interface{}{
|
|
|
- "tag_topinformation": topinformation,
|
|
|
- }
|
|
|
- //log.Println("hasNew", " ====== ", biddingID)
|
|
|
- Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": updateMgo})
|
|
|
- //ToDo 2.更新es
|
|
|
- // 存量数据
|
|
|
- if timeType == timeTypeAll {
|
|
|
- if util.IntAll(tmp["extracttype"]) != 1 {
|
|
|
- continue
|
|
|
+ splitLines := strings.Split(resu, `;`)
|
|
|
+ for _, line := range splitLines {
|
|
|
+ matches := re.FindStringSubmatch(line)
|
|
|
+ if len(matches) == 5 {
|
|
|
+ class := matches[4]
|
|
|
+ id := matches[3]
|
|
|
+ tags := make([]string, 0)
|
|
|
+ if class == "其他分类" || class == "" || id == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ for _, v2 := range strings.Split(class, ",") {
|
|
|
+ if v2 == "车辆领域" {
|
|
|
+ tags = append(tags, "情报_车辆租赁")
|
|
|
+ } else if v2 == "安防领域" {
|
|
|
+ tags = append(tags, "情报_安防")
|
|
|
+ } else if v2 == "印务领域" {
|
|
|
+ tags = append(tags, "情报_印务商机")
|
|
|
+ } else if v2 == "环境领域" {
|
|
|
+ tags = append(tags, "情报_环境采购")
|
|
|
+ } else if v2 == "家具领域" {
|
|
|
+ tags = append(tags, "情报_家具招投标")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(tags) > 0 {
|
|
|
+ updateMgo := map[string]interface{}{
|
|
|
+ "tag_topinformation_zp": tags,
|
|
|
+ }
|
|
|
+ Mgo.UpdateById("bidding", id, map[string]interface{}{"$set": updateMgo})
|
|
|
+ updateEs := map[string]interface{}{
|
|
|
+ "tag_topinformation_zp": tags,
|
|
|
+ }
|
|
|
+ if count%1000 == 0 {
|
|
|
+ log.Println("update es", id, tags)
|
|
|
+ }
|
|
|
+ // 更新es
|
|
|
+ updateEsPool <- []map[string]interface{}{
|
|
|
+ {"_id": id},
|
|
|
+ updateEs,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }(lines)
|
|
|
+ // 清空 lines 切片而不是 reportBuilder
|
|
|
+ lines = make([]string, 0)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(lines) > 0 {
|
|
|
+ report := strings.Join(lines, "\n")
|
|
|
+ resu, err := ZpAI4(key, model, report)
|
|
|
+ lines = make([]string, 0)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("智普请求失败,", err, report)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ splitLines := strings.Split(resu, `;`)
|
|
|
+ for _, line := range splitLines {
|
|
|
+ matches := re.FindStringSubmatch(line)
|
|
|
+ if len(matches) == 5 {
|
|
|
+ class := matches[4]
|
|
|
+ id := matches[3]
|
|
|
+ tags := make([]string, 0)
|
|
|
+ if class == "其他分类" || class == "" || id == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ for _, v2 := range strings.Split(class, ",") {
|
|
|
+ if v2 == "车辆领域" {
|
|
|
+ tags = append(tags, "情报_车辆租赁")
|
|
|
+ } else if v2 == "安防领域" {
|
|
|
+ tags = append(tags, "情报_安防")
|
|
|
+ } else if v2 == "印务领域" {
|
|
|
+ tags = append(tags, "情报_印务商机")
|
|
|
+ } else if v2 == "环境领域" {
|
|
|
+ tags = append(tags, "情报_环境采购")
|
|
|
+ } else if v2 == "家具领域" {
|
|
|
+ tags = append(tags, "情报_家具招投标")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(tags) > 0 {
|
|
|
+ updateMgo := map[string]interface{}{
|
|
|
+ "tag_topinformation_zp": tags,
|
|
|
+ }
|
|
|
+ Mgo.UpdateById("bidding", id, map[string]interface{}{"$set": updateMgo})
|
|
|
+ updateEs := map[string]interface{}{
|
|
|
+ "tag_topinformation_zp": tags,
|
|
|
+ }
|
|
|
+ if count%1000 == 0 {
|
|
|
+ log.Println("update es", id, tags)
|
|
|
+ }
|
|
|
+ // 更新es
|
|
|
+ updateEsPool <- []map[string]interface{}{
|
|
|
+ {"_id": id},
|
|
|
+ updateEs,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("大模型处理数据处理完毕")
|
|
|
+}
|
|
|
+
|
|
|
+// dealTopInformationAi5 单个调用ZpAI4,多协程
|
|
|
+func dealTopInformationAi5(where interface{}) {
|
|
|
+ defer util.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ count := 0
|
|
|
+
|
|
|
+ key := "4d5206b1b297c1e7b77f9578edcb2cf7.TNU2i8G1oUNdR02i"
|
|
|
+ model := "glm-4-flash"
|
|
|
+ re := regexp.MustCompile(`title:(.*?),projectname:(.*?),id:(.*?),class:(.*?)(?:\s*$|\n)`)
|
|
|
+ ch := make(chan bool, 50)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(where).Select(nil).Sort("-_id").Iter()
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%100 == 0 {
|
|
|
+ log.Println("current:", count, tmp["title"], tmp["_id"])
|
|
|
+ }
|
|
|
+
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ ////
|
|
|
+ title := util.ObjToString(tmp["title"])
|
|
|
+ projectname := util.ObjToString(tmp["projectname"])
|
|
|
+ biddingID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ text := fmt.Sprintf("title:%s,projectname:%s,id:%s\n", title, projectname, biddingID)
|
|
|
+ resu, err := ZpAI4(key, model, text)
|
|
|
+ if len(resu) > 0 && err != nil {
|
|
|
+ splitLines := strings.Split(resu, `;`)
|
|
|
+ for _, line := range splitLines {
|
|
|
+ matches := re.FindStringSubmatch(line)
|
|
|
+ if len(matches) == 5 {
|
|
|
+ class := matches[4]
|
|
|
+ if class != "" && class != "其他分类" {
|
|
|
+ tags := make([]string, 0)
|
|
|
+ for _, v2 := range strings.Split(class, ",") {
|
|
|
+ if v2 == "车辆领域" {
|
|
|
+ tags = append(tags, "情报_车辆租赁")
|
|
|
+ } else if v2 == "安防领域" {
|
|
|
+ tags = append(tags, "情报_安防")
|
|
|
+ } else if v2 == "印务领域" {
|
|
|
+ tags = append(tags, "情报_印务商机")
|
|
|
+ } else if v2 == "环境领域" {
|
|
|
+ tags = append(tags, "情报_环境采购")
|
|
|
+ } else if v2 == "家具领域" {
|
|
|
+ tags = append(tags, "情报_家具招投标")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(tags) > 0 {
|
|
|
+ if biddingID != "" {
|
|
|
+ updateMgo := map[string]interface{}{
|
|
|
+ "tag_topinformation_zp": tags,
|
|
|
+ "topinformation_time": time.Now().Unix(),
|
|
|
+ }
|
|
|
+ Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": updateMgo})
|
|
|
+
|
|
|
+ if count%100 == 0 {
|
|
|
+ log.Println("update es", biddingID, tags)
|
|
|
+ }
|
|
|
+ updateEs := map[string]interface{}{
|
|
|
+ "tag_topinformation_zp": tags,
|
|
|
+ }
|
|
|
+ if GF.Esa.URL != "" {
|
|
|
+ log.Println("更新es ,", biddingID)
|
|
|
+ err := Esa.UpdateDocument("bidding", biddingID, updateEs)
|
|
|
+ if err != nil && err.Error() != "Document not updated: noop" {
|
|
|
+ log.Println("esa update err", biddingID, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if GF.Esb.URL != "" {
|
|
|
+ err := Esb.UpdateDocument("bidding", biddingID, updateEs)
|
|
|
+ if err != nil && err.Error() != "Document not updated: noop" {
|
|
|
+ log.Println("esb update err", biddingID, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- } else {
|
|
|
- // 增量数据
|
|
|
- if util.IntAll(tmp["dataprocess"]) != 8 {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }(tmp)
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("数据处理完毕")
|
|
|
+}
|
|
|
+
|
|
|
+// dealTopInformationAi6 更新提示语后的大模型调用
|
|
|
+func dealTopInformationAi6(where interface{}) {
|
|
|
+ defer util.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ count := 0
|
|
|
+ ch := make(chan bool, 50)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ var lines = make([]interface{}, 0)
|
|
|
+ it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(where).Select(nil).Sort("-_id").Iter()
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%100 == 0 {
|
|
|
+ jlog.Info("dealTopInformationAi6", zap.Any("current", count), zap.Any("title", tmp["title"]), zap.Any("_id", tmp["_id"]))
|
|
|
+ }
|
|
|
+
|
|
|
+ da := map[string]interface{}{
|
|
|
+ "id": mongodb.BsonIdToSId(tmp["_id"]),
|
|
|
+ "title": util.ObjToString(tmp["title"]),
|
|
|
+ }
|
|
|
+ rs, err := json.Marshal(&da)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err, da)
|
|
|
+ }
|
|
|
+ ra := string(rs)
|
|
|
+ lines = append(lines, ra)
|
|
|
+ if len(lines) == 5 {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+
|
|
|
+ go func(lines []interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ resu, err := ZpAI6(key, model, lines)
|
|
|
+ if err != nil {
|
|
|
+ jlog.Info("dealTopInformationAi6,第一次请求失败", zap.Error(err))
|
|
|
+ resu, err = ZpAI6(key, model, lines)
|
|
|
+ if err != nil {
|
|
|
+ jlog.Info("dealTopInformationAi6", zap.Any("再次请求失败", err))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, item := range resu["result"] {
|
|
|
+ tags := make([]string, 0)
|
|
|
+ class := util.ObjToString(item["class"])
|
|
|
+ id := util.ObjToString(item["id"])
|
|
|
+ if class == "其他分类" || class == "" || id == "" {
|
|
|
continue
|
|
|
}
|
|
|
+ oldBid, _ := Mgo.FindById(GF.Mongo.Coll, id, nil)
|
|
|
+ if existTop, okk := (*oldBid)["tag_topinformation"]; okk {
|
|
|
+ if tops, ok2 := existTop.([]interface{}); ok2 {
|
|
|
+ for _, v := range tops {
|
|
|
+ if util.ObjToString(v) == "情报_物业" {
|
|
|
+ tags = append(tags, util.ObjToString(v))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ for _, v2 := range strings.Split(class, ",") {
|
|
|
+ if v2 == "车辆领域" {
|
|
|
+ tags = append(tags, "情报_车辆租赁")
|
|
|
+ } else if v2 == "安防领域" {
|
|
|
+ tags = append(tags, "情报_安防")
|
|
|
+ } else if v2 == "印务领域" {
|
|
|
+ tags = append(tags, "情报_印务商机")
|
|
|
+ } else if v2 == "环境领域" {
|
|
|
+ tags = append(tags, "情报_环境采购")
|
|
|
+ } else if v2 == "家具领域" {
|
|
|
+ tags = append(tags, "情报_家具招投标")
|
|
|
+ } else if v2 == "法务领域" {
|
|
|
+ tags = append(tags, "情报_法务")
|
|
|
+ } else if v2 == "财务审计领域" {
|
|
|
+ tags = append(tags, "情报_财务审计")
|
|
|
+ } else if v2 == "招标代理领域" {
|
|
|
+ tags = append(tags, "情报_招标代理")
|
|
|
+ } else if v2 == "管理咨询领域" {
|
|
|
+ tags = append(tags, "情报_管理咨询")
|
|
|
+ } else if v2 == "保险领域" {
|
|
|
+ tags = append(tags, "情报_保险")
|
|
|
+ } else if v2 == "工程设计咨询领域" {
|
|
|
+ tags = append(tags, "情报_工程设计咨询")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(tags) > 0 {
|
|
|
+ updateMgo := map[string]interface{}{
|
|
|
+ "tag_topinformation": removeDuplicates(tags),
|
|
|
+ "tag_topinformation_time": time.Now().Unix(),
|
|
|
+ }
|
|
|
+ Mgo.UpdateById(GF.Mongo.Coll, id, map[string]interface{}{"$set": updateMgo})
|
|
|
+ updateEs := map[string]interface{}{
|
|
|
+ "tag_topinformation": tags,
|
|
|
+ }
|
|
|
+ updateEsPool <- []map[string]interface{}{
|
|
|
+ {"_id": id},
|
|
|
+ updateEs,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }(lines)
|
|
|
+
|
|
|
+ lines = make([]interface{}, 0)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+
|
|
|
+ if len(lines) > 0 {
|
|
|
+ resu, err := ZpAI6(key, model, lines)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, item := range resu["result"] {
|
|
|
+ tags := make([]string, 0)
|
|
|
+ class := util.ObjToString(item["class"])
|
|
|
+ id := util.ObjToString(item["id"])
|
|
|
+ if class == "其他分类" || class == "" || id == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ oldBid, _ := Mgo.FindById("bidding", id, nil)
|
|
|
+ if existTop, okk := (*oldBid)["tag_topinformation"]; okk {
|
|
|
+ if tops, ok2 := existTop.([]interface{}); ok2 {
|
|
|
+ for _, v := range tops {
|
|
|
+ if util.ObjToString(v) == "情报_物业" {
|
|
|
+ tags = append(tags, util.ObjToString(v))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for _, v2 := range strings.Split(class, ",") {
|
|
|
+ if v2 == "车辆领域" {
|
|
|
+ tags = append(tags, "情报_车辆租赁")
|
|
|
+ } else if v2 == "安防领域" {
|
|
|
+ tags = append(tags, "情报_安防")
|
|
|
+ } else if v2 == "印务领域" {
|
|
|
+ tags = append(tags, "情报_印务商机")
|
|
|
+ } else if v2 == "环境领域" {
|
|
|
+ tags = append(tags, "情报_环境采购")
|
|
|
+ } else if v2 == "家具领域" {
|
|
|
+ tags = append(tags, "情报_家具招投标")
|
|
|
+ } else if v2 == "法务领域" {
|
|
|
+ tags = append(tags, "情报_法务")
|
|
|
+ } else if v2 == "财务审计领域" {
|
|
|
+ tags = append(tags, "情报_财务审计")
|
|
|
+ } else if v2 == "招标代理领域" {
|
|
|
+ tags = append(tags, "情报_招标代理")
|
|
|
+ } else if v2 == "管理咨询领域" {
|
|
|
+ tags = append(tags, "情报_管理咨询")
|
|
|
+ } else if v2 == "保险领域" {
|
|
|
+ tags = append(tags, "情报_保险")
|
|
|
+ } else if v2 == "工程设计咨询领域" {
|
|
|
+ tags = append(tags, "情报_工程设计咨询")
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- err := Esa.UpdateDocument("bidding", biddingID, updateEs)
|
|
|
- if err != nil && err.Error() != "Document not updated: noop" {
|
|
|
- log.Println("esa update err", biddingID, err)
|
|
|
+ if len(tags) > 0 {
|
|
|
+ updateMgo := map[string]interface{}{
|
|
|
+ "tag_topinformation": removeDuplicates(tags),
|
|
|
+ }
|
|
|
+ Mgo.UpdateById(GF.Mongo.Coll, id, map[string]interface{}{"$set": updateMgo})
|
|
|
+ updateEs := map[string]interface{}{
|
|
|
+ "tag_topinformation": tags,
|
|
|
}
|
|
|
- err = Esb.UpdateDocument("bidding", biddingID, updateEs)
|
|
|
- if err != nil && err.Error() != "Document not updated: noop" {
|
|
|
- log.Println("esb update err", biddingID, err)
|
|
|
+ updateEsPool <- []map[string]interface{}{
|
|
|
+ {"_id": id},
|
|
|
+ updateEs,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ jlog.Info("dealTopInformationAi6", zap.Any("数据处理完毕", count))
|
|
|
+}
|
|
|
+
|
|
|
+func dealTopInformationAi7(where interface{}) {
|
|
|
+ defer util.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ count := 0
|
|
|
+ ch := make(chan bool, 50)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ var lines = make([]interface{}, 0)
|
|
|
+ it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(where).Select(nil).Sort("-_id").Iter()
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%100 == 0 {
|
|
|
+ jlog.Info("dealTopInformationAi6", zap.Any("current", count), zap.Any("title", tmp["title"]), zap.Any("_id", tmp["_id"]))
|
|
|
+ }
|
|
|
+
|
|
|
+ // 判断之前物业标签是否有无
|
|
|
+ hasWy := 0
|
|
|
+ if existTop, okk := (tmp)["tag_topinformation"]; okk {
|
|
|
+ if tops, ok2 := existTop.([]interface{}); ok2 {
|
|
|
+ for _, v := range tops {
|
|
|
+ if util.ObjToString(v) == "情报_物业" {
|
|
|
+ hasWy = 1
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ title := util.ObjToString(tmp["title"])
|
|
|
+ projectName := util.ObjToString(tmp["projectname"])
|
|
|
+ winner := util.ObjToString(tmp["s_winner"])
|
|
|
+ for _, v := range RuleF {
|
|
|
+ if strings.Contains(winner, v) {
|
|
|
+ FawuMap.Store(title, 1)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if strings.Contains(title, v) {
|
|
|
+ FawuMap.Store(title, 1)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if strings.Contains(projectName, v) {
|
|
|
+ FawuMap.Store(title, 1)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ hasWyMap.Store(title, hasWy)
|
|
|
+ titleMap.Store(title, mongodb.BsonIdToSId(tmp["_id"]))
|
|
|
+
|
|
|
+ da := map[string]interface{}{
|
|
|
+ "id": mongodb.BsonIdToSId(tmp["_id"]),
|
|
|
+ "title": util.ObjToString(tmp["title"]),
|
|
|
+ }
|
|
|
+ rs, err := json.Marshal(&da)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err, da)
|
|
|
+ }
|
|
|
+ ra := string(rs)
|
|
|
+ lines = append(lines, ra)
|
|
|
+ if len(lines) == 5 {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+
|
|
|
+ go func(lines []interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ resu, err := ZpAI7(key, model, lines)
|
|
|
+ if err != nil {
|
|
|
+ jlog.Info("dealTopInformationAi7,第一次请求失败", zap.Error(err))
|
|
|
+ resu, err = ZpAI7(key, model, lines)
|
|
|
+ if err != nil {
|
|
|
+ jlog.Info("dealTopInformationAi7", zap.Any("再次请求失败", err))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ dealResu(resu)
|
|
|
+
|
|
|
+ }(lines)
|
|
|
+
|
|
|
+ lines = make([]interface{}, 0)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+
|
|
|
+ if len(lines) > 0 {
|
|
|
+ resu, err := ZpAI7(key, model, lines)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ }
|
|
|
+ dealResu(resu)
|
|
|
+ }
|
|
|
+ jlog.Info("dealTopInformationAi6", zap.Any("数据处理完毕", count))
|
|
|
+}
|
|
|
+
|
|
|
+func dealAi(datas []RequestData) {
|
|
|
+
|
|
|
+ // 有匹配新的标签,需要更新MongoDB以及ES 数据
|
|
|
+ //if hasNew {
|
|
|
+ // topinformation = removeDuplicates(topinformation) //去重
|
|
|
+ // //ToDo 1.更新MongoDB
|
|
|
+ // updateMgo := map[string]interface{}{
|
|
|
+ // "tag_topinformation": topinformation,
|
|
|
+ // "topinformation_time": time.Now().Unix(),
|
|
|
+ // }
|
|
|
+ // updateEs := map[string]interface{}{
|
|
|
+ // "tag_topinformation": topinformation,
|
|
|
+ // }
|
|
|
+ // //log.Println("hasNew", " ====== ", biddingID)
|
|
|
+ // Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": updateMgo})
|
|
|
+ // //ToDo 2.更新es
|
|
|
+ //
|
|
|
+ // if GF.Esa.URL != "" {
|
|
|
+ // err := Esa.UpdateDocument("bidding", biddingID, updateEs)
|
|
|
+ // if err != nil && err.Error() != "Document not updated: noop" {
|
|
|
+ // log.Println("esa update err", biddingID, err)
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // if GF.Esb.URL != "" {
|
|
|
+ // err := Esb.UpdateDocument("bidding", biddingID, updateEs)
|
|
|
+ // if err != nil && err.Error() != "Document not updated: noop" {
|
|
|
+ // log.Println("esb update err", biddingID, err)
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+}
|
|
|
+
|
|
|
+// updateEsMethod 更新es
|
|
|
+func updateEsMethod() {
|
|
|
+ arru := make([][]map[string]interface{}, 200) //200条一组更新es
|
|
|
+ indexu := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-updateEsPool:
|
|
|
+ arru[indexu] = v
|
|
|
+ indexu++
|
|
|
+ if indexu == 200 {
|
|
|
+ updateEsSp <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-updateEsSp
|
|
|
+ }()
|
|
|
+ Esa.UpdateBulk(GF.Env.Esindex, arru...)
|
|
|
+ if Esb.S_esurl != "" {
|
|
|
+ Esb.UpdateBulk(GF.Env.Esindex, arru...)
|
|
|
+ }
|
|
|
+ }(arru)
|
|
|
+ arru = make([][]map[string]interface{}, 200)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ case <-time.After(1000 * time.Millisecond):
|
|
|
+ if indexu > 0 {
|
|
|
+ updateEsSp <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-updateEsSp
|
|
|
+ }()
|
|
|
+ Esa.UpdateBulk(GF.Env.Esindex, arru...)
|
|
|
+ if Esb.S_esurl != "" {
|
|
|
+ Esb.UpdateBulk(GF.Env.Esindex, arru...)
|
|
|
+ }
|
|
|
+ }(arru[:indexu])
|
|
|
+ arru = make([][]map[string]interface{}, 200)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// updateMethod 更新MongoDB
|
|
|
+func updateMethod() {
|
|
|
+ arru := make([][]map[string]interface{}, 50)
|
|
|
+ indexu := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-updatePool:
|
|
|
+ arru[indexu] = v
|
|
|
+ indexu++
|
|
|
+ if indexu == 50 {
|
|
|
+ updateSp <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-updateSp
|
|
|
+ }()
|
|
|
+ Mgo.UpdateBulk(GF.Mongo.Coll, arru...)
|
|
|
+ }(arru)
|
|
|
+ arru = make([][]map[string]interface{}, 50)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ case <-time.After(1000 * time.Millisecond):
|
|
|
+ if indexu > 0 {
|
|
|
+ updateSp <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-updateSp
|
|
|
+ }()
|
|
|
+ Mgo.UpdateBulk(GF.Mongo.Coll, arru...)
|
|
|
+ }(arru[:indexu])
|
|
|
+ arru = make([][]map[string]interface{}, 50)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// dealResu 处理大模型返回结果,放入通道更新
|
|
|
+func dealResu(resu map[string][]map[string]interface{}) {
|
|
|
+ for _, item := range resu["result"] {
|
|
|
+ tags := make([]string, 0)
|
|
|
+ class := util.ObjToString(item["class"])
|
|
|
+ title := util.ObjToString(item["title"])
|
|
|
+ if class == "" || title == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ hasWy, _ := hasWyMap.Load(title)
|
|
|
+ id, _ := titleMap.Load(title)
|
|
|
+ fawu, _ := FawuMap.Load(title)
|
|
|
+ idStr := util.ObjToString(id)
|
|
|
+ if idStr == "" || title == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ if util.IntAll(hasWy) > 0 {
|
|
|
+ tags = append(tags, "情报_物业")
|
|
|
+ }
|
|
|
+ if util.IntAll(fawu) > 0 {
|
|
|
+ tags = append(tags, "情报_法务")
|
|
|
+ }
|
|
|
+ hasWyMap.Delete(title)
|
|
|
+ titleMap.Delete(title)
|
|
|
+ FawuMap.Delete(title)
|
|
|
+
|
|
|
+ for _, v2 := range strings.Split(class, ",") {
|
|
|
+ if v2 == "车辆领域" {
|
|
|
+ tags = append(tags, "情报_车辆租赁")
|
|
|
+ } else if v2 == "安防领域" {
|
|
|
+ tags = append(tags, "情报_安防")
|
|
|
+ } else if v2 == "印务领域" {
|
|
|
+ tags = append(tags, "情报_印务商机")
|
|
|
+ } else if v2 == "环境领域" {
|
|
|
+ tags = append(tags, "情报_环境采购")
|
|
|
+ } else if v2 == "家具领域" {
|
|
|
+ tags = append(tags, "情报_家具招投标")
|
|
|
+ } else if v2 == "法务领域" {
|
|
|
+ tags = append(tags, "情报_法务")
|
|
|
+ } else if v2 == "财务审计领域" {
|
|
|
+ tags = append(tags, "情报_财务审计")
|
|
|
+ } else if v2 == "招标代理领域" {
|
|
|
+ tags = append(tags, "情报_招标代理")
|
|
|
+ } else if v2 == "管理咨询领域" {
|
|
|
+ tags = append(tags, "情报_管理咨询")
|
|
|
+ } else if v2 == "保险领域" {
|
|
|
+ tags = append(tags, "情报_保险")
|
|
|
+ } else if v2 == "工程设计咨询领域" {
|
|
|
+ tags = append(tags, "情报_工程设计咨询")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tags = removeDuplicates(tags)
|
|
|
+
|
|
|
+ if len(tags) > 0 {
|
|
|
+ updateMgo := map[string]interface{}{
|
|
|
+ "tag_topinformation": tags,
|
|
|
+ }
|
|
|
+ //更新MongoDB
|
|
|
+ updatePool <- []map[string]interface{}{
|
|
|
+ {"_id": mongodb.StringTOBsonId(idStr)},
|
|
|
+ {"$set": updateMgo},
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //更新es
|
|
|
+ updateES := map[string]interface{}{
|
|
|
+ "tag_topinformation": tags,
|
|
|
+ }
|
|
|
+ updateEsPool <- []map[string]interface{}{
|
|
|
+ {"_id": id},
|
|
|
+ updateES,
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- log.Println("数据处理完毕")
|
|
|
}
|