123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- package main
- import (
- . "app.yhyue.com/moapp/jybase/mongodb"
- osr "app.yhyue.com/moapp/jybase/overseer"
- "flag"
- "fmt"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gctx"
- "github.com/gogf/gf/v2/os/gfsnotify"
- "google.golang.org/grpc"
- "gopkg.in/natefinch/lumberjack.v2"
- "io"
- "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
- "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
- "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
- ossService "jygit.jydev.jianyu360.cn/BaseService/ossService/oss"
- "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
- "log"
- "net"
- "net/http"
- "net/rpc"
- "os"
- "strings"
- "sync"
- "time"
- )
- const Version = "V1.3"
- func main() {
- ctx := gctx.New()
- osr.Run(osr.Config{
- Addresses: []string{g.Config().MustGet(ctx, "port").String(), g.Config().MustGet(ctx, "grpcPort").String()}, // 多个监听地址
- Program: program,
- })
- }
- func program(state osr.State) {
- pid := os.Getpid()
- log.Println("ppid", os.Getppid(), "子进程", pid, "启动,监听端口", strings.Join(state.Addresses, " "))
- ctx := gctx.New()
- var logger *lumberjack.Logger
- g.Config().MustGet(ctx, "logger").Struct(&logger)
- writers := []io.Writer{logger}
- if g.Config().MustGet(ctx, "logger.console").Bool() {
- writers = append(writers, os.Stdout)
- }
- log.SetOutput(io.MultiWriter(writers...))
- config.InitDb()
- // 初始化OSS帐号与bucket信息
- ossService.LoadOSSAccounts()
- // 注册一个回调函数,当配置发生变更时会被调用
- downloadPoolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int()
- getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int()
- getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int()
- gfsnotify.Add("./config.yaml", func(event *gfsnotify.Event) {
- log.Println(event.String())
- if event.IsWrite() || event.IsChmod() || event.IsRename() {
- log.Println("配置文件有变化,更新内存。。。")
- ossService.LoadOSSAccounts()
- if poolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int(); downloadPoolSize != poolSize {
- downloadPoolSize = poolSize
- ossService.DownloadPool = make(chan bool, downloadPoolSize)
- }
- if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize {
- getDetailFromEsPoolSize = poolSize
- ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize)
- }
- if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize {
- getDetailFromMgoPoolSize = poolSize
- ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize)
- }
- }
- })
- // 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量
- go func() {
- ticker := time.NewTicker(5 * time.Second)
- var onlineNodesPrevWarn, downloadQueuePrevWarn, uploadQueuePrevWarn, getDetailQueuePrevWarn int64
- for range ticker.C {
- util.SendHeartbeat(ctx)
- util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn)
- util.CheckDownloadQueue(ctx, &downloadQueuePrevWarn)
- util.CheckUploadQueue(ctx, &uploadQueuePrevWarn)
- util.CheckGetDetailQueue(ctx, &getDetailQueuePrevWarn)
- }
- }()
- // state.Listeners 与配置的 Addresses 顺序一致
- servers := make([]osr.Server, len(state.Listeners))
- // 启动RPC服务:注册OSSService,实现接口调用
- rpcService := new(ossService.OSSService)
- rpc.Register(rpcService)
- rpc.HandleHTTP()
- mux := http.DefaultServeMux
- mux.HandleFunc(constant.UploadUrl, ossService.UploadHandler)
- mux.HandleFunc(constant.DownloadUrl, ossService.DownloadHandler)
- mux.HandleFunc(constant.DeleteUrl, ossService.DeleteHandler)
- mux.HandleFunc("/ossservice/nodes", ossService.NodesHandler)
- mux.HandleFunc(constant.GetBidDetailUrl, ossService.BidDetailHandler)
- mux.HandleFunc("/ossservice/version", func(w http.ResponseWriter, r *http.Request) {
- fmt.Fprint(w, "版本号:", Version)
- })
- servers[0] = &http.Server{Handler: mux}
- //创建一个grpc 服务器
- grpcServer := grpc.NewServer()
- defer grpcServer.GracefulStop()
- //注册事件
- pb.RegisterServiceServer(grpcServer, &ossService.Grpc{})
- servers[1] = grpcServer
- // 启动所有服务(每个监听对应一个服务器)
- for i, listener := range state.Listeners {
- go func(srv osr.Server, l net.Listener) {
- log.Println("子进程", pid, "启动端口", l.Addr().String())
- if err := srv.Serve(l); err != nil {
- log.Println("端口", l.Addr().String(), "服务启动失败", err)
- }
- }(servers[i], listener)
- }
- // 监听关闭信号,优雅关闭所有服务器
- <-state.GracefulShutdown
- log.Println("子进程", pid, "停止运行。。。")
- }
- // /////////////////
- func main11() {
- start := flag.Int64("s", 0, "")
- end := flag.Int64("e", 0, "")
- poolSize := flag.Int("p", 5, "")
- flag.Parse()
- // 初始化OSS帐号与bucket信息
- ossService.LoadOSSAccounts()
- sess := config.Mgo.GetMgoConn()
- defer config.Mgo.DestoryMongoConn(sess)
- query := map[string]interface{}{}
- idQuery := map[string]interface{}{}
- if *start != 0 && *start == *end {
- query["comeintime"] = *start
- } else {
- if *start != 0 {
- idQuery["$gte"] = *start
- }
- if *end != 0 {
- idQuery["$lte"] = *end
- }
- }
- if len(idQuery) > 0 {
- query["comeintime"] = idQuery
- }
- log.Println("start。。。", query)
- it := sess.DB(config.Mgo.DbName).C(g.Config().MustGet(gctx.New(), "mongodb.collection").String()).Find(query).Select(map[string]interface{}{
- "_id": 1,
- "detail": 1,
- "contenthtml": 1,
- "comeintime": 1,
- }).Sort("-comeintime").Iter()
- pool := make(chan bool, *poolSize)
- wait := &sync.WaitGroup{}
- index := 0
- for m := make(map[string]interface{}); it.Next(&m); {
- pool <- true
- wait.Add(1)
- index++
- if index%5000 == 0 {
- log.Println(index, m["_id"], m["comeintime"])
- }
- go func(mm map[string]interface{}) {
- defer func() {
- <-pool
- wait.Done()
- }()
- objectName := BsonIdToSId(mm["_id"]) + ".txt"
- detail, _ := mm["detail"].(string)
- for {
- if err1 := ossService.Upload("detail", objectName, strings.NewReader(detail), false); err1 != nil {
- log.Println("detail", objectName, err1)
- time.Sleep(3 * time.Second)
- } else {
- break
- }
- }
- contenthtml, _ := mm["contenthtml"].(string)
- for {
- if err2 := ossService.Upload("contenthtml", objectName, strings.NewReader(contenthtml), false); err2 != nil {
- log.Println("contenthtml", objectName, err2)
- time.Sleep(3 * time.Second)
- } else {
- break
- }
- }
- }(m)
- m = make(map[string]interface{})
- }
- wait.Wait()
- log.Println("over。。。", index)
- }
|