Parcourir la source

Merge branch 'develop' of 192.168.3.17:zhanghongbo/qfw into develop

李广朋 il y a 9 ans
Parent
commit
1943f14c1e

+ 14 - 0
common/src/qfw/util/common.go

@@ -317,6 +317,13 @@ func SubString(str string, begin, length int) (substr string) {
 func Try(fun func(), handler func(interface{})) {
 	defer func() {
 		if err := recover(); err != nil {
+			for skip := 1; ; skip++ {
+				_, file, line, ok := runtime.Caller(skip)
+				if !ok {
+					break
+				}
+				go log.Printf("%v,%v\n", file, line)
+			}
 			handler(err)
 		}
 	}()
@@ -356,3 +363,10 @@ func InterfaceArrTointArr(arr []interface{}) []int {
 	}
 	return tmp
 }
+func InterfaceArrToint64Arr(arr []interface{}) []int64 {
+	tmp := make([]int64, 0)
+	for _, v := range arr {
+		tmp = append(tmp, int64(v.(float64)))
+	}
+	return tmp
+}

+ 2 - 1
common/src/qfw/util/credit/credit.go

@@ -107,7 +107,8 @@ func InCreditB(userId, code string, param map[string]interface{}) (bool, int, er
 	err := Rc.InCreadit(&rpc.CreditData{Code: code, Uid: userId, Num: 0, OtherParam: param}, &Replay)
 	if err == nil && Replay != 0 {
 		b = true
-	} else {
+	}
+	if err != nil {
 		log.Println("调用rpc出错", err)
 	}
 	return b, Replay, err

+ 1 - 1
core/src/config.json

@@ -1,6 +1,6 @@
 {
     "webServerPort": "80",
-    "redisServers": "enterprise=192.168.3.14:1379,service=192.168.3.14:2379,other=192.168.3.14:3379,sso=192.168.3.14:1379",
+    "redisServers": "enterprise=192.168.3.14:1379,service=192.168.3.14:2379,other=192.168.3.14:3379,sso=192.168.3.14:1379,credit=192.168.3.14:3379",
     "useRedis": false,
     "mongodbServers": "192.168.3.18:27080",
     "elasticsearch": "http://192.168.3.18:9800",

+ 1 - 1
core/src/qfw/member/credit/credit.go

@@ -11,6 +11,6 @@ import (
 var RedisDB string
 
 func init() {
-	RedisDB = "other"
+	RedisDB = "credit"
 	xweb.AddAction(&credit{})
 }

+ 1 - 1
core/src/qfw/member/credit/creditdetail.go

@@ -111,7 +111,7 @@ func (c *credit) InCreditAjx() error {
 			}
 		}
 		if param == "qd" {
-			b := cd.UpuserCreditSession(userId, cd.B_QD, "A", nil, c.Action)
+			b := cd.UpuserCreditSession(userId, cd.B_QD, "B", nil, c.Action)
 			if b {
 				c.Session().UpdateByCustomField("id", userId, "credit_qd", "y")
 				result["result"] = "y"

+ 1 - 1
core/src/timetask.json

@@ -1 +1 @@
-{"comment":{"c_rate":720,"commentrate":900},"market":{"demand":{"attr":["i_hits","i_bids","i_status"],"timepoint":"2016-01-20 08:18:24"},"service":{"attr":["i_hits","i_sales","i_comments","i_score","i_appcounts"],"timepoint":"2016-01-20 08:18:24"}},"marketisstart":true,"marketrate":300}
+{"comment":{"c_rate":720,"commentrate":900},"market":{"demand":{"attr":["i_hits","i_bids","i_status"],"timepoint":"2016-01-20 09:32:14"},"service":{"attr":["i_hits","i_sales","i_comments","i_score","i_appcounts"],"timepoint":"2016-01-20 09:32:14"}},"marketisstart":true,"marketrate":300}

+ 1 - 1
credit/src/qfw/creditrpc/creditrpc.go

@@ -104,7 +104,7 @@ func (c *CreditRpc) InCreadit(param *qrpc.CreditData, replay *int) error {
 					obj := redis.Get(consts.RedisDB, key)
 					bcon := true
 					if obj != nil {
-						newobj := obj.([]int64)
+						newobj := util.InterfaceArrToint64Arr(obj.([]interface{}))
 						//比较日期是不是连续签到,并且在有效次数内
 						if newobj[1] < newobj[2] && time.Unix(newobj[0], 0).AddDate(0, 0, 1).Day() == now.Day() {
 							newobj[0] = now.Unix()

BIN
etl/client/src/trsclient/trsclient


+ 61 - 68
etl/client/src/trsclient/trsclient.go

@@ -40,20 +40,16 @@ func main() {
 
 //定时上传文件
 func transferTime() {
-	if bcon {
-		bcon = false
-		hour := time.Now().Hour()
-		if hour > startHour && hour < endHour {
-			_, err := rpc.DialHTTP("tcp", rpcaddr)
-			if err != nil {
-				log.Println(err.Error())
-			} else {
+	util.Try(func() {
+		if bcon {
+			bcon = false
+			hour := time.Now().Hour()
+			if hour > startHour && hour < endHour {
 				FileTransfer()
 			}
+			bcon = true
 		}
-		bcon = true
-	}
-
+	}, func(e interface{}) {})
 	time.AfterFunc(uploadduration*time.Minute, transferTime)
 }
 
@@ -64,77 +60,74 @@ type TrsDataParam struct {
 }
 
 func FileTransfer() {
-	fi, err := os.Stat(jsonpath)
-	if err != nil {
-		log.Println(err.Error())
-		return
-	}
-	if !fi.IsDir() {
-		log.Println(jsonpath, "不是一个目录")
-		return
-	}
-	filepath.Walk(jsonpath, func(path string, info os.FileInfo, err error) error {
-		if info.IsDir() || strings.LastIndex(info.Name(), "sync") > 0 {
-			return nil
+	util.Try(func() {
+		fi, err := os.Stat(jsonpath)
+		if err != nil {
+			log.Println(err.Error())
+			return
 		}
-		filename := info.Name()
-		fi, _ := os.Open(path)
-		bs, _ := ioutil.ReadAll(fi)
-		fi.Close()
-		go transfer(path, filename, bs)
-		//TODO 删除本地文件
-		return nil
-	})
+		if !fi.IsDir() {
+			log.Println(jsonpath, "不是一个目录")
+			return
+		}
+		filepath.Walk(jsonpath, func(path string, info os.FileInfo, err error) error {
+			if info.IsDir() || strings.LastIndex(info.Name(), "sync") > 0 {
+				return nil
+			}
+			filename := info.Name()
+			fi, _ := os.Open(path)
+			bs, _ := ioutil.ReadAll(fi)
+			fi.Close()
+			go transfer(path, filename, bs)
+			//TODO 删除本地文件
+			return nil
+		})
+	}, func(e interface{}) {})
 }
 
 //发文件,失败后并尝试重试发送
 func transfer(path string, filename string, data []byte) {
-	pool <- true
-	defer func() {
-		<-pool
-	}()
-	i := 0
-	for i < 2 {
-		i++
+	util.Try(func() {
+		pool <- true
+		defer func() {
+			<-pool
+		}()
 		client, err := rpc.DialHTTP("tcp", rpcaddr)
 		defer client.Close()
-		if err != nil {
-			log.Println(err.Error())
-			time.Sleep(2 * time.Minute)
-			continue
-		}
-		param := &TrsDataParam{FileName: filename, FileData: data}
-		reply := 0
-		err = client.Call("TrsDataTransfer.TransferFile", param, &reply)
-		if err != nil {
-			log.Println(err.Error())
-			time.Sleep(2 * time.Minute)
-			continue
-		} else {
-			if reply == 1 {
-				move(path, jsonpath+"_back/"+filename)
+		if err == nil {
+			param := &TrsDataParam{FileName: filename, FileData: data}
+			reply := 0
+			err = client.Call("TrsDataTransfer.TransferFile", param, &reply)
+			if err != nil {
+				log.Println(err.Error())
+			} else {
+				if reply == 1 {
+					move(path, jsonpath+"_back/"+filename)
+				}
 			}
 		}
-		break
-	}
+	}, func(e interface{}) {})
 }
 
 func delFile() {
-	client, err := rpc.DialHTTP("tcp", rpcaddr)
-	defer client.Close()
-	if err == nil {
-		param := &TrsDataParam{}
-		reply := make([]string, 10)
-		err := client.Call("TrsDataTransfer.DelFileList", param, &reply)
-		if err != nil {
-			log.Println(err.Error())
-			time.Sleep(1 * time.Minute)
-		} else {
-			for _, v := range reply {
-				del(jsonpath + "_back/" + v)
+	util.Try(func() {
+		client, err := rpc.DialHTTP("tcp", rpcaddr)
+		defer client.Close()
+		if err == nil {
+			param := &TrsDataParam{}
+			reply := make([]string, 10)
+			err := client.Call("TrsDataTransfer.DelFileList", param, &reply)
+			if err != nil {
+				log.Println(err.Error())
+				time.Sleep(1 * time.Minute)
+			} else {
+				for _, v := range reply {
+					del(jsonpath + "_back/" + v)
+				}
 			}
 		}
-	}
+	}, func(e interface{}) {})
+
 	time.AfterFunc(delduration*time.Minute, delFile)
 }
 

BIN
etl/server/src/trsserver/trsserver


+ 5 - 7
etl/server/src/trsserver/trsserver.go

@@ -31,8 +31,7 @@ func (i *TrsDataTransfer) TransferFile(param *TrsDataParam, reply *int) error {
 	log.Println(param.FileName)
 	tmp := []interface{}{}
 	err := json.Unmarshal([]byte(param.FileData), &tmp)
-	if err == nil {
-		c <- true
+	if err == nil && len(tmp) > 0 {
 		go TrsZtbSave(param.FileName, tmp)
 		*reply = 1
 	} else {
@@ -54,6 +53,7 @@ func (i *TrsDataTransfer) DelFileList(param *TrsDataParam, replys *[]string) err
 
 func TrsZtbSave(filename string, tmp []interface{}) {
 	util.Try(func() {
+		c <- true
 		session := mongodb.GetMgoConn()
 		defer mongodb.DestoryMongoConn(session)
 		defer func() {
@@ -79,11 +79,9 @@ func TrsZtbSave(filename string, tmp []interface{}) {
 	}, func(e interface{}) {
 		log.Println(e)
 	})
-	go func() {
-		Lock.Lock()
-		delStrings = append(delStrings, filename)
-		Lock.Unlock()
-	}()
+	Lock.Lock()
+	delStrings = append(delStrings, filename)
+	Lock.Unlock()
 }
 
 func main() {