فهرست منبع

增存量连接池修改

apple 5 سال پیش
والد
کامیت
bb74e34aa3
2فایلهای تغییر یافته به همراه35 افزوده شده و 40 حذف شده
  1. 18 19
      udp_winner/timedTaskAgency.go
  2. 17 21
      udp_winner/timedTaskBuyer.go

+ 18 - 19
udp_winner/timedTaskAgency.go

@@ -37,8 +37,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
 	// agencyaddr-company_address企业地址
-	SourceClientcc := SourceClient.GetMgoConn()
-	defer SourceClient.DestoryMongoConn(SourceClientcc)
+	SourceClientcc := SourceClient.GetMgoConn(86400)
 	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
 		"_id": bson.M{
 			"$gte": GId,
@@ -47,6 +46,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 	}).Select(bson.M{"agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1,
 		"agencyaddr": 1}).Iter()
 	if cursor.Err() != nil {
+		SourceClientcc.Close()
 		log.Println(cursor.Err())
 		return
 	}
@@ -86,6 +86,9 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 				log.Println(err)
 			}
 		}
+
+		SourceClientcc.Close()
+
 		//遍历redis
 		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
 			log.Println(scan.Err())
@@ -102,15 +105,11 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 				rdb.Do("SELECT", Config["redis_agency_db"])
 				if reply, err := redis.String(rdb.Do("GET", redisCName)); err != nil {
 					//redis不存在,存到临时表,定时任务处理
-					//FClient.DbName = Config["mgodb_extract_kf"]
-					//if tmpid := FClient.Save("agency_new", tmps); tmpid == nil {
-					//	log.Println("存量 FClient.Save err", tmpid)
-					//}
-					fsavec := FClient.GetMgoConn().DB(Config["mgodb_extract_kf"]).C("agency_new")
+					FClient.DbName = Config["mgodb_extract_kf"]
 					for _, vmap := range rValuesMaps {
 						vmap["_id"] = bson.ObjectIdHex(vmap["_id"].(string))
-						if err = fsavec.Insert(vmap); err != nil{
-							log.Println("存量 FClient.Save err", err)
+						if err = FClient.SaveForOld("agency_new", vmap); err != nil {
+							log.Println("存量 FClient.Save err", err,vmap)
 						}
 					}
 					//log.Println("get redis id err:定时任务处理", err, tmp)
@@ -222,8 +221,8 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 			if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
 				//redis不存在存到临时表,定时任务处理
 				FClient.DbName = Config["mgodb_extract_kf"]
-				if tmpid := FClient.Save("agency_new", tmp); tmpid == "" {
-					log.Println("FClient.Save err", tmpid)
+				if err := FClient.SaveForOld("agency_new", tmp); err!=nil {
+					log.Println("FClient.Save err", err,tmp)
 				}
 				//log.Println("get redis id err:定时任务处理", err, tmp)
 				if err := rdb.Close(); err != nil {
@@ -332,8 +331,8 @@ func TimedTaskAgency() {
 	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
-		Fcconn := FClient.GetMgoConn()
-		defer FClient.DestoryMongoConn(Fcconn)
+		Fcconn := FClient.GetMgoConn(86400)
+		defer Fcconn.Close()
 		tmpLast := map[string]interface{}{}
 		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
 			if !iter.Next(&tmpLast) {
@@ -343,8 +342,7 @@ func TimedTaskAgency() {
 				continue
 			} else {
 				log.Println("临时表有数据:", tmpLast)
-				fconn := FClient.GetMgoConn()
-				defer FClient.DestoryMongoConn(fconn)
+				fconn := FClient.GetMgoConn(86400)
 				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{
 					"_id": bson.M{
 						"$lte": tmpLast["_id"],
@@ -398,11 +396,11 @@ func TimedTaskAgency() {
 					if !b || (*resulttmp)["_id"] == nil {
 						//log.Println(r)
 						//匹配不到原始库,存入异常表删除临时表
-						fdmongo := FClient.GetMgoConn().DB(Config["mgodb_extract_kf"])
-						if err := fdmongo.C("agency_err").Insert( tmp); err != nil {
-							log.Println("存入异常表错误", err,tmp)
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if err := FClient.SaveForOld("agency_err", tmp); err != nil {
+							log.Println("存入异常表错误", err, tmp)
 						}
-						if deleteNum := fdmongo.C("agency_new").RemoveId( bson.ObjectIdHex(tmpId)); !b {
+						if deleteNum := FClient.Del("agency_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !b {
 							log.Println("删除临时表错误", deleteNum)
 						}
 						continue
@@ -596,6 +594,7 @@ func TimedTaskAgency() {
 						}
 					}
 				}
+				fconn.Close()
 			}
 		}
 		t2.Reset(time.Minute)

+ 17 - 21
udp_winner/timedTaskBuyer.go

@@ -9,7 +9,6 @@ import (
 	mu "mfw/util"
 	"net"
 	"qfw/util"
-	"reflect"
 	"sort"
 	"strconv"
 	"strings"
@@ -38,8 +37,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
 	// buyeraddr-company_address企业地址
-	SourceClientcc := SourceClient.GetMgoConn()
-	defer SourceClient.DestoryMongoConn(SourceClientcc)
+	SourceClientcc := SourceClient.GetMgoConn(86400)
 	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
 		"_id": bson.M{
 			"$gte": GId,
@@ -48,6 +46,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 	}).Select(bson.M{"buyer": 1, "buyertel": 1, "buyerperson": 1, "topscopeclass": 1,
 		"buyeraddr": 1,"buyerclass":1}).Iter()
 	if cursor.Err() != nil {
+		SourceClientcc.Close()
 		log.Println(cursor.Err())
 		return
 	}
@@ -87,6 +86,9 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 				log.Println(err)
 			}
 		}
+
+		SourceClientcc.Close()
+
 		//遍历redis
 		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
 			log.Println(scan.Err())
@@ -103,15 +105,11 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 				rdb.Do("SELECT", Config["redis_buyer_db"])
 				if reply, err := redis.String(rdb.Do("GET", redisCName)); err != nil {
 					//redis不存在,存到临时表,定时任务处理
-					//FClient.DbName = Config["mgodb_extract_kf"]
-					//if tmpid := FClient.Save("buyer_new", tmps); tmpid == nil {
-					//	log.Println("存量 FClient.Save err", tmpid)
-					//}
-					fsavec := FClient.GetMgoConn().DB(Config["mgodb_extract_kf"]).C("buyer_new")
+					FClient.DbName = Config["mgodb_extract_kf"]
 					for _, vmap := range rValuesMaps {
 						vmap["_id"] = bson.ObjectIdHex(vmap["_id"].(string))
-						if err = fsavec.Insert(vmap); err != nil{
-							log.Println("存量 FClient.Save err", err)
+						if err = FClient.SaveForOld("buyer_new", vmap); err != nil {
+							log.Println("存量 FClient.Save err", err,vmap)
 						}
 					}
 					//log.Println("get redis id err:定时任务处理", err, tmp)
@@ -242,8 +240,8 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 			if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
 				//redis不存在存到临时表,定时任务处理
 				FClient.DbName = Config["mgodb_extract_kf"]
-				if tmpid := FClient.Save("buyer_new", tmp); tmpid == "" {
-					log.Println("FClient.Save err", tmpid)
+				if err := FClient.SaveForOld("buyer_new", tmp); err!=nil {
+					log.Println("FClient.Save err", err,tmp)
 				}
 				//log.Println("get redis id err:定时任务处理", err, tmp)
 				if err := rdb.Close(); err != nil {
@@ -371,8 +369,8 @@ func TimedTaskBuyer() {
 	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
-		Fcconn := FClient.GetMgoConn()
-		defer FClient.DestoryMongoConn(Fcconn)
+		Fcconn := FClient.GetMgoConn(86400)
+		defer Fcconn.Close()
 		tmpLast := map[string]interface{}{}
 		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("buyer_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
 			if !iter.Next(&tmpLast) {
@@ -382,8 +380,7 @@ func TimedTaskBuyer() {
 				continue
 			} else {
 				log.Println("临时表有数据:", tmpLast)
-				fconn := FClient.GetMgoConn()
-				defer FClient.DestoryMongoConn(fconn)
+				fconn := FClient.GetMgoConn(86400)
 				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("buyer_new").Find(bson.M{
 					"_id": bson.M{
 						"$lte": tmpLast["_id"],
@@ -437,12 +434,11 @@ func TimedTaskBuyer() {
 					if !b || (*resulttmp)["_id"] == nil {
 						//log.Println(r)
 						//匹配不到原始库,存入异常表删除临时表
-						log.Println(len(tmp),reflect.TypeOf(tmp))
-						fdmongo := FClient.GetMgoConn().DB(Config["mgodb_extract_kf"])
-						if err := fdmongo.C("buyer_err").Insert(tmp); err != nil {
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if err := FClient.SaveForOld("buyer_err", tmp); err != nil {
 							log.Println("存入异常表错误", err, tmp)
 						}
-						if deleteNum := fdmongo.C("buyer_new").RemoveId(bson.ObjectIdHex(tmpId)); !b {
+						if deleteNum := FClient.Del("buyer_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !b {
 							log.Println("删除临时表错误", deleteNum)
 						}
 						continue
@@ -634,7 +630,7 @@ func TimedTaskBuyer() {
 					}
 
 				}
-
+				fconn.Close()
 			}
 		}
 		t2.Reset(time.Minute)