Selaa lähdekoodia

标讯相关数仓数据灌入,测试版...备份

zhengkun 1 vuosi sitten
vanhempi
commit
1a7c48d019

+ 61 - 48
data_mgo_to_tidb/bidding.go

@@ -40,14 +40,12 @@ func taskB() {
 				wg.Done()
 			}()
 			if util.IntAll(tmp["extracttype"]) != -1 {
-				//taskBase(tmp) 			//基础标讯数据
-				//taskExpand(tmp) 		//扩展数据
-				//taskDetail(tmp) //正文信息
-				//taskAtts(tmp) //附件信息
+				//taskBase(tmp)    //基础标讯数据
+				//taskExpand(tmp)  //扩展数据
+				//taskDetail(tmp)  //正文信息
+				//taskAtts(tmp)    //附件信息
 				//taskIntent(tmp) //采购意向
-				taskPackage(tmp) //分包
-
-				//taskWinner(tmp)
+				//taskPackage(tmp) //分包
 			}
 		}(tmp)
 		tmp = make(map[string]interface{})
@@ -280,11 +278,13 @@ func taskAttsAttach(att_info map[string]interface{}, tmpid string, f_id int64) {
 			info["s_file_id"] = f_id
 			info["d_createtime"] = time.Now().Format(util.Date_Full_Layout)
 			attach_url := util.ObjToString(att["attach_url"])
-			bs := OssGetObject(attach_url)
-			//if utf8.RuneCountInString(bs) > 100000 {
-			//	bs = string(([]rune(bs))[:100000])
-			//}
-			info["s_file_text"] = bs
+			if attach_url != "" {
+				//bs := OssGetObject(attach_url)
+				//if utf8.RuneCountInString(bs) > 100000 {
+				//	bs = string(([]rune(bs))[:100000])
+				//}
+				//info["s_file_text"] = bs
+			}
 			saveAttrPool <- info
 		}
 	}
@@ -292,43 +292,44 @@ func taskAttsAttach(att_info map[string]interface{}, tmpid string, f_id int64) {
 
 // @Description 采购意向
 func taskIntent(tmp map[string]interface{}) {
-	if arr, ok := tmp["procurementlist"].([]interface{}); ok {
-		for _, p := range arr {
-			p1 := p.(map[string]interface{})
-			saveM := map[string]interface{}{}
-			saveM["s_info_id"] = mongodb.BsonIdToSId(tmp["_id"])
-			if p1["itemname"] != nil {
-				saveM["s_intention_name"] = p1["itemname"]
-			}
-			if p1["projectscope"] != nil {
-				saveM["s_intention_demand"] = p1["projectscope"]
-			}
-			if p1["item"] != nil {
-				saveM["s_item"] = p1["item"]
-			}
-			if p1["totalprice"] != nil {
-				saveM["f_totalprice"] = p1["totalprice"]
-			}
-			if p1["expurasingtime"] != nil {
-				saveM["s_expurasingtime"] = p1["expurasingtime"]
-			}
-			if p1["reserved_amount"] != nil {
-				saveM["s_reserved_amount"] = p1["reserved_amount"]
-			}
-			if b := util.ObjToString(tmp["buyer"]); b != "" {
-				if code := getNameId(b); code != "" {
-					saveM["s_buyer_id"] = code
-				}
+	procurementlist := IsMarkInterfaceMap(tmp["procurementlist"])
+	tmpid := mongodb.BsonIdToSId(tmp["_id"])
+	for _, p1 := range procurementlist {
+		info := map[string]interface{}{}
+		info["s_info_id"] = tmpid
+		if p1["itemname"] != nil {
+			info["s_intention_name"] = p1["itemname"]
+		}
+		if p1["projectscope"] != nil {
+			info["s_intention_demand"] = p1["projectscope"]
+		}
+		if p1["item"] != nil {
+			info["s_item"] = p1["item"]
+		}
+		if p1["totalprice"] != nil {
+			info["f_totalprice"] = p1["totalprice"]
+		}
+		if p1["expurasingtime"] != nil {
+			info["s_expurasingtime"] = p1["expurasingtime"]
+		}
+		if p1["reserved_amount"] != nil {
+			info["s_reserved_amount"] = p1["reserved_amount"]
+		}
+		if b := util.ObjToString(tmp["buyer"]); b != "" {
+			if code := getNameId(b); code != "" {
+				info["s_buyer_id"] = code
 			}
-			saveM["d_createtime"] = time.Now().Format(util.Date_Full_Layout)
-			saveIntentPool <- saveM
 		}
+		info["d_createtime"] = time.Now().Format(util.Date_Full_Layout)
+		//InsertGlobalMysqlData("dwd_f_bid_intention_baseinfo", info, tmpid)
+		saveIntentPool <- info
 	}
 }
 
 // @Description 分包基本信息
 func taskPackage(tmp map[string]interface{}) {
 	tmpid := mongodb.BsonIdToSId(tmp["_id"])
+	//筛选分包
 	packages := filterPackageInfos(tmp)
 	if len(packages) <= 1 { //单包···标讯本身
 		baseInfo := CPBaseInfoFromBidding(tmp, tmpid)
@@ -337,15 +338,27 @@ func taskPackage(tmp map[string]interface{}) {
 			//投标人信息
 			CPBidderBiddingBaseInfo(tmp, tmpid, pid)
 			//标的物信息
-			//new_purlist := CPBiddingPackageGoodsBaseInfo(tmp, tmpid, pid)
-			//for _,v := range new_purlist {
-			//
-			//}
+			new_purlist := CPBiddingPackageGoodsBaseInfo(tmp, tmpid, pid)
+			for _, v := range new_purlist {
+				saveGoodsPool <- v
+			}
 		}
 	} else { //多包...具体源信息
-		//for k, v := range packages {
-		//	baseInfo := CPBaseInfoFromPackage(v, tmpid)
-		//}
+		for k, v := range packages {
+			baseInfo := CPBaseInfoFromPackage(v, tmpid)
+			pid := InsertGlobalMysqlData("dwd_f_bid_package_baseinfo", baseInfo, tmpid)
+			if pid > 0 { //投标人信息
+				if k == 0 { //标的物信息
+					CPBidderPackageBaseInfo(v, tmp, tmpid, pid, true)
+					new_purlist := CPBiddingPackageGoodsBaseInfo(tmp, tmpid, pid)
+					for _, v1 := range new_purlist {
+						saveGoodsPool <- v1
+					}
+				} else {
+					CPBidderPackageBaseInfo(v, tmp, tmpid, pid, false)
+				}
+			}
+		}
 	}
 }
 

+ 4 - 5
data_mgo_to_tidb/common.toml

@@ -1,6 +1,5 @@
-
 [udp]
-locport = ":1681"
+locport = ":1166"
 [db]
 
 [db.mysql]
@@ -11,14 +10,14 @@ user = "root"
 password = "=PDT49#80Z!RVv52_z"
 
 [db.mongob]
-addr = "127.0.0.1:27017"
+addr = "192.168.3.166:27082"
 dbname = "qfw"
 size = 5
 user = ""
 password = ""
 [db.mongop]
-addr = "127.0.0.1:27017"
-dbname = "zhengkun"
+addr = "192.168.3.166:27082"
+dbname = "qfw"
 size = 5
 user = ""
 password = ""

+ 3 - 5
data_mgo_to_tidb/main.go

@@ -17,7 +17,7 @@ func init() {
 	InitMgo()
 	InitMysql()
 	InitField()
-	InitOss(true) //线上部署记得关闭
+	//InitOss(false)
 	//redis.InitRedis1("qyxy_id=127.0.0.1:8379", 1)
 	//redis.InitRedis1("qyxy_id=192.168.3.166:4379", 1)
 	log.Info("init success")
@@ -30,11 +30,9 @@ func main() {
 	//go SaveDetailFunc()
 	//go SaveAttrFunc()
 	//go SaveIntentFunc()
-
-	//go SaveWinnerFunc()
-	//go SavePurFunc()
+	//go SaveBidderFunc()
+	//go SaveGoodsFunc()
 	//go saveErrMethod()
-
 	taskB()
 
 	//rootCmd := &cobra.Command{Use: "my cmd"}

+ 39 - 7
data_mgo_to_tidb/pkg.go

@@ -44,8 +44,7 @@ func CPBidderBiddingBaseInfo(tmp map[string]interface{}, tmpid string, pid int64
 		info["i_is_bidders"] = 1
 	}
 	//保存服务...
-	InsertGlobalMysqlData("dwd_f_bid_package_bidder_baseinfo", info, tmpid)
-
+	saveBidderPool <- info
 	//候选人相关情况
 	o_win_ids, o_win_names := getWinerorder(tmp, wins[0])
 	for k, v := range o_win_ids {
@@ -57,11 +56,11 @@ func CPBidderBiddingBaseInfo(tmp map[string]interface{}, tmpid string, pid int64
 		o_info["i_is_winner"] = 0
 		o_info["d_updatetime"] = time.Now().Format(util.Date_Full_Layout)
 		//保存服务...
-		InsertGlobalMysqlData("dwd_f_bid_package_bidder_baseinfo", info, tmpid)
+		saveBidderPool <- info
 	}
 }
 
-// 标讯分包标的物信息-记录唯一包
+// 标讯分包标的物信息
 func CPBiddingPackageGoodsBaseInfo(tmp map[string]interface{}, tmpid string, pid int64) []map[string]interface{} {
 	purchasinglist := IsMarkInterfaceMap(tmp["purchasinglist"])
 	new_purlist := []map[string]interface{}{}
@@ -91,7 +90,6 @@ func CPBiddingPackageGoodsBaseInfo(tmp map[string]interface{}, tmpid string, pid
 func CPBaseInfoFromPackage(pinfos map[string]interface{}, tmpid string) map[string]interface{} {
 	info := map[string]interface{}{}
 	info["s_info_id"] = tmpid
-	info["d_createtime"] = time.Now().Format(util.Date_Full_Layout)
 	for k, v := range pinfos {
 		info["s_packagecode"] = k
 		if pkg := util.ObjToMap(v); pkg != nil {
@@ -105,12 +103,46 @@ func CPBaseInfoFromPackage(pinfos map[string]interface{}, tmpid string) map[stri
 			return info
 		}
 	}
+	info["d_createtime"] = time.Now().Format(util.Date_Full_Layout)
 	return info
 }
 
 // 投标人相关信息-分包版
-func CPBidderPackageBaseInfo(pinfos map[string]interface{}, tmpid string) {
-
+func CPBidderPackageBaseInfo(pinfos map[string]interface{}, tmp map[string]interface{}, tmpid string, pid int64, isF bool) {
+	for _, v := range pinfos {
+		pinfo := *util.ObjToMap(v)
+		winner := util.ObjToString(pinfo["winner"])
+		info := map[string]interface{}{}
+		info["s_info_id"] = tmpid
+		info["i_package_id"] = pid
+		info["d_createtime"] = time.Now().Format(util.Date_Full_Layout)
+		wins := strings.Split(winner, ",")
+		bidder_id, bidder_name, other_bidder_id := getWinsNameId(wins)
+		info["s_bidder_id"] = bidder_id
+		info["i_is_winner"] = 1
+		if len(wins) > 1 {
+			info["s_bidder_name"] = bidder_name
+			info["s_other_bidder_name"] = winner
+			info["s_other_bidder_id"] = other_bidder_id
+			info["i_is_bidders"] = 1
+		}
+		//保存服务...
+		saveBidderPool <- info
+		//候选人相关情况
+		if isF {
+			o_win_ids, o_win_names := getWinerorder(tmp, wins[0])
+			for k, v := range o_win_ids {
+				o_info := map[string]interface{}{}
+				o_info["s_info_id"] = tmpid
+				o_info["s_bidder_id"] = v
+				o_info["s_bidder_name"] = o_win_names[k]
+				o_info["i_is_winner"] = 0
+				o_info["d_updatetime"] = time.Now().Format(util.Date_Full_Layout)
+				//保存服务...
+				saveBidderPool <- info
+			}
+		}
+	}
 }
 
 // 获取所有投标人的相关标识

+ 69 - 0
data_mgo_to_tidb/save.go

@@ -176,6 +176,75 @@ func SaveIntentFunc() {
 		}
 	}
 }
+func SaveBidderFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveBidderPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveBidderSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBidderSp
+					}()
+					MysqlTool.InsertBulk("dwd_f_bid_package_bidder_baseinfo", BidderField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveBidderSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBidderSp
+					}()
+					MysqlTool.InsertBulk("dwd_f_bid_package_bidder_baseinfo", BidderField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveGoodsFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveGoodsPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveGoodsSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveGoodsSp
+					}()
+					MysqlTool.InsertBulk("dwd_f_bid_package_goods_baseinfo", GoodsField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveGoodsSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveGoodsSp
+					}()
+					MysqlTool.InsertBulk("dwd_f_bid_package_goods_baseinfo", GoodsField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
 
 func SaveProFunc() {
 	arru := make([]map[string]interface{}, saveSize)