package main import ( "encoding/json" "fmt" "gopkg.in/mgo.v2/bson" "html/template" "jy/mongodbutil" "log" mu "mfw/util" "net" "net/http" "qfw/common/src/qfw/util" qu "qfw/util" "strings" "sync" ) var udpclient mu.UdpClient //udp对象 var Sysconfig map[string]interface{} var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string var sys sync.RWMutex func init() { qu.ReadConfig(&Sysconfig) MgoIP = qu.ObjToString(Sysconfig["mongodb_ip"]) MgoDB = qu.ObjToString(Sysconfig["mongodb_db"]) MgoC = qu.ObjToString(Sysconfig["mongodb_c"]) SidField = qu.ObjToString(Sysconfig["json_sidfiled"]) EidField = qu.ObjToString(Sysconfig["json_eidfiled"]) MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_filefiled"], "projectinfo") if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" { log.Println("获取配置文件参数失败", Sysconfig) return } mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB) log.Println(mongodbutil.Mgo.Get().Ping()) } func main() { udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024} udpclient.Listen(processUdpMsg) log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"]) if Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点 ips := qu.ObjToString(Sysconfig["broadcast_ips"]) ipsArr := strings.Split(ips, ";") for _, v := range ipsArr { udpclient.WriteUdp([]byte{}, mu.OP_NOOP, &net.UDPAddr{ IP: net.ParseIP(v), Port: qu.IntAll(Sysconfig["broadcast_port"]), }) } } mux := http.NewServeMux() mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { http.Redirect(writer, request, "/login", http.StatusFound) }) mux.HandleFunc("/favicon.ico", func(writer http.ResponseWriter, request *http.Request) { writer.WriteHeader(http.StatusOK) }) mux.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) { tp, err := template.ParseFiles("web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } if request.Method == "GET"{ tp.Execute(writer,"") return } if request.Method == "POST" { err := request.ParseForm() if err != nil { http.Error(writer, err.Error(), http.StatusBadRequest) return } email := request.Form.Get("username") pwd := request.Form.Get("pwd") if email == "ocr_task" && pwd == "ocr_task_pwd" { http.SetCookie(writer,&http.Cookie{Name: "username", Value: util.GetMd5String("email")}) http.Redirect(writer, request, "/query", http.StatusFound) } else { writer.WriteHeader(http.StatusUnauthorized) tp.Execute(writer, "密码错误") } return } }) res := http.FileServer(http.Dir("res")) mux.Handle("/res/", http.StripPrefix("/res/", res)) mux.HandleFunc("/query", func(writer http.ResponseWriter, request *http.Request) { tplogin, err := template.ParseFiles("web/login.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } cookie, e := request.Cookie("username") if cookie == nil{ http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer,"请先登录") return } if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc"{ http.RedirectHandler("/login", http.StatusUnauthorized) tplogin.Execute(writer,"密钥错误") return } tp, err := template.ParseFiles("web/index.html", "public/header.html") if err != nil { log.Println(err) writer.Write([]byte( "页面找不到了~")) return } task := queryOcr_task() tp.Execute(writer, task) }) log.Println("Http listening port: ", qu.ObjToString(Sysconfig["http_port"])) if err := http.ListenAndServe(":"+qu.ObjToString(Sysconfig["http_port"]), mux); err != nil { fmt.Println("start http server faild:", err) } } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer qu.Catch() switch act { case mu.OP_TYPE_DATA: //保存服务 go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra) tmp := make(map[string]interface{}) err := json.Unmarshal(data, &tmp) if err != nil { go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra) return } tmp["start"] = tmp[qu.ObjToString(SidField)] bytes, _ := json.Marshal(tmp) b := mongodbutil.Mgo.Save("ocr_task", string(bytes)) log.Println("保存id:", b) case mu.OP_NOOP: //其他节点回应消息打印 log.Println("节点接收成功", string(data), ra.String()) case mu.OP_GET_DOWNLOADERCODE: //分发任务 if `{"permission":"get_ocr_task"}` != string(data) { log.Println("没有权限:", string(data), ra) go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra) return } sys.Lock() datas, _ := mongodbutil.Mgo.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1) if len(*datas) == 0 { go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra) sys.Unlock() return } tmp := (*datas)[0] ObjectId := tmp["_id"] sid := qu.ObjToString(tmp[qu.ObjToString(SidField)]) eid := qu.ObjToString(tmp[qu.ObjToString(EidField)]) rdata, _ := mongodbutil.Mgo.FindOneByField(MgoC, bson.M{"_id": bson.M{ "$gt": bson.ObjectIdHex(sid), }}, `{"_id":1,"`+MgoFileFiled+`":1}`) //log.Println(rdata) if len((*rdata)) == 0 { go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra) sys.Unlock() return } newId := (*rdata)["_id"] if newId.(bson.ObjectId).Hex() >= eid { go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置 go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务 totmp := make(map[string]string) totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")]) totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)]) tobyte, _ := json.Marshal(totmp) go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(qu.ObjToString(Sysconfig["toudpip"])), Port: qu.IntAll(Sysconfig["toudpport"]), }) log.Println("ocr_task处理完成,发送下个节点", string(tobyte)) mongodbutil.Mgo.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)}) sys.Unlock() return } go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务 //log.Println(newId.(bson.ObjectId).Hex()) tmp[SidField] = newId.(bson.ObjectId).Hex() mongodbutil.Mgo.Update("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false) sys.Unlock() } } func queryOcr_task() map[string]int { data := make(map[string]int) taskArr, _ := mongodbutil.Mgo.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1) taskNum := len(*taskArr) if taskNum == 0 { return data } data["taskNum"] = taskNum sumNum := 0 nowSumNum := 0 for i, v := range *taskArr { sid := bson.ObjectIdHex(qu.ObjToString(v["start"])) eid := bson.ObjectIdHex(qu.ObjToString(v[qu.ObjToString(EidField)])) sumNum += mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{ "$gte": sid, "$lte": eid, }}) if i == 0 { nowSumNum = sumNum } } data["sumNum"] = sumNum data["nowSumNum"] = nowSumNum tmpsid := bson.ObjectIdHex(qu.ObjToString((*taskArr)[0]["start"])) over := (*taskArr)[0][qu.ObjToString(SidField)] overNum := mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{ "$gte": tmpsid, "$lt": bson.ObjectIdHex(qu.ObjToString(over)), }}) data["overNum"] = overNum undoneNum := sumNum - overNum data["undoneNum"] = undoneNum nowUnDoneNum := nowSumNum - overNum data["nowUnDoneNum"] = nowUnDoneNum return data }