renzheng 9 vuotta sitten
vanhempi
commit
9ae40cfe6f

+ 7 - 0
common/src/qfw/util/common.go

@@ -317,6 +317,13 @@ func SubString(str string, begin, length int) (substr string) {
 func Try(fun func(), handler func(interface{})) {
 	defer func() {
 		if err := recover(); err != nil {
+			for skip := 1; ; skip++ {
+				_, file, line, ok := runtime.Caller(skip)
+				if !ok {
+					break
+				}
+				go log.Printf("%v,%v\n", file, line)
+			}
 			handler(err)
 		}
 	}()

+ 1 - 2
core/src/timetask.json

@@ -1,2 +1 @@
-{"comment":{"c_rate":720,"commentrate":900},"market":{"demand":{"attr":["i_hits","i_bids","i_status"],"timepoint":"2016-01-18 18:09:04"},"service":{"attr":["i_hits","i_sales","i_comments","i_score","i_appcounts"],"timepoint":"2016-01-18 18:09:04"}},"marketisstart":true,"marketrate":300}
-
+{"comment":{"c_rate":720,"commentrate":900},"market":{"demand":{"attr":["i_hits","i_bids","i_status"],"timepoint":"2016-01-19 18:18:18"},"service":{"attr":["i_hits","i_sales","i_comments","i_score","i_appcounts"],"timepoint":"2016-01-19 18:18:18"}},"marketisstart":true,"marketrate":300}

BIN
etl/client/src/trsclient/trsclient


+ 61 - 68
etl/client/src/trsclient/trsclient.go

@@ -40,20 +40,16 @@ func main() {
 
 //定时上传文件
 func transferTime() {
-	if bcon {
-		bcon = false
-		hour := time.Now().Hour()
-		if hour > startHour && hour < endHour {
-			_, err := rpc.DialHTTP("tcp", rpcaddr)
-			if err != nil {
-				log.Println(err.Error())
-			} else {
+	util.Try(func() {
+		if bcon {
+			bcon = false
+			hour := time.Now().Hour()
+			if hour > startHour && hour < endHour {
 				FileTransfer()
 			}
+			bcon = true
 		}
-		bcon = true
-	}
-
+	}, func(e interface{}) {})
 	time.AfterFunc(uploadduration*time.Minute, transferTime)
 }
 
@@ -64,77 +60,74 @@ type TrsDataParam struct {
 }
 
 func FileTransfer() {
-	fi, err := os.Stat(jsonpath)
-	if err != nil {
-		log.Println(err.Error())
-		return
-	}
-	if !fi.IsDir() {
-		log.Println(jsonpath, "不是一个目录")
-		return
-	}
-	filepath.Walk(jsonpath, func(path string, info os.FileInfo, err error) error {
-		if info.IsDir() || strings.LastIndex(info.Name(), "sync") > 0 {
-			return nil
+	util.Try(func() {
+		fi, err := os.Stat(jsonpath)
+		if err != nil {
+			log.Println(err.Error())
+			return
 		}
-		filename := info.Name()
-		fi, _ := os.Open(path)
-		bs, _ := ioutil.ReadAll(fi)
-		fi.Close()
-		go transfer(path, filename, bs)
-		//TODO 删除本地文件
-		return nil
-	})
+		if !fi.IsDir() {
+			log.Println(jsonpath, "不是一个目录")
+			return
+		}
+		filepath.Walk(jsonpath, func(path string, info os.FileInfo, err error) error {
+			if info.IsDir() || strings.LastIndex(info.Name(), "sync") > 0 {
+				return nil
+			}
+			filename := info.Name()
+			fi, _ := os.Open(path)
+			bs, _ := ioutil.ReadAll(fi)
+			fi.Close()
+			go transfer(path, filename, bs)
+			//TODO 删除本地文件
+			return nil
+		})
+	}, func(e interface{}) {})
 }
 
 //发文件,失败后并尝试重试发送
 func transfer(path string, filename string, data []byte) {
-	pool <- true
-	defer func() {
-		<-pool
-	}()
-	i := 0
-	for i < 2 {
-		i++
+	util.Try(func() {
+		pool <- true
+		defer func() {
+			<-pool
+		}()
 		client, err := rpc.DialHTTP("tcp", rpcaddr)
 		defer client.Close()
-		if err != nil {
-			log.Println(err.Error())
-			time.Sleep(2 * time.Minute)
-			continue
-		}
-		param := &TrsDataParam{FileName: filename, FileData: data}
-		reply := 0
-		err = client.Call("TrsDataTransfer.TransferFile", param, &reply)
-		if err != nil {
-			log.Println(err.Error())
-			time.Sleep(2 * time.Minute)
-			continue
-		} else {
-			if reply == 1 {
-				move(path, jsonpath+"_back/"+filename)
+		if err == nil {
+			param := &TrsDataParam{FileName: filename, FileData: data}
+			reply := 0
+			err = client.Call("TrsDataTransfer.TransferFile", param, &reply)
+			if err != nil {
+				log.Println(err.Error())
+			} else {
+				if reply == 1 {
+					move(path, jsonpath+"_back/"+filename)
+				}
 			}
 		}
-		break
-	}
+	}, func(e interface{}) {})
 }
 
 func delFile() {
-	client, err := rpc.DialHTTP("tcp", rpcaddr)
-	defer client.Close()
-	if err == nil {
-		param := &TrsDataParam{}
-		reply := make([]string, 10)
-		err := client.Call("TrsDataTransfer.DelFileList", param, &reply)
-		if err != nil {
-			log.Println(err.Error())
-			time.Sleep(1 * time.Minute)
-		} else {
-			for _, v := range reply {
-				del(jsonpath + "_back/" + v)
+	util.Try(func() {
+		client, err := rpc.DialHTTP("tcp", rpcaddr)
+		defer client.Close()
+		if err == nil {
+			param := &TrsDataParam{}
+			reply := make([]string, 10)
+			err := client.Call("TrsDataTransfer.DelFileList", param, &reply)
+			if err != nil {
+				log.Println(err.Error())
+				time.Sleep(1 * time.Minute)
+			} else {
+				for _, v := range reply {
+					del(jsonpath + "_back/" + v)
+				}
 			}
 		}
-	}
+	}, func(e interface{}) {})
+
 	time.AfterFunc(delduration*time.Minute, delFile)
 }
 

BIN
etl/server/src/trsserver/trsserver


+ 5 - 7
etl/server/src/trsserver/trsserver.go

@@ -31,8 +31,7 @@ func (i *TrsDataTransfer) TransferFile(param *TrsDataParam, reply *int) error {
 	log.Println(param.FileName)
 	tmp := []interface{}{}
 	err := json.Unmarshal([]byte(param.FileData), &tmp)
-	if err == nil {
-		c <- true
+	if err == nil && len(tmp) > 0 {
 		go TrsZtbSave(param.FileName, tmp)
 		*reply = 1
 	} else {
@@ -54,6 +53,7 @@ func (i *TrsDataTransfer) DelFileList(param *TrsDataParam, replys *[]string) err
 
 func TrsZtbSave(filename string, tmp []interface{}) {
 	util.Try(func() {
+		c <- true
 		session := mongodb.GetMgoConn()
 		defer mongodb.DestoryMongoConn(session)
 		defer func() {
@@ -79,11 +79,9 @@ func TrsZtbSave(filename string, tmp []interface{}) {
 	}, func(e interface{}) {
 		log.Println(e)
 	})
-	go func() {
-		Lock.Lock()
-		delStrings = append(delStrings, filename)
-		Lock.Unlock()
-	}()
+	Lock.Lock()
+	delStrings = append(delStrings, filename)
+	Lock.Unlock()
 }
 
 func main() {