Browse Source

数仓新调整
1、基本信息
2、附件信息

zhengkun 1 year ago
parent
commit
a61c46caef

+ 58 - 66
data_mgo_to_tidb/bidding.go

@@ -69,16 +69,13 @@ func doBiddingTask(gtid, lteid string, mapInfo map[string]interface{}) {
 func taskB() {
 	sess := MongoB.GetMgoConn()
 	defer MongoB.DestoryMongoConn(sess)
-
 	ch := make(chan bool, 10)
 	wg := &sync.WaitGroup{}
-
-	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("634eac71911e1eb345b2d861")}
-	q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId("632d42d667a6b0a2861eef92")}}
+	q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId("132d42d667a6b0a2861eef92")}}
 	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Sort("_id").Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
-		if count%20000 == 0 {
+		if count%10000 == 0 {
 			log.Info(fmt.Sprintf("current --- %d", count))
 		}
 		ch <- true
@@ -88,24 +85,23 @@ func taskB() {
 				<-ch
 				wg.Done()
 			}()
-
 			if util.IntAll(tmp["extracttype"]) != -1 {
-				taskBase(tmp)
-				taskTags(tmp)
-				taskExpand(tmp)
-				taskAtts(tmp)
-				taskInfoformat(tmp)
-				taskIntent(tmp)
-				taskWinner(tmp)
+				//taskBase(tmp) 			//基础标讯数据
+				//taskTags(tmp)     		//标签信息
+				//taskExpand(tmp) 		//扩展数据
+				taskDetail(tmp) //正文信息
+				//taskAtts(tmp) 			//附件信息
+				//taskInfoformat(tmp) 	//拟建
+				//taskIntent(tmp) 	  	//采购意向
+				//taskWinner(tmp)
 				//taskPackage(tmp)
-				taskPur(tmp)
+				//taskPur(tmp)
 			}
-
 		}(tmp)
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()
-	log.Info(fmt.Sprintf("over --- %d", count))
+	log.Info(fmt.Sprintf("is over --- %d", count))
 }
 
 // @Description 基本信息
@@ -113,79 +109,71 @@ func taskB() {
 func taskBase(tmp map[string]interface{}) {
 	saveM := make(map[string]interface{})
 	var errf []string // 异常字段
-	for _, f := range BaseField {
-		if f == "infoid" {
-			saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+	for _, nf := range BaseField {
+		f := nf[2:]
+		if f == "info_id" {
+			saveM[nf] = mongodb.BsonIdToSId(tmp["_id"])
 		} else if f == "area_code" {
 			if tmp["area"] != nil {
-				saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
+				saveM[nf] = AreaCode[util.ObjToString(tmp["area"])]
 			}
 		} else if f == "city_code" {
 			if tmp["area"] != nil && tmp["city"] != nil {
 				c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
-				saveM[f] = AreaCode[c]
+				saveM[nf] = AreaCode[c]
 			}
 		} else if f == "district_code" {
 			if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
 				c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
-				saveM[f] = AreaCode[c]
+				saveM[nf] = AreaCode[c]
 			}
 		} else if f == "toptype_code" {
 			if obj := util.ObjToString(tmp["toptype"]); obj != "" {
-				saveM[f] = TopTypeCode[obj]
+				saveM[nf] = TopTypeCode[obj]
 			}
 		} else if f == "subtype_code" {
 			if obj := util.ObjToString(tmp["subtype"]); obj != "" {
-				saveM[f] = SubTypeCode[obj]
+				saveM[nf] = SubTypeCode[obj]
 			}
 		} else if f == "buyerclass_code" {
 			if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
-				saveM[f] = BuyerCode[obj]
+				saveM[nf] = BuyerCode[obj]
 			}
 		} else if f == "createtime" || f == "updatetime" {
-			saveM[f] = time.Now().Format(util.Date_Full_Layout)
-		} else if f == "comeintime" || f == "publishtime" || f == "bidopentime" {
+			saveM[nf] = time.Now().Format(util.Date_Full_Layout)
+		} else if f == "comeintime" || f == "publishtime" || f == "bidopentime" || f == "bidendtime" {
 			if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
 				t := util.Int64All(tmp[f])
-				saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+				saveM[nf] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
 			}
 		} else if f == "multipackage" || f == "isValidFile" {
 			if tmp[f] == nil {
-				saveM[f] = 0
+				saveM[nf] = 0
 			} else {
-				saveM[f] = tmp[f]
+				saveM[nf] = tmp[f]
 			}
 		} else if f == "buyer_id" {
 			if b := util.ObjToString(tmp["buyer"]); b != "" {
-				saveM["buyer"] = b
-				//if code := redis.GetStr("qyxy_id", b); code != "" {
 				if code := getNameId(b); code != "" {
-					saveM[f] = code
+					saveM[nf] = code
 				}
 			}
 		} else if f == "agency_id" {
 			if b := util.ObjToString(tmp["agency"]); b != "" {
-				saveM["agency"] = b
-				//if code := redis.GetStr("qyxy_id", b); code != "" {
 				if code := getNameId(b); code != "" {
-					saveM[f] = code
-				}
-			} else {
-				if tmp[f] != nil {
-					saveM[f] = tmp[f]
+					saveM[nf] = code
 				}
 			}
 		} else {
 			if tmp[f] != nil {
 				if BaseVMap[f] != nil {
 					var b bool
-					saveM[f], b = verifyF(f, tmp[f], BaseVMap[f])
-					// 保存异常字段数据
-					if b {
+					saveM[nf], b = verifyF(f, tmp[f], BaseVMap[f])
+					if b { // 保存异常字段数据
 						errf = append(errf, f)
 					}
 				} else {
-					saveM[f] = tmp[f]
+					saveM[nf] = tmp[f]
 				}
 			}
 		}
@@ -198,8 +186,8 @@ func taskBase(tmp map[string]interface{}) {
 
 func getNameId(name string) string {
 	info := MysqlTool.FindOne("dws_f_ent_baseinfo", map[string]interface{}{"name": name}, "name_id", "")
-	if info != nil && (*info)["name_Id"] != nil {
-		return util.ObjToString((*info)["name_Id"])
+	if info != nil && (*info)["name_id"] != nil {
+		return util.ObjToString((*info)["name_id"])
 	} else {
 		return ""
 	}
@@ -210,68 +198,68 @@ func getNameId(name string) string {
 func taskExpand(tmp map[string]interface{}) {
 	saveM := make(map[string]interface{})
 	var errf []string // 异常字段
-	for _, f := range ExpandField {
-		if f == "infoid" {
-			saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+	for _, nf := range ExpandField {
+		f := nf[2:]
+		if f == "info_id" {
+			saveM[nf] = mongodb.BsonIdToSId(tmp["_id"])
 		} else if f == "project_startdate" || f == "project_completedate" || f == "signstarttime" || f == "bidendtime" || f == "bidstarttime" || f == "docstarttime" ||
 			f == "docendtime" || f == "signaturedate" || f == "signendtime" {
 			if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
 				t := util.Int64All(tmp[f])
-				saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+				saveM[nf] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
 			}
 		} else if f == "createtime" || f == "updatetime" {
-			saveM[f] = time.Now().Format(util.Date_Full_Layout)
+			saveM[nf] = time.Now().Format(util.Date_Full_Layout)
 		} else if f == "bidway" {
 			if util.ObjToString(tmp[f]) == "电子投标" {
-				saveM[f] = 1
+				saveM[nf] = 1
 			} else if util.ObjToString(tmp[f]) == "纸质投标" {
-				saveM[f] = 0
+				saveM[nf] = 0
 			}
-		} else if f == "review_experts" {
+		} else if f == "review_experts" { //评标专家
 			if tmp[f] != nil {
 				if reflect.TypeOf(tmp[f]).String() == "string" {
-					saveM[f] = tmp[f]
+					saveM[nf] = tmp[f]
 				} else if reflect.TypeOf(tmp[f]).String() == "[]interface {}" {
 					if arr, ok := tmp[f].([]interface{}); ok {
-						saveM[f] = strings.Join(util.ObjArrToStringArr(arr), ",")
+						saveM[nf] = strings.Join(util.ObjArrToStringArr(arr), ",")
 					}
 				}
 			}
 		} else if f == "bid_guarantee" || f == "contract_guarantee" {
 			if tmp[f] != nil {
 				if tmp[f].(bool) {
-					saveM[f] = 1
+					saveM[nf] = 1
 				} else {
-					saveM[f] = 0
+					saveM[nf] = 0
 				}
 			}
-		} else if f == "supervisorrate" || f == "agencyfee" {
+		} else if f == "agencyfee" {
 			if tmp[f] != nil {
 				if reflect.TypeOf(tmp[f]).String() == "string" {
 					v2, err := strconv.ParseFloat(strings.ReplaceAll(util.ObjToString(tmp[f]), "%", ""), 64)
 					if err != nil {
 						v, _ := decimal.NewFromFloat(v2).Div(decimal.NewFromFloat(float64(100))).Float64()
-						saveM[f] = v
+						saveM[nf] = v
 					}
 				} else {
-					saveM[f], _ = util.FormatFloat(util.Float64All(tmp[f]), 4)
+					saveM[nf], _ = util.FormatFloat(util.Float64All(tmp[f]), 4)
 				}
 			}
 		} else if f == "project_duration" {
 			if tmp[f] != nil {
-				tmp[f] = util.IntAll(tmp[f])
+				saveM[nf] = util.IntAll(tmp[f])
 			}
 		} else {
 			if tmp[f] != nil {
 				if ExpandVMap[f] != nil {
 					var b bool
-					saveM[f], b = verifyF(f, tmp[f], ExpandVMap[f])
-					// 保存异常字段数据
-					if b {
+					saveM[nf], b = verifyF(f, tmp[f], ExpandVMap[f])
+					if b { // 保存异常字段数据
 						errf = append(errf, f)
 					}
 				} else {
-					saveM[f] = tmp[f]
+					saveM[nf] = tmp[f]
 				}
 			}
 		}
@@ -282,6 +270,10 @@ func taskExpand(tmp map[string]interface{}) {
 	}
 }
 
+func taskDetail(tmp map[string]interface{}) {
+
+}
+
 // @Description 标签记录
 // @Author J 2022/9/22 11:13
 func taskTags(tmp map[string]interface{}) {

+ 9 - 9
data_mgo_to_tidb/common.toml

@@ -4,24 +4,24 @@ locport = ":1681"
 [db]
 
 [db.mysql]
-addr = "192.168.3.14:4000"
+addr = "192.168.3.217:4000"
 dbname = "global_common_data"
 size = 5
 user = "root"
 password = "=PDT49#80Z!RVv52_z"
 
 [db.mongob]
-addr = "192.168.3.207:27001"
-dbname = "qfw_data"
+addr = "127.0.0.1:27017"
+dbname = "qfw"
 size = 5
-user = "root"
-password = "root"
+user = ""
+password = ""
 [db.mongop]
-addr = "192.168.3.207:27001"
-dbname = "qfw_data"
+addr = "127.0.0.1:27017"
+dbname = "zhengkun"
 size = 5
-user = "root"
-password = "root"
+user = ""
+password = ""
 
 [mail]
 send = false

BIN
data_mgo_to_tidb/data_tidb


+ 95 - 33
data_mgo_to_tidb/field-criteria.json

@@ -1,25 +1,50 @@
 {
   "dws_f_bid_baseinfo": {
-    "field_array": ["infoid", "area_code", "city_code", "district_code", "budget", "bidamount", "biddiscount", "title", "toptype_code",
-      "subtype_code", "projectname", "projectcode", "buyerclass_code", "publishtime", "bidopentime", "comeintime", "isValidFile", "multipackage",
-      "projectscope_id", "detail_id", "href", "purchasing", "site", "updatetime", "createtime", "buyer_id", "agency_id"],
+    "field_array": [
+      "s_info_id",
+      "s_area_code",
+      "s_city_code",
+      "s_district_code",
+      "f_budget",
+      "f_bidamount",
+      "f_biddiscount",
+      "s_title",
+      "s_toptype_code",
+      "s_subtype_code",
+      "s_projectname",
+      "s_projectcode",
+      "s_buyerclass_code",
+      "d_publishtime",
+      "d_comeintime",
+      "d_bidopentime",
+      "d_bidendtime",
+      "i_isValidFile",
+      "i_multipackage",
+      "s_purchasing",
+      "s_site",
+      "s_href",
+      "d_updatetime",
+      "d_createtime",
+      "s_buyer_id",
+      "s_agency_id"
+    ],
     "field_criteria": {
       "budget": {
         "stype": "float",
         "min": 0,
-        "max": 100000000000,
+        "max": 1000000000,
         "decimal": 4
       },
       "bidamount": {
         "stype": "float",
         "min": 0,
-        "max": 100000000000,
+        "max": 1000000000,
         "decimal": 4
       },
       "biddiscount": {
         "stype": "float",
         "min": 0,
-        "max": 1000000,
+        "max": 100,
         "decimal": 4
       },
       "title": {
@@ -37,16 +62,6 @@
         "length": 100,
         "intercept": true
       },
-      "href": {
-        "stype": "string",
-        "length": 5000,
-        "intercept": false
-      },
-      "purchasing": {
-        "stype": "string",
-        "length": 2000,
-        "intercept": true
-      },
       "site": {
         "stype": "string",
         "length": 100,
@@ -55,14 +70,52 @@
     }
   },
   "dws_f_bid_expand_baseinfo": {
-    "field_array": ["infoid", "projectperiod", "project_duration", "project_timeunit", "project_startdate", "project_completedate", "signstarttime",
-      "signendtime", "docstarttime", "docendtime", "bidendtime", "bidstarttime", "signaturedate", "bidway", "docamount", "agencyrate", "agencyfee",
-      "currency", "funds", "payway", "bidamounttype", "review_experts", "bidmethod", "bid_bond", "bid_guarantee", "contract_bond", "contract_guarantee",
-      "bidopenaddress", "contractcode", "supervisorrate", "getdocmethod", "projectaddr", "project_scale", "purchasing_tag", "updatetime", "createtime"],
+    "field_array": [
+      "s_info_id",
+      "s_projectperiod",
+      "s_project_scale",
+      "i_project_duration",
+      "s_project_timeunit",
+      "d_project_startdate",
+      "d_project_completedate",
+      "d_signstarttime",
+      "d_signendtime",
+      "d_docstarttime",
+      "d_docendtime",
+      "d_bidendtime",
+      "d_bidstarttime",
+      "d_signaturedate",
+      "s_review_experts",
+      "s_bidmethod",
+      "s_getdocmethod",
+      "i_bidway",
+      "f_docamount",
+      "f_agencyrate",
+      "f_agencyfee",
+      "s_currency",
+      "s_funds",
+      "s_payway",
+      "s_bid_bond",
+      "i_bid_guarantee",
+      "s_contract_bond",
+      "i_contract_guarantee",
+      "s_contractcode",
+      "s_buyerzipcode"   ,
+      "s_bidopenaddress",
+      "s_buyeraddr",
+      "s_agencyaddr",
+      "s_projectaddr",
+      "s_enterprise_qualification",
+      "s_personnel_qualification",
+      "s_performance_qualification",
+      "s_enterprise_credit",
+      "d_updatetime",
+      "d_createtime"
+    ],
     "field_criteria": {
       "projectperiod": {
         "stype": "string",
-        "length": 500,
+        "length": 1000,
         "intercept": true
       },
       "project_timeunit": {
@@ -72,7 +125,7 @@
       },
       "funds": {
         "stype": "string",
-        "length": 100,
+        "length": 5000,
         "intercept": true
       },
       "currency": {
@@ -82,7 +135,7 @@
       },
       "payway": {
         "stype": "string",
-        "length": 255,
+        "length": 500,
         "intercept": true
       },
       "bidmethod": {
@@ -102,7 +155,7 @@
       },
       "bidopenaddress": {
         "stype": "string",
-        "length": 500,
+        "length": 1000,
         "intercept": true
       },
       "contractcode": {
@@ -117,24 +170,33 @@
       },
       "projectaddr": {
         "stype": "string",
-        "length": 500,
+        "length": 1000,
         "intercept": true
       },
       "project_scale": {
         "stype": "string",
-        "length": 500,
+        "length": 5000,
         "intercept": true
       },
-      "purchasing_tag": {
+      "enterprise_qualification": {
         "stype": "string",
-        "length": 500,
+        "length": 20000,
         "intercept": true
       },
-      "biddiscount": {
-        "stype": "float",
-        "min": 0,
-        "max": 999999,
-        "decimal": 4
+      "personnel_qualification": {
+        "stype": "string",
+        "length": 20000,
+        "intercept": true
+      },
+      "performance_qualification": {
+        "stype": "string",
+        "length": 20000,
+        "intercept": true
+      },
+      "enterprise_credit": {
+        "stype": "string",
+        "length": 20000,
+        "intercept": true
       },
       "docamount": {
         "stype": "float",

+ 21 - 17
data_mgo_to_tidb/init.go

@@ -22,8 +22,12 @@ var (
 	saveBaseSp     = make(chan bool, 1)
 	saveExpandPool = make(chan map[string]interface{}, 5000)
 	saveExpandSp   = make(chan bool, 1)
-	saveTagPool    = make(chan map[string]interface{}, 5000)
-	saveTagSp      = make(chan bool, 1)
+	saveDetailPool = make(chan map[string]interface{}, 5000)
+	saveDetailSp   = make(chan bool, 1)
+
+	saveTagPool = make(chan map[string]interface{}, 5000)
+	saveTagSp   = make(chan bool, 1)
+
 	saveAttrPool   = make(chan map[string]interface{}, 5000)
 	saveAttrSp     = make(chan bool, 1)
 	saveIfmPool    = make(chan map[string]interface{}, 5000)
@@ -85,7 +89,6 @@ var (
 // @Author J 2022/7/26 15:30
 func InitLog() {
 	logcfg := config.Conf.Log
-
 	err := log.InitLog(
 		log.Path(logcfg.LogPath),
 		log.Level(logcfg.LogLevel),
@@ -190,20 +193,21 @@ func InitField() {
 			log.Error("InitField", zap.String("field_array", "dws_f_bid_expand_baseinfo"))
 			panic("dws_f_bid_expand_baseinfo")
 		}
-		if m, o := FCriteria["dws_f_project_baseinfo"].(map[string]interface{}); o {
-			ProField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
-			ProVMap = m["field_criteria"].(map[string]interface{})
-		} else {
-			log.Error("InitField", zap.String("field_array", "dws_f_project_baseinfo"))
-			panic("dws_f_project_baseinfo")
-		}
-		if m, o := FCriteria["dws_f_project_business"].(map[string]interface{}); o {
-			ProBusField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
-			ProBusVMap = m["field_criteria"].(map[string]interface{})
-		} else {
-			log.Error("InitField", zap.String("field_array", "dws_f_project_business"))
-			panic("dws_f_project_business")
-		}
+		//项目相关-字段限制暂时注释
+		//if m, o := FCriteria["dws_f_project_baseinfo"].(map[string]interface{}); o {
+		//	ProField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
+		//	ProVMap = m["field_criteria"].(map[string]interface{})
+		//} else {
+		//	log.Error("InitField", zap.String("field_array", "dws_f_project_baseinfo"))
+		//	panic("dws_f_project_baseinfo")
+		//}
+		//if m, o := FCriteria["dws_f_project_business"].(map[string]interface{}); o {
+		//	ProBusField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
+		//	ProBusVMap = m["field_criteria"].(map[string]interface{})
+		//} else {
+		//	log.Error("InitField", zap.String("field_array", "dws_f_project_business"))
+		//	panic("dws_f_project_business")
+		//}
 	} else {
 		log.Error("InitField, 未找到field-criteria.json文件")
 		panic("InitField, 未找到field-criteria.json文件")

+ 64 - 28
data_mgo_to_tidb/main.go

@@ -27,14 +27,17 @@ func init() {
 	InitMgo()
 	InitMysql()
 	InitField()
-
-	redis.InitRedis1("qyxy_id=127.0.0.1:8379", 1)
+	//redis.InitRedis1("qyxy_id=127.0.0.1:8379", 1)
 	//redis.InitRedis1("qyxy_id=192.168.3.166:4379", 1)
 	log.Info("init success")
+
+	//启动 nohup ./MTB bidding &
 }
 
 func main() {
-	//
+
+	taskB()
+
 	//go SaveFunc()
 	//go SaveTagFunc()
 	//go SaveExpandFunc()
@@ -46,13 +49,13 @@ func main() {
 	//go SavePurFunc()
 	//go saveErrMethod()
 
-	rootCmd := &cobra.Command{Use: "my cmd"}
-	rootCmd.AddCommand(bidding())
-	rootCmd.AddCommand(project())
-	rootCmd.AddCommand(projectAdd())
-	if err := rootCmd.Execute(); err != nil {
-		fmt.Println("rootCmd.Execute failed", err.Error())
-	}
+	//rootCmd := &cobra.Command{Use: "my cmd"}
+	//rootCmd.AddCommand(bidding())
+	//rootCmd.AddCommand(project())
+	//rootCmd.AddCommand(projectAdd())
+	//if err := rootCmd.Execute(); err != nil {
+	//	fmt.Println("rootCmd.Execute failed", err.Error())
+	//}
 
 	//go SaveRelationFunc()
 	//taskMysql()
@@ -86,7 +89,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
 			}
 			log.Info("start sync ...")
-			doBiddingTask(gtid, lteid, mapInfo)
+			//doBiddingTask(gtid, lteid, mapInfo)
 		}
 	}
 }
@@ -298,18 +301,16 @@ func bidding() *cobra.Command {
 		Use:   "bidding",
 		Short: "Start processing bidding data",
 		Run: func(cmd *cobra.Command, args []string) {
-
 			go SaveFunc()
-			go SaveTagFunc()
-			go SaveExpandFunc()
-			go SaveAttrFunc()
-			go SaveImfFunc()
-			go SaveIntentFunc()
-			go SaveWinnerFunc()
-			go SavePackageFunc()
-			go SavePurFunc()
-
-			go saveErrMethod()
+			//go SaveTagFunc()
+			//go SaveExpandFunc()
+			//go SaveAttrFunc()
+			//go SaveImfFunc()
+			//go SaveIntentFunc()
+			//go SaveWinnerFunc()
+			//go SavePackageFunc()
+			//go SavePurFunc()
+			//go saveErrMethod()
 			taskB()
 		},
 	}
@@ -368,7 +369,7 @@ func SaveFunc() {
 					defer func() {
 						<-saveBaseSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
+					MysqlTool.InsertBulk("dwd_f_bid_baseinfo", BaseField, arru...)
 				}(arru)
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -380,7 +381,7 @@ func SaveFunc() {
 					defer func() {
 						<-saveBaseSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
+					MysqlTool.InsertBulk("dwd_f_bid_baseinfo", BaseField, arru...)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -403,7 +404,42 @@ func SaveExpandFunc() {
 					defer func() {
 						<-saveExpandSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
+					MysqlTool.InsertBulk("dwd_f_bid_expand_baseinfo", ExpandField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveExpandSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveExpandSp
+					}()
+					MysqlTool.InsertBulk("dwd_f_bid_expand_baseinfo", ExpandField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveDetailFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveExpandPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveExpandSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveExpandSp
+					}()
+					MysqlTool.InsertBulk("dwd_f_bid_detail", ExpandField, arru...)
 				}(arru)
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -415,7 +451,7 @@ func SaveExpandFunc() {
 					defer func() {
 						<-saveExpandSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
+					MysqlTool.InsertBulk("dwd_f_bid_detail", ExpandField, arru...)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -824,7 +860,7 @@ func saveErrMethod() {
 					defer func() {
 						<-saveErrSp
 					}()
-					MongoB.SaveBulk("bidding_tidb_f_err", arru...)
+					MongoB.SaveBulk("bidding_mgo_to_tidb_f_err", arru...)
 				}(arru)
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -836,7 +872,7 @@ func saveErrMethod() {
 					defer func() {
 						<-saveErrSp
 					}()
-					MongoB.SaveBulk("bidding_tidb_f_err", arru...)
+					MongoB.SaveBulk("bidding_mgo_to_tidb_f_err", arru...)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0