main.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package main
  2. import (
  3. "encoding/json"
  4. "gopkg.in/mgo.v2/bson"
  5. "jy/mongodbutil"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. qu "qfw/util"
  10. "strings"
  11. "sync"
  12. )
  13. var udpclient mu.UdpClient //udp对象
  14. var Sysconfig map[string]interface{}
  15. var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
  16. var sys sync.RWMutex
  17. func init() {
  18. qu.ReadConfig(&Sysconfig)
  19. MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
  20. MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
  21. MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
  22. SidField = qu.ObjToString(Sysconfig["json_sidfiled"])
  23. EidField = qu.ObjToString(Sysconfig["json_eidfiled"])
  24. MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo")
  25. if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
  26. log.Println("获取配置文件参数失败", Sysconfig)
  27. return
  28. }
  29. mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
  30. log.Println(mongodbutil.Mgo.Get().Ping())
  31. }
  32. func main() {
  33. udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
  34. udpclient.Listen(processUdpMsg)
  35. log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
  36. b := make(chan bool, 1)
  37. <-b
  38. }
  39. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  40. defer qu.Catch()
  41. switch act {
  42. case mu.OP_TYPE_DATA: //保存服务
  43. go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra)
  44. tmp := make(map[string]interface{})
  45. err := json.Unmarshal(data, &tmp)
  46. if err != nil {
  47. go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
  48. return
  49. }
  50. tmp["start"] = tmp[qu.ObjToString(SidField)]
  51. bytes, _ := json.Marshal(tmp)
  52. b := mongodbutil.Mgo.Save("ocr_task", string(bytes))
  53. log.Println("保存id:",b)
  54. case mu.OP_NOOP: //其他节点回应消息打印
  55. log.Println("节点接收成功", string(data),ra.String())
  56. case mu.OP_GET_DOWNLOADERCODE: //分发任务
  57. if `{"permission":"get_ocr_task"}` != string(data){
  58. log.Println("没有权限:",string(data),ra)
  59. go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra)
  60. return
  61. }
  62. sys.Lock()
  63. datas, _ := mongodbutil.Mgo.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
  64. if len(*datas) == 0 {
  65. go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
  66. sys.Unlock()
  67. return
  68. }
  69. tmp := (*datas)[0]
  70. ObjectId := tmp["_id"]
  71. sid := qu.ObjToString(tmp[qu.ObjToString(SidField)])
  72. eid := qu.ObjToString(tmp[qu.ObjToString(EidField)])
  73. rdata, _ := mongodbutil.Mgo.FindOneByField(MgoC, bson.M{"_id": bson.M{
  74. "$gt": bson.ObjectIdHex(sid),
  75. }}, `{"_id":1,"`+MgoFileFiled+`":1}`)
  76. //log.Println(rdata)
  77. if len((*rdata)) == 0 {
  78. go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
  79. sys.Unlock()
  80. return
  81. }
  82. newId := (*rdata)["_id"]
  83. if newId.(bson.ObjectId).Hex() >= eid {
  84. go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//起始位置
  85. go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务
  86. totmp := make(map[string]string)
  87. totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
  88. totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)])
  89. tobyte, _ := json.Marshal(totmp)
  90. go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
  91. IP: net.ParseIP(qu.ObjToString(Sysconfig["toudpip"])),
  92. Port: qu.IntAll(Sysconfig["toudpport"]),
  93. })
  94. log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
  95. mongodbutil.Mgo.Del("ocr_task", bson.M{"_id":ObjectId.(bson.ObjectId)})
  96. sys.Unlock()
  97. return
  98. }
  99. go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务
  100. //log.Println(newId.(bson.ObjectId).Hex())
  101. tmp[SidField] = newId.(bson.ObjectId).Hex()
  102. mongodbutil.Mgo.Update("ocr_task",
  103. bson.M{"_id":ObjectId.(bson.ObjectId)},tmp,false,false)
  104. sys.Unlock()
  105. }
  106. }