main.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package main
  2. import (
  3. "encoding/json"
  4. "gopkg.in/mgo.v2/bson"
  5. "jy/mongodbutil"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "net/rpc"
  10. "path"
  11. qu "qfw/util"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. var udpclient mu.UdpClient //udp对象
  18. var Sysconfig map[string]interface{}
  19. var MgoIP, MgoDB, MgoC, MgoFileFiled, GetDataIp, GetDataPort string
  20. var sys sync.RWMutex
  21. var ChanA,ChanB = make(chan bool),make(chan bool,1)
  22. var tmpNUM int32
  23. func init() {
  24. qu.ReadConfig(&Sysconfig)
  25. MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
  26. MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
  27. MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
  28. GetDataIp = qu.ObjToString(Sysconfig["get_data_ip"])
  29. GetDataPort = qu.ObjToString(Sysconfig["get_data_port"])
  30. MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
  31. if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
  32. log.Println("获取配置文件参数失败", Sysconfig)
  33. return
  34. }
  35. mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
  36. log.Println(mongodbutil.Mgo.Get().Ping())
  37. }
  38. func main() {
  39. log.Println(Sysconfig)
  40. udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
  41. udpclient.Listen(processUdpMsg)
  42. log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
  43. ticker := time.NewTicker(time.Second * 30)
  44. var num int
  45. task: for {
  46. select {
  47. case <-ticker.C:
  48. num++
  49. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, &net.UDPAddr{
  50. IP: net.ParseIP(GetDataIp),
  51. Port: qu.IntAll(GetDataPort),
  52. })
  53. log.Println("程序启动,开启循环请求数据信息", num)
  54. case abc, ok := <-ChanA:
  55. log.Println("abc,ok:", abc, ok)
  56. if !ok {
  57. log.Println("主动循环请求数据已关闭")
  58. break task
  59. }
  60. }
  61. }
  62. b := make(chan bool, 1)
  63. <-b
  64. }
  65. // "file2text": "192.168.3.207:1234",
  66. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  67. defer qu.Catch()
  68. switch act {
  69. case mu.OP_TYPE_DATA:
  70. atomic.AddInt32(&tmpNUM, 1)
  71. v := atomic.LoadInt32(&tmpNUM)
  72. if v == 1 {
  73. ChanA <- true
  74. close(ChanA)
  75. }
  76. log.Println("data:",string(data))
  77. sys.Lock()
  78. var mapInfo map[string]interface{}
  79. err := json.Unmarshal(data, &mapInfo)
  80. if err != nil {
  81. log.Println("json err :", err, string(data),ra.String())
  82. time.Sleep(time.Second * 30)
  83. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  84. sys.Unlock()
  85. return
  86. }
  87. if qu.ObjToString(mapInfo["permission"])!="ocr_task"{
  88. log.Println("数据异常 :", string(data),ra.String())
  89. time.Sleep(time.Second * 3)
  90. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  91. sys.Unlock()
  92. return
  93. }
  94. ObjectId := qu.ObjToString(mapInfo["id"])
  95. if ObjectId == "" || !bson.IsObjectIdHex(ObjectId) {
  96. log.Println("获取数据id错误", mapInfo,ra.String())
  97. time.Sleep(time.Second * 3)
  98. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  99. sys.Unlock()
  100. return
  101. }
  102. sys.Unlock()
  103. log.Println("获取数据成功:", mapInfo,ra.String())
  104. data, _ := mongodbutil.Mgo.FindById(MgoC, ObjectId, bson.M{"_id": 1, MgoFileFiled: 1})
  105. if len(*data) == 0 {
  106. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  107. return
  108. }
  109. if v, ok := (*data)[MgoFileFiled].(map[string]interface{}); !ok {
  110. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  111. return
  112. } else {
  113. switch v["attachments"].(type) {
  114. case map[string]interface{}:
  115. att := v["attachments"].(map[string]interface{})
  116. updateNum := 0
  117. for attk, vaatt := range att {
  118. if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
  119. //log.Println(mid, "mgo 结构体转换失败", vaatt)
  120. continue
  121. } else {
  122. ChanB <- true
  123. if qu.ObjToString(fileinfo["fid"]) == "" {
  124. //log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
  125. <-ChanB
  126. continue
  127. }
  128. save(bson.ObjectIdHex(ObjectId), attk, &v, &fileinfo, &updateNum)
  129. <-ChanB
  130. }
  131. }
  132. }
  133. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  134. }
  135. case mu.OP_NOOP: //下个节点回应
  136. log.Println("接收成功", string(data))
  137. }
  138. }
  139. func save(mid interface{}, attk string, qmap, fileinfo *map[string]interface{}, updatenum *int) {
  140. defer qu.Catch()
  141. type FileData struct {
  142. ObjId string //Id
  143. OrgUrl string //源下载地址
  144. Fid string
  145. Name string
  146. Type string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
  147. Content string //识别内容
  148. }
  149. client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
  150. if err != nil {
  151. log.Println(mid, "rpc err :", err)
  152. return
  153. }
  154. defer client.Close()
  155. var reply []byte
  156. //bs, _ := ioutil.ReadFile("1.docx")
  157. var fffpath string
  158. fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"]))
  159. if strings.TrimSpace(fffpath) == "" {
  160. fffpath = qu.ObjToString((*fileinfo)["ftype"])
  161. } else {
  162. fffpath = fffpath[1:]
  163. }
  164. fileData := &FileData{
  165. ObjId: mid.(bson.ObjectId).String(),
  166. OrgUrl: qu.ObjToString((*fileinfo)["url"]),
  167. Name: qu.ObjToString((*fileinfo)["filename"]),
  168. Fid: qu.ObjToString((*fileinfo)["fid"]), //附件id
  169. Type: fffpath,
  170. }
  171. //log.Println(mid, fileData)
  172. err = client.Call("FileToText.FileToContext", fileData, &reply)
  173. if err != nil {
  174. log.Println(mid, "call ocr error:", err)
  175. return
  176. }
  177. //fileinfo["ftype"] = "doc"
  178. //reply = []byte("jdsfkldasjflkj")
  179. //fileinfo["ftype"] = "zip"
  180. //testfiles := []map[string]interface {
  181. //}{
  182. // {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
  183. // {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
  184. // {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
  185. //}
  186. //reply, _ = json.Marshal(testfiles)
  187. if len(reply) == 0 {
  188. log.Println(mid, "rpc返回数据为空:", qu.ObjToString((*fileinfo)["fid"]), string(reply))
  189. return
  190. }
  191. //log.Println(mid, string(reply))
  192. rdata := make(map[string]interface{})
  193. if err := json.Unmarshal(reply, &rdata); err != nil {
  194. log.Println(mid, "rpc返回数据解析失败:", qu.ObjToString((*fileinfo)["fid"]), err)
  195. return
  196. }
  197. if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
  198. if qu.ObjToString((*fileinfo)["ftype"]) == "rar" || qu.ObjToString((*fileinfo)["ftype"]) == "zip" {
  199. (*fileinfo)["content"] = rdata["contextc"]
  200. } else {
  201. (*fileinfo)["content"] = rdata["context"]
  202. }
  203. (*fileinfo)["expend"] = rdata["expend"]
  204. delete(*fileinfo, "update")
  205. //log.Println((*fileinfo))
  206. (*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk] = *fileinfo
  207. //asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
  208. //qwer := asdf["attachments"].(map[string]interface{})
  209. //qwer[attk] =*fileinfo
  210. //log.Println((*qmap)[MgoFileFiled])
  211. updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  212. "$set": bson.M{
  213. MgoFileFiled: (*qmap)[MgoFileFiled],
  214. },
  215. })
  216. if updateBool {
  217. *updatenum++
  218. mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  219. "$set": bson.M{
  220. "updatefileNum": &updatenum,
  221. },})
  222. log.Println(mid, "mongo更新数据成功")
  223. } else {
  224. log.Println(mid, "mongo更新数据失败", qu.ObjToString((*fileinfo)["fid"]))
  225. }
  226. //nowHour := time.Now().Hour()
  227. //rdlock.Lock()
  228. //if nowHour != hourNum {
  229. // log.Println("send email:", SendMail(fmt.Sprint(updateBool, mid)))
  230. // hourNum = nowHour
  231. //}
  232. //rdlock.Unlock()
  233. } else {
  234. log.Println(mid, "调用rpc服务解析异常:", mid, qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
  235. }
  236. }
  237. //
  238. //var hourNum int
  239. //var rdlock sync.RWMutex