123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- package main
- import (
- "encoding/json"
- "fmt"
- "github.com/go-gomail/gomail"
- "gopkg.in/mgo.v2/bson"
- "jy/mongodbutil"
- "log"
- mu "mfw/util"
- "net"
- "net/rpc"
- "path"
- "qfw/common/src/qfw/util"
- qu "qfw/util"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- var udpclient mu.UdpClient //udp对象
- var Sysconfig map[string]interface{}
- var MgoIP, MgoDB, MgoC, MgoFileFiled string
- var ChanB chan bool
- var PageSize int
- func init() {
- qu.ReadConfig(&Sysconfig)
- MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"])
- MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"])
- MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"])
- PageSize = qu.IntAllDef(Sysconfig["PageSize"],2000)
- MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
- if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" ||PageSize <=0{
- log.Println("获取配置文件参数失败", Sysconfig)
- return
- }
- mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
- log.Println(mongodbutil.Mgo.Get().Ping())
- ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5))
- }
- func main() {
- log.Println(Sysconfig)
- udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
- b := make(chan bool, 1)
- <-b
- }
- // "file2text": "192.168.3.207:1234",
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- defer qu.Catch()
- switch act {
- case mu.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- log.Println("json err :", err, string(data))
- return
- }
- log.Println("updocr接收数据:",mapInfo)
- stime :=time.Now()
- gid := strings.TrimSpace(mapInfo["gtid"].(string))
- rgid := gid
- lid := strings.TrimSpace(mapInfo["lteid"].(string))
- //err = udpclient.WriteUdp([]byte("updocr接收数据成功"), mu.OP_TYPE_DATA, &net.UDPAddr{
- // IP: net.ParseIP(Sysconfig["toudpip"].(string)),
- // Port: qu.IntAll(Sysconfig["toudpport"]),
- //})
- ////forfunc(lid)
- //log.Println("接收数据成功,发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err)
- if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
- var jsq int64
- query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid),"$lte": bson.ObjectIdHex(lid),}}
- log.Println("query---:", query)
- sum :=mongodbutil.Mgo.Count(MgoC,query)
- log.Println("sum:", sum)
- pageNum := (sum + PageSize - 1) / PageSize
- limit := PageSize
- if sum < PageSize {
- limit = sum
- }
- for i := 0; i < pageNum; i++ {
- query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}}
- log.Println("page=", i+1,"query=", query,limit)
- list, b := mongodbutil.Mgo.Find(MgoC,query,nil,bson.M{"_id": 1,MgoFileFiled:1},false,0, limit)
- if !b{
- log.Println("查询失败")
- continue
- }
- for _,v:=range *list {
- gid = qu.BsonIdToSId(v["_id"])
- jsq++
- updateNum :=0
- qmap := qu.ObjToMap(v)
- mid := (*qmap)["_id"]
- if v, ok := (*qmap)[MgoFileFiled].(map[string]interface{}); !ok {
- //log.Println(mid, "mgo 没有字段", MgoFileFiled)
- continue
- } else {
- switch v["attachments"].(type) {
- case map[string]interface{}:
- att := v["attachments"].(map[string]interface{})
- for attk, vaatt := range att {
- if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
- //log.Println(mid, "mgo 结构体转换失败", vaatt)
- continue
- } else {
- ChanB <- true
- if qu.ObjToString(fileinfo["fid"]) ==""{
- <-ChanB
- //log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
- continue
- }
- //if (strings.Contains(qu.ObjToString(fileinfo["url"]),"fs.qmx.top")|| strings.Contains(qu.ObjToString(fileinfo["url"]),"fj1.jianyu360.com"))&& (strings.TrimSpace(qu.ObjToString(fileinfo["content"]))==""||strings.Contains(qu.ObjToString(fileinfo["content"]),"error") ){
- // save(mid,attk, qmap, &fileinfo,&updateNum)
- // <-ChanB
- //}else {
- // <-ChanB
- //}
- //if qu.ObjToString(fileinfo["update"]) ==""{
- // <-ChanB
- // log.Println(mid, "mgo ", MgoFileFiled,"没有update ")
- // continue
- //}
- save(mid,attk, qmap, &fileinfo,&updateNum)
- <-ChanB
- }
- }
- }
- }
- }
- }
- //发送udp信号
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": rgid,
- "lteid": lid,
- "stype": "fujian",
- })
- err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP(Sysconfig["toudpip"].(string)),
- Port: qu.IntAll(Sysconfig["toudpport"]),
- })
- //识别完以后再次查询数据库,进行下一轮识别
- log.Println("处理查询数据结束...",jsq,time.Now().Sub(stime))
- SendMail(rgid+"--->"+lid+"处理完成")
- //进行下一轮识别
- forfunc(lid)
- log.Println("发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err)
- } else {
- log.Println("开始id或结束id参数错误:", string(data))
- }
- case mu.OP_NOOP: //下个节点回应
- log.Println("接收成功", string(data))
- }
- }
- func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},updatenum *int) {
- defer qu.Catch()
- type FileData struct {
- ObjId string //Id
- OrgUrl string //源下载地址
- Fid string
- Name string
- Type string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
- Content string //识别内容
- }
- client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
- if err != nil {
- log.Println(mid, "rpc err :", err)
- return
- }
- defer client.Close()
- var reply []byte
- //bs, _ := ioutil.ReadFile("1.docx")
- var fffpath string
- fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"]))
- if strings.TrimSpace(fffpath) == ""{
- fffpath = qu.ObjToString((*fileinfo)["ftype"])
- }else {
- fffpath = fffpath[1:]
- }
- fileData := &FileData{
- ObjId:mid.(bson.ObjectId).String(),
- OrgUrl: qu.ObjToString((*fileinfo)["url"]),
- Name: qu.ObjToString((*fileinfo)["filename"]),
- Fid: qu.ObjToString((*fileinfo)["fid"]), //附件id
- Type: fffpath,
- }
- //log.Println(mid, fileData)
- err = client.Call("FileToText.FileToContext", fileData, &reply)
- if err != nil {
- log.Println(mid, "call ocr error:", err)
- return
- }
- //fileinfo["ftype"] = "doc"
- //reply = []byte("jdsfkldasjflkj")
- //fileinfo["ftype"] = "zip"
- //testfiles := []map[string]interface {
- //}{
- // {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
- // {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
- // {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
- //}
- //reply, _ = json.Marshal(testfiles)
- if len(reply) == 0{
- log.Println(mid, "rpc返回数据为空:",qu.ObjToString((*fileinfo)["fid"]), string(reply))
- return
- }
- //log.Println(mid, string(reply))
- rdata := make(map[string]interface{})
- if err := json.Unmarshal(reply, &rdata); err != nil {
- log.Println(mid, "rpc返回数据解析失败:",qu.ObjToString((*fileinfo)["fid"]), err)
- return
- }
- if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
- if qu.ObjToString((*fileinfo)["ftype"]) == "rar" || qu.ObjToString((*fileinfo)["ftype"]) == "zip" {
- (*fileinfo)["content"] = rdata["contextc"]
- } else {
- (*fileinfo)["content"] = rdata["context"]
- }
- (*fileinfo)["expend"] = rdata["expend"]
- delete(*fileinfo,"update")
- //log.Println((*fileinfo))
- (*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk]=*fileinfo
- //asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
- //qwer := asdf["attachments"].(map[string]interface{})
- //qwer[attk] =*fileinfo
- //log.Println((*qmap)[MgoFileFiled])
- updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
- "$set": bson.M{
- MgoFileFiled: (*qmap)[MgoFileFiled],
- },
- })
- if updateBool{
- *updatenum++
- mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
- "$set": bson.M{
- "updatefileNum": &updatenum,
- },})
- log.Println(mid, "mongo更新数据成功")
- }else {
- log.Println(mid, "mongo更新数据失败",qu.ObjToString((*fileinfo)["fid"]))
- }
- nowHour := time.Now().Hour()
- rdlock.Lock()
- if nowHour != hourNum{
- log.Println("send email:",SendMail(fmt.Sprint(updateBool,mid)))
- hourNum = nowHour
- }
- rdlock.Unlock()
- } else {
- log.Println(mid, "调用rpc服务解析异常:",mid,qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
- }
- }
- var hourNum int
- var rdlock sync.RWMutex
- func SendMail( body string ) error {
- //定义邮箱服务器连接信息,如果是阿里邮箱 pass填密码,qq邮箱填授权码
- mailConn := map[string]string {
- "user": "550838476@qq.com",
- "pass": "",
- "host": "smtp.qq.com",
- "port": "465",
- }
- port, _ := strconv.Atoi(mailConn["port"]) //转换端口类型为int
- m := gomail.NewMessage()
- m.SetHeader("From","Get to" + "<" + mailConn["user"] + ">") //这种方式可以添加别名,即“XD Game”, 也可以直接用<code>m.SetHeader("From",mailConn["user"])</code> 读者可以自行实验下效果
- m.SetHeader("To", []string{"550838476@qq.com"}...) //发送给多个用户
- m.SetHeader("Subject", "MongoId") //设置邮件主题
- m.SetBody("text/html","服务器:"+ body) //设置邮件正文
- d := gomail.NewDialer(mailConn["host"], port, mailConn["user"], mailConn["pass"])
- err := d.DialAndSend(m)
- return err
- }
- func forfunc(lid string) {
- for {
- //查询最后一个id
- lastObjectId, _ := mongodbutil.Mgo.Find(MgoC,nil,"-_id",bson.M{"_id":1},true,-1,-1)
- lastId,ok := (*lastObjectId)[0]["_id"].(bson.ObjectId)
- log.Println("lastID:",lastId)
- //查询最后一个id出错重新查询
- if!ok{//转换失败
- log.Println("查询异常",*lastObjectId)
- time.Sleep(time.Minute)
- continue
- }
- //查询最后一个id等于上一轮的id就重新查询
- if lastId.Hex() == lid {
- log.Println("没有新数据",lastId.Hex())
- SendMail(time.Now().String()+"没有最新数据,当前最后一条数据id:"+lastId.Hex())
- time.Sleep(time.Hour)
- continue
- }
- //不相等说明有新数据,进行下次处理
- m := map[string]string{
- "gtid":lid,//上一轮结束的最后id
- "lteid":lastId.Hex(),//新一轮查询出来的id
- }
- bytes, _ := json.Marshal(m)
- //发送udp
- err := udpclient.WriteUdp(bytes,mu.OP_TYPE_DATA,&net.UDPAddr{
- IP: net.ParseIP( util.ObjToString(Sysconfig["udpip"])),
- Port: util.IntAll(Sysconfig["udpport"]),
- })
- if err != nil{
- log.Println("发送udp失败",err,string(bytes))
- time.Sleep(time.Minute)
- continue
- }
- SendMail(time.Now().String()+fmt.Sprint("发送udp成功,gtid:",lid,",lteid:",lastId.Hex()))
- log.Println("发送udp成功,gtid:",lid,",lteid:",lastId.Hex())
- break//发送完后终止循环
- }
- }
|