main.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package main
  2. import (
  3. "encoding/json"
  4. "gopkg.in/mgo.v2/bson"
  5. "jy/mongodbutil"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "net/rpc"
  10. "path"
  11. qu "qfw/util"
  12. "strings"
  13. "sync/atomic"
  14. "time"
  15. )
  16. var udpclient mu.UdpClient //udp对象
  17. var Sysconfig map[string]interface{}
  18. var MgoIP, MgoDB, MgoC, MgoFileFiled, GetDataIp, GetDataPort string
  19. var ChanA, ChanB = make(chan bool), make(chan bool, 1)
  20. var tmpNUM int32
  21. func init() {
  22. qu.ReadConfig(&Sysconfig)
  23. MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
  24. MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
  25. MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
  26. GetDataIp = qu.ObjToString(Sysconfig["get_data_ip"])
  27. GetDataPort = qu.ObjToString(Sysconfig["get_data_port"])
  28. MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
  29. if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
  30. log.Println("获取配置文件参数失败", Sysconfig)
  31. return
  32. }
  33. mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
  34. log.Println(mongodbutil.Mgo.Get().Ping())
  35. }
  36. func main() {
  37. log.Println(Sysconfig)
  38. udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
  39. udpclient.Listen(processUdpMsg)
  40. log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
  41. ticker := time.NewTicker(time.Second * 30)
  42. var num int
  43. task:
  44. for {
  45. select {
  46. case <-ticker.C:
  47. num++
  48. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, &net.UDPAddr{
  49. IP: net.ParseIP(GetDataIp),
  50. Port: qu.IntAll(GetDataPort),
  51. })
  52. log.Println("程序启动,开启循环请求数据信息", num)
  53. case abc, ok := <-ChanA:
  54. log.Println("abc,ok:", abc, ok)
  55. if !ok {
  56. log.Println("主动循环请求数据已关闭")
  57. break task
  58. }
  59. }
  60. }
  61. b := make(chan bool, 1)
  62. <-b
  63. }
  64. // "file2text": "192.168.3.207:1234",
  65. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  66. switch act {
  67. case mu.OP_TYPE_DATA:
  68. atomic.AddInt32(&tmpNUM, 1)
  69. v := atomic.LoadInt32(&tmpNUM)
  70. if v == 1 {
  71. ChanA <- true
  72. close(ChanA)
  73. }
  74. log.Println("data:", string(data), ra.String())
  75. var mapInfo map[string]interface{}
  76. err := json.Unmarshal(data, &mapInfo)
  77. if err != nil {
  78. if string(data) == "没有新数据" {
  79. time.Sleep(time.Second * 30)
  80. } else {
  81. time.Sleep(time.Second * 30)
  82. }
  83. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  84. return
  85. }
  86. if qu.ObjToString(mapInfo["permission"]) != "ocr_task" {
  87. log.Println("数据异常 :", string(data), ra.String())
  88. if qu.ObjToString(mapInfo["permission"]) == "stop" {
  89. log.Println(mapInfo)
  90. panic("释放实例")
  91. }
  92. time.Sleep(time.Second * 30)
  93. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  94. return
  95. }
  96. ObjectId := qu.ObjToString(mapInfo["id"])
  97. if ObjectId == "" || !bson.IsObjectIdHex(ObjectId) {
  98. log.Println("获取数据id错误", mapInfo, ra.String())
  99. time.Sleep(time.Second * 30)
  100. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  101. return
  102. }
  103. log.Println("获取数据成功:", mapInfo, ra.String())
  104. data, _ := mongodbutil.Mgo.FindById(MgoC, ObjectId, bson.M{"_id": 1, MgoFileFiled: 1})
  105. if len(*data) == 0 {
  106. if qu.ObjToString(mapInfo["is_start"]) == "true" {
  107. return
  108. }
  109. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  110. return
  111. }
  112. if v, ok := (*data)[MgoFileFiled].(map[string]interface{}); !ok {
  113. if qu.ObjToString(mapInfo["is_start"]) == "true" {
  114. return
  115. }
  116. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  117. return
  118. } else {
  119. switch v["attachments"].(type) {
  120. case map[string]interface{}:
  121. att := v["attachments"].(map[string]interface{})
  122. updateNum := 0
  123. for attk, vaatt := range att {
  124. if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
  125. //log.Println(mid, "mgo 结构体转换失败", vaatt)
  126. continue
  127. } else {
  128. ChanB <- true
  129. if qu.ObjToString(fileinfo["fid"]) == "" {
  130. //log.Println(mid, "mgo ", MgoFileFiled,"没有fid ")
  131. <-ChanB
  132. continue
  133. }
  134. save(bson.ObjectIdHex(ObjectId), attk, data, &fileinfo, &updateNum)
  135. <-ChanB
  136. }
  137. }
  138. }
  139. if qu.ObjToString(mapInfo["is_start"]) == "true" {
  140. return
  141. }
  142. go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
  143. }
  144. case mu.OP_NOOP: //下个节点回应
  145. log.Println("接收成功", string(data))
  146. case mu.OP_DELETE_DOWNLOADERCODES:
  147. log.Println(string(data))
  148. udpclient.WriteUdp([]byte(`{"permission":"stop"}`), mu.OP_TYPE_DATA, &net.UDPAddr{
  149. IP: net.ParseIP(Sysconfig["udpip"].(string)),
  150. Port: qu.IntAll(Sysconfig["udpport"].(string)),
  151. })
  152. }
  153. }
  154. func save(mid interface{}, attk string, qmap, fileinfo *map[string]interface{}, updatenum *int) {
  155. defer qu.Catch()
  156. type FileData struct {
  157. ObjId string //Id
  158. OrgUrl string //源下载地址
  159. Fid string
  160. Name string
  161. Type string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
  162. Content string //识别内容
  163. }
  164. client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
  165. if err != nil {
  166. log.Println(mid, "rpc err :", err)
  167. return
  168. }
  169. defer client.Close()
  170. var reply []byte
  171. //bs, _ := ioutil.ReadFile("1.docx")
  172. var fffpath string
  173. fffpath = path.Ext(qu.ObjToString((*fileinfo)["filename"]))
  174. if strings.TrimSpace(fffpath) == "" {
  175. fffpath = qu.ObjToString((*fileinfo)["ftype"])
  176. } else {
  177. fffpath = fffpath[1:]
  178. }
  179. fileData := &FileData{
  180. ObjId: mid.(bson.ObjectId).String(),
  181. OrgUrl: qu.ObjToString((*fileinfo)["url"]),
  182. Name: qu.ObjToString((*fileinfo)["filename"]),
  183. Fid: qu.ObjToString((*fileinfo)["fid"]), //附件id
  184. Type: fffpath,
  185. }
  186. //log.Println(mid, fileData)
  187. err = client.Call("FileToText.FileToContext", fileData, &reply)
  188. if err != nil {
  189. log.Println(mid, "call ocr error:", err)
  190. return
  191. }
  192. //fileinfo["ftype"] = "doc"
  193. //reply = []byte("jdsfkldasjflkj")
  194. //fileinfo["ftype"] = "zip"
  195. //testfiles := []map[string]interface {
  196. //}{
  197. // {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
  198. // {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
  199. // {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
  200. //}
  201. //reply, _ = json.Marshal(testfiles)
  202. if len(reply) == 0 {
  203. log.Println(mid, "rpc返回数据为空:", qu.ObjToString((*fileinfo)["fid"]), string(reply))
  204. return
  205. }
  206. //log.Println(mid, string(reply))
  207. rdata := make(map[string]interface{})
  208. if err := json.Unmarshal(reply, &rdata); err != nil {
  209. log.Println(mid, "rpc返回数据解析失败:", qu.ObjToString((*fileinfo)["fid"]), err)
  210. return
  211. }
  212. if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
  213. if qu.ObjToString((*fileinfo)["ftype"]) == "rar" || qu.ObjToString((*fileinfo)["ftype"]) == "zip" {
  214. (*fileinfo)["content"] = rdata["contextc"]
  215. } else {
  216. (*fileinfo)["content"] = rdata["context"]
  217. }
  218. (*fileinfo)["expend"] = rdata["expend"]
  219. delete(*fileinfo, "update")
  220. //log.Println((*fileinfo))
  221. (*qmap)[MgoFileFiled].(map[string]interface{})["attachments"].(map[string]interface{})[attk] = *fileinfo
  222. //asdf := (*qmap)[MgoFileFiled].(map[string]interface{})
  223. //qwer := asdf["attachments"].(map[string]interface{})
  224. //qwer[attk] =*fileinfo
  225. //log.Println((*qmap)[MgoFileFiled])
  226. updateBool := mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  227. "$set": bson.M{
  228. MgoFileFiled: (*qmap)[MgoFileFiled],
  229. },
  230. })
  231. if updateBool {
  232. *updatenum++
  233. mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
  234. "$set": bson.M{
  235. "updatefileNum": &updatenum,
  236. },})
  237. log.Println(mid, "mongo更新数据成功")
  238. } else {
  239. log.Println(mid, "mongo更新数据失败", qu.ObjToString((*fileinfo)["fid"]))
  240. }
  241. //nowHour := time.Now().Hour()
  242. //rdlock.Lock()
  243. //if nowHour != hourNum {
  244. // log.Println("send email:", SendMail(fmt.Sprint(updateBool, mid)))
  245. // hourNum = nowHour
  246. //}
  247. //rdlock.Unlock()
  248. } else {
  249. log.Println(mid, "调用rpc服务解析异常:", mid, qu.ObjToString((*fileinfo)["fid"]), rdata["err"])
  250. }
  251. }
  252. //
  253. //var hourNum int
  254. //var rdlock sync.RWMutex