main.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package main
  2. import (
  3. "encoding/json"
  4. "gopkg.in/mgo.v2/bson"
  5. "jy/mongodbutil"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. qu "qfw/util"
  10. "strings"
  11. "sync"
  12. )
  13. var udpclient mu.UdpClient //udp对象
  14. var Sysconfig map[string]interface{}
  15. var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
  16. var sys sync.RWMutex
  17. func init() {
  18. qu.ReadConfig(&Sysconfig)
  19. MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
  20. MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
  21. MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
  22. SidField = qu.ObjToString(Sysconfig["json_sidfiled"])
  23. EidField = qu.ObjToString(Sysconfig["json_eidfiled"])
  24. MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo")
  25. if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
  26. log.Println("获取配置文件参数失败", Sysconfig)
  27. return
  28. }
  29. mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
  30. log.Println(mongodbutil.Mgo.Get().Ping())
  31. }
  32. func main() {
  33. udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
  34. udpclient.Listen(processUdpMsg)
  35. log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
  36. b := make(chan bool, 1)
  37. <-b
  38. }
  39. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  40. defer qu.Catch()
  41. switch act {
  42. case mu.OP_TYPE_DATA: //保存服务
  43. go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra)
  44. tmp := make(map[string]interface{})
  45. err := json.Unmarshal(data, &tmp)
  46. if err != nil {
  47. go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
  48. return
  49. }
  50. tmp["start"] = tmp[qu.ObjToString(SidField)]
  51. bytes, _ := json.Marshal(tmp)
  52. b := mongodbutil.Mgo.Save("ocr_task", string(bytes))
  53. log.Println("保存id:",b)
  54. case mu.OP_NOOP: //其他节点回应消息打印
  55. log.Println("接收成功", string(data))
  56. case mu.OP_GET_DOWNLOADERCODE: //分发任务
  57. sys.Lock()
  58. datas, _ := mongodbutil.Mgo.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
  59. if len(*datas) == 0 {
  60. go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
  61. sys.Unlock()
  62. return
  63. }
  64. tmp := (*datas)[0]
  65. ObjectId := tmp["_id"]
  66. sid := qu.ObjToString(tmp[qu.ObjToString(SidField)])
  67. eid := qu.ObjToString(tmp[qu.ObjToString(EidField)])
  68. rdata, _ := mongodbutil.Mgo.FindOneByField(MgoC, bson.M{"_id": bson.M{
  69. "$gt": bson.ObjectIdHex(sid),
  70. }}, `{"_id":1,"`+MgoFileFiled+`":1}`)
  71. //log.Println(rdata)
  72. if len((*rdata)) == 0 {
  73. go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
  74. sys.Unlock()
  75. return
  76. }
  77. newId := (*rdata)["_id"]
  78. if newId.(bson.ObjectId).Hex() >= eid {
  79. go udpclient.WriteUdp([]byte(`{"id":"`+tmp["start"].(bson.ObjectId).Hex()+`"}`), mu.OP_TYPE_DATA, ra)//起始位置
  80. go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`"}`), mu.OP_TYPE_DATA, ra)//分发任务
  81. totmp := make(map[string]string)
  82. totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
  83. totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)])
  84. tobyte, _ := json.Marshal(totmp)
  85. go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
  86. IP: net.ParseIP(qu.ObjToString(Sysconfig["toudpip"])),
  87. Port: qu.IntAll(Sysconfig["toudpport"]),
  88. })
  89. log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
  90. mongodbutil.Mgo.Del("ocr_task", `{"_id":`+ObjectId.(bson.ObjectId)+`}`)
  91. sys.Unlock()
  92. return
  93. }
  94. go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`"}`), mu.OP_TYPE_DATA, ra)//分发任务
  95. //log.Println(newId.(bson.ObjectId).Hex())
  96. tmp[SidField] = newId.(bson.ObjectId).Hex()
  97. mongodbutil.Mgo.Update("ocr_task",
  98. bson.M{"_id":ObjectId.(bson.ObjectId)},tmp,false,false)
  99. sys.Unlock()
  100. }
  101. }