|
- package main
- import (
- elastic "app.yhyue.com/moapp/jybase/es"
- "app.yhyue.com/moapp/jybase/mongodb"
- "dataflow/util"
- "fmt"
- "github.com/nats-io/nats.go"
- "go.mongodb.org/mongo-driver/bson"
- "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
- u "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
- "log"
- "regexp"
- "time"
- )
- const MGO_SAVE, MGO_UPDATE = "s", "u"
- var (
- conf util.Conf
- ThreadsLimit int
- MgoB *mongodb.MongodbSim
- MgoQ *mongodb.MongodbSim
- MgoP *mongodb.MongodbSim
- jn *jnats.Jnats
- NatsThreads chan bool
- Es elastic.Es
- //mgo
- DataSaveCache = make(chan map[string]interface{}, 1000)
- DataUpdateCache = make(chan []map[string]interface{}, 1000)
- DataSaveThreads = make(chan bool, 5)
- DataUpdateThreads = make(chan bool, 5)
- //other
- regLetter = regexp.MustCompile("[a-z]*")
- filterFileType = regexp.MustCompile("(jpg|jpeg|png|pdf)")
- )
- func init() {
- conf = util.GetConf()
- jn = jnats.NewJnats(conf.Config.Natsurl)
- MgoB = mongodb.NewMgoWithUser(conf.Config.Mongodb.Addr, conf.Config.Mongodb.Dbname, conf.Config.Mongodb.Username, conf.Config.Mongodb.Password, conf.Config.Mongodb.Dbsize)
- MgoQ = mongodb.NewMgoWithUser(conf.Config.MongodbQ.Addr, conf.Config.MongodbQ.Dbname, conf.Config.MongodbQ.Username, conf.Config.MongodbQ.Password, conf.Config.MongodbQ.Dbsize)
- MgoP = mongodb.NewMgoWithUser(conf.Config.MongodbP.Addr, conf.Config.MongodbP.Dbname, conf.Config.MongodbP.Username, conf.Config.MongodbP.Password, conf.Config.MongodbP.Dbsize)
- redis.InitRedis1(conf.Config.Redis.Addr, conf.Config.Redis.DbIndex)
- InitOss()
- InitFileInfo()
- InitKeywordClient()
- InitEs()
- NatsThreads = make(chan bool, conf.Config.Threads)
- }
- func main() {
- go SaveBulkData()
- go UpdateBulkData()
- SubscribeNats()
- select {}
- }
- func SubscribeNats() {
- //先消费,带压缩
- jn.SubZip(conf.Config.Process.Subject+"."+conf.Config.Process.Step, func(msg *nats.Msg) {
- NatsThreads <- true
- go func(msg *nats.Msg) {
- defer func() {
- <-NatsThreads
- }()
- data := &util.MsgInfo{}
- err := bson.Unmarshal(msg.Data, &data)
- if err != nil {
- log.Println("解析数据失败:", err)
- data.Err = err.Error()
- //SaveErrData()//保存异常数据
- } else {
- //处理数据
- data.Stime = time.Now().Unix()
- data.CurrSetp = conf.Config.Process.Step
- if data.Extend.MgoSave.SType == MGO_SAVE { //保存
- SaveDealData(data.Data)
- } else if data.Extend.MgoSave.SType == MGO_UPDATE { //更新
- UpdateDealData(data.Id, data.Data)
- }
- data.Etime = time.Now().Unix()
- }
- //消息回写
- bs, _ := bson.Marshal(data)
- err = msg.Respond(bs)
- if err != nil {
- fmt.Println("回执失败:", data.Id, data.Data["_id"])
- //SaveErrData()//保存异常数据
- }
- }(msg)
- })
- }
- // 保存
- func SaveDealData(data map[string]interface{}) {
- //分类及部分字段处理
- fieldFun(data)
- //补充publishtime
- if u.IntAll(data["publishtime"]) == -1 {
- methodPb(data) //修正发布时间
- }
- //keyword关键词
- DealInfo(map[string]interface{}{}, data)
- // entidlist
- if s_winner, ok := data["s_winner"].(string); ok && s_winner != "" {
- cid := companyFun(s_winner)
- if len(cid) > 0 {
- data["entidlist"] = cid
- }
- }
- //剑鱼发布信息分类处理
- //typeFunc(data)//单独数据流处理
- // 附件有效字段
- if r := validFile(data); r != 0 {
- if r == -1 {
- data["isValidFile"] = false
- } else {
- data["isValidFile"] = true
- }
- }
- //情报标签字段
- if data["tag_topinformation"] != nil {
- data["tag_set"] = getTagSet(data)
- }
- //放入通道
- DataSaveCache <- data
- }
- // 更新
- func UpdateDealData(id string, data map[string]interface{}) {
- fields := map[string]interface{}{
- "detail": 0,
- "contenthtml": 0,
- }
- bid, _ := MgoB.FindById("bidding", id, fields)
- if bid != nil && len(*bid) > 0 { //找到数据
- modifyinfo := make(map[string]bool)
- if tmpmodifyinfo, ok := (*bid)["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil {
- for k := range tmpmodifyinfo {
- modifyinfo[k] = true
- }
- }
- update := map[string]interface{}{}
- del := map[string]interface{}{}
- for _, k := range conf.Config.Fields {
- tmpV := data[k] //extract v1
- bidV := (*bid)[k] //bidding v2
- if bidV == nil && tmpV != nil {
- update[k] = tmpV
- } else if bidV != nil && tmpV != nil && !modifyinfo[k] {
- update[k] = tmpV
- } else if bidV != nil && tmpV == nil && !modifyinfo[k] {
- if k == "s_subscopeclass" && del["subscopeclass"] == nil {
- continue
- } else if k == "s_topscopeclass" && del["topscopeclass"] == nil {
- continue
- }
- del[k] = 1
- }
- }
- //分类及部分字段处理
- fieldFun(data)
- //publishtime
- if u.IntAll(data["publishtime"]) == -1 {
- methodPb(data) //修正发布时间
- }
- // entidlist
- if s_winner, ok := data["s_winner"].(string); ok && s_winner != "" {
- cid := companyFun(s_winner)
- if len(cid) > 0 {
- data["entidlist"] = cid
- }
- }
- //剑鱼发布信息分类处理
- //typeFunc(data)
- // 附件有效字段
- //if r := validFile(data); r != 0 {
- // if r == -1 {
- // data["isValidFile"] = false
- // } else {
- // data["isValidFile"] = true
- // }
- //}
- //情报标签字段
- //if data["tag_topinformation"] != nil {
- // data["tag_set"] = getTagSet(data)
- //}
- //放入通道
- if len(del) > 0 {
- DataUpdateCache <- []map[string]interface{}{
- {"_id": mongodb.StringTOBsonId(id)},
- {"$set": update, "$unset": del},
- }
- } else {
- DataUpdateCache <- []map[string]interface{}{
- {"_id": mongodb.StringTOBsonId(id)},
- {"$set": update, "$unset": del},
- }
- }
- } else { //未找到数据
- log.Println("未找到bidding数据:", id)
- }
- }
- // 批量保存data_bak
- func SaveBulkData() {
- log.Println("Save Data...")
- savearr := make([]map[string]interface{}, 200)
- index_save := 0
- for {
- select {
- case v := <-DataSaveCache:
- savearr[index_save] = v
- index_save++
- if index_save == 20 {
- DataSaveThreads <- true
- go func(tmp []map[string]interface{}) {
- defer func() {
- <-DataSaveThreads
- }()
- MgoB.SaveBulk("bidding", tmp...)
- }(savearr)
- savearr = make([]map[string]interface{}, 200)
- index_save = 0
- }
- case <-time.After(5 * time.Second):
- if index_save > 0 {
- DataSaveThreads <- true
- go func(tmp []map[string]interface{}) {
- defer func() {
- <-DataSaveThreads
- }()
- MgoB.SaveBulk("bidding", tmp...)
- }(savearr[:index_save])
- savearr = make([]map[string]interface{}, 200)
- index_save = 0
- }
- }
- }
- }
- // 批量更新心跳信息
- func UpdateBulkData() {
- log.Println("Update Data...")
- heartarr := make([][]map[string]interface{}, 200)
- index_update := 0
- for {
- select {
- case v := <-DataUpdateCache:
- heartarr[index_update] = v
- index_update++
- if index_update == 20 {
- DataUpdateThreads <- true
- go func(tmp [][]map[string]interface{}) {
- defer func() {
- <-DataUpdateThreads
- }()
- MgoB.UpdateBulk("bidding", tmp...)
- }(heartarr)
- heartarr = make([][]map[string]interface{}, 200)
- index_update = 0
- }
- case <-time.After(5 * time.Second):
- if index_update > 0 {
- DataUpdateThreads <- true
- go func(tmp [][]map[string]interface{}) {
- defer func() {
- <-DataUpdateThreads
- }()
- MgoB.UpSertBulk("bidding", tmp...)
- }(heartarr[:index_update])
- heartarr = make([][]map[string]interface{}, 200)
- index_update = 0
- }
- }
- }
- }
|