Browse Source

udp文件服务调用

fengweiqiang 6 years ago
parent
commit
61e33597ad
4 changed files with 252 additions and 0 deletions
  1. 11 0
      src/udpfileserver/README.md
  2. 15 0
      src/udpfileserver/config.json
  3. 197 0
      src/udpfileserver/main.go
  4. 29 0
      src/udpfileserver/main_test.go

+ 11 - 0
src/udpfileserver/README.md

@@ -0,0 +1,11 @@
+# udp文件服务调用
+##### 根据mongo开始id、结束id和文件字段mongodb_one_filefiled检索,再调用 ocr服务,识别内容后更新mongo数据信息。
+
+    udp文件服务请求参数:
+    gtid:开始id
+    lteid:结束id
+    
+    udp文件服务配置文件说明:
+    mongodb_one_c:mongo中操作的表
+    mongodb_one_filefiled:mongo结构体中的文件字段名称
+    file2text:解析文件内容服务地址

+ 15 - 0
src/udpfileserver/config.json

@@ -0,0 +1,15 @@
+{
+  "udpip": "127.0.0.1",
+  "udpport": "8888",
+  "channelsize": "5",
+  "dbsize": "5",
+  "mongodb_one_ip": "127.0.0.1:27017",
+  "mongodb_one_db": "spider",
+  "mongodb_one_c": "data_bak",
+  "mongodb_one_filefiled": "projectinfo",
+  "mongodb_two_ip": "127.0.0.1:27017",
+  "mongodb_two_db": "spider",
+  "mongodb_two_c": "data_bak",
+  "mongodb_two_filefiled": "projectinfo",
+  "file2text": "127.0.0.1:1234"
+}

+ 197 - 0
src/udpfileserver/main.go

@@ -0,0 +1,197 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"jy/mongodbutil"
+	"log"
+	mu "mfw/util"
+	"net"
+	"net/rpc"
+	qu "qfw/util"
+	"strings"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+var udpclient mu.UdpClient //udp对象
+var Sysconfig map[string]interface{}
+var MgoIP, MgoDB, MgoC, MgoFileFiled string
+var ChanB chan bool
+
+func init() {
+	qu.ReadConfig(&Sysconfig)
+	MgoIP = qu.ObjToString(Sysconfig["mongodb_two_ip"])
+	MgoDB = qu.ObjToString(Sysconfig["mongodb_two_db"])
+	MgoC = qu.ObjToString(Sysconfig["mongodb_two_c"])
+	MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_two_filefiled"], "projectinfo")
+	if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
+		log.Println("获取配置文件参数失败", Sysconfig)
+		return
+	}
+	mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
+	ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5))
+}
+
+func main() {
+	log.Println(Sysconfig)
+	udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
+	b := make(chan bool, 1)
+	<-b
+}
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	defer qu.Catch()
+	switch act {
+	case mu.OP_TYPE_DATA:
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		if err != nil {
+			log.Println("json err :", err, string(data))
+			return
+		}
+		gid := strings.TrimSpace(mapInfo["gtid"].(string))
+		lid := strings.TrimSpace(mapInfo["lteid"].(string))
+		if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
+			if findAll, b := mongodbutil.Mgo.Find(MgoC,
+				bson.M{
+					"_id": bson.M{
+						"$gte": bson.ObjectIdHex(gid),
+						"$lte": bson.ObjectIdHex(lid),
+					},
+					MgoFileFiled: bson.M{
+						"$ne": nil,
+					},
+				},
+				//if findAll, b := mongodbutil.Mgo.Find(MgoC, bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}},
+				nil, `{"_id":"1",`+MgoFileFiled+`:"1"}`, false, -1, -1); !b {
+				log.Println("查询数据失败 :", string(data))
+			} else {
+				fmt.Println(len(*findAll))
+				if len(*findAll) <= 0 {
+					log.Println("查询数据为空 :", string(data))
+					return
+				}
+				for _, v := range *findAll {
+					qmap := *qu.ObjToMap(v)
+					mid := qmap["_id"]
+					if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok {
+						log.Println(mid, "mgo 转换异常", MgoFileFiled)
+						continue
+					} else {
+						switch v["attachments"].(type) {
+						case map[string]interface{}:
+							att := v["attachments"].(map[string]interface{})
+							for _, vaatt := range att {
+								if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
+									log.Println(mid, "mgo 结构体转换失败", vaatt)
+									continue
+								} else {
+									ChanB <- true
+									go save(mid, qmap, fileinfo)
+
+								}
+							}
+						}
+					}
+					//fileMap := *qu.ObjToMap(qmap["projectinfo"])
+					//fmt.Println(fileMap["attachments"])
+				}
+			}
+		} else {
+			log.Println("开始id或结束id参数错误:", string(data))
+		}
+
+	case mu.OP_NOOP: //下个节点回应
+		log.Println("接收成功", string(data))
+
+	}
+
+}
+func save(mid interface{}, qmap, fileinfo map[string]interface{}) {
+	defer qu.Catch()
+	defer func() {
+		<-ChanB
+	}()
+	type FileData struct {
+		Fid     string
+		Name    string
+		Type    string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
+		Content string //识别内容
+	}
+	client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
+	if err != nil {
+		log.Println(mid, "rpc err :", err)
+		return
+	}
+	defer client.Close()
+	var reply []byte
+	//bs, _ := ioutil.ReadFile("1.docx")
+	fileData := &FileData{
+		Name: qu.ObjToString(fileinfo["filename"]),
+		Fid:  qu.ObjToString(fileinfo["fid"]), //附件id
+		Type: qu.ObjToString(fileinfo["ftype"]),
+	}
+	log.Println(mid, fileData)
+	err = client.Call("FileToText.FileToContext", fileData, &reply)
+	if err != nil {
+		log.Println(mid, "call ocr error:", err)
+		return
+	}
+	//fileinfo["ftype"] = "doc"
+	//reply = []byte("jdsfkldasjflkj")
+	//fileinfo["ftype"] = "zip"
+	//testfiles := []map[string]interface {
+	//}{
+	//	{"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
+	//	{"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
+	//	{"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
+	//}
+	//reply, _ = json.Marshal(testfiles)
+	log.Println(mid, string(reply))
+	rdata := make(map[string]interface{})
+	if err := json.Unmarshal(reply, &rdata); err != nil {
+		log.Println(mid, "rpc返回数据解析失败:", err)
+		return
+	}
+	if rdata["err"] == nil || rdata["err"] == "null" {
+		if qu.ObjToString(fileinfo["ftype"]) == "rar" || qu.ObjToString(fileinfo["ftype"]) == "zip" {
+			fileinfo["content"] = rdata["contextc"]
+		} else {
+			fileinfo["content"] = rdata["context"]
+		}
+		if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				MgoFileFiled: qmap[MgoFileFiled],
+			},
+		}) {
+			log.Println(mid, "mongo更新数据失败")
+		} else {
+			log.Println(mid, "mongo更新数据成功")
+		}
+	} else {
+		log.Println(mid, "调用rpc服务解析异常:", rdata["err"])
+	}
+	//if qu.ObjToString(fileinfo["ftype"]) == "zip" || qu.ObjToString(fileinfo["ftype"]) == "rar" {
+	//	fileDatas := make([]map[string]interface{}, 0)
+	//	if err := json.Unmarshal(reply, &fileDatas); err != nil {
+	//		log.Println("json转换错误", mid, err)
+	//		return
+	//	}
+	//	fileinfo["content"] = fileDatas
+	//} else {
+	//	fileinfo["content"] = string(reply)
+	//}
+	//if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+	//	"$set": bson.M{
+	//		MgoFileFiled: qmap[MgoFileFiled],
+	//	},
+	//}) {
+	//	log.Println(mid, "更新数据失败")
+	//} else {
+	//	log.Println(mid, "更新数据成功")
+	//}
+
+}

+ 29 - 0
src/udpfileserver/main_test.go

@@ -0,0 +1,29 @@
+package main
+
+import (
+	"encoding/json"
+	"log"
+	"mfw/util"
+	"net"
+	"testing"
+)
+
+func TestName(t *testing.T) {
+	udpclient = util.UdpClient{Local: "127.0.0.1:8889", BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	m := map[string]string{
+		"gtid":"5cac41a6dff41f3b20b3b99c",
+		"lteid":"5cac41a6dff41f3b20b3b99c",
+	}
+	b, _ := json.Marshal(m)
+	err := udpclient.WriteUdp(b, util.OP_TYPE_DATA, &net.UDPAddr{
+		IP:   net.ParseIP("127.0.0.1"),
+		Port: 8888,
+	})
+	if err != nil{
+		log.Println(err)
+		return
+	}
+	log.Println("发送成功")
+
+}