Prechádzať zdrojové kódy

读取数据库信息修改为每次单条读取

fengweiqiang 6 rokov pred
rodič
commit
9271582df9
1 zmenil súbory, kde vykonal 110 pridanie a 19 odobranie
  1. 110 19
      src/udpfileserver/main.go

+ 110 - 19
src/udpfileserver/main.go

@@ -2,12 +2,13 @@ package main
 
 import (
 	"encoding/json"
-	"fmt"
+	"gopkg.in/mgo.v2"
 	"jy/mongodbutil"
 	"log"
 	mu "mfw/util"
 	"net"
 	"net/rpc"
+	"path"
 	qu "qfw/util"
 	"strings"
 
@@ -52,10 +53,18 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			log.Println("json err :", err, string(data))
 			return
 		}
+		log.Println(mapInfo)
 		gid := strings.TrimSpace(mapInfo["gtid"].(string))
 		lid := strings.TrimSpace(mapInfo["lteid"].(string))
 		if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
-			if findAll, b := mongodbutil.Mgo.Find(MgoC,
+			MgoSession, err := mgo.Dial(MgoIP)
+			defer MgoSession.Close()
+			if err != nil {
+				log.Println("mongo err:",err)
+				return
+			}
+
+			iter := MgoSession.DB(MgoDB).C(MgoC).Find(
 				bson.M{
 					"_id": bson.M{
 						"$gte": bson.ObjectIdHex(gid),
@@ -64,21 +73,34 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					MgoFileFiled: bson.M{
 						"$ne": nil,
 					},
-				},
-				//if findAll, b := mongodbutil.Mgo.Find(MgoC, bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}},
-				nil, `{"_id":"1",`+MgoFileFiled+`:"1"}`, false, -1, -1); !b {
-				log.Println("查询数据失败 :", string(data))
-			} else {
-				fmt.Println(len(*findAll))
-				if len(*findAll) <= 0 {
-					log.Println("查询数据为空 :", string(data))
-					return
-				}
-				for _, v := range *findAll {
-					qmap := *qu.ObjToMap(v)
+				},).Select(bson.M{"_id": 1,MgoFileFiled:1}).Iter()
+
+			//if findAll, b := mongodbutil.Mgo.Find(MgoC,
+			//	bson.M{
+			//		"_id": bson.M{
+			//			"$gte": bson.ObjectIdHex(gid),
+			//			"$lte": bson.ObjectIdHex(lid),
+			//		},
+			//		MgoFileFiled: bson.M{
+			//			"$ne": nil,
+			//		},
+			//	},
+			//	//if findAll, b := mongodbutil.Mgo.Find(MgoC, bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}},
+			//	nil, `{"_id":"1",`+MgoFileFiled+`:"1"}`, false, -1, -1); !b {
+			//	log.Println("查询数据失败 :", string(data))
+			//} else {
+			var result *map[string]interface{}
+			log.Println("处理查询数据...")
+			for iter.Next(&result){
+				//for _, v := range *result {
+					qmap := *qu.ObjToMap(result)
 					mid := qmap["_id"]
 					if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok {
-						log.Println(mid, "mgo 转换异常", MgoFileFiled)
+						mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+							"$set": bson.M{
+								"updatefileErr": 1,
+							},})
+						//log.Println(mid, "mgo 转换异常", MgoFileFiled)
 						continue
 					} else {
 						switch v["attachments"].(type) {
@@ -86,7 +108,11 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 							att := v["attachments"].(map[string]interface{})
 							for _, vaatt := range att {
 								if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
-									log.Println(mid, "mgo 结构体转换失败", vaatt)
+									mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+										"$set": bson.M{
+											"updatefileErr": 1,
+										},})
+									//log.Println(mid, "mgo 结构体转换失败", vaatt)
 									continue
 								} else {
 									ChanB <- true
@@ -99,7 +125,40 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					//fileMap := *qu.ObjToMap(qmap["projectinfo"])
 					//fmt.Println(fileMap["attachments"])
 				}
-			}
+			//}
+			defer iter.Close()
+			log.Println("处理查询数据结束...")
+			//fmt.Println(len(*findAll))
+				//if len(*findAll) <= 0 {
+				//	log.Println("查询数据为空 :", string(data))
+				//	return
+				//}
+				//for _, v := range *findAll {
+				//	qmap := *qu.ObjToMap(v)
+				//	mid := qmap["_id"]
+				//	if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok {
+				//		log.Println(mid, "mgo 转换异常", MgoFileFiled)
+				//		continue
+				//	} else {
+				//		switch v["attachments"].(type) {
+				//		case map[string]interface{}:
+				//			att := v["attachments"].(map[string]interface{})
+				//			for _, vaatt := range att {
+				//				if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
+				//					log.Println(mid, "mgo 结构体转换失败", vaatt)
+				//					continue
+				//				} else {
+				//					ChanB <- true
+				//					go save(mid, qmap, fileinfo)
+				//
+				//				}
+				//			}
+				//		}
+				//	}
+				//	//fileMap := *qu.ObjToMap(qmap["projectinfo"])
+				//	//fmt.Println(fileMap["attachments"])
+				//}
+			//}
 		} else {
 			log.Println("开始id或结束id参数错误:", string(data))
 		}
@@ -123,20 +182,35 @@ func save(mid interface{}, qmap, fileinfo map[string]interface{}) {
 	}
 	client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
 	if err != nil {
+		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				"updatefileErr": 1,
+			},})
 		log.Println(mid, "rpc err :", err)
 		return
 	}
 	defer client.Close()
 	var reply []byte
 	//bs, _ := ioutil.ReadFile("1.docx")
+	var fffpath string
+	fffpath = path.Ext(qu.ObjToString(fileinfo["filename"]))
+	if strings.TrimSpace(fffpath) == ""{
+		fffpath  = qu.ObjToString(fileinfo["ftype"])
+	}else {
+		fffpath = fffpath[1:]
+	}
 	fileData := &FileData{
 		Name: qu.ObjToString(fileinfo["filename"]),
 		Fid:  qu.ObjToString(fileinfo["fid"]), //附件id
-		Type: qu.ObjToString(fileinfo["ftype"]),
+		Type: fffpath,
 	}
-	log.Println(mid, fileData)
+	//log.Println(mid, fileData)
 	err = client.Call("FileToText.FileToContext", fileData, &reply)
 	if err != nil {
+		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				"updatefileErr": 1,
+			},})
 		log.Println(mid, "call ocr error:", err)
 		return
 	}
@@ -151,12 +225,20 @@ func save(mid interface{}, qmap, fileinfo map[string]interface{}) {
 	//}
 	//reply, _ = json.Marshal(testfiles)
 	if len(reply) == 0{
+		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				"updatefileErr": 1,
+			},})
 		log.Println(mid, "rpc返回数据为空:", string(reply))
 		return
 	}
 	log.Println(mid, string(reply))
 	rdata := make(map[string]interface{})
 	if err := json.Unmarshal(reply, &rdata); err != nil {
+		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				"updatefileErr": 1,
+			},})
 		log.Println(mid, "rpc返回数据解析失败:", err)
 		return
 	}
@@ -169,13 +251,22 @@ func save(mid interface{}, qmap, fileinfo map[string]interface{}) {
 		if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
 			"$set": bson.M{
 				MgoFileFiled: qmap[MgoFileFiled],
+				"updatefileErr":0,
 			},
 		}) {
+			mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+				"$set": bson.M{
+					"updatefileErr": 1,
+				},})
 			log.Println(mid, "mongo更新数据失败")
 		} else {
 			log.Println(mid, "mongo更新数据成功")
 		}
 	} else {
+		mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				"updatefileErr": 1,
+			},})
 		log.Println(mid, "调用rpc服务解析异常:", rdata["err"])
 	}
 	//if qu.ObjToString(fileinfo["ftype"]) == "zip" || qu.ObjToString(fileinfo["ftype"]) == "rar" {