123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- package main
- import (
- . "app.yhyue.com/moapp/jybase/mongodb"
- "flag"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gctx"
- "github.com/gogf/gf/v2/os/gfsnotify"
- "google.golang.org/grpc"
- "gopkg.in/natefinch/lumberjack.v2"
- "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
- "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
- "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
- ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss"
- "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
- "log"
- "net"
- "net/http"
- "net/rpc"
- "strings"
- "sync"
- "time"
- )
- func main() {
- ctx := gctx.New()
- var logger *lumberjack.Logger
- g.Config().MustGet(ctx, "logger").Struct(&logger)
- log.SetOutput(logger)
- // 初始化OSS帐号与bucket信息
- ossService.LoadOSSAccounts()
- // 注册一个回调函数,当配置发生变更时会被调用
- downloadPoolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int()
- getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int()
- getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int()
- gfsnotify.Add("./config.yaml", func(event *gfsnotify.Event) {
- log.Println(event.String())
- if event.IsWrite() || event.IsChmod() || event.IsRename() {
- log.Println("配置文件有变化,更新内存。。。")
- ossService.LoadOSSAccounts()
- if poolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int(); downloadPoolSize != poolSize {
- downloadPoolSize = poolSize
- ossService.DownloadPool = make(chan bool, downloadPoolSize)
- }
- if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize {
- getDetailFromEsPoolSize = poolSize
- ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize)
- }
- if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize {
- getDetailFromMgoPoolSize = poolSize
- ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize)
- }
- }
- })
- // 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量
- go func() {
- ticker := time.NewTicker(5 * time.Second)
- var onlineNodesPrevWarn, downloadQueuePrevWarn, uploadQueuePrevWarn, getDetailQueuePrevWarn int64
- ctx := gctx.New()
- for range ticker.C {
- util.SendHeartbeat(ctx)
- util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn)
- util.CheckDownloadQueue(ctx, &downloadQueuePrevWarn)
- util.CheckUploadQueue(ctx, &uploadQueuePrevWarn)
- util.CheckGetDetailQueue(ctx, &getDetailQueuePrevWarn)
- }
- }()
- go func() {
- //创建一个grpc 服务器
- s := grpc.NewServer()
- //注册事件
- pb.RegisterServiceServer(s, &ossService.Grpc{})
- grpcPort := g.Config().MustGet(gctx.New(), "grpcPort").String()
- //处理链接
- listen, err := net.Listen("tcp", grpcPort)
- if err != nil {
- log.Println(err)
- } else {
- log.Println("grpc server is listening", grpcPort)
- }
- s.Serve(listen)
- }()
- // 启动RPC服务:注册OSSService,实现接口调用
- rpcService := new(ossService.OSSService)
- rpc.Register(rpcService)
- rpc.HandleHTTP()
- http.HandleFunc(constant.UploadUrl, ossService.UploadHandler)
- http.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler)
- http.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler)
- http.HandleFunc("/ossservice/nodes", ossService.NodesHandler)
- http.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler)
- port := g.Config().MustGet(ctx, "port").String()
- log.Println("HTTP server started on " + port)
- if err := http.ListenAndServe(port, nil); err != nil {
- //if err := endless.ListenAndServe(port, nil, func() {}); err != nil {
- log.Fatalln("HTTP server error", err)
- }
- }
- // /////////////////
- func main11() {
- start := flag.Int64("s", 0, "")
- end := flag.Int64("e", 0, "")
- poolSize := flag.Int("p", 5, "")
- flag.Parse()
- // 初始化OSS帐号与bucket信息
- ossService.LoadOSSAccounts()
- sess := config.Mgo.GetMgoConn()
- defer config.Mgo.DestoryMongoConn(sess)
- query := map[string]interface{}{}
- idQuery := map[string]interface{}{}
- if *start != 0 && *start == *end {
- query["comeintime"] = *start
- } else {
- if *start != 0 {
- idQuery["$gte"] = *start
- }
- if *end != 0 {
- idQuery["$lte"] = *end
- }
- }
- if len(idQuery) > 0 {
- query["comeintime"] = idQuery
- }
- log.Println("start。。。", query)
- it := sess.DB(config.Mgo.DbName).C(g.Config().MustGet(gctx.New(), "mongodb.collection").String()).Find(query).Select(map[string]interface{}{
- "_id": 1,
- "detail": 1,
- "contenthtml": 1,
- "comeintime": 1,
- }).Sort("-comeintime").Iter()
- pool := make(chan bool, *poolSize)
- wait := &sync.WaitGroup{}
- index := 0
- for m := make(map[string]interface{}); it.Next(&m); {
- pool <- true
- wait.Add(1)
- index++
- if index%5000 == 0 {
- log.Println(index, m["_id"], m["comeintime"])
- }
- go func(mm map[string]interface{}) {
- defer func() {
- <-pool
- wait.Done()
- }()
- objectName := BsonIdToSId(mm["_id"]) + ".txt"
- detail, _ := mm["detail"].(string)
- for {
- if err1 := ossService.Upload("detail", objectName, strings.NewReader(detail), false); err1 != nil {
- log.Println("detail", objectName, err1)
- time.Sleep(3 * time.Second)
- } else {
- break
- }
- }
- contenthtml, _ := mm["contenthtml"].(string)
- for {
- if err2 := ossService.Upload("contenthtml", objectName, strings.NewReader(contenthtml), false); err2 != nil {
- log.Println("contenthtml", objectName, err2)
- time.Sleep(3 * time.Second)
- } else {
- break
- }
- }
- }(m)
- m = make(map[string]interface{})
- }
- wait.Wait()
- log.Println("over。。。", index)
- }
|