Переглянути джерело

数据迁移新增控制开启关闭功能

maxiaoshan 1 рік тому
батько
коміт
5d1cbde1d3
2 змінених файлів з 42 додано та 13 видалено
  1. 2 1
      src/saveServer/src/config.json
  2. 40 12
      src/saveServer/src/main.go

+ 2 - 1
src/saveServer/src/config.json

@@ -1,5 +1,6 @@
 {
-	"webport": "8002",
+	"webport": "8010",
+    "listenport": "8011",
 	"address": "192.168.3.166:27082",
     "dbname": "qfw",
     "coll": "ocr_flie_over",

+ 40 - 12
src/saveServer/src/main.go

@@ -3,18 +3,19 @@ package main
 import (
 	"fmt"
 	"github.com/cron"
+	"github.com/go-xweb/xweb"
 	"mongodb"
+	"net/http"
 	qu "qfw/util"
 	"sync"
 	"time"
 
 	"go.mongodb.org/mongo-driver/bson/primitive"
-
-	"github.com/go-xweb/xweb"
 )
 
 var (
 	Webport       string
+	ListenPort    string
 	Config        map[string]interface{}
 	Mgo           *mongodb.MongodbSim
 	Coll          string //ocr_flie_over
@@ -26,6 +27,7 @@ var (
 	SaveMgoCache  = make(chan map[string]interface{}, 1000) //
 	SP            = make(chan bool, 5)
 	//StartId  string
+	Stop bool
 )
 
 func init() {
@@ -33,6 +35,7 @@ func init() {
 	InitFileInfo() //初始化附件解析信息
 	InitOss()      //oss
 	Webport = qu.ObjToString(Config["webport"])
+	ListenPort = qu.ObjToString(Config["listenport"])
 	Coll = qu.ObjToString(Config["coll"])
 	MoveCollFile = qu.ObjToString(Config["movecollfile"])
 	MoveCollNomal = qu.ObjToString(Config["movecollnomal"])
@@ -54,13 +57,27 @@ func main() {
 	c := cron.New()
 	c.Start()
 	c.AddFunc("0 0 0 ? * *", DeleteData)
+	go monitor()
 	go SaveData()                //数据保存
 	go FileDataMoveToBidding()   //附件数据迁移至bidding
 	go NormalDataMoveToBidding() //正常数据迁移至bidding
-	//go DataMoveToBidding()
 	xweb.Run(":" + Webport)
-	// ch := make(chan bool, 1)
-	// <-ch
+	//ch := make(chan bool, 1)
+	//<-ch
+}
+func monitor() {
+	//最好是单实例调用
+	http.HandleFunc("/movebidding/stop", func(w http.ResponseWriter, r *http.Request) {
+		fmt.Println("停止数据迁移...")
+		Stop = true
+		w.Write([]byte("ok"))
+	})
+	http.HandleFunc("/movebidding/run", func(w http.ResponseWriter, r *http.Request) {
+		fmt.Println("启动数据迁移...")
+		Stop = false
+		w.Write([]byte("ok"))
+	})
+	http.ListenAndServe(":"+ListenPort, nil)
 }
 
 func NormalDataMoveToBidding() {
@@ -75,8 +92,8 @@ func NormalDataMoveToBidding() {
 			},
 		}
 		count := Mgo.Count(MoveCollNomal, query)
-		qu.Debug("bidding_nomal,本轮查询数据量:", count)
-		if count > 0 {
+		qu.Debug("bidding_nomal,本轮查询数据量:", count, Stop)
+		if count > 0 && !Stop {
 			NormalDataMove(query)
 		} else {
 			time.Sleep(time.Duration(TaskTime) * time.Second)
@@ -91,9 +108,10 @@ func NormalDataMove(query map[string]interface{}) {
 	fields := map[string]interface{}{
 		"bid_completetime": 0,
 		"bid_starttime":    0,
+		"goods_read":       0,
 	}
 	lock := sync.Mutex{}
-	ch := make(chan bool, 3)
+	ch := make(chan bool, 10)
 	wg := sync.WaitGroup{}
 	updateArr := [][]map[string]interface{}{}
 	it := sess.DB("qfw").C(MoveCollNomal).Find(&query).Select(&fields).Iter()
@@ -111,6 +129,10 @@ func NormalDataMove(query map[string]interface{}) {
 			update = append(update, map[string]interface{}{"_id": _id})
 			newId := primitive.NewObjectID()
 			tmp["_id"] = newId
+			if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
+				strid := mongodb.BsonIdToSId(newId)
+				tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strid) + `.html`
+			}
 			//更新
 			set := map[string]interface{}{"moveok": true}
 			set["biddingid"] = mongodb.BsonIdToSId(newId)
@@ -147,6 +169,9 @@ func FileDataMoveToBidding() {
 	defer qu.Catch()
 	for {
 		query := map[string]interface{}{ //查询附件解析完成,未迁移的数据
+			"bid_completetime": map[string]interface{}{
+				"$exists": true,
+			},
 			"file_completetime": map[string]interface{}{
 				"$exists": true,
 			},
@@ -155,8 +180,8 @@ func FileDataMoveToBidding() {
 			},
 		}
 		count := Mgo.Count(MoveCollFile, query)
-		qu.Debug("bidding_file,本轮查询数据量:", count)
-		if count > 0 {
+		qu.Debug("bidding_file,本轮查询数据量:", count, Stop)
+		if count > 0 && !Stop {
 			FileDataMove(query)
 		} else {
 			time.Sleep(time.Duration(TaskTime) * time.Second)
@@ -172,9 +197,12 @@ func FileDataMove(query map[string]interface{}) {
 		"attach_time":       0,
 		"file_completetime": 0,
 		"file_starttime":    0,
+		"bid_completetime":  0,
+		"bid_starttime":     0,
+		"file_read":         0,
 	}
 	lock := sync.Mutex{}
-	ch := make(chan bool, 3)
+	ch := make(chan bool, 10)
 	wg := sync.WaitGroup{}
 	updateArr := [][]map[string]interface{}{}
 	it := sess.DB("qfw").C(MoveCollFile).Find(&query).Select(&fields).Iter()
@@ -191,6 +219,7 @@ func FileDataMove(query map[string]interface{}) {
 			update := []map[string]interface{}{}
 			update = append(update, map[string]interface{}{"_id": _id})
 			newId := primitive.NewObjectID()
+			tmp["_id"] = newId                                                                              //新id
 			if competehref := qu.ObjToString(tmp["competehref"]); competehref != "" && competehref != "#" { //根据id重新生成href
 				strid := mongodb.BsonIdToSId(newId)
 				tmp["href"] = `https://www.jianyu360.cn/article/content/` + qu.CommonEncodeArticle("content", strid) + `.html`
@@ -214,7 +243,6 @@ func FileDataMove(query map[string]interface{}) {
 			update = append(update, map[string]interface{}{
 				"$set": set,
 			})
-			tmp["_id"] = newId
 			//save
 			SaveMgoCache <- tmp
 			lock.Lock()