main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/go-gomail/gomail"
  6. "gopkg.in/mgo.v2/bson"
  7. "jy/mongodbutil"
  8. "log"
  9. mu "mfw/util"
  10. "net"
  11. "net/rpc"
  12. "path"
  13. "qfw/common/src/qfw/util"
  14. qu "qfw/util"
  15. "strconv"
  16. "strings"
  17. "sync"
  18. "time"
  19. )
  20. var udpclient mu.UdpClient //udp对象
  21. var Sysconfig map[string]interface{}
  22. var MgoIP, MgoDB, MgoC, MgoFileFiled string
  23. var ChanB chan bool
  24. var PageSize int
  25. func init() {
  26. qu.ReadConfig(&Sysconfig)
  27. MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"])
  28. MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"])
  29. MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"])
  30. PageSize = qu.IntAllDef(Sysconfig["PageSize"],2000)
  31. MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
  32. if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" ||PageSize <=0{
  33. log.Println("获取配置文件参数失败", Sysconfig)
  34. return
  35. }
  36. mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
  37. log.Println(mongodbutil.Mgo.Get().Ping())
  38. ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5))
  39. }
  40. func main() {
  41. log.Println(Sysconfig)
  42. udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
  43. udpclient.Listen(processUdpMsg)
  44. log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
  45. b := make(chan bool, 1)
  46. <-b
  47. }
  48. // "file2text": "192.168.3.207:1234",
  49. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  50. defer qu.Catch()
  51. switch act {
  52. case mu.OP_TYPE_DATA:
  53. var mapInfo map[string]interface{}
  54. err := json.Unmarshal(data, &mapInfo)
  55. if err != nil {
  56. log.Println("json err :", err, string(data))
  57. return
  58. }
  59. log.Println("updocr接收数据:",mapInfo)
  60. stime :=time.Now()
  61. gid := strings.TrimSpace(mapInfo["gtid"].(string))
  62. lid := strings.TrimSpace(mapInfo["lteid"].(string))
  63. //err = udpclient.WriteUdp([]byte("updocr接收数据成功"), mu.OP_TYPE_DATA, &net.UDPAddr{
  64. // IP: net.ParseIP(Sysconfig["toudpip"].(string)),
  65. // Port: qu.IntAll(Sysconfig["toudpport"]),
  66. //})
  67. ////forfunc(lid)
  68. //log.Println("接收数据成功,发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err)
  69. if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
  70. var jsq int64
  71. query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid),"$lte": bson.ObjectIdHex(lid),}}
  72. log.Println("query---:", query)
  73. sum :=mongodbutil.Mgo.Count(MgoC,query)
  74. log.Println("sum:", sum)
  75. pageNum := (sum + PageSize - 1) / PageSize
  76. limit := PageSize
  77. if sum < PageSize {
  78. limit = sum
  79. }
  80. for i := 0; i < pageNum; i++ {
  81. query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}}
  82. log.Println("page=", i+1,"query=", query,limit)
  83. list, b := mongodbutil.Mgo.Find(MgoC,query,nil,bson.M{"_id": 1,MgoFileFiled:1},false,0, limit)
  84. if !b{
  85. log.Println("查询失败")
  86. continue
  87. }
  88. for _,v:=range *list {
  89. gid = qu.BsonIdToSId(v["_id"])
  90. jsq++
  91. updateNum :=0
  92. qmap := qu.ObjToMap(v)
  93. mid := (*qmap)["_id"]
  94. if v, ok := (*qmap)[MgoFileFiled].(map[string]interface{}); !ok {
  95. //log.Println(mid, "mgo 没有字段", MgoFileFiled)
  96. continue
  97. } else {
  98. switch v["attachments"].(type) {
  99. case map[string]interface{}:
  100. att := v["attachments"].(map[string]interface{})
  101. for attk, vaatt := range att {
  102. if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
  103. //log.Println(mid, "mgo 结构体转换失败", vaatt)
  104. continue
  105. } else {
  106. ChanB <- true
  107. if qu.ObjToString(fileinfo["fid"]) ==""{
  108. <-ChanB
  109. log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
  110. continue
  111. }
  112. //if (strings.Contains(qu.ObjToString(fileinfo["url"]),"fs.qmx.top")|| strings.Contains(qu.ObjToString(fileinfo["url"]),"fj1.jianyu360.com"))&& (strings.TrimSpace(qu.ObjToString(fileinfo["content"]))==""||strings.Contains(qu.ObjToString(fileinfo["content"]),"error") ){
  113. // save(mid,attk, qmap, &fileinfo,&updateNum)
  114. // <-ChanB
  115. //}else {
  116. // <-ChanB
  117. //}
  118. //if qu.ObjToString(fileinfo["update"]) ==""{
  119. // <-ChanB
  120. // log.Println(mid, "mgo ", MgoFileFiled,"没有update ")
  121. // continue
  122. //}
  123. save(mid,attk, qmap, &fileinfo,&updateNum)
  124. <-ChanB
  125. }
  126. }
  127. }
  128. }
  129. }
  130. }
  131. //识别完以后再次查询数据库,进行下一轮识别
  132. log.Println("处理查询数据结束...",jsq,time.Now().Sub(stime))
  133. by, _ := json.Marshal(map[string]interface{}{
  134. "gtid": gid,
  135. "lteid": lid,
  136. "stype": "fujian",
  137. })
  138. err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  139. IP: net.ParseIP(Sysconfig["toudpip"].(string)),
  140. Port: qu.IntAll(Sysconfig["toudpport"]),
  141. })
  142. //SendMail("处理完成")
  143. //进行下一轮识别
  144. //forfunc(lid)
  145. log.Println("发送到:",Sysconfig["toudpip"].(string),Sysconfig["toudpport"],err)
  146. } else {
  147. log.Println("开始id或结束id参数错误:", string(data))
  148. }
  149. case mu.OP_NOOP: //下个节点回应
  150. log.Println("接收成功", string(data))
  151. }
  152. }
  153. func save(mid interface{},attk string, qmap, fileinfo *map[string]interface{},updatenum *int) {
  154. defer qu.Catch()
  155. type FileData struct {
  156. ObjId string //Id
  157. OrgUrl string //源下载地址
  158. Fid string
  159. Name string
  160. Type string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
  161. Content string //识别内容
  162. }
  163. client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
  164. if err != nil {
  165. log.Println(mid, "rpc err :", err)
  166. return
  167. }
  168. defer client.Close()
  169. var reply []byte
  170. //bs, _ := ioutil.ReadFile("1.docx")
  171. var fffpath string
  172. fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"]))
  173. if strings.TrimSpace(fffpath) == ""{
  174. fffpath = qu.ObjToString((*fileinfo)["ftype"])
  175. }else {
  176. fffpath = fffpath[1:]
  177. }
  178. fileData := &FileData{
  179. ObjId:mid.(bson.ObjectId).String(),
  180. OrgUrl: qu.ObjToString((*fileinfo)["url"]),
  181. Name: qu.ObjToString((*fileinfo)["filename"]),
  182. Fid: qu.ObjToString((*fileinfo)["fid"]), //附件id
  183. Type: fffpath,
  184. }
  185. //log.Println(mid, fileData)
  186. err = client.Call("FileToText.FileToContext", fileData, &reply)
  187. if err != nil {
  188. log.Println(mid, "call ocr error:", err)
  189. return
  190. }
  191. //fileinfo["ftype"] = "doc"
  192. //reply = []byte("jdsfkldasjflkj")
  193. //fileinfo["ftype"] = "zip"
  194. //testfiles := []map[string]interface {
  195. //}{
  196. // {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
  197. // {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
  198. // {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
  199. //}
  200. //reply, _ = json.Marshal(testfiles)
  201. if len(reply) == 0{
  202. log.Println(mid, "rpc返回数据为空:",qu.ObjToString((*fileinfo)["fid"]), string(reply))
  203. return
  204. }
  205. //log.Println(mid, string(reply))
  206. rdata := make(map[string]interface{})
  207. if err := json.Unmarshal(reply, &rdata); err != nil {
  208. log.Println(mid, "rpc返回数据解析失败:",qu.ObjToString((*fileinfo)["fid"]), err)
  209. return
  210. }
  211. if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
  212. if qu.ObjToString((*fileinfo)["ftype"]) == "rar" || qu.ObjToString((*fileinfo)["ftype"]) == "zip" {
  213. (*fileinfo)["content"] = rdata["contextc"]
  214. } else {
  215. (*fileinfo)["content"] = rdata["context"]
  216. }
  217. (*fileinfo)["expend"] = rdata["expend"]
  218. delete(*fileinfo,"update")
  219. //log.Println((*fileinfo))
  220. (*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk]=*fileinfo
  221. //asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
  222. //qwer := asdf["attachments"].(map[string]interface{})
  223. //qwer[attk] =*fileinfo
  224. //log.Println((*qmap)[MgoFileFiled])
  225. updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  226. "$set": bson.M{
  227. MgoFileFiled: (*qmap)[MgoFileFiled],
  228. },
  229. })
  230. if updateBool{
  231. *updatenum++
  232. mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  233. "$set": bson.M{
  234. "updatefileNum": &updatenum,
  235. },})
  236. log.Println(mid, "mongo更新数据成功")
  237. }else {
  238. log.Println(mid, "mongo更新数据失败",qu.ObjToString((*fileinfo)["fid"]))
  239. }
  240. nowHour := time.Now().Hour()
  241. rdlock.Lock()
  242. if nowHour != hourNum{
  243. log.Println("send email:",SendMail(fmt.Sprint(updateBool,mid)))
  244. hourNum = nowHour
  245. }
  246. rdlock.Unlock()
  247. } else {
  248. log.Println(mid, "调用rpc服务解析异常:",mid,qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
  249. }
  250. }
  251. var hourNum int
  252. var rdlock sync.RWMutex
  253. func SendMail( body string ) error {
  254. //定义邮箱服务器连接信息,如果是阿里邮箱 pass填密码,qq邮箱填授权码
  255. mailConn := map[string]string {
  256. "user": "550838476@qq.com",
  257. "pass": "",
  258. "host": "smtp.qq.com",
  259. "port": "465",
  260. }
  261. port, _ := strconv.Atoi(mailConn["port"]) //转换端口类型为int
  262. m := gomail.NewMessage()
  263. m.SetHeader("From","Get to" + "<" + mailConn["user"] + ">") //这种方式可以添加别名,即“XD Game”, 也可以直接用<code>m.SetHeader("From",mailConn["user"])</code> 读者可以自行实验下效果
  264. m.SetHeader("To", []string{"550838476@qq.com"}...) //发送给多个用户
  265. m.SetHeader("Subject", "MongoId") //设置邮件主题
  266. m.SetBody("text/html","服务器五:"+ body) //设置邮件正文
  267. d := gomail.NewDialer(mailConn["host"], port, mailConn["user"], mailConn["pass"])
  268. err := d.DialAndSend(m)
  269. return err
  270. }
  271. func forfunc(lid string) {
  272. for {
  273. //查询最后一个id
  274. lastObjectId, _ := mongodbutil.Mgo.Find(MgoC,nil,"-_id",bson.M{"_id":1},true,-1,-1)
  275. lastId,ok := (*lastObjectId)[0]["_id"].(bson.ObjectId)
  276. log.Println("lastID:",lastId)
  277. //查询最后一个id出错重新查询
  278. if!ok{//转换失败
  279. log.Println("查询异常",*lastObjectId)
  280. time.Sleep(time.Minute)
  281. continue
  282. }
  283. //查询最后一个id等于上一轮的id就重新查询
  284. if lastId.Hex() == lid {
  285. log.Println("没有新数据",lastId.Hex())
  286. SendMail(time.Now().String()+"没有最新数据,当前最后一条数据id:"+lastId.Hex())
  287. time.Sleep(time.Hour)
  288. continue
  289. }
  290. //不相等说明有新数据,进行下次处理
  291. m := map[string]string{
  292. "gtid":lid,//上一轮结束的最后id
  293. "lteid":lastId.Hex(),//新一轮查询出来的id
  294. }
  295. bytes, _ := json.Marshal(m)
  296. //发送udp
  297. err := udpclient.WriteUdp(bytes,mu.OP_TYPE_DATA,&net.UDPAddr{
  298. IP: net.ParseIP( util.ObjToString(Sysconfig["udpip"])),
  299. Port: util.IntAll(Sysconfig["udpport"]),
  300. })
  301. if err != nil{
  302. log.Println("发送udp失败",err,string(bytes))
  303. time.Sleep(time.Minute)
  304. continue
  305. }
  306. SendMail(time.Now().String()+fmt.Sprint("发送udp成功,gtid:",lid,",lteid:",lastId.Hex()))
  307. log.Println("发送udp成功,gtid:",lid,",lteid:",lastId.Hex())
  308. break//发送完后终止循环
  309. }
  310. }