fengweiqiang 6 years ago
parent
commit
c627e5ee7c
4 changed files with 0 additions and 366 deletions
  1. 0 11
      src/udpfileserver/README.md
  2. 0 12
      src/udpfileserver/config.json
  3. 0 311
      src/udpfileserver/main.go
  4. 0 32
      src/udpfileserver/maintest.go

+ 0 - 11
src/udpfileserver/README.md

@@ -1,11 +0,0 @@
-# udp文件服务调用
-##### 根据mongo开始id、结束id和文件字段mongodb_one_filefiled检索,再调用 ocr服务,识别内容后更新mongo数据信息。
-
-    udp文件服务请求参数:
-    gtid:开始id
-    lteid:结束id
-    
-    udp文件服务配置文件说明:
-    mongodb_one_c:mongo中操作的表
-    mongodb_one_filefiled:mongo结构体中的文件字段名称
-    file2text:解析文件内容服务地址

+ 0 - 12
src/udpfileserver/config.json

@@ -1,12 +0,0 @@
-{
-  "udpip": "127.0.0.1",
-  "udpport": "8888",
-  "channelsize": "1",
-  "dbsize": "5",
-  "mongodb_one_ip": "127.0.0.1:27017",
-  "mongodb_one_db": "spider",
-  "mongodb_one_c": "bidding_file",
-  "mongodb_one_filefiled": "projectinfo",
-  "file2text": "192.168.3.207:1234",
-  "PageSize":5000
-}

+ 0 - 311
src/udpfileserver/main.go

@@ -1,311 +0,0 @@
-package main
-
-import (
-	"encoding/json"
-	"fmt"
-	"github.com/go-gomail/gomail"
-	"gopkg.in/mgo.v2/bson"
-	"jy/mongodbutil"
-	"log"
-	mu "mfw/util"
-	"net"
-	"net/rpc"
-	"path"
-	"qfw/common/src/qfw/util"
-	qu "qfw/util"
-	"strconv"
-	"strings"
-	"sync"
-	"time"
-)
-
-var udpclient mu.UdpClient //udp对象
-var Sysconfig map[string]interface{}
-var MgoIP, MgoDB, MgoC, MgoFileFiled string
-var ChanB chan bool
-var PageSize int
-
-func init() {
-	qu.ReadConfig(&Sysconfig)
-	MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"])
-	MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"])
-	MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"])
-	PageSize = qu.IntAllDef(Sysconfig["PageSize"],2000)
-	MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
-	if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" ||PageSize <=0{
-		log.Println("获取配置文件参数失败", Sysconfig)
-		return
-	}
-	mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
-	log.Println(mongodbutil.Mgo.Get().Ping())
-	ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5))
-}
-
-func main() {
-	log.Println(Sysconfig)
-	udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
-	udpclient.Listen(processUdpMsg)
-	log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
-	b := make(chan bool, 1)
-	<-b
-}
-//  "file2text": "192.168.3.207:1234",
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
-	defer qu.Catch()
-	switch act {
-	case mu.OP_TYPE_DATA:
-		var mapInfo map[string]interface{}
-		err := json.Unmarshal(data, &mapInfo)
-		if err != nil {
-			log.Println("json err :", err, string(data))
-			return
-		}
-		log.Println(mapInfo)
-		stime :=time.Now()
-		gid := strings.TrimSpace(mapInfo["gtid"].(string))
-		lid := strings.TrimSpace(mapInfo["lteid"].(string))
-		if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
-			var jsq int64
-			query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid),"$lte": bson.ObjectIdHex(lid),}}
-			log.Println("query---:", query)
-			sum :=mongodbutil.Mgo.Count(MgoC,query)
-			log.Println("sum:", sum)
-			pageNum := (sum + PageSize - 1) / PageSize
-			limit := PageSize
-			if sum < PageSize {
-				limit = sum
-			}
-			for i := 0; i < pageNum; i++ {
-				query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}}
-				log.Println("page=", i+1,"query=", query,limit)
-				list, b := mongodbutil.Mgo.Find(MgoC,query,nil,bson.M{"_id": 1,MgoFileFiled:1},false,0, limit)
-				if !b{
-					log.Println("查询失败")
-					continue
-				}
-
-				for _,v:=range *list {
-					gid = qu.BsonIdToSId(v["_id"])
-					jsq++
-					updateNum :=0
-					qmap := qu.ObjToMap(v)
-					mid := (*qmap)["_id"]
-					if v, ok := (*qmap)[MgoFileFiled].(map[string]interface{}); !ok {
-						//log.Println(mid, "mgo 没有字段", MgoFileFiled)
-						continue
-					} else {
-						switch v["attachments"].(type) {
-						case map[string]interface{}:
-							att := v["attachments"].(map[string]interface{})
-							for attk, vaatt := range att {
-								if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
-									//log.Println(mid, "mgo 结构体转换失败", vaatt)
-									continue
-								} else {
-									ChanB <- true
-									if qu.ObjToString(fileinfo["fid"]) ==""{
-										<-ChanB
-										log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
-										continue
-									}
-									//if (strings.Contains(qu.ObjToString(fileinfo["url"]),"fs.qmx.top")|| strings.Contains(qu.ObjToString(fileinfo["url"]),"fj1.jianyu360.com"))&& (strings.TrimSpace(qu.ObjToString(fileinfo["content"]))==""||strings.Contains(qu.ObjToString(fileinfo["content"]),"error") ){
-									//	save(mid,attk, qmap, &fileinfo,&updateNum)
-									//	<-ChanB
-									//}else {
-									//	<-ChanB
-									//}
-									//if qu.ObjToString(fileinfo["update"]) ==""{
-									//	<-ChanB
-									//	log.Println(mid, "mgo ", MgoFileFiled,"没有update ")
-									//	continue
-									//}
-									save(mid,attk, qmap, &fileinfo,&updateNum)
-									<-ChanB
-								}
-							}
-						}
-					}
-				}
-			}
-			//识别完以后再次查询数据库,进行下一轮识别
-			log.Println("处理查询数据结束...",jsq,time.Now().Sub(stime))
-			//SendMail("处理完成")
-			//进行下一轮识别
-			forfunc(lid)
-		} else {
-			log.Println("开始id或结束id参数错误:", string(data))
-		}
-
-	case mu.OP_NOOP: //下个节点回应
-		log.Println("接收成功", string(data))
-
-	}
-
-}
-func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},updatenum *int) {
-	defer qu.Catch()
-	type FileData struct {
-		ObjId   string //Id
-		OrgUrl  string //源下载地址
-		Fid     string
-		Name    string
-		Type    string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
-		Content string //识别内容
-	}
-	client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
-	if err != nil {
-		log.Println(mid, "rpc err :", err)
-		return
-	}
-	defer client.Close()
-	var reply []byte
-	//bs, _ := ioutil.ReadFile("1.docx")
-	var fffpath string
-	fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"]))
-	if strings.TrimSpace(fffpath) == ""{
-		fffpath  = qu.ObjToString((*fileinfo)["ftype"])
-	}else {
-		fffpath = fffpath[1:]
-	}
-	fileData := &FileData{
-		ObjId:mid.(bson.ObjectId).String(),
-		OrgUrl: qu.ObjToString((*fileinfo)["url"]),
-		Name: qu.ObjToString((*fileinfo)["filename"]),
-		Fid:  qu.ObjToString((*fileinfo)["fid"]), //附件id
-		Type: fffpath,
-	}
-	//log.Println(mid, fileData)
-	err = client.Call("FileToText.FileToContext", fileData, &reply)
-	if err != nil {
-		log.Println(mid, "call ocr error:", err)
-		return
-	}
-	//fileinfo["ftype"] = "doc"
-	//reply = []byte("jdsfkldasjflkj")
-	//fileinfo["ftype"] = "zip"
-	//testfiles := []map[string]interface {
-	//}{
-	//	{"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
-	//	{"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
-	//	{"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
-	//}
-	//reply, _ = json.Marshal(testfiles)
-	if len(reply) == 0{
-		log.Println(mid, "rpc返回数据为空:",qu.ObjToString((*fileinfo)["fid"]), string(reply))
-		return
-	}
-	//log.Println(mid, string(reply))
-	rdata := make(map[string]interface{})
-	if err := json.Unmarshal(reply, &rdata); err != nil {
-		log.Println(mid, "rpc返回数据解析失败:",qu.ObjToString((*fileinfo)["fid"]), err)
-		return
-	}
-	if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
-		if qu.ObjToString((*fileinfo)["ftype"]) == "rar" || qu.ObjToString((*fileinfo)["ftype"]) == "zip" {
-			(*fileinfo)["content"] = rdata["contextc"]
-		} else {
-			(*fileinfo)["content"] = rdata["context"]
-		}
-		(*fileinfo)["expend"] = rdata["expend"]
-		delete(*fileinfo,"update")
-		//log.Println((*fileinfo))
-
-		(*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk]=*fileinfo
-		//asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
-		//qwer := asdf["attachments"].(map[string]interface{})
-		//qwer[attk] =*fileinfo
-		//log.Println((*qmap)[MgoFileFiled])
-
-		updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
-			"$set": bson.M{
-				MgoFileFiled: (*qmap)[MgoFileFiled],
-			},
-		})
-		if updateBool{
-			*updatenum++
-			mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
-				"$set": bson.M{
-					"updatefileNum": &updatenum,
-				},})
-			log.Println(mid, "mongo更新数据成功")
-		}else {
-			log.Println(mid, "mongo更新数据失败",qu.ObjToString((*fileinfo)["fid"]))
-		}
-		nowHour := time.Now().Hour()
-		rdlock.Lock()
-		if nowHour != hourNum{
-			log.Println("send email:",SendMail(fmt.Sprint(updateBool,mid)))
-			hourNum = nowHour
-		}
-		rdlock.Unlock()
-	} else {
-		log.Println(mid, "调用rpc服务解析异常:",mid,qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
-	}
-
-}
-var hourNum int
-var rdlock sync.RWMutex
-func SendMail( body string ) error {
-	//定义邮箱服务器连接信息,如果是阿里邮箱 pass填密码,qq邮箱填授权码
-	mailConn := map[string]string {
-		"user": "550838476@qq.com",
-		"pass": "",
-		"host": "smtp.qq.com",
-		"port": "465",
-	}
-
-	port, _ := strconv.Atoi(mailConn["port"]) //转换端口类型为int
-
-	m := gomail.NewMessage()
-	m.SetHeader("From","Get to" + "<" + mailConn["user"] + ">")  //这种方式可以添加别名,即“XD Game”, 也可以直接用<code>m.SetHeader("From",mailConn["user"])</code> 读者可以自行实验下效果
-	m.SetHeader("To", []string{"550838476@qq.com"}...)  //发送给多个用户
-	m.SetHeader("Subject", "MongoId")  //设置邮件主题
-	m.SetBody("text/html","服务器五:"+ body)     //设置邮件正文
-
-	d := gomail.NewDialer(mailConn["host"], port, mailConn["user"], mailConn["pass"])
-
-	err := d.DialAndSend(m)
-	return err
-
-}
-
-func forfunc(lid string) {
-	for {
-		//查询最后一个id
-		lastObjectId, _ := mongodbutil.Mgo.Find(MgoC,nil,"-_id",bson.M{"_id":1},true,-1,-1)
-		lastId,ok := (*lastObjectId)[0]["_id"].(bson.ObjectId)
-		log.Println("lastID:",lastId)
-		//查询最后一个id出错重新查询
-		if!ok{//转换失败
-			log.Println("查询异常",*lastObjectId)
-			time.Sleep(time.Minute)
-			continue
-		}
-		//查询最后一个id等于上一轮的id就重新查询
-		if lastId.Hex() == lid {
-			log.Println("没有新数据",lastId.Hex())
-			SendMail(time.Now().String()+"没有最新数据,当前最后一条数据id:"+lastId.Hex())
-			time.Sleep(time.Hour)
-			continue
-		}
-		//不相等说明有新数据,进行下次处理
-		m := map[string]string{
-			"gtid":lid,//上一轮结束的最后id
-			"lteid":lastId.Hex(),//新一轮查询出来的id
-		}
-		bytes, _ := json.Marshal(m)
-		//发送udp
-		err := udpclient.WriteUdp(bytes,mu.OP_TYPE_DATA,&net.UDPAddr{
-			IP:   net.ParseIP( util.ObjToString(Sysconfig["udpip"])),
-			Port:  util.IntAll(Sysconfig["udpport"]),
-		})
-		if err != nil{
-			log.Println("发送udp失败",err,string(bytes))
-			time.Sleep(time.Minute)
-			continue
-		}
-		SendMail(time.Now().String()+fmt.Sprint("发送udp成功,gtid:",lid,",lteid:",lastId.Hex()))
-		log.Println("发送udp成功,gtid:",lid,",lteid:",lastId.Hex())
-		break//发送完后终止循环
-	}
-}

+ 0 - 32
src/udpfileserver/maintest.go

@@ -1,32 +0,0 @@
-package main
-
-import (
-	"encoding/json"
-	"log"
-	"mfw/util"
-	"net"
-)
-
-func main() {
-	udpclient := util.UdpClient{Local: "127.0.0.1:8889", BufSize: 1024}
-	udpclient.Listen(processUdpMsg2)
-	m := map[string]string{
-		"gtid":"5cb682c9ed1d910046aca2eb",
-		"lteid":"5cb683f2ed1d9100570abbc3",
-	}
-	b, _ := json.Marshal(m)
-	//for  range time.Tick(time.Second){
-	err := udpclient.WriteUdp(b, util.OP_TYPE_DATA, &net.UDPAddr{
-		IP:   net.ParseIP("127.0.0.1"),
-		Port: 8888,
-	})
-	if err != nil{
-		log.Println(err)
-		return
-	}
-	log.Println("发送成功")
-	//}
-}
-func processUdpMsg2(act byte, data []byte, ra *net.UDPAddr) {
-
-}