main.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "gopkg.in/mgo.v2/bson"
  6. "html/template"
  7. "jy/mongodbutil"
  8. "log"
  9. mu "mfw/util"
  10. "net"
  11. "net/http"
  12. "qfw/common/src/qfw/util"
  13. qu "qfw/util"
  14. "strings"
  15. "sync"
  16. )
  17. var udpclient mu.UdpClient //udp对象
  18. var Sysconfig map[string]interface{}
  19. var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
  20. var sys sync.RWMutex
  21. func init() {
  22. qu.ReadConfig(&Sysconfig)
  23. MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
  24. MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
  25. MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
  26. SidField = qu.ObjToString(Sysconfig["json_sidfiled"])
  27. EidField = qu.ObjToString(Sysconfig["json_eidfiled"])
  28. MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo")
  29. if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
  30. log.Println("获取配置文件参数失败", Sysconfig)
  31. return
  32. }
  33. mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
  34. log.Println(mongodbutil.Mgo.Get().Ping())
  35. }
  36. func main() {
  37. udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
  38. udpclient.Listen(processUdpMsg)
  39. log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
  40. if Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点
  41. ips := qu.ObjToString(Sysconfig["broadcast_ips"])
  42. ipsArr := strings.Split(ips, ";")
  43. for _, v := range ipsArr {
  44. udpclient.WriteUdp([]byte{}, mu.OP_NOOP, &net.UDPAddr{
  45. IP: net.ParseIP(v),
  46. Port: qu.IntAll(Sysconfig["broadcast_port"]),
  47. })
  48. }
  49. }
  50. mux := http.NewServeMux()
  51. mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
  52. http.Redirect(writer, request, "/login", http.StatusFound)
  53. })
  54. mux.HandleFunc("/favicon.ico", func(writer http.ResponseWriter, request *http.Request) {
  55. writer.WriteHeader(http.StatusOK)
  56. })
  57. mux.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) {
  58. tp, err := template.ParseFiles("web/login.html")
  59. if err != nil {
  60. log.Println(err)
  61. writer.Write([]byte( "页面找不到了~"))
  62. return
  63. }
  64. if request.Method == "GET"{
  65. tp.Execute(writer,"")
  66. return
  67. }
  68. if request.Method == "POST" {
  69. err := request.ParseForm()
  70. if err != nil {
  71. http.Error(writer, err.Error(), http.StatusBadRequest)
  72. return
  73. }
  74. email := request.Form.Get("username")
  75. pwd := request.Form.Get("pwd")
  76. if email == "ocr_task" && pwd == "ocr_task_pwd" {
  77. http.SetCookie(writer,&http.Cookie{Name: "username", Value: util.GetMd5String("email")})
  78. http.Redirect(writer, request, "/query", http.StatusFound)
  79. } else {
  80. writer.WriteHeader(http.StatusUnauthorized)
  81. tp.Execute(writer, "密码错误")
  82. }
  83. return
  84. }
  85. })
  86. res := http.FileServer(http.Dir("res"))
  87. mux.Handle("/res/", http.StripPrefix("/res/", res))
  88. mux.HandleFunc("/query", func(writer http.ResponseWriter, request *http.Request) {
  89. tplogin, err := template.ParseFiles("web/login.html")
  90. if err != nil {
  91. log.Println(err)
  92. writer.Write([]byte( "页面找不到了~"))
  93. return
  94. }
  95. cookie, e := request.Cookie("username")
  96. if cookie == nil{
  97. http.RedirectHandler("/login", http.StatusUnauthorized)
  98. tplogin.Execute(writer,"请先登录")
  99. return
  100. }
  101. if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc"{
  102. http.RedirectHandler("/login", http.StatusUnauthorized)
  103. tplogin.Execute(writer,"密钥错误")
  104. return
  105. }
  106. tp, err := template.ParseFiles("web/index.html", "public/header.html")
  107. if err != nil {
  108. log.Println(err)
  109. writer.Write([]byte( "页面找不到了~"))
  110. return
  111. }
  112. task := queryOcr_task()
  113. tp.Execute(writer, task)
  114. })
  115. log.Println("Http listening port: ", qu.ObjToString(Sysconfig["http_port"]))
  116. if err := http.ListenAndServe(":"+qu.ObjToString(Sysconfig["http_port"]), mux); err != nil {
  117. fmt.Println("start http server faild:", err)
  118. }
  119. }
  120. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  121. defer qu.Catch()
  122. switch act {
  123. case mu.OP_TYPE_DATA: //保存服务
  124. go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra)
  125. tmp := make(map[string]interface{})
  126. err := json.Unmarshal(data, &tmp)
  127. if err != nil {
  128. go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
  129. return
  130. }
  131. tmp["start"] = tmp[qu.ObjToString(SidField)]
  132. bytes, _ := json.Marshal(tmp)
  133. b := mongodbutil.Mgo.Save("ocr_task", string(bytes))
  134. log.Println("保存id:", b)
  135. case mu.OP_NOOP: //其他节点回应消息打印
  136. log.Println("节点接收成功", string(data), ra.String())
  137. case mu.OP_GET_DOWNLOADERCODE: //分发任务
  138. if `{"permission":"get_ocr_task"}` != string(data) {
  139. log.Println("没有权限:", string(data), ra)
  140. go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra)
  141. return
  142. }
  143. sys.Lock()
  144. datas, _ := mongodbutil.Mgo.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
  145. if len(*datas) == 0 {
  146. go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
  147. sys.Unlock()
  148. return
  149. }
  150. tmp := (*datas)[0]
  151. ObjectId := tmp["_id"]
  152. sid := qu.ObjToString(tmp[qu.ObjToString(SidField)])
  153. eid := qu.ObjToString(tmp[qu.ObjToString(EidField)])
  154. rdata, _ := mongodbutil.Mgo.FindOneByField(MgoC, bson.M{"_id": bson.M{
  155. "$gt": bson.ObjectIdHex(sid),
  156. }}, `{"_id":1,"`+MgoFileFiled+`":1}`)
  157. //log.Println(rdata)
  158. if len((*rdata)) == 0 {
  159. go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
  160. sys.Unlock()
  161. return
  162. }
  163. newId := (*rdata)["_id"]
  164. if newId.(bson.ObjectId).Hex() >= eid {
  165. go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置
  166. go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
  167. totmp := make(map[string]string)
  168. totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
  169. totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)])
  170. tobyte, _ := json.Marshal(totmp)
  171. go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
  172. IP: net.ParseIP(qu.ObjToString(Sysconfig["toudpip"])),
  173. Port: qu.IntAll(Sysconfig["toudpport"]),
  174. })
  175. log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
  176. mongodbutil.Mgo.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
  177. sys.Unlock()
  178. return
  179. }
  180. go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
  181. //log.Println(newId.(bson.ObjectId).Hex())
  182. tmp[SidField] = newId.(bson.ObjectId).Hex()
  183. mongodbutil.Mgo.Update("ocr_task",
  184. bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
  185. sys.Unlock()
  186. }
  187. }
  188. func queryOcr_task() map[string]int {
  189. data := make(map[string]int)
  190. taskArr, _ := mongodbutil.Mgo.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
  191. taskNum := len(*taskArr)
  192. if taskNum == 0 {
  193. return data
  194. }
  195. data["taskNum"] = taskNum
  196. sumNum := 0
  197. nowSumNum := 0
  198. for i, v := range *taskArr {
  199. sid := bson.ObjectIdHex(qu.ObjToString(v["start"]))
  200. eid := bson.ObjectIdHex(qu.ObjToString(v[qu.ObjToString(EidField)]))
  201. sumNum += mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{
  202. "$gte": sid,
  203. "$lte": eid,
  204. }})
  205. if i == 0 {
  206. nowSumNum = sumNum
  207. }
  208. }
  209. data["sumNum"] = sumNum
  210. data["nowSumNum"] = nowSumNum
  211. tmpsid := bson.ObjectIdHex(qu.ObjToString((*taskArr)[0]["start"]))
  212. over := (*taskArr)[0][qu.ObjToString(SidField)]
  213. overNum := mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{
  214. "$gte": tmpsid,
  215. "$lt": bson.ObjectIdHex(qu.ObjToString(over)),
  216. }})
  217. data["overNum"] = overNum
  218. undoneNum := sumNum - overNum
  219. data["undoneNum"] = undoneNum
  220. nowUnDoneNum := nowSumNum - overNum
  221. data["nowUnDoneNum"] = nowUnDoneNum
  222. return data
  223. }