|
@@ -2,8 +2,6 @@ package main
|
|
|
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
- "fmt"
|
|
|
- "github.com/go-gomail/gomail"
|
|
|
"gopkg.in/mgo.v2/bson"
|
|
|
"jy/mongodbutil"
|
|
|
"log"
|
|
@@ -11,9 +9,7 @@ import (
|
|
|
"net"
|
|
|
"net/rpc"
|
|
|
"path"
|
|
|
- "qfw/common/src/qfw/util"
|
|
|
qu "qfw/util"
|
|
|
- "strconv"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"time"
|
|
@@ -21,24 +17,24 @@ import (
|
|
|
|
|
|
var udpclient mu.UdpClient //udp对象
|
|
|
var Sysconfig map[string]interface{}
|
|
|
-var MgoIP, MgoDB, MgoC, MgoFileFiled string
|
|
|
+var MgoIP, MgoDB, MgoC, MgoFileFiled, GetDataIp, GetDataPort string
|
|
|
+var sys sync.RWMutex
|
|
|
var ChanB chan bool
|
|
|
-var PageSize int
|
|
|
|
|
|
func init() {
|
|
|
qu.ReadConfig(&Sysconfig)
|
|
|
- MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"])
|
|
|
- MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"])
|
|
|
- MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"])
|
|
|
- PageSize = qu.IntAllDef(Sysconfig["PageSize"],2000)
|
|
|
+ MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
|
|
|
+ MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
|
|
|
+ MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
|
|
|
+ GetDataIp = qu.ObjToString(Sysconfig["get_data_ip"])
|
|
|
+ GetDataPort = qu.ObjToString(Sysconfig["get_data_port"])
|
|
|
MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
|
|
|
- if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" ||PageSize <=0{
|
|
|
+ if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
|
|
|
log.Println("获取配置文件参数失败", Sysconfig)
|
|
|
return
|
|
|
}
|
|
|
mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
|
|
|
log.Println(mongodbutil.Mgo.Get().Ping())
|
|
|
- ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5))
|
|
|
}
|
|
|
|
|
|
func main() {
|
|
@@ -46,121 +42,75 @@ func main() {
|
|
|
udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
|
|
|
udpclient.Listen(processUdpMsg)
|
|
|
log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
|
|
|
+ go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, &net.UDPAddr{
|
|
|
+ IP: net.ParseIP(GetDataIp),
|
|
|
+ Port: qu.IntAll(GetDataPort),
|
|
|
+ })
|
|
|
b := make(chan bool, 1)
|
|
|
<-b
|
|
|
}
|
|
|
+
|
|
|
// "file2text": "192.168.3.207:1234",
|
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
defer qu.Catch()
|
|
|
switch act {
|
|
|
case mu.OP_TYPE_DATA:
|
|
|
+ sys.Lock()
|
|
|
var mapInfo map[string]interface{}
|
|
|
err := json.Unmarshal(data, &mapInfo)
|
|
|
if err != nil {
|
|
|
log.Println("json err :", err, string(data))
|
|
|
+ time.Sleep(time.Second * 30)
|
|
|
+ go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, ra)
|
|
|
return
|
|
|
}
|
|
|
- log.Println("updocr接收数据:",mapInfo)
|
|
|
- stime :=time.Now()
|
|
|
- gid := strings.TrimSpace(mapInfo["gtid"].(string))
|
|
|
- rgid := gid
|
|
|
- lid := strings.TrimSpace(mapInfo["lteid"].(string))
|
|
|
- //err = udpclient.WriteUdp([]byte("updocr接收数据成功"), mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
|
- // IP: net.ParseIP(Sysconfig["toudpip"].(string)),
|
|
|
- // Port: qu.IntAll(Sysconfig["toudpport"]),
|
|
|
- //})
|
|
|
- ////forfunc(lid)
|
|
|
- //log.Println("接收数据成功,发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err)
|
|
|
- if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
|
|
|
- var jsq int64
|
|
|
- query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid),"$lte": bson.ObjectIdHex(lid),}}
|
|
|
- log.Println("query---:", query)
|
|
|
- sum :=mongodbutil.Mgo.Count(MgoC,query)
|
|
|
- log.Println("sum:", sum)
|
|
|
- pageNum := (sum + PageSize - 1) / PageSize
|
|
|
- limit := PageSize
|
|
|
- if sum < PageSize {
|
|
|
- limit = sum
|
|
|
- }
|
|
|
- for i := 0; i < pageNum; i++ {
|
|
|
- query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}}
|
|
|
- log.Println("page=", i+1,"query=", query,limit)
|
|
|
- list, b := mongodbutil.Mgo.Find(MgoC,query,nil,bson.M{"_id": 1,MgoFileFiled:1},false,0, limit)
|
|
|
- if !b{
|
|
|
- log.Println("查询失败")
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
- for _,v:=range *list {
|
|
|
- gid = qu.BsonIdToSId(v["_id"])
|
|
|
- jsq++
|
|
|
- updateNum :=0
|
|
|
- qmap := qu.ObjToMap(v)
|
|
|
- mid := (*qmap)["_id"]
|
|
|
- if v, ok := (*qmap)[MgoFileFiled].(map[string]interface{}); !ok {
|
|
|
- //log.Println(mid, "mgo 没有字段", MgoFileFiled)
|
|
|
+ ObjectId := qu.ObjToString(mapInfo["id"])
|
|
|
+ if ObjectId == "" || !bson.IsObjectIdHex(ObjectId) {
|
|
|
+ log.Println("获取数据id错误", mapInfo)
|
|
|
+ time.Sleep(time.Second * 10)
|
|
|
+ go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, ra)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ sys.Unlock()
|
|
|
+ log.Println("获取数据成功:", mapInfo)
|
|
|
+ data, _ := mongodbutil.Mgo.FindById(MgoC, ObjectId, bson.M{"_id": 1, MgoFileFiled: 1})
|
|
|
+ if len(*data) == 0 {
|
|
|
+ go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, ra)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if v, ok := (*data)[MgoFileFiled].(map[string]interface{}); !ok {
|
|
|
+ go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, ra)
|
|
|
+ return
|
|
|
+ } else {
|
|
|
+ switch v["attachments"].(type) {
|
|
|
+ case map[string]interface{}:
|
|
|
+ att := v["attachments"].(map[string]interface{})
|
|
|
+ updateNum := 0
|
|
|
+ for attk, vaatt := range att {
|
|
|
+ if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
|
|
|
+ //log.Println(mid, "mgo 结构体转换失败", vaatt)
|
|
|
continue
|
|
|
} else {
|
|
|
- switch v["attachments"].(type) {
|
|
|
- case map[string]interface{}:
|
|
|
- att := v["attachments"].(map[string]interface{})
|
|
|
- for attk, vaatt := range att {
|
|
|
- if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
|
|
|
- //log.Println(mid, "mgo 结构体转换失败", vaatt)
|
|
|
- continue
|
|
|
- } else {
|
|
|
- ChanB <- true
|
|
|
- if qu.ObjToString(fileinfo["fid"]) ==""{
|
|
|
- <-ChanB
|
|
|
- //log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
|
|
|
- continue
|
|
|
- }
|
|
|
- //if (strings.Contains(qu.ObjToString(fileinfo["url"]),"fs.qmx.top")|| strings.Contains(qu.ObjToString(fileinfo["url"]),"fj1.jianyu360.com"))&& (strings.TrimSpace(qu.ObjToString(fileinfo["content"]))==""||strings.Contains(qu.ObjToString(fileinfo["content"]),"error") ){
|
|
|
- // save(mid,attk, qmap, &fileinfo,&updateNum)
|
|
|
- // <-ChanB
|
|
|
- //}else {
|
|
|
- // <-ChanB
|
|
|
- //}
|
|
|
- //if qu.ObjToString(fileinfo["update"]) ==""{
|
|
|
- // <-ChanB
|
|
|
- // log.Println(mid, "mgo ", MgoFileFiled,"没有update ")
|
|
|
- // continue
|
|
|
- //}
|
|
|
- save(mid,attk, qmap, &fileinfo,&updateNum)
|
|
|
- <-ChanB
|
|
|
- }
|
|
|
- }
|
|
|
+ ChanB <- true
|
|
|
+ if qu.ObjToString(fileinfo["fid"]) == "" {
|
|
|
+ <-ChanB
|
|
|
+ //log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
|
|
|
+ continue
|
|
|
}
|
|
|
+ save(bson.ObjectIdHex(ObjectId), attk, &v, &fileinfo, &updateNum)
|
|
|
+ <-ChanB
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- //发送udp信号
|
|
|
- by, _ := json.Marshal(map[string]interface{}{
|
|
|
- "gtid": rgid,
|
|
|
- "lteid": lid,
|
|
|
- "stype": "fujian",
|
|
|
- })
|
|
|
- err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
|
- IP: net.ParseIP(Sysconfig["toudpip"].(string)),
|
|
|
- Port: qu.IntAll(Sysconfig["toudpport"]),
|
|
|
- })
|
|
|
- //识别完以后再次查询数据库,进行下一轮识别
|
|
|
- log.Println("处理查询数据结束...",jsq,time.Now().Sub(stime))
|
|
|
- SendMail(rgid+"--->"+lid+"处理完成")
|
|
|
- //进行下一轮识别
|
|
|
- forfunc(lid)
|
|
|
- log.Println("发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err)
|
|
|
- } else {
|
|
|
- log.Println("开始id或结束id参数错误:", string(data))
|
|
|
+ go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, ra)
|
|
|
}
|
|
|
-
|
|
|
case mu.OP_NOOP: //下个节点回应
|
|
|
log.Println("接收成功", string(data))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
-func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},updatenum *int) {
|
|
|
+func save(mid interface{}, attk string, qmap, fileinfo *map[string]interface{}, updatenum *int) {
|
|
|
defer qu.Catch()
|
|
|
type FileData struct {
|
|
|
ObjId string //Id
|
|
@@ -180,17 +130,17 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},up
|
|
|
//bs, _ := ioutil.ReadFile("1.docx")
|
|
|
var fffpath string
|
|
|
fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"]))
|
|
|
- if strings.TrimSpace(fffpath) == ""{
|
|
|
- fffpath = qu.ObjToString((*fileinfo)["ftype"])
|
|
|
- }else {
|
|
|
+ if strings.TrimSpace(fffpath) == "" {
|
|
|
+ fffpath = qu.ObjToString((*fileinfo)["ftype"])
|
|
|
+ } else {
|
|
|
fffpath = fffpath[1:]
|
|
|
}
|
|
|
fileData := &FileData{
|
|
|
- ObjId:mid.(bson.ObjectId).String(),
|
|
|
+ ObjId: mid.(bson.ObjectId).String(),
|
|
|
OrgUrl: qu.ObjToString((*fileinfo)["url"]),
|
|
|
- Name: qu.ObjToString((*fileinfo)["filename"]),
|
|
|
- Fid: qu.ObjToString((*fileinfo)["fid"]), //附件id
|
|
|
- Type: fffpath,
|
|
|
+ Name: qu.ObjToString((*fileinfo)["filename"]),
|
|
|
+ Fid: qu.ObjToString((*fileinfo)["fid"]), //附件id
|
|
|
+ Type: fffpath,
|
|
|
}
|
|
|
//log.Println(mid, fileData)
|
|
|
err = client.Call("FileToText.FileToContext", fileData, &reply)
|
|
@@ -208,14 +158,14 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},up
|
|
|
// {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
|
|
|
//}
|
|
|
//reply, _ = json.Marshal(testfiles)
|
|
|
- if len(reply) == 0{
|
|
|
- log.Println(mid, "rpc返回数据为空:",qu.ObjToString((*fileinfo)["fid"]), string(reply))
|
|
|
+ if len(reply) == 0 {
|
|
|
+ log.Println(mid, "rpc返回数据为空:", qu.ObjToString((*fileinfo)["fid"]), string(reply))
|
|
|
return
|
|
|
}
|
|
|
//log.Println(mid, string(reply))
|
|
|
rdata := make(map[string]interface{})
|
|
|
if err := json.Unmarshal(reply, &rdata); err != nil {
|
|
|
- log.Println(mid, "rpc返回数据解析失败:",qu.ObjToString((*fileinfo)["fid"]), err)
|
|
|
+ log.Println(mid, "rpc返回数据解析失败:", qu.ObjToString((*fileinfo)["fid"]), err)
|
|
|
return
|
|
|
}
|
|
|
if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
|
|
@@ -225,10 +175,10 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},up
|
|
|
(*fileinfo)["content"] = rdata["context"]
|
|
|
}
|
|
|
(*fileinfo)["expend"] = rdata["expend"]
|
|
|
- delete(*fileinfo,"update")
|
|
|
+ delete(*fileinfo, "update")
|
|
|
//log.Println((*fileinfo))
|
|
|
|
|
|
- (*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk]=*fileinfo
|
|
|
+ (*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk] = *fileinfo
|
|
|
//asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
|
|
|
//qwer := asdf["attachments"].(map[string]interface{})
|
|
|
//qwer[attk] =*fileinfo
|
|
@@ -239,91 +189,29 @@ func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},up
|
|
|
MgoFileFiled: (*qmap)[MgoFileFiled],
|
|
|
},
|
|
|
})
|
|
|
- if updateBool{
|
|
|
+ if updateBool {
|
|
|
*updatenum++
|
|
|
mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
|
|
|
"$set": bson.M{
|
|
|
"updatefileNum": &updatenum,
|
|
|
},})
|
|
|
log.Println(mid, "mongo更新数据成功")
|
|
|
- }else {
|
|
|
- log.Println(mid, "mongo更新数据失败",qu.ObjToString((*fileinfo)["fid"]))
|
|
|
- }
|
|
|
- nowHour := time.Now().Hour()
|
|
|
- rdlock.Lock()
|
|
|
- if nowHour != hourNum{
|
|
|
- log.Println("send email:",SendMail(fmt.Sprint(updateBool,mid)))
|
|
|
- hourNum = nowHour
|
|
|
+ } else {
|
|
|
+ log.Println(mid, "mongo更新数据失败", qu.ObjToString((*fileinfo)["fid"]))
|
|
|
}
|
|
|
- rdlock.Unlock()
|
|
|
+ //nowHour := time.Now().Hour()
|
|
|
+ //rdlock.Lock()
|
|
|
+ //if nowHour != hourNum {
|
|
|
+ // log.Println("send email:", SendMail(fmt.Sprint(updateBool, mid)))
|
|
|
+ // hourNum = nowHour
|
|
|
+ //}
|
|
|
+ //rdlock.Unlock()
|
|
|
} else {
|
|
|
- log.Println(mid, "调用rpc服务解析异常:",mid,qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
|
|
|
- }
|
|
|
-
|
|
|
-}
|
|
|
-var hourNum int
|
|
|
-var rdlock sync.RWMutex
|
|
|
-func SendMail( body string ) error {
|
|
|
- //定义邮箱服务器连接信息,如果是阿里邮箱 pass填密码,qq邮箱填授权码
|
|
|
- mailConn := map[string]string {
|
|
|
- "user": "550838476@qq.com",
|
|
|
- "pass": "",
|
|
|
- "host": "smtp.qq.com",
|
|
|
- "port": "465",
|
|
|
+ log.Println(mid, "调用rpc服务解析异常:", mid, qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
|
|
|
}
|
|
|
|
|
|
- port, _ := strconv.Atoi(mailConn["port"]) //转换端口类型为int
|
|
|
-
|
|
|
- m := gomail.NewMessage()
|
|
|
- m.SetHeader("From","Get to" + "<" + mailConn["user"] + ">") //这种方式可以添加别名,即“XD Game”, 也可以直接用<code>m.SetHeader("From",mailConn["user"])</code> 读者可以自行实验下效果
|
|
|
- m.SetHeader("To", []string{"550838476@qq.com"}...) //发送给多个用户
|
|
|
- m.SetHeader("Subject", "MongoId") //设置邮件主题
|
|
|
- m.SetBody("text/html","服务器:"+ body) //设置邮件正文
|
|
|
-
|
|
|
- d := gomail.NewDialer(mailConn["host"], port, mailConn["user"], mailConn["pass"])
|
|
|
-
|
|
|
- err := d.DialAndSend(m)
|
|
|
- return err
|
|
|
-
|
|
|
}
|
|
|
+//
|
|
|
+//var hourNum int
|
|
|
+//var rdlock sync.RWMutex
|
|
|
|
|
|
-func forfunc(lid string) {
|
|
|
- for {
|
|
|
- //查询最后一个id
|
|
|
- lastObjectId, _ := mongodbutil.Mgo.Find(MgoC,nil,"-_id",bson.M{"_id":1},true,-1,-1)
|
|
|
- lastId,ok := (*lastObjectId)[0]["_id"].(bson.ObjectId)
|
|
|
- log.Println("lastID:",lastId)
|
|
|
- //查询最后一个id出错重新查询
|
|
|
- if!ok{//转换失败
|
|
|
- log.Println("查询异常",*lastObjectId)
|
|
|
- time.Sleep(time.Minute)
|
|
|
- continue
|
|
|
- }
|
|
|
- //查询最后一个id等于上一轮的id就重新查询
|
|
|
- if lastId.Hex() == lid {
|
|
|
- log.Println("没有新数据",lastId.Hex())
|
|
|
- SendMail(time.Now().String()+"没有最新数据,当前最后一条数据id:"+lastId.Hex())
|
|
|
- time.Sleep(time.Hour)
|
|
|
- continue
|
|
|
- }
|
|
|
- //不相等说明有新数据,进行下次处理
|
|
|
- m := map[string]string{
|
|
|
- "gtid":lid,//上一轮结束的最后id
|
|
|
- "lteid":lastId.Hex(),//新一轮查询出来的id
|
|
|
- }
|
|
|
- bytes, _ := json.Marshal(m)
|
|
|
- //发送udp
|
|
|
- err := udpclient.WriteUdp(bytes,mu.OP_TYPE_DATA,&net.UDPAddr{
|
|
|
- IP: net.ParseIP( util.ObjToString(Sysconfig["udpip"])),
|
|
|
- Port: util.IntAll(Sysconfig["udpport"]),
|
|
|
- })
|
|
|
- if err != nil{
|
|
|
- log.Println("发送udp失败",err,string(bytes))
|
|
|
- time.Sleep(time.Minute)
|
|
|
- continue
|
|
|
- }
|
|
|
- SendMail(time.Now().String()+fmt.Sprint("发送udp成功,gtid:",lid,",lteid:",lastId.Hex()))
|
|
|
- log.Println("发送udp成功,gtid:",lid,",lteid:",lastId.Hex())
|
|
|
- break//发送完后终止循环
|
|
|
- }
|
|
|
-}
|