main.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "jy/mongodbutil"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "net/rpc"
  10. qu "qfw/util"
  11. "strings"
  12. "gopkg.in/mgo.v2/bson"
  13. )
  14. var udpclient mu.UdpClient //udp对象
  15. var Sysconfig map[string]interface{}
  16. var MgoIP, MgoDB, MgoC, MgoFileFiled string
  17. var ChanB chan bool
  18. func init() {
  19. qu.ReadConfig(&Sysconfig)
  20. MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"])
  21. MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"])
  22. MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"])
  23. MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
  24. if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
  25. log.Println("获取配置文件参数失败", Sysconfig)
  26. return
  27. }
  28. mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
  29. ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5))
  30. }
  31. func main() {
  32. log.Println(Sysconfig)
  33. udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
  34. udpclient.Listen(processUdpMsg)
  35. log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
  36. b := make(chan bool, 1)
  37. <-b
  38. }
  39. // "file2text": "192.168.3.207:1234",
  40. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  41. defer qu.Catch()
  42. switch act {
  43. case mu.OP_TYPE_DATA:
  44. var mapInfo map[string]interface{}
  45. err := json.Unmarshal(data, &mapInfo)
  46. if err != nil {
  47. log.Println("json err :", err, string(data))
  48. return
  49. }
  50. gid := strings.TrimSpace(mapInfo["gtid"].(string))
  51. lid := strings.TrimSpace(mapInfo["lteid"].(string))
  52. if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {
  53. if findAll, b := mongodbutil.Mgo.Find(MgoC,
  54. bson.M{
  55. "_id": bson.M{
  56. "$gte": bson.ObjectIdHex(gid),
  57. "$lte": bson.ObjectIdHex(lid),
  58. },
  59. MgoFileFiled: bson.M{
  60. "$ne": nil,
  61. },
  62. },
  63. //if findAll, b := mongodbutil.Mgo.Find(MgoC, bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}},
  64. nil, `{"_id":"1",`+MgoFileFiled+`:"1"}`, false, -1, -1); !b {
  65. log.Println("查询数据失败 :", string(data))
  66. } else {
  67. fmt.Println(len(*findAll))
  68. if len(*findAll) <= 0 {
  69. log.Println("查询数据为空 :", string(data))
  70. return
  71. }
  72. for _, v := range *findAll {
  73. qmap := *qu.ObjToMap(v)
  74. mid := qmap["_id"]
  75. if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok {
  76. log.Println(mid, "mgo 转换异常", MgoFileFiled)
  77. continue
  78. } else {
  79. switch v["attachments"].(type) {
  80. case map[string]interface{}:
  81. att := v["attachments"].(map[string]interface{})
  82. for _, vaatt := range att {
  83. if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
  84. log.Println(mid, "mgo 结构体转换失败", vaatt)
  85. continue
  86. } else {
  87. ChanB <- true
  88. go save(mid, qmap, fileinfo)
  89. }
  90. }
  91. }
  92. }
  93. //fileMap := *qu.ObjToMap(qmap["projectinfo"])
  94. //fmt.Println(fileMap["attachments"])
  95. }
  96. }
  97. } else {
  98. log.Println("开始id或结束id参数错误:", string(data))
  99. }
  100. case mu.OP_NOOP: //下个节点回应
  101. log.Println("接收成功", string(data))
  102. }
  103. }
  104. func save(mid interface{}, qmap, fileinfo map[string]interface{}) {
  105. defer qu.Catch()
  106. defer func() {
  107. <-ChanB
  108. }()
  109. type FileData struct {
  110. Fid string
  111. Name string
  112. Type string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
  113. Content string //识别内容
  114. }
  115. client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
  116. if err != nil {
  117. log.Println(mid, "rpc err :", err)
  118. return
  119. }
  120. defer client.Close()
  121. var reply []byte
  122. //bs, _ := ioutil.ReadFile("1.docx")
  123. fileData := &FileData{
  124. Name: qu.ObjToString(fileinfo["filename"]),
  125. Fid: qu.ObjToString(fileinfo["fid"]), //附件id
  126. Type: qu.ObjToString(fileinfo["ftype"]),
  127. }
  128. log.Println(mid, fileData)
  129. err = client.Call("FileToText.FileToContext", fileData, &reply)
  130. if err != nil {
  131. log.Println(mid, "call ocr error:", err)
  132. return
  133. }
  134. //fileinfo["ftype"] = "doc"
  135. //reply = []byte("jdsfkldasjflkj")
  136. //fileinfo["ftype"] = "zip"
  137. //testfiles := []map[string]interface {
  138. //}{
  139. // {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
  140. // {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
  141. // {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
  142. //}
  143. //reply, _ = json.Marshal(testfiles)
  144. if len(reply) == 0{
  145. log.Println(mid, "rpc返回数据为空:", string(reply))
  146. return
  147. }
  148. log.Println(mid, string(reply))
  149. rdata := make(map[string]interface{})
  150. if err := json.Unmarshal(reply, &rdata); err != nil {
  151. log.Println(mid, "rpc返回数据解析失败:", err)
  152. return
  153. }
  154. if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
  155. if qu.ObjToString(fileinfo["ftype"]) == "rar" || qu.ObjToString(fileinfo["ftype"]) == "zip" {
  156. fileinfo["content"] = rdata["contextc"]
  157. } else {
  158. fileinfo["content"] = rdata["context"]
  159. }
  160. if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  161. "$set": bson.M{
  162. MgoFileFiled: qmap[MgoFileFiled],
  163. },
  164. }) {
  165. log.Println(mid, "mongo更新数据失败")
  166. } else {
  167. log.Println(mid, "mongo更新数据成功")
  168. }
  169. } else {
  170. log.Println(mid, "调用rpc服务解析异常:", rdata["err"])
  171. }
  172. //if qu.ObjToString(fileinfo["ftype"]) == "zip" || qu.ObjToString(fileinfo["ftype"]) == "rar" {
  173. // fileDatas := make([]map[string]interface{}, 0)
  174. // if err := json.Unmarshal(reply, &fileDatas); err != nil {
  175. // log.Println("json转换错误", mid, err)
  176. // return
  177. // }
  178. // fileinfo["content"] = fileDatas
  179. //} else {
  180. // fileinfo["content"] = string(reply)
  181. //}
  182. //if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  183. // "$set": bson.M{
  184. // MgoFileFiled: qmap[MgoFileFiled],
  185. // },
  186. //}) {
  187. // log.Println(mid, "更新数据失败")
  188. //} else {
  189. // log.Println(mid, "更新数据成功")
  190. //}
  191. }