package main import ( "encoding/json" "fmt" "github.com/go-gomail/gomail" "gopkg.in/mgo.v2/bson" "jy/mongodbutil" "log" mu "mfw/util" "net" "net/rpc" "path" "qfw/common/src/qfw/util" qu "qfw/util" "strconv" "strings" "sync" "time" ) var udpclient mu.UdpClient //udp对象 var Sysconfig map[string]interface{} var MgoIP, MgoDB, MgoC, MgoFileFiled string var ChanB chan bool var PageSize int func init() { qu.ReadConfig(&Sysconfig) MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"]) MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"]) MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"]) PageSize = qu.IntAllDef(Sysconfig["PageSize"],2000) MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo") if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" ||PageSize <=0{ log.Println("获取配置文件参数失败", Sysconfig) return } mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB) log.Println(mongodbutil.Mgo.Get().Ping()) ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5)) } func main() { log.Println(Sysconfig) udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024} udpclient.Listen(processUdpMsg) log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"]) b := make(chan bool, 1) <-b } // "file2text": "192.168.3.207:1234", func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer qu.Catch() switch act { case mu.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { log.Println("json err :", err, string(data)) return } log.Println("updocr接收数据:",mapInfo) stime :=time.Now() gid := strings.TrimSpace(mapInfo["gtid"].(string)) rgid := gid lid := strings.TrimSpace(mapInfo["lteid"].(string)) //err = udpclient.WriteUdp([]byte("updocr接收数据成功"), mu.OP_TYPE_DATA, &net.UDPAddr{ // IP: net.ParseIP(Sysconfig["toudpip"].(string)), // Port: qu.IntAll(Sysconfig["toudpport"]), //}) ////forfunc(lid) //log.Println("接收数据成功,发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err) if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) { var jsq int64 query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid),"$lte": bson.ObjectIdHex(lid),}} log.Println("query---:", query) sum :=mongodbutil.Mgo.Count(MgoC,query) log.Println("sum:", sum) pageNum := (sum + PageSize - 1) / PageSize limit := PageSize if sum < PageSize { limit = sum } for i := 0; i < pageNum; i++ { query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}} log.Println("page=", i+1,"query=", query,limit) list, b := mongodbutil.Mgo.Find(MgoC,query,nil,bson.M{"_id": 1,MgoFileFiled:1},false,0, limit) if !b{ log.Println("查询失败") continue } for _,v:=range *list { gid = qu.BsonIdToSId(v["_id"]) jsq++ updateNum :=0 qmap := qu.ObjToMap(v) mid := (*qmap)["_id"] if v, ok := (*qmap)[MgoFileFiled].(map[string]interface{}); !ok { //log.Println(mid, "mgo 没有字段", MgoFileFiled) continue } else { switch v["attachments"].(type) { case map[string]interface{}: att := v["attachments"].(map[string]interface{}) for attk, vaatt := range att { if fileinfo, ok := vaatt.(map[string]interface{}); !ok { //log.Println(mid, "mgo 结构体转换失败", vaatt) continue } else { ChanB <- true if qu.ObjToString(fileinfo["fid"]) ==""{ <-ChanB //log.Println(mid, "mgo ", MgoFileFiled,"没有fid ") continue } //if (strings.Contains(qu.ObjToString(fileinfo["url"]),"fs.qmx.top")|| strings.Contains(qu.ObjToString(fileinfo["url"]),"fj1.jianyu360.com"))&& (strings.TrimSpace(qu.ObjToString(fileinfo["content"]))==""||strings.Contains(qu.ObjToString(fileinfo["content"]),"error") ){ // save(mid,attk, qmap, &fileinfo,&updateNum) // <-ChanB //}else { // <-ChanB //} //if qu.ObjToString(fileinfo["update"]) ==""{ // <-ChanB // log.Println(mid, "mgo ", MgoFileFiled,"没有update ") // continue //} save(mid,attk, qmap, &fileinfo,&updateNum) <-ChanB } } } } } } //发送udp信号 by, _ := json.Marshal(map[string]interface{}{ "gtid": rgid, "lteid": lid, "stype": "fujian", }) err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(Sysconfig["toudpip"].(string)), Port: qu.IntAll(Sysconfig["toudpport"]), }) //识别完以后再次查询数据库,进行下一轮识别 log.Println("处理查询数据结束...",jsq,time.Now().Sub(stime)) SendMail(rgid+"--->"+lid+"处理完成") //进行下一轮识别 forfunc(lid) log.Println("发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err) } else { log.Println("开始id或结束id参数错误:", string(data)) } case mu.OP_NOOP: //下个节点回应 log.Println("接收成功", string(data)) } } func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},updatenum *int) { defer qu.Catch() type FileData struct { ObjId string //Id OrgUrl string //源下载地址 Fid string Name string Type string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls Content string //识别内容 } client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"])) if err != nil { log.Println(mid, "rpc err :", err) return } defer client.Close() var reply []byte //bs, _ := ioutil.ReadFile("1.docx") var fffpath string fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"])) if strings.TrimSpace(fffpath) == ""{ fffpath = qu.ObjToString((*fileinfo)["ftype"]) }else { fffpath = fffpath[1:] } fileData := &FileData{ ObjId:mid.(bson.ObjectId).String(), OrgUrl: qu.ObjToString((*fileinfo)["url"]), Name: qu.ObjToString((*fileinfo)["filename"]), Fid: qu.ObjToString((*fileinfo)["fid"]), //附件id Type: fffpath, } //log.Println(mid, fileData) err = client.Call("FileToText.FileToContext", fileData, &reply) if err != nil { log.Println(mid, "call ocr error:", err) return } //fileinfo["ftype"] = "doc" //reply = []byte("jdsfkldasjflkj") //fileinfo["ftype"] = "zip" //testfiles := []map[string]interface { //}{ // {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"}, // {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"}, // {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"}, //} //reply, _ = json.Marshal(testfiles) if len(reply) == 0{ log.Println(mid, "rpc返回数据为空:",qu.ObjToString((*fileinfo)["fid"]), string(reply)) return } //log.Println(mid, string(reply)) rdata := make(map[string]interface{}) if err := json.Unmarshal(reply, &rdata); err != nil { log.Println(mid, "rpc返回数据解析失败:",qu.ObjToString((*fileinfo)["fid"]), err) return } if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" { if qu.ObjToString((*fileinfo)["ftype"]) == "rar" || qu.ObjToString((*fileinfo)["ftype"]) == "zip" { (*fileinfo)["content"] = rdata["contextc"] } else { (*fileinfo)["content"] = rdata["context"] } (*fileinfo)["expend"] = rdata["expend"] delete(*fileinfo,"update") //log.Println((*fileinfo)) (*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk]=*fileinfo //asdf := (*qmap)[MgoFileFiled].(map[string]interface{}) //qwer := asdf["attachments"].(map[string]interface{}) //qwer[attk] =*fileinfo //log.Println((*qmap)[MgoFileFiled]) updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{ "$set": bson.M{ MgoFileFiled: (*qmap)[MgoFileFiled], }, }) if updateBool{ *updatenum++ mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{ "$set": bson.M{ "updatefileNum": &updatenum, },}) log.Println(mid, "mongo更新数据成功") }else { log.Println(mid, "mongo更新数据失败",qu.ObjToString((*fileinfo)["fid"])) } nowHour := time.Now().Hour() rdlock.Lock() if nowHour != hourNum{ log.Println("send email:",SendMail(fmt.Sprint(updateBool,mid))) hourNum = nowHour } rdlock.Unlock() } else { log.Println(mid, "调用rpc服务解析异常:",mid,qu.ObjToString((*fileinfo)["fid"]), rdata["err"]) } } var hourNum int var rdlock sync.RWMutex func SendMail( body string ) error { //定义邮箱服务器连接信息,如果是阿里邮箱 pass填密码,qq邮箱填授权码 mailConn := map[string]string { "user": "550838476@qq.com", "pass": "", "host": "smtp.qq.com", "port": "465", } port, _ := strconv.Atoi(mailConn["port"]) //转换端口类型为int m := gomail.NewMessage() m.SetHeader("From","Get to" + "<" + mailConn["user"] + ">") //这种方式可以添加别名,即“XD Game”, 也可以直接用m.SetHeader("From",mailConn["user"]) 读者可以自行实验下效果 m.SetHeader("To", []string{"550838476@qq.com"}...) //发送给多个用户 m.SetHeader("Subject", "MongoId") //设置邮件主题 m.SetBody("text/html","服务器:"+ body) //设置邮件正文 d := gomail.NewDialer(mailConn["host"], port, mailConn["user"], mailConn["pass"]) err := d.DialAndSend(m) return err } func forfunc(lid string) { for { //查询最后一个id lastObjectId, _ := mongodbutil.Mgo.Find(MgoC,nil,"-_id",bson.M{"_id":1},true,-1,-1) lastId,ok := (*lastObjectId)[0]["_id"].(bson.ObjectId) log.Println("lastID:",lastId) //查询最后一个id出错重新查询 if!ok{//转换失败 log.Println("查询异常",*lastObjectId) time.Sleep(time.Minute) continue } //查询最后一个id等于上一轮的id就重新查询 if lastId.Hex() == lid { log.Println("没有新数据",lastId.Hex()) SendMail(time.Now().String()+"没有最新数据,当前最后一条数据id:"+lastId.Hex()) time.Sleep(time.Hour) continue } //不相等说明有新数据,进行下次处理 m := map[string]string{ "gtid":lid,//上一轮结束的最后id "lteid":lastId.Hex(),//新一轮查询出来的id } bytes, _ := json.Marshal(m) //发送udp err := udpclient.WriteUdp(bytes,mu.OP_TYPE_DATA,&net.UDPAddr{ IP: net.ParseIP( util.ObjToString(Sysconfig["udpip"])), Port: util.IntAll(Sysconfig["udpport"]), }) if err != nil{ log.Println("发送udp失败",err,string(bytes)) time.Sleep(time.Minute) continue } SendMail(time.Now().String()+fmt.Sprint("发送udp成功,gtid:",lid,",lteid:",lastId.Hex())) log.Println("发送udp成功,gtid:",lid,",lteid:",lastId.Hex()) break//发送完后终止循环 } }