main.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "gopkg.in/mgo.v2/bson"
  6. "html/template"
  7. "jy/mongodbutil"
  8. "log"
  9. mu "mfw/util"
  10. "net"
  11. "net/http"
  12. qu "qfw/util"
  13. "strings"
  14. "sync"
  15. )
  16. var udpclient mu.UdpClient //udp对象
  17. var Sysconfig map[string]interface{}
  18. var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
  19. var sys sync.RWMutex
  20. func init() {
  21. qu.ReadConfig(&Sysconfig)
  22. MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
  23. MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
  24. MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
  25. SidField = qu.ObjToString(Sysconfig["json_sidfiled"])
  26. EidField = qu.ObjToString(Sysconfig["json_eidfiled"])
  27. MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo")
  28. if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
  29. log.Println("获取配置文件参数失败", Sysconfig)
  30. return
  31. }
  32. mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
  33. log.Println(mongodbutil.Mgo.Get().Ping())
  34. }
  35. func main() {
  36. udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
  37. udpclient.Listen(processUdpMsg)
  38. log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
  39. if Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点
  40. ips := qu.ObjToString(Sysconfig["broadcast_ips"])
  41. ipsArr := strings.Split(ips, ";")
  42. for _, v := range ipsArr {
  43. udpclient.WriteUdp([]byte{}, mu.OP_NOOP, &net.UDPAddr{
  44. IP: net.ParseIP(v),
  45. Port: qu.IntAll(Sysconfig["broadcast_port"]),
  46. })
  47. }
  48. }
  49. mux := http.NewServeMux()
  50. mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
  51. http.Redirect(writer,request,"/query",http.StatusFound)
  52. })
  53. res := http.FileServer(http.Dir("res"))
  54. mux.Handle("/res/", http.StripPrefix("/res/", res))
  55. mux.HandleFunc("/query", func(writer http.ResponseWriter, request *http.Request) {
  56. tp, err := template.ParseFiles("web/index.html","public/header.html")
  57. task := queryOcr_task()
  58. if err != nil {
  59. log.Println(err)
  60. fmt.Fprint(writer, "页面找不到了~")
  61. return
  62. }
  63. tp.Execute(writer, task)
  64. })
  65. log.Println("Http listening port: ",qu.ObjToString(Sysconfig["http_port"]))
  66. if err := http.ListenAndServe(":"+qu.ObjToString(Sysconfig["http_port"]), mux); err != nil {
  67. fmt.Println("start http server faild:", err)
  68. }
  69. }
  70. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  71. defer qu.Catch()
  72. switch act {
  73. case mu.OP_TYPE_DATA: //保存服务
  74. go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra)
  75. tmp := make(map[string]interface{})
  76. err := json.Unmarshal(data, &tmp)
  77. if err != nil {
  78. go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
  79. return
  80. }
  81. tmp["start"] = tmp[qu.ObjToString(SidField)]
  82. bytes, _ := json.Marshal(tmp)
  83. b := mongodbutil.Mgo.Save("ocr_task", string(bytes))
  84. log.Println("保存id:", b)
  85. case mu.OP_NOOP: //其他节点回应消息打印
  86. log.Println("节点接收成功", string(data), ra.String())
  87. case mu.OP_GET_DOWNLOADERCODE: //分发任务
  88. if `{"permission":"get_ocr_task"}` != string(data) {
  89. log.Println("没有权限:", string(data), ra)
  90. go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra)
  91. return
  92. }
  93. sys.Lock()
  94. datas, _ := mongodbutil.Mgo.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
  95. if len(*datas) == 0 {
  96. go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
  97. sys.Unlock()
  98. return
  99. }
  100. tmp := (*datas)[0]
  101. ObjectId := tmp["_id"]
  102. sid := qu.ObjToString(tmp[qu.ObjToString(SidField)])
  103. eid := qu.ObjToString(tmp[qu.ObjToString(EidField)])
  104. rdata, _ := mongodbutil.Mgo.FindOneByField(MgoC, bson.M{"_id": bson.M{
  105. "$gt": bson.ObjectIdHex(sid),
  106. }}, `{"_id":1,"`+MgoFileFiled+`":1}`)
  107. //log.Println(rdata)
  108. if len((*rdata)) == 0 {
  109. go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
  110. sys.Unlock()
  111. return
  112. }
  113. newId := (*rdata)["_id"]
  114. if newId.(bson.ObjectId).Hex() >= eid {
  115. go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置
  116. go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
  117. totmp := make(map[string]string)
  118. totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
  119. totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)])
  120. tobyte, _ := json.Marshal(totmp)
  121. go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
  122. IP: net.ParseIP(qu.ObjToString(Sysconfig["toudpip"])),
  123. Port: qu.IntAll(Sysconfig["toudpport"]),
  124. })
  125. log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
  126. mongodbutil.Mgo.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
  127. sys.Unlock()
  128. return
  129. }
  130. go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
  131. //log.Println(newId.(bson.ObjectId).Hex())
  132. tmp[SidField] = newId.(bson.ObjectId).Hex()
  133. mongodbutil.Mgo.Update("ocr_task",
  134. bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
  135. sys.Unlock()
  136. }
  137. }
  138. func queryOcr_task() map[string]int {
  139. data := make(map[string]int)
  140. taskArr, _ := mongodbutil.Mgo.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
  141. taskNum := len(*taskArr)
  142. if taskNum == 0 {
  143. return data
  144. }
  145. data["taskNum"] = taskNum
  146. sumNum := 0
  147. nowSumNum :=0
  148. for i, v := range *taskArr {
  149. sid := bson.ObjectIdHex(qu.ObjToString(v["start"]))
  150. eid := bson.ObjectIdHex(qu.ObjToString(v[qu.ObjToString(EidField)]))
  151. sumNum += mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{
  152. "$gte": sid,
  153. "$lte": eid,
  154. }})
  155. if i ==0{
  156. nowSumNum = sumNum
  157. }
  158. }
  159. data["sumNum"] = sumNum
  160. data["nowSumNum"] = nowSumNum
  161. tmpsid := bson.ObjectIdHex(qu.ObjToString((*taskArr)[0]["start"]))
  162. over := (*taskArr)[0][qu.ObjToString(SidField)]
  163. overNum := mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{
  164. "$gte": tmpsid,
  165. "$lt": bson.ObjectIdHex(qu.ObjToString(over)),
  166. }})
  167. data["overNum"] = overNum
  168. undoneNum := sumNum - overNum
  169. data["undoneNum"] = undoneNum
  170. nowUnDoneNum := nowSumNum - overNum
  171. data["nowUnDoneNum"] = nowUnDoneNum
  172. return data
  173. }