|
@@ -2,6 +2,8 @@ package main
|
|
|
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
+ "fmt"
|
|
|
+ "github.com/go-gomail/gomail"
|
|
|
"gopkg.in/mgo.v2/bson"
|
|
|
"jy/mongodbutil"
|
|
|
"log"
|
|
@@ -9,8 +11,11 @@ import (
|
|
|
"net"
|
|
|
"net/rpc"
|
|
|
"path"
|
|
|
+ "qfw/common/src/qfw/util"
|
|
|
qu "qfw/util"
|
|
|
+ "strconv"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
@@ -61,7 +66,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
lid := strings.TrimSpace(mapInfo["lteid"].(string))
|
|
|
if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
|
|
|
var jsq int64
|
|
|
- query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid),"$lte": bson.ObjectIdHex(lid),}}
|
|
|
+ query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid),"$lte": bson.ObjectIdHex(lid),}}
|
|
|
log.Println("query---:", query)
|
|
|
sum :=mongodbutil.Mgo.Count(MgoC,query)
|
|
|
log.Println("sum:", sum)
|
|
@@ -71,7 +76,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
limit = sum
|
|
|
}
|
|
|
for i := 0; i < pageNum; i++ {
|
|
|
- query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}}
|
|
|
+ query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}}
|
|
|
log.Println("page=", i+1,"query=", query,limit)
|
|
|
list, b := mongodbutil.Mgo.Find(MgoC,query,nil,bson.M{"_id": 1,MgoFileFiled:1},false,0, limit)
|
|
|
if !b{
|
|
@@ -103,6 +108,11 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
|
|
|
continue
|
|
|
}
|
|
|
+ //if qu.ObjToString(fileinfo["update"]) ==""{
|
|
|
+ // <-ChanB
|
|
|
+ // log.Println(mid, "mgo ", MgoFileFiled,"没有update ")
|
|
|
+ // continue
|
|
|
+ //}
|
|
|
save(mid,attk, qmap, &fileinfo,&updateNum)
|
|
|
<-ChanB
|
|
|
}
|
|
@@ -111,7 +121,10 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ //识别完以后再次查询数据库,进行下一轮识别
|
|
|
log.Println("处理查询数据结束...",jsq,time.Now().Sub(stime))
|
|
|
+ //进行下一轮识别
|
|
|
+ forfunc(lid)
|
|
|
} else {
|
|
|
log.Println("开始id或结束id参数错误:", string(data))
|
|
|
}
|
|
@@ -185,6 +198,7 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},up
|
|
|
(*fileinfo)["content"] = rdata["context"]
|
|
|
}
|
|
|
(*fileinfo)["expend"] = rdata["expend"]
|
|
|
+ delete(*fileinfo,"update")
|
|
|
//log.Println((*fileinfo))
|
|
|
|
|
|
(*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk]=*fileinfo
|
|
@@ -208,8 +222,81 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},up
|
|
|
}else {
|
|
|
log.Println(mid, "mongo更新数据失败",qu.ObjToString((*fileinfo)["fid"]))
|
|
|
}
|
|
|
+ nowHour := time.Now().Hour()
|
|
|
+ rdlock.Lock()
|
|
|
+ if nowHour != hourNum{
|
|
|
+ log.Println("send email:",SendMail(fmt.Sprint(updateBool,mid)))
|
|
|
+ hourNum = nowHour
|
|
|
+ }
|
|
|
+ rdlock.Unlock()
|
|
|
} else {
|
|
|
log.Println(mid, "调用rpc服务解析异常:",qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
|
|
|
}
|
|
|
|
|
|
}
|
|
|
+var hourNum int
|
|
|
+var rdlock sync.RWMutex
|
|
|
+func SendMail( body string ) error {
|
|
|
+ //定义邮箱服务器连接信息,如果是阿里邮箱 pass填密码,qq邮箱填授权码
|
|
|
+ mailConn := map[string]string {
|
|
|
+ "user": "550838476@qq.com",
|
|
|
+ "pass": "",
|
|
|
+ "host": "smtp.qq.com",
|
|
|
+ "port": "465",
|
|
|
+ }
|
|
|
+
|
|
|
+ port, _ := strconv.Atoi(mailConn["port"]) //转换端口类型为int
|
|
|
+
|
|
|
+ m := gomail.NewMessage()
|
|
|
+ m.SetHeader("From","Get to" + "<" + mailConn["user"] + ">") //这种方式可以添加别名,即“XD Game”, 也可以直接用<code>m.SetHeader("From",mailConn["user"])</code> 读者可以自行实验下效果
|
|
|
+ m.SetHeader("To", []string{"550838476@qq.com"}...) //发送给多个用户
|
|
|
+ m.SetHeader("Subject", "MongoId") //设置邮件主题
|
|
|
+ m.SetBody("text/html", body) //设置邮件正文
|
|
|
+
|
|
|
+ d := gomail.NewDialer(mailConn["host"], port, mailConn["user"], mailConn["pass"])
|
|
|
+
|
|
|
+ err := d.DialAndSend(m)
|
|
|
+ return err
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func forfunc(lid string) {
|
|
|
+ for {
|
|
|
+ //查询最后一个id
|
|
|
+ lastObjectId, _ := mongodbutil.Mgo.Find(MgoC,nil,"-_id",bson.M{"_id":1},true,-1,-1)
|
|
|
+ lastId,ok := (*lastObjectId)[0]["_id"].(bson.ObjectId)
|
|
|
+ log.Println("lastID:",lastId)
|
|
|
+ //查询最后一个id出错重新查询
|
|
|
+ if!ok{//转换失败
|
|
|
+ log.Println("查询异常",*lastObjectId)
|
|
|
+ time.Sleep(time.Minute)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //查询最后一个id等于上一轮的id就重新查询
|
|
|
+ if lastId.Hex() == lid {
|
|
|
+ log.Println("没有新数据",lastId.Hex())
|
|
|
+ SendMail(time.Now().String()+"没有最新数据,当前最后一条数据id:"+lastId.Hex())
|
|
|
+ time.Sleep(time.Hour)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //不相等说明有新数据,进行下次处理
|
|
|
+ m := map[string]string{
|
|
|
+ "gtid":lid,//上一轮结束的最后id
|
|
|
+ "lteid":lastId.Hex(),//新一轮查询出来的id
|
|
|
+ }
|
|
|
+ bytes, _ := json.Marshal(m)
|
|
|
+ //发送udp
|
|
|
+ err := udpclient.WriteUdp(bytes,mu.OP_TYPE_DATA,&net.UDPAddr{
|
|
|
+ IP: net.ParseIP( util.ObjToString(Sysconfig["udpip"])),
|
|
|
+ Port: util.IntAll(Sysconfig["udpport"]),
|
|
|
+ })
|
|
|
+ if err != nil{
|
|
|
+ log.Println("发送udp失败",err,string(bytes))
|
|
|
+ time.Sleep(time.Minute)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ SendMail(time.Now().String()+fmt.Sprint("发送udp成功,gtid:",lid,",lteid:",lastId.Hex()))
|
|
|
+ log.Println("发送udp成功,gtid:",lid,",lteid:",lastId.Hex())
|
|
|
+ break//发送完后终止循环
|
|
|
+ }
|
|
|
+}
|