main.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package main
  2. import (
  3. "encoding/json"
  4. "gopkg.in/mgo.v2/bson"
  5. "jy/mongodbutil"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "net/rpc"
  10. "path"
  11. qu "qfw/util"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. var udpclient mu.UdpClient //udp对象
  18. var Sysconfig map[string]interface{}
  19. var MgoIP, MgoDB, MgoC, MgoFileFiled, GetDataIp, GetDataPort string
  20. var sys sync.RWMutex
  21. var ChanA, ChanB = make(chan bool), make(chan bool, 1)
  22. var tmpNUM int32
  23. func init() {
  24. qu.ReadConfig(&Sysconfig)
  25. MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
  26. MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
  27. MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
  28. GetDataIp = qu.ObjToString(Sysconfig["get_data_ip"])
  29. GetDataPort = qu.ObjToString(Sysconfig["get_data_port"])
  30. MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
  31. if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
  32. log.Println("获取配置文件参数失败", Sysconfig)
  33. return
  34. }
  35. mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
  36. log.Println(mongodbutil.Mgo.Get().Ping())
  37. }
  38. func main() {
  39. log.Println(Sysconfig)
  40. udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
  41. udpclient.Listen(processUdpMsg)
  42. log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
  43. ticker := time.NewTicker(time.Second * 30)
  44. var num int
  45. task:
  46. for {
  47. select {
  48. case <-ticker.C:
  49. num++
  50. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, &net.UDPAddr{
  51. IP: net.ParseIP(GetDataIp),
  52. Port: qu.IntAll(GetDataPort),
  53. })
  54. log.Println("程序启动,开启循环请求数据信息", num)
  55. case abc, ok := <-ChanA:
  56. log.Println("abc,ok:", abc, ok)
  57. if !ok {
  58. log.Println("主动循环请求数据已关闭")
  59. break task
  60. }
  61. }
  62. }
  63. b := make(chan bool, 1)
  64. <-b
  65. }
  66. // "file2text": "192.168.3.207:1234",
  67. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  68. defer qu.Catch()
  69. switch act {
  70. case mu.OP_TYPE_DATA:
  71. atomic.AddInt32(&tmpNUM, 1)
  72. v := atomic.LoadInt32(&tmpNUM)
  73. if v == 1 {
  74. ChanA <- true
  75. close(ChanA)
  76. }
  77. log.Println("data:", string(data), ra.String())
  78. sys.Lock()
  79. var mapInfo map[string]interface{}
  80. err := json.Unmarshal(data, &mapInfo)
  81. if err != nil {
  82. if string(data) == "没有新数据" {
  83. time.Sleep(time.Minute * 5)
  84. } else {
  85. time.Sleep(time.Second * 30)
  86. }
  87. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  88. sys.Unlock()
  89. return
  90. }
  91. if qu.ObjToString(mapInfo["permission"]) != "ocr_task" {
  92. log.Println("数据异常 :", string(data), ra.String())
  93. time.Sleep(time.Second * 30)
  94. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  95. sys.Unlock()
  96. return
  97. }
  98. ObjectId := qu.ObjToString(mapInfo["id"])
  99. if ObjectId == "" || !bson.IsObjectIdHex(ObjectId) {
  100. log.Println("获取数据id错误", mapInfo, ra.String())
  101. time.Sleep(time.Second * 30)
  102. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  103. sys.Unlock()
  104. return
  105. }
  106. sys.Unlock()
  107. log.Println("获取数据成功:", mapInfo, ra.String())
  108. data, _ := mongodbutil.Mgo.FindById(MgoC, ObjectId, bson.M{"_id": 1, MgoFileFiled: 1})
  109. if len(*data) == 0 {
  110. if qu.ObjToString(mapInfo["is_start"]) == "true"{
  111. return
  112. }
  113. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  114. return
  115. }
  116. if v, ok := (*data)[MgoFileFiled].(map[string]interface{}); !ok {
  117. if qu.ObjToString(mapInfo["is_start"]) == "true" {
  118. return
  119. }
  120. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  121. return
  122. } else {
  123. switch v["attachments"].(type) {
  124. case map[string]interface{}:
  125. att := v["attachments"].(map[string]interface{})
  126. updateNum := 0
  127. for attk, vaatt := range att {
  128. if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
  129. //log.Println(mid, "mgo 结构体转换失败", vaatt)
  130. continue
  131. } else {
  132. ChanB <- true
  133. if qu.ObjToString(fileinfo["fid"]) == "" {
  134. //log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
  135. <-ChanB
  136. continue
  137. }
  138. save(bson.ObjectIdHex(ObjectId), attk, data, &fileinfo, &updateNum)
  139. <-ChanB
  140. }
  141. }
  142. }
  143. if qu.ObjToString(mapInfo["is_start"]) == "true" {
  144. return
  145. }
  146. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  147. }
  148. case mu.OP_NOOP: //下个节点回应
  149. log.Println("接收成功", string(data))
  150. }
  151. }
  152. func save(mid interface{}, attk string, qmap, fileinfo *map[string]interface{}, updatenum *int) {
  153. defer qu.Catch()
  154. type FileData struct {
  155. ObjId string //Id
  156. OrgUrl string //源下载地址
  157. Fid string
  158. Name string
  159. Type string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
  160. Content string //识别内容
  161. }
  162. client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
  163. if err != nil {
  164. log.Println(mid, "rpc err :", err)
  165. return
  166. }
  167. defer client.Close()
  168. var reply []byte
  169. //bs, _ := ioutil.ReadFile("1.docx")
  170. var fffpath string
  171. fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"]))
  172. if strings.TrimSpace(fffpath) == "" {
  173. fffpath = qu.ObjToString((*fileinfo)["ftype"])
  174. } else {
  175. fffpath = fffpath[1:]
  176. }
  177. fileData := &FileData{
  178. ObjId: mid.(bson.ObjectId).String(),
  179. OrgUrl: qu.ObjToString((*fileinfo)["url"]),
  180. Name: qu.ObjToString((*fileinfo)["filename"]),
  181. Fid: qu.ObjToString((*fileinfo)["fid"]), //附件id
  182. Type: fffpath,
  183. }
  184. //log.Println(mid, fileData)
  185. err = client.Call("FileToText.FileToContext", fileData, &reply)
  186. if err != nil {
  187. log.Println(mid, "call ocr error:", err)
  188. return
  189. }
  190. //fileinfo["ftype"] = "doc"
  191. //reply = []byte("jdsfkldasjflkj")
  192. //fileinfo["ftype"] = "zip"
  193. //testfiles := []map[string]interface {
  194. //}{
  195. // {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
  196. // {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
  197. // {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
  198. //}
  199. //reply, _ = json.Marshal(testfiles)
  200. if len(reply) == 0 {
  201. log.Println(mid, "rpc返回数据为空:", qu.ObjToString((*fileinfo)["fid"]), string(reply))
  202. return
  203. }
  204. //log.Println(mid, string(reply))
  205. rdata := make(map[string]interface{})
  206. if err := json.Unmarshal(reply, &rdata); err != nil {
  207. log.Println(mid, "rpc返回数据解析失败:", qu.ObjToString((*fileinfo)["fid"]), err)
  208. return
  209. }
  210. if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
  211. if qu.ObjToString((*fileinfo)["ftype"]) == "rar" || qu.ObjToString((*fileinfo)["ftype"]) == "zip" {
  212. (*fileinfo)["content"] = rdata["contextc"]
  213. } else {
  214. (*fileinfo)["content"] = rdata["context"]
  215. }
  216. (*fileinfo)["expend"] = rdata["expend"]
  217. delete(*fileinfo, "update")
  218. //log.Println((*fileinfo))
  219. (*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk] = *fileinfo
  220. //asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
  221. //qwer := asdf["attachments"].(map[string]interface{})
  222. //qwer[attk] =*fileinfo
  223. //log.Println((*qmap)[MgoFileFiled])
  224. updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  225. "$set": bson.M{
  226. MgoFileFiled: (*qmap)[MgoFileFiled],
  227. },
  228. })
  229. if updateBool {
  230. *updatenum++
  231. mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  232. "$set": bson.M{
  233. "updatefileNum": &updatenum,
  234. },})
  235. log.Println(mid, "mongo更新数据成功")
  236. } else {
  237. log.Println(mid, "mongo更新数据失败", qu.ObjToString((*fileinfo)["fid"]))
  238. }
  239. //nowHour := time.Now().Hour()
  240. //rdlock.Lock()
  241. //if nowHour != hourNum {
  242. // log.Println("send email:", SendMail(fmt.Sprint(updateBool, mid)))
  243. // hourNum = nowHour
  244. //}
  245. //rdlock.Unlock()
  246. } else {
  247. log.Println(mid, "调用rpc服务解析异常:", mid, qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
  248. }
  249. }
  250. //
  251. //var hourNum int
  252. //var rdlock sync.RWMutex