package main import ( elastic "app.yhyue.com/moapp/jybase/es" "app.yhyue.com/moapp/jybase/mongodb" "dataflow/util" "fmt" "github.com/nats-io/nats.go" "go.mongodb.org/mongo-driver/bson" "jygit.jydev.jianyu360.cn/BP/jynats/jnats" u "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "log" "regexp" "time" ) const MGO_SAVE, MGO_UPDATE = "s", "u" var ( conf util.Conf ThreadsLimit int MgoB *mongodb.MongodbSim MgoQ *mongodb.MongodbSim MgoP *mongodb.MongodbSim jn *jnats.Jnats NatsThreads chan bool Es elastic.Es //mgo DataSaveCache = make(chan map[string]interface{}, 1000) DataUpdateCache = make(chan []map[string]interface{}, 1000) DataSaveThreads = make(chan bool, 5) DataUpdateThreads = make(chan bool, 5) //other regLetter = regexp.MustCompile("[a-z]*") filterFileType = regexp.MustCompile("(jpg|jpeg|png|pdf)") ) func init() { conf = util.GetConf() jn = jnats.NewJnats(conf.Config.Natsurl) MgoB = mongodb.NewMgoWithUser(conf.Config.Mongodb.Addr, conf.Config.Mongodb.Dbname, conf.Config.Mongodb.Username, conf.Config.Mongodb.Password, conf.Config.Mongodb.Dbsize) MgoQ = mongodb.NewMgoWithUser(conf.Config.MongodbQ.Addr, conf.Config.MongodbQ.Dbname, conf.Config.MongodbQ.Username, conf.Config.MongodbQ.Password, conf.Config.MongodbQ.Dbsize) MgoP = mongodb.NewMgoWithUser(conf.Config.MongodbP.Addr, conf.Config.MongodbP.Dbname, conf.Config.MongodbP.Username, conf.Config.MongodbP.Password, conf.Config.MongodbP.Dbsize) redis.InitRedis1(conf.Config.Redis.Addr, conf.Config.Redis.DbIndex) InitOss() InitFileInfo() InitKeywordClient() InitEs() NatsThreads = make(chan bool, conf.Config.Threads) } func main() { go SaveBulkData() go UpdateBulkData() SubscribeNats() select {} } func SubscribeNats() { //先消费,带压缩 jn.SubZip(conf.Config.Process.Subject+"."+conf.Config.Process.Step, func(msg *nats.Msg) { NatsThreads <- true go func(msg *nats.Msg) { defer func() { <-NatsThreads }() data := &util.MsgInfo{} err := bson.Unmarshal(msg.Data, &data) if err != nil { log.Println("解析数据失败:", err) data.Err = err.Error() //SaveErrData()//保存异常数据 } else { //处理数据 data.Stime = time.Now().Unix() data.CurrSetp = conf.Config.Process.Step if data.Extend.MgoSave.SType == MGO_SAVE { //保存 SaveDealData(data.Data) } else if data.Extend.MgoSave.SType == MGO_UPDATE { //更新 UpdateDealData(data.Id, data.Data) } data.Etime = time.Now().Unix() } //消息回写 bs, _ := bson.Marshal(data) err = msg.Respond(bs) if err != nil { fmt.Println("回执失败:", data.Id, data.Data["_id"]) //SaveErrData()//保存异常数据 } }(msg) }) } // 保存 func SaveDealData(data map[string]interface{}) { //分类及部分字段处理 fieldFun(data) //补充publishtime if u.IntAll(data["publishtime"]) == -1 { methodPb(data) //修正发布时间 } //keyword关键词 DealInfo(map[string]interface{}{}, data) // entidlist if s_winner, ok := data["s_winner"].(string); ok && s_winner != "" { cid := companyFun(s_winner) if len(cid) > 0 { data["entidlist"] = cid } } //剑鱼发布信息分类处理 //typeFunc(data)//单独数据流处理 // 附件有效字段 if r := validFile(data); r != 0 { if r == -1 { data["isValidFile"] = false } else { data["isValidFile"] = true } } //情报标签字段 if data["tag_topinformation"] != nil { data["tag_set"] = getTagSet(data) } //放入通道 DataSaveCache <- data } // 更新 func UpdateDealData(id string, data map[string]interface{}) { fields := map[string]interface{}{ "detail": 0, "contenthtml": 0, } bid, _ := MgoB.FindById("bidding", id, fields) if bid != nil && len(*bid) > 0 { //找到数据 modifyinfo := make(map[string]bool) if tmpmodifyinfo, ok := (*bid)["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil { for k := range tmpmodifyinfo { modifyinfo[k] = true } } update := map[string]interface{}{} del := map[string]interface{}{} for _, k := range conf.Config.Fields { tmpV := data[k] //extract v1 bidV := (*bid)[k] //bidding v2 if bidV == nil && tmpV != nil { update[k] = tmpV } else if bidV != nil && tmpV != nil && !modifyinfo[k] { update[k] = tmpV } else if bidV != nil && tmpV == nil && !modifyinfo[k] { if k == "s_subscopeclass" && del["subscopeclass"] == nil { continue } else if k == "s_topscopeclass" && del["topscopeclass"] == nil { continue } del[k] = 1 } } //分类及部分字段处理 fieldFun(data) //publishtime if u.IntAll(data["publishtime"]) == -1 { methodPb(data) //修正发布时间 } // entidlist if s_winner, ok := data["s_winner"].(string); ok && s_winner != "" { cid := companyFun(s_winner) if len(cid) > 0 { data["entidlist"] = cid } } //剑鱼发布信息分类处理 //typeFunc(data) // 附件有效字段 //if r := validFile(data); r != 0 { // if r == -1 { // data["isValidFile"] = false // } else { // data["isValidFile"] = true // } //} //情报标签字段 //if data["tag_topinformation"] != nil { // data["tag_set"] = getTagSet(data) //} //放入通道 if len(del) > 0 { DataUpdateCache <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(id)}, {"$set": update, "$unset": del}, } } else { DataUpdateCache <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(id)}, {"$set": update, "$unset": del}, } } } else { //未找到数据 log.Println("未找到bidding数据:", id) } } // 批量保存data_bak func SaveBulkData() { log.Println("Save Data...") savearr := make([]map[string]interface{}, 200) index_save := 0 for { select { case v := <-DataSaveCache: savearr[index_save] = v index_save++ if index_save == 20 { DataSaveThreads <- true go func(tmp []map[string]interface{}) { defer func() { <-DataSaveThreads }() MgoB.SaveBulk("bidding", tmp...) }(savearr) savearr = make([]map[string]interface{}, 200) index_save = 0 } case <-time.After(5 * time.Second): if index_save > 0 { DataSaveThreads <- true go func(tmp []map[string]interface{}) { defer func() { <-DataSaveThreads }() MgoB.SaveBulk("bidding", tmp...) }(savearr[:index_save]) savearr = make([]map[string]interface{}, 200) index_save = 0 } } } } // 批量更新心跳信息 func UpdateBulkData() { log.Println("Update Data...") heartarr := make([][]map[string]interface{}, 200) index_update := 0 for { select { case v := <-DataUpdateCache: heartarr[index_update] = v index_update++ if index_update == 20 { DataUpdateThreads <- true go func(tmp [][]map[string]interface{}) { defer func() { <-DataUpdateThreads }() MgoB.UpdateBulk("bidding", tmp...) }(heartarr) heartarr = make([][]map[string]interface{}, 200) index_update = 0 } case <-time.After(5 * time.Second): if index_update > 0 { DataUpdateThreads <- true go func(tmp [][]map[string]interface{}) { defer func() { <-DataUpdateThreads }() MgoB.UpSertBulk("bidding", tmp...) }(heartarr[:index_update]) heartarr = make([][]map[string]interface{}, 200) index_update = 0 } } } }