zhengkun vor 1 Jahr
Ursprung
Commit
cfa3cb7448
3 geänderte Dateien mit 18 neuen und 15 gelöschten Zeilen
  1. 4 2
      data_mgo_to_tidb/bidding/bidding.go
  2. 2 1
      data_mgo_to_tidb/main.go
  3. 12 12
      data_mgo_to_tidb/util/save.go

+ 4 - 2
data_mgo_to_tidb/bidding/bidding.go

@@ -26,8 +26,8 @@ func TaskBidding() {
 	wg := &sync.WaitGroup{}
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
-			"$gt":  mongodb.StringTOBsonId("6558df800000000000000000"),
-			"$lte": mongodb.StringTOBsonId("655a31000000000000000000"),
+			"$gt":  mongodb.StringTOBsonId("100000000000000000000000"),
+			"$lte": mongodb.StringTOBsonId("900000000000000000000000"),
 		},
 	}
 	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Sort("_id").Iter()
@@ -96,7 +96,9 @@ func taskBase(tmp map[string]interface{}) {
 	}
 	info["d_updatetime"] = time.Now().Format(util.Date_Full_Layout)
 	info["d_createtime"] = time.Now().Format(util.Date_Full_Layout)
+
 	u.InsertGlobalMysqlData(u.T_dwd_f_bid_baseinfo, info, mongodb.BsonIdToSId(tmp["_id"]))
+	//u.SaveBasePool <- info
 }
 
 func taskExpand(tmp map[string]interface{}) {

+ 2 - 1
data_mgo_to_tidb/main.go

@@ -7,10 +7,11 @@ import (
 
 func init() {
 	u.InitInfo() //初始化...
+	//u.InitSaveService()//批量保存...
 }
 
 func main() {
-	//标讯信息-正式版先测试一天的数据
+	//标讯信息
 	bidding.TaskBidding()
 
 	c := make(chan bool, 1)

+ 12 - 12
data_mgo_to_tidb/util/save.go

@@ -3,9 +3,9 @@ package util
 import "time"
 
 var (
-	saveSize     = 200
-	saveBasePool = make(chan map[string]interface{}, 5000)
-	saveBaseSp   = make(chan bool, 1)
+	SaveSize     = 200
+	SaveBasePool = make(chan map[string]interface{}, 5000)
+	SaveBaseSp   = make(chan bool, 1)
 
 	BaseField = []string{"s_info_id", "s_area_code", "s_city_code", "s_district_code", "f_budget", "f_bidamount", "f_biddiscount", "s_title", "s_toptype_code", "s_subtype_code", "s_projectname", "s_projectcode", "s_buyerclass_code", "d_publishtime", "d_comeintime", "d_bidopentime", "d_bidendtime", "i_isvalidfile", "s_href", "s_purchasing", "i_multipackage", "s_site", "s_buyer_id", "s_agency_id", "d_updatetime", "d_createtime"}
 )
@@ -15,34 +15,34 @@ func InitSaveService() {
 }
 
 func SaveBaseFunc() {
-	arru := make([]map[string]interface{}, saveSize)
+	arru := make([]map[string]interface{}, SaveSize)
 	indexu := 0
 	for {
 		select {
-		case v := <-saveBasePool:
+		case v := <-SaveBasePool:
 			arru[indexu] = v
 			indexu++
-			if indexu == saveSize {
-				saveBaseSp <- true
+			if indexu == SaveSize {
+				SaveBaseSp <- true
 				go func(arru []map[string]interface{}) {
 					defer func() {
-						<-saveBaseSp
+						<-SaveBaseSp
 					}()
 					MysqlTool.InsertBulk(T_dwd_f_bid_baseinfo, BaseField, arru...)
 				}(arru)
-				arru = make([]map[string]interface{}, saveSize)
+				arru = make([]map[string]interface{}, SaveSize)
 				indexu = 0
 			}
 		case <-time.After(1000 * time.Millisecond):
 			if indexu > 0 {
-				saveBaseSp <- true
+				SaveBaseSp <- true
 				go func(arru []map[string]interface{}) {
 					defer func() {
-						<-saveBaseSp
+						<-SaveBaseSp
 					}()
 					MysqlTool.InsertBulk(T_dwd_f_bid_baseinfo, BaseField, arru...)
 				}(arru[:indexu])
-				arru = make([]map[string]interface{}, saveSize)
+				arru = make([]map[string]interface{}, SaveSize)
 				indexu = 0
 			}
 		}