main.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package main
  2. import (
  3. "encoding/json"
  4. "gopkg.in/mgo.v2/bson"
  5. "jy/mongodbutil"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "net/rpc"
  10. "path"
  11. qu "qfw/util"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. var udpclient mu.UdpClient //udp对象
  17. var Sysconfig map[string]interface{}
  18. var MgoIP, MgoDB, MgoC, MgoFileFiled, GetDataIp, GetDataPort string
  19. var sys sync.RWMutex
  20. var ChanB chan bool
  21. func init() {
  22. qu.ReadConfig(&Sysconfig)
  23. MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
  24. MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
  25. MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
  26. GetDataIp = qu.ObjToString(Sysconfig["get_data_ip"])
  27. GetDataPort = qu.ObjToString(Sysconfig["get_data_port"])
  28. MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
  29. if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
  30. log.Println("获取配置文件参数失败", Sysconfig)
  31. return
  32. }
  33. mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
  34. log.Println(mongodbutil.Mgo.Get().Ping())
  35. }
  36. func main() {
  37. log.Println(Sysconfig)
  38. udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
  39. udpclient.Listen(processUdpMsg)
  40. log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
  41. go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, &net.UDPAddr{
  42. IP: net.ParseIP(GetDataIp),
  43. Port: qu.IntAll(GetDataPort),
  44. })
  45. b := make(chan bool, 1)
  46. <-b
  47. }
  48. // "file2text": "192.168.3.207:1234",
  49. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  50. defer qu.Catch()
  51. switch act {
  52. case mu.OP_TYPE_DATA:
  53. sys.Lock()
  54. var mapInfo map[string]interface{}
  55. err := json.Unmarshal(data, &mapInfo)
  56. if err != nil {
  57. log.Println("json err :", err, string(data))
  58. time.Sleep(time.Second * 30)
  59. go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, ra)
  60. return
  61. }
  62. ObjectId := qu.ObjToString(mapInfo["id"])
  63. if ObjectId == "" || !bson.IsObjectIdHex(ObjectId) {
  64. log.Println("获取数据id错误", mapInfo)
  65. time.Sleep(time.Second * 10)
  66. go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, ra)
  67. return
  68. }
  69. sys.Unlock()
  70. log.Println("获取数据成功:", mapInfo)
  71. data, _ := mongodbutil.Mgo.FindById(MgoC, ObjectId, bson.M{"_id": 1, MgoFileFiled: 1})
  72. if len(*data) == 0 {
  73. go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, ra)
  74. return
  75. }
  76. if v, ok := (*data)[MgoFileFiled].(map[string]interface{}); !ok {
  77. go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, ra)
  78. return
  79. } else {
  80. switch v["attachments"].(type) {
  81. case map[string]interface{}:
  82. att := v["attachments"].(map[string]interface{})
  83. updateNum := 0
  84. for attk, vaatt := range att {
  85. if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
  86. //log.Println(mid, "mgo 结构体转换失败", vaatt)
  87. continue
  88. } else {
  89. ChanB <- true
  90. if qu.ObjToString(fileinfo["fid"]) == "" {
  91. <-ChanB
  92. //log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
  93. continue
  94. }
  95. save(bson.ObjectIdHex(ObjectId), attk, &v, &fileinfo, &updateNum)
  96. <-ChanB
  97. }
  98. }
  99. }
  100. go udpclient.WriteUdp([]byte{}, mu.OP_GET_DOWNLOADERCODE, ra)
  101. }
  102. case mu.OP_NOOP: //下个节点回应
  103. log.Println("接收成功", string(data))
  104. }
  105. }
  106. func save(mid interface{}, attk string, qmap, fileinfo *map[string]interface{}, updatenum *int) {
  107. defer qu.Catch()
  108. type FileData struct {
  109. ObjId string //Id
  110. OrgUrl string //源下载地址
  111. Fid string
  112. Name string
  113. Type string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
  114. Content string //识别内容
  115. }
  116. client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
  117. if err != nil {
  118. log.Println(mid, "rpc err :", err)
  119. return
  120. }
  121. defer client.Close()
  122. var reply []byte
  123. //bs, _ := ioutil.ReadFile("1.docx")
  124. var fffpath string
  125. fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"]))
  126. if strings.TrimSpace(fffpath) == "" {
  127. fffpath = qu.ObjToString((*fileinfo)["ftype"])
  128. } else {
  129. fffpath = fffpath[1:]
  130. }
  131. fileData := &FileData{
  132. ObjId: mid.(bson.ObjectId).String(),
  133. OrgUrl: qu.ObjToString((*fileinfo)["url"]),
  134. Name: qu.ObjToString((*fileinfo)["filename"]),
  135. Fid: qu.ObjToString((*fileinfo)["fid"]), //附件id
  136. Type: fffpath,
  137. }
  138. //log.Println(mid, fileData)
  139. err = client.Call("FileToText.FileToContext", fileData, &reply)
  140. if err != nil {
  141. log.Println(mid, "call ocr error:", err)
  142. return
  143. }
  144. //fileinfo["ftype"] = "doc"
  145. //reply = []byte("jdsfkldasjflkj")
  146. //fileinfo["ftype"] = "zip"
  147. //testfiles := []map[string]interface {
  148. //}{
  149. // {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
  150. // {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
  151. // {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
  152. //}
  153. //reply, _ = json.Marshal(testfiles)
  154. if len(reply) == 0 {
  155. log.Println(mid, "rpc返回数据为空:", qu.ObjToString((*fileinfo)["fid"]), string(reply))
  156. return
  157. }
  158. //log.Println(mid, string(reply))
  159. rdata := make(map[string]interface{})
  160. if err := json.Unmarshal(reply, &rdata); err != nil {
  161. log.Println(mid, "rpc返回数据解析失败:", qu.ObjToString((*fileinfo)["fid"]), err)
  162. return
  163. }
  164. if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
  165. if qu.ObjToString((*fileinfo)["ftype"]) == "rar" || qu.ObjToString((*fileinfo)["ftype"]) == "zip" {
  166. (*fileinfo)["content"] = rdata["contextc"]
  167. } else {
  168. (*fileinfo)["content"] = rdata["context"]
  169. }
  170. (*fileinfo)["expend"] = rdata["expend"]
  171. delete(*fileinfo, "update")
  172. //log.Println((*fileinfo))
  173. (*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk] = *fileinfo
  174. //asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
  175. //qwer := asdf["attachments"].(map[string]interface{})
  176. //qwer[attk] =*fileinfo
  177. //log.Println((*qmap)[MgoFileFiled])
  178. updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  179. "$set": bson.M{
  180. MgoFileFiled: (*qmap)[MgoFileFiled],
  181. },
  182. })
  183. if updateBool {
  184. *updatenum++
  185. mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  186. "$set": bson.M{
  187. "updatefileNum": &updatenum,
  188. },})
  189. log.Println(mid, "mongo更新数据成功")
  190. } else {
  191. log.Println(mid, "mongo更新数据失败", qu.ObjToString((*fileinfo)["fid"]))
  192. }
  193. //nowHour := time.Now().Hour()
  194. //rdlock.Lock()
  195. //if nowHour != hourNum {
  196. // log.Println("send email:", SendMail(fmt.Sprint(updateBool, mid)))
  197. // hourNum = nowHour
  198. //}
  199. //rdlock.Unlock()
  200. } else {
  201. log.Println(mid, "调用rpc服务解析异常:", mid, qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
  202. }
  203. }
  204. //
  205. //var hourNum int
  206. //var rdlock sync.RWMutex