Browse Source

ocrudp识别

fengweiqiang 6 years ago
parent
commit
859d7bbd39
4 changed files with 387 additions and 0 deletions
  1. 11 0
      udpfileocr/README.md
  2. 14 0
      udpfileocr/config.json
  3. 327 0
      udpfileocr/main.go
  4. 35 0
      udpfileocr/maintest.go

+ 11 - 0
udpfileocr/README.md

@@ -0,0 +1,11 @@
+# udp文件服务调用
+##### 根据mongo开始id、结束id和文件字段mongodb_one_filefiled检索,再调用 ocr服务,识别内容后更新mongo数据信息。
+
+    udp文件服务请求参数:
+    gtid:开始id
+    lteid:结束id
+    
+    udp文件服务配置文件说明:
+    mongodb_one_c:mongo中操作的表
+    mongodb_one_filefiled:mongo结构体中的文件字段名称
+    file2text:解析文件内容服务地址

+ 14 - 0
udpfileocr/config.json

@@ -0,0 +1,14 @@
+{
+  "udpip": "127.0.0.1",
+  "udpport": "1490",
+  "channelsize": "1",
+  "dbsize": "5",
+  "mongodb_one_ip": "192.168.3.207:27082",
+  "mongodb_one_db": "spider",
+  "mongodb_one_c": "bidding_file",
+  "mongodb_one_filefiled": "projectinfo",
+  "file2text": "192.168.3.207:1234",
+  "PageSize":5000,
+  "toudpip": "127.0.0.1",
+  "toudpport": "1481"
+}

+ 327 - 0
udpfileocr/main.go

@@ -0,0 +1,327 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/go-gomail/gomail"
+	"gopkg.in/mgo.v2/bson"
+	"jy/mongodbutil"
+	"log"
+	mu "mfw/util"
+	"net"
+	"net/rpc"
+	"path"
+	"qfw/common/src/qfw/util"
+	qu "qfw/util"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+)
+
+var udpclient mu.UdpClient //udp对象
+var Sysconfig map[string]interface{}
+var MgoIP, MgoDB, MgoC, MgoFileFiled string
+var ChanB chan bool
+var PageSize int
+
+func init() {
+	qu.ReadConfig(&Sysconfig)
+	MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"])
+	MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"])
+	MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"])
+	PageSize = qu.IntAllDef(Sysconfig["PageSize"],2000)
+	MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
+	if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" ||PageSize <=0{
+		log.Println("获取配置文件参数失败", Sysconfig)
+		return
+	}
+	mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
+	log.Println(mongodbutil.Mgo.Get().Ping())
+	ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5))
+}
+
+func main() {
+	log.Println(Sysconfig)
+	udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
+	b := make(chan bool, 1)
+	<-b
+}
+//  "file2text": "192.168.3.207:1234",
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	defer qu.Catch()
+	switch act {
+	case mu.OP_TYPE_DATA:
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		if err != nil {
+			log.Println("json err :", err, string(data))
+			return
+		}
+		log.Println("updocr接收数据:",mapInfo)
+		stime :=time.Now()
+		gid := strings.TrimSpace(mapInfo["gtid"].(string))
+		lid := strings.TrimSpace(mapInfo["lteid"].(string))
+		//err = udpclient.WriteUdp([]byte("updocr接收数据成功"), mu.OP_TYPE_DATA, &net.UDPAddr{
+		//	IP:   net.ParseIP(Sysconfig["toudpip"].(string)),
+		//	Port: qu.IntAll(Sysconfig["toudpport"]),
+		//})
+		////forfunc(lid)
+		//log.Println("接收数据成功,发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err)
+		if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
+			var jsq int64
+			query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid),"$lte": bson.ObjectIdHex(lid),}}
+			log.Println("query---:", query)
+			sum :=mongodbutil.Mgo.Count(MgoC,query)
+			log.Println("sum:", sum)
+			pageNum := (sum + PageSize - 1) / PageSize
+			limit := PageSize
+			if sum < PageSize {
+				limit = sum
+			}
+			for i := 0; i < pageNum; i++ {
+				query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}}
+				log.Println("page=", i+1,"query=", query,limit)
+				list, b := mongodbutil.Mgo.Find(MgoC,query,nil,bson.M{"_id": 1,MgoFileFiled:1},false,0, limit)
+				if !b{
+					log.Println("查询失败")
+					continue
+				}
+
+				for _,v:=range *list {
+					gid = qu.BsonIdToSId(v["_id"])
+					jsq++
+					updateNum :=0
+					qmap := qu.ObjToMap(v)
+					mid := (*qmap)["_id"]
+					if v, ok := (*qmap)[MgoFileFiled].(map[string]interface{}); !ok {
+						//log.Println(mid, "mgo 没有字段", MgoFileFiled)
+						continue
+					} else {
+						switch v["attachments"].(type) {
+						case map[string]interface{}:
+							att := v["attachments"].(map[string]interface{})
+							for attk, vaatt := range att {
+								if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
+									//log.Println(mid, "mgo 结构体转换失败", vaatt)
+									continue
+								} else {
+									ChanB <- true
+									if qu.ObjToString(fileinfo["fid"]) ==""{
+										<-ChanB
+										log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
+										continue
+									}
+									//if (strings.Contains(qu.ObjToString(fileinfo["url"]),"fs.qmx.top")|| strings.Contains(qu.ObjToString(fileinfo["url"]),"fj1.jianyu360.com"))&& (strings.TrimSpace(qu.ObjToString(fileinfo["content"]))==""||strings.Contains(qu.ObjToString(fileinfo["content"]),"error") ){
+									//	save(mid,attk, qmap, &fileinfo,&updateNum)
+									//	<-ChanB
+									//}else {
+									//	<-ChanB
+									//}
+									//if qu.ObjToString(fileinfo["update"]) ==""{
+									//	<-ChanB
+									//	log.Println(mid, "mgo ", MgoFileFiled,"没有update ")
+									//	continue
+									//}
+									save(mid,attk, qmap, &fileinfo,&updateNum)
+									<-ChanB
+								}
+							}
+						}
+					}
+				}
+			}
+			//识别完以后再次查询数据库,进行下一轮识别
+			log.Println("处理查询数据结束...",jsq,time.Now().Sub(stime))
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  gid,
+				"lteid": lid,
+				"stype": "fujian",
+			})
+			err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP(Sysconfig["toudpip"].(string)),
+				Port: qu.IntAll(Sysconfig["toudpport"]),
+			})
+			//SendMail("处理完成")
+			//进行下一轮识别
+			//forfunc(lid)
+			log.Println("发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err)
+		} else {
+			log.Println("开始id或结束id参数错误:", string(data))
+		}
+
+	case mu.OP_NOOP: //下个节点回应
+		log.Println("接收成功", string(data))
+
+	}
+
+}
+func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},updatenum *int) {
+	defer qu.Catch()
+	type FileData struct {
+		ObjId   string //Id
+		OrgUrl  string //源下载地址
+		Fid     string
+		Name    string
+		Type    string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
+		Content string //识别内容
+	}
+	client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
+	if err != nil {
+		log.Println(mid, "rpc err :", err)
+		return
+	}
+	defer client.Close()
+	var reply []byte
+	//bs, _ := ioutil.ReadFile("1.docx")
+	var fffpath string
+	fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"]))
+	if strings.TrimSpace(fffpath) == ""{
+		fffpath  = qu.ObjToString((*fileinfo)["ftype"])
+	}else {
+		fffpath = fffpath[1:]
+	}
+	fileData := &FileData{
+		ObjId:mid.(bson.ObjectId).String(),
+		OrgUrl: qu.ObjToString((*fileinfo)["url"]),
+		Name: qu.ObjToString((*fileinfo)["filename"]),
+		Fid:  qu.ObjToString((*fileinfo)["fid"]), //附件id
+		Type: fffpath,
+	}
+	//log.Println(mid, fileData)
+	err = client.Call("FileToText.FileToContext", fileData, &reply)
+	if err != nil {
+		log.Println(mid, "call ocr error:", err)
+		return
+	}
+	//fileinfo["ftype"] = "doc"
+	//reply = []byte("jdsfkldasjflkj")
+	//fileinfo["ftype"] = "zip"
+	//testfiles := []map[string]interface {
+	//}{
+	//	{"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
+	//	{"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
+	//	{"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
+	//}
+	//reply, _ = json.Marshal(testfiles)
+	if len(reply) == 0{
+		log.Println(mid, "rpc返回数据为空:",qu.ObjToString((*fileinfo)["fid"]), string(reply))
+		return
+	}
+	//log.Println(mid, string(reply))
+	rdata := make(map[string]interface{})
+	if err := json.Unmarshal(reply, &rdata); err != nil {
+		log.Println(mid, "rpc返回数据解析失败:",qu.ObjToString((*fileinfo)["fid"]), err)
+		return
+	}
+	if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
+		if qu.ObjToString((*fileinfo)["ftype"]) == "rar" || qu.ObjToString((*fileinfo)["ftype"]) == "zip" {
+			(*fileinfo)["content"] = rdata["contextc"]
+		} else {
+			(*fileinfo)["content"] = rdata["context"]
+		}
+		(*fileinfo)["expend"] = rdata["expend"]
+		delete(*fileinfo,"update")
+		//log.Println((*fileinfo))
+
+		(*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk]=*fileinfo
+		//asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
+		//qwer := asdf["attachments"].(map[string]interface{})
+		//qwer[attk] =*fileinfo
+		//log.Println((*qmap)[MgoFileFiled])
+
+		updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+			"$set": bson.M{
+				MgoFileFiled: (*qmap)[MgoFileFiled],
+			},
+		})
+		if updateBool{
+			*updatenum++
+			mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
+				"$set": bson.M{
+					"updatefileNum": &updatenum,
+				},})
+			log.Println(mid, "mongo更新数据成功")
+		}else {
+			log.Println(mid, "mongo更新数据失败",qu.ObjToString((*fileinfo)["fid"]))
+		}
+		nowHour := time.Now().Hour()
+		rdlock.Lock()
+		if nowHour != hourNum{
+			log.Println("send email:",SendMail(fmt.Sprint(updateBool,mid)))
+			hourNum = nowHour
+		}
+		rdlock.Unlock()
+	} else {
+		log.Println(mid, "调用rpc服务解析异常:",mid,qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
+	}
+
+}
+var hourNum int
+var rdlock sync.RWMutex
+func SendMail( body string ) error {
+	//定义邮箱服务器连接信息,如果是阿里邮箱 pass填密码,qq邮箱填授权码
+	mailConn := map[string]string {
+		"user": "550838476@qq.com",
+		"pass": "",
+		"host": "smtp.qq.com",
+		"port": "465",
+	}
+
+	port, _ := strconv.Atoi(mailConn["port"]) //转换端口类型为int
+
+	m := gomail.NewMessage()
+	m.SetHeader("From","Get to" + "<" + mailConn["user"] + ">")  //这种方式可以添加别名,即“XD Game”, 也可以直接用<code>m.SetHeader("From",mailConn["user"])</code> 读者可以自行实验下效果
+	m.SetHeader("To", []string{"550838476@qq.com"}...)  //发送给多个用户
+	m.SetHeader("Subject", "MongoId")  //设置邮件主题
+	m.SetBody("text/html","服务器五:"+ body)     //设置邮件正文
+
+	d := gomail.NewDialer(mailConn["host"], port, mailConn["user"], mailConn["pass"])
+
+	err := d.DialAndSend(m)
+	return err
+
+}
+
+func forfunc(lid string) {
+	for {
+		//查询最后一个id
+		lastObjectId, _ := mongodbutil.Mgo.Find(MgoC,nil,"-_id",bson.M{"_id":1},true,-1,-1)
+		lastId,ok := (*lastObjectId)[0]["_id"].(bson.ObjectId)
+		log.Println("lastID:",lastId)
+		//查询最后一个id出错重新查询
+		if!ok{//转换失败
+			log.Println("查询异常",*lastObjectId)
+			time.Sleep(time.Minute)
+			continue
+		}
+		//查询最后一个id等于上一轮的id就重新查询
+		if lastId.Hex() == lid {
+			log.Println("没有新数据",lastId.Hex())
+			SendMail(time.Now().String()+"没有最新数据,当前最后一条数据id:"+lastId.Hex())
+			time.Sleep(time.Hour)
+			continue
+		}
+		//不相等说明有新数据,进行下次处理
+		m := map[string]string{
+			"gtid":lid,//上一轮结束的最后id
+			"lteid":lastId.Hex(),//新一轮查询出来的id
+		}
+		bytes, _ := json.Marshal(m)
+		//发送udp
+		err := udpclient.WriteUdp(bytes,mu.OP_TYPE_DATA,&net.UDPAddr{
+			IP:   net.ParseIP( util.ObjToString(Sysconfig["udpip"])),
+			Port:  util.IntAll(Sysconfig["udpport"]),
+		})
+		if err != nil{
+			log.Println("发送udp失败",err,string(bytes))
+			time.Sleep(time.Minute)
+			continue
+		}
+		SendMail(time.Now().String()+fmt.Sprint("发送udp成功,gtid:",lid,",lteid:",lastId.Hex()))
+		log.Println("发送udp成功,gtid:",lid,",lteid:",lastId.Hex())
+		break//发送完后终止循环
+	}
+}

+ 35 - 0
udpfileocr/maintest.go

@@ -0,0 +1,35 @@
+package main
+
+import (
+	"encoding/json"
+	"log"
+	"mfw/util"
+	"net"
+)
+
+func main() {
+	udpclient := util.UdpClient{Local: "127.0.0.1:1482", BufSize: 1024}
+	udpclient.Listen(processUdpMsg2)
+	m := map[string]string{
+		"gtid":"5d19bf2fa5cb26b9b79b1994",
+		"lteid":"5d19bf2fa5cb26b9b79b1995",
+	}
+	b, _ := json.Marshal(m)
+	//for  range time.Tick(time.Second){
+	err := udpclient.WriteUdp(b, util.OP_TYPE_DATA, &net.UDPAddr{
+		IP:   net.ParseIP("172.17.145.163"),
+		Port: 1481,
+	})
+	if err != nil{
+		log.Println(err)
+		return
+	}
+	log.Println("发送成功")
+	//}
+	select {
+
+	}
+}
+func processUdpMsg2(act byte, data []byte, ra *net.UDPAddr) {
+	log.Println(string(data))
+}