|
- package main
- import (
- "encoding/json"
- "fmt"
- "gopkg.in/mgo.v2/bson"
- "html/template"
- "jy/mongodbutil"
- "log"
- mu "mfw/util"
- "net"
- "net/http"
- "qfw/common/src/qfw/util"
- qu "qfw/util"
- "strings"
- "sync"
- )
- var udpclient mu.UdpClient //udp对象
- var Sysconfig map[string]interface{}
- var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
- var sys sync.RWMutex
- func init() {
- qu.ReadConfig(&Sysconfig)
- MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"])
- MgoDB = qu.ObjToString(Sysconfig["mongodb_db"])
- MgoC = qu.ObjToString(Sysconfig["mongodb_c"])
- SidField = qu.ObjToString(Sysconfig["json_sidfiled"])
- EidField = qu.ObjToString(Sysconfig["json_eidfiled"])
- MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo")
- if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
- log.Println("获取配置文件参数失败", Sysconfig)
- return
- }
- mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
- log.Println(mongodbutil.Mgo.Get().Ping())
- }
- func main() {
- udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
- if Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点
- ips := qu.ObjToString(Sysconfig["broadcast_ips"])
- ipsArr := strings.Split(ips, ";")
- for _, v := range ipsArr {
- udpclient.WriteUdp([]byte{}, mu.OP_NOOP, &net.UDPAddr{
- IP: net.ParseIP(v),
- Port: qu.IntAll(Sysconfig["broadcast_port"]),
- })
- }
- }
- mux := http.NewServeMux()
- mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
- http.Redirect(writer, request, "/login", http.StatusFound)
- })
- mux.HandleFunc("/favicon.ico", func(writer http.ResponseWriter, request *http.Request) {
- writer.WriteHeader(http.StatusOK)
- })
- mux.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) {
- tp, err := template.ParseFiles("web/login.html")
- if err != nil {
- log.Println(err)
- writer.Write([]byte( "页面找不到了~"))
- return
- }
- if request.Method == "GET"{
- tp.Execute(writer,"")
- return
- }
- if request.Method == "POST" {
- err := request.ParseForm()
- if err != nil {
- http.Error(writer, err.Error(), http.StatusBadRequest)
- return
- }
- email := request.Form.Get("username")
- pwd := request.Form.Get("pwd")
- if email == "ocr_task" && pwd == "ocr_task_pwd" {
- http.SetCookie(writer,&http.Cookie{Name: "username", Value: util.GetMd5String("email")})
- http.Redirect(writer, request, "/query", http.StatusFound)
- } else {
- writer.WriteHeader(http.StatusUnauthorized)
- tp.Execute(writer, "密码错误")
- }
- return
- }
- })
- res := http.FileServer(http.Dir("res"))
- mux.Handle("/res/", http.StripPrefix("/res/", res))
- mux.HandleFunc("/query", func(writer http.ResponseWriter, request *http.Request) {
- tplogin, err := template.ParseFiles("web/login.html")
- if err != nil {
- log.Println(err)
- writer.Write([]byte( "页面找不到了~"))
- return
- }
- cookie, e := request.Cookie("username")
- if cookie == nil{
- http.RedirectHandler("/login", http.StatusUnauthorized)
- tplogin.Execute(writer,"请先登录")
- return
- }
- if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc"{
- http.RedirectHandler("/login", http.StatusUnauthorized)
- tplogin.Execute(writer,"密钥错误")
- return
- }
- tp, err := template.ParseFiles("web/index.html", "public/header.html")
- if err != nil {
- log.Println(err)
- writer.Write([]byte( "页面找不到了~"))
- return
- }
- task := queryOcr_task()
- tp.Execute(writer, task)
- })
- log.Println("Http listening port: ", qu.ObjToString(Sysconfig["http_port"]))
- if err := http.ListenAndServe(":"+qu.ObjToString(Sysconfig["http_port"]), mux); err != nil {
- fmt.Println("start http server faild:", err)
- }
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- defer qu.Catch()
- switch act {
- case mu.OP_TYPE_DATA: //保存服务
- go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra)
- tmp := make(map[string]interface{})
- err := json.Unmarshal(data, &tmp)
- if err != nil {
- go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
- return
- }
- tmp["start"] = tmp[qu.ObjToString(SidField)]
- bytes, _ := json.Marshal(tmp)
- b := mongodbutil.Mgo.Save("ocr_task", string(bytes))
- log.Println("保存id:", b)
- case mu.OP_NOOP: //其他节点回应消息打印
- log.Println("节点接收成功", string(data), ra.String())
- case mu.OP_GET_DOWNLOADERCODE: //分发任务
- if `{"permission":"get_ocr_task"}` != string(data) {
- log.Println("没有权限:", string(data), ra)
- go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra)
- return
- }
- sys.Lock()
- datas, _ := mongodbutil.Mgo.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
- if len(*datas) == 0 {
- go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
- sys.Unlock()
- return
- }
- tmp := (*datas)[0]
- ObjectId := tmp["_id"]
- sid := qu.ObjToString(tmp[qu.ObjToString(SidField)])
- eid := qu.ObjToString(tmp[qu.ObjToString(EidField)])
- rdata, _ := mongodbutil.Mgo.FindOneByField(MgoC, bson.M{"_id": bson.M{
- "$gt": bson.ObjectIdHex(sid),
- }}, `{"_id":1,"`+MgoFileFiled+`":1}`)
- //log.Println(rdata)
- if len((*rdata)) == 0 {
- go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
- sys.Unlock()
- return
- }
- newId := (*rdata)["_id"]
- if newId.(bson.ObjectId).Hex() >= eid {
- go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置
- go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
- totmp := make(map[string]string)
- totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
- totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)])
- tobyte, _ := json.Marshal(totmp)
- go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP(qu.ObjToString(Sysconfig["toudpip"])),
- Port: qu.IntAll(Sysconfig["toudpport"]),
- })
- log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
- mongodbutil.Mgo.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
- sys.Unlock()
- return
- }
- go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
- //log.Println(newId.(bson.ObjectId).Hex())
- tmp[SidField] = newId.(bson.ObjectId).Hex()
- mongodbutil.Mgo.Update("ocr_task",
- bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
- sys.Unlock()
- }
- }
- func queryOcr_task() map[string]int {
- data := make(map[string]int)
- taskArr, _ := mongodbutil.Mgo.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
- taskNum := len(*taskArr)
- if taskNum == 0 {
- return data
- }
- data["taskNum"] = taskNum
- sumNum := 0
- nowSumNum := 0
- for i, v := range *taskArr {
- sid := bson.ObjectIdHex(qu.ObjToString(v["start"]))
- eid := bson.ObjectIdHex(qu.ObjToString(v[qu.ObjToString(EidField)]))
- sumNum += mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{
- "$gte": sid,
- "$lte": eid,
- }})
- if i == 0 {
- nowSumNum = sumNum
- }
- }
- data["sumNum"] = sumNum
- data["nowSumNum"] = nowSumNum
- tmpsid := bson.ObjectIdHex(qu.ObjToString((*taskArr)[0]["start"]))
- over := (*taskArr)[0][qu.ObjToString(SidField)]
- overNum := mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{
- "$gte": tmpsid,
- "$lt": bson.ObjectIdHex(qu.ObjToString(over)),
- }})
- data["overNum"] = overNum
- undoneNum := sumNum - overNum
- data["undoneNum"] = undoneNum
- nowUnDoneNum := nowSumNum - overNum
- data["nowUnDoneNum"] = nowUnDoneNum
- return data
- }
|