Browse Source

批量保存...

zhengkun 1 year ago
parent
commit
82e62ac43a
1 changed files with 50 additions and 0 deletions
  1. 50 0
      data_mgo_to_tidb/util/save.go

+ 50 - 0
data_mgo_to_tidb/util/save.go

@@ -0,0 +1,50 @@
+package util
+
+import "time"
+
+var (
+	saveSize     = 200
+	saveBasePool = make(chan map[string]interface{}, 5000)
+	saveBaseSp   = make(chan bool, 1)
+
+	BaseField = []string{"s_info_id", "s_area_code", "s_city_code", "s_district_code", "f_budget", "f_bidamount", "f_biddiscount", "s_title", "s_toptype_code", "s_subtype_code", "s_projectname", "s_projectcode", "s_buyerclass_code", "d_publishtime", "d_comeintime", "d_bidopentime", "d_bidendtime", "i_isvalidfile", "s_href", "s_purchasing", "i_multipackage", "s_site", "s_buyer_id", "s_agency_id", "d_updatetime", "d_createtime"}
+)
+
+func InitSaveService() {
+	go SaveBaseFunc()
+}
+
+func SaveBaseFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveBasePool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveBaseSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBaseSp
+					}()
+					MysqlTool.InsertBulk(T_dwd_f_bid_baseinfo, BaseField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveBaseSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBaseSp
+					}()
+					MysqlTool.InsertBulk(T_dwd_f_bid_baseinfo, BaseField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}