main.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package main
  2. import (
  3. "encoding/json"
  4. "gopkg.in/mgo.v2/bson"
  5. "jy/mongodbutil"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. qu "qfw/util"
  10. "strings"
  11. "sync"
  12. )
  13. var udpclient mu.UdpClient //udp对象
  14. var Sysconfig map[string]interface{}
  15. var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
  16. var sys sync.RWMutex
  17. func init() {
  18. qu.ReadConfig(&Sysconfig)
  19. MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
  20. MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
  21. MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
  22. SidField = qu.ObjToString(Sysconfig["json_sidfiled"])
  23. EidField = qu.ObjToString(Sysconfig["json_eidfiled"])
  24. MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo")
  25. if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
  26. log.Println("获取配置文件参数失败", Sysconfig)
  27. return
  28. }
  29. mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
  30. log.Println(mongodbutil.Mgo.Get().Ping())
  31. }
  32. func main() {
  33. udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
  34. udpclient.Listen(processUdpMsg)
  35. log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
  36. if Sysconfig["broadcast"].(bool){//重启的话通知分布式节点
  37. ips := qu.ObjToString(Sysconfig["broadcast_ips"])
  38. ipsArr := strings.Split(ips, ";")
  39. for _,v := range ipsArr{
  40. udpclient.WriteUdp([]byte{},mu.OP_NOOP,&net.UDPAddr{
  41. IP: net.ParseIP(v),
  42. Port: qu.IntAll(Sysconfig["broadcast_port"]),
  43. })
  44. }
  45. }
  46. b := make(chan bool, 1)
  47. <-b
  48. }
  49. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  50. defer qu.Catch()
  51. switch act {
  52. case mu.OP_TYPE_DATA: //保存服务
  53. go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra)
  54. tmp := make(map[string]interface{})
  55. err := json.Unmarshal(data, &tmp)
  56. if err != nil {
  57. go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
  58. return
  59. }
  60. tmp["start"] = tmp[qu.ObjToString(SidField)]
  61. bytes, _ := json.Marshal(tmp)
  62. b := mongodbutil.Mgo.Save("ocr_task", string(bytes))
  63. log.Println("保存id:",b)
  64. case mu.OP_NOOP: //其他节点回应消息打印
  65. log.Println("节点接收成功", string(data),ra.String())
  66. case mu.OP_GET_DOWNLOADERCODE: //分发任务
  67. if `{"permission":"get_ocr_task"}` != string(data){
  68. log.Println("没有权限:",string(data),ra)
  69. go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra)
  70. return
  71. }
  72. sys.Lock()
  73. datas, _ := mongodbutil.Mgo.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
  74. if len(*datas) == 0 {
  75. go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
  76. sys.Unlock()
  77. return
  78. }
  79. tmp := (*datas)[0]
  80. ObjectId := tmp["_id"]
  81. sid := qu.ObjToString(tmp[qu.ObjToString(SidField)])
  82. eid := qu.ObjToString(tmp[qu.ObjToString(EidField)])
  83. rdata, _ := mongodbutil.Mgo.FindOneByField(MgoC, bson.M{"_id": bson.M{
  84. "$gt": bson.ObjectIdHex(sid),
  85. }}, `{"_id":1,"`+MgoFileFiled+`":1}`)
  86. //log.Println(rdata)
  87. if len((*rdata)) == 0 {
  88. go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
  89. sys.Unlock()
  90. return
  91. }
  92. newId := (*rdata)["_id"]
  93. if newId.(bson.ObjectId).Hex() >= eid {
  94. go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//起始位置
  95. go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务
  96. totmp := make(map[string]string)
  97. totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
  98. totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)])
  99. tobyte, _ := json.Marshal(totmp)
  100. go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
  101. IP: net.ParseIP(qu.ObjToString(Sysconfig["toudpip"])),
  102. Port: qu.IntAll(Sysconfig["toudpport"]),
  103. })
  104. log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
  105. mongodbutil.Mgo.Del("ocr_task", bson.M{"_id":ObjectId.(bson.ObjectId)})
  106. sys.Unlock()
  107. return
  108. }
  109. go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务
  110. //log.Println(newId.(bson.ObjectId).Hex())
  111. tmp[SidField] = newId.(bson.ObjectId).Hex()
  112. mongodbutil.Mgo.Update("ocr_task",
  113. bson.M{"_id":ObjectId.(bson.ObjectId)},tmp,false,false)
  114. sys.Unlock()
  115. }
  116. }