|
@@ -2,11 +2,14 @@ package main
|
|
|
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
+ "fmt"
|
|
|
"gopkg.in/mgo.v2/bson"
|
|
|
+ "html/template"
|
|
|
"jy/mongodbutil"
|
|
|
"log"
|
|
|
mu "mfw/util"
|
|
|
"net"
|
|
|
+ "net/http"
|
|
|
qu "qfw/util"
|
|
|
"strings"
|
|
|
"sync"
|
|
@@ -38,19 +41,38 @@ func main() {
|
|
|
udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
|
|
|
udpclient.Listen(processUdpMsg)
|
|
|
log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
|
|
|
- if Sysconfig["broadcast"].(bool){//重启的话通知分布式节点
|
|
|
+ if Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点
|
|
|
ips := qu.ObjToString(Sysconfig["broadcast_ips"])
|
|
|
ipsArr := strings.Split(ips, ";")
|
|
|
- for _,v := range ipsArr{
|
|
|
- udpclient.WriteUdp([]byte{},mu.OP_NOOP,&net.UDPAddr{
|
|
|
+ for _, v := range ipsArr {
|
|
|
+ udpclient.WriteUdp([]byte{}, mu.OP_NOOP, &net.UDPAddr{
|
|
|
IP: net.ParseIP(v),
|
|
|
Port: qu.IntAll(Sysconfig["broadcast_port"]),
|
|
|
})
|
|
|
}
|
|
|
}
|
|
|
- b := make(chan bool, 1)
|
|
|
- <-b
|
|
|
+ mux := http.NewServeMux()
|
|
|
+ mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
|
|
|
+ http.Redirect(writer,request,"/query",http.StatusFound)
|
|
|
+ })
|
|
|
+ res := http.FileServer(http.Dir("res"))
|
|
|
+ mux.Handle("/res/", http.StripPrefix("/res/", res))
|
|
|
+ mux.HandleFunc("/query", func(writer http.ResponseWriter, request *http.Request) {
|
|
|
+ tp, err := template.ParseFiles("web/index.html","public/header.html")
|
|
|
+ task := queryOcr_task()
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ fmt.Fprint(writer, "页面找不到了~")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ tp.Execute(writer, task)
|
|
|
+ })
|
|
|
+ log.Println("Http listening port: ",qu.ObjToString(Sysconfig["http_port"]))
|
|
|
+ if err := http.ListenAndServe(":"+qu.ObjToString(Sysconfig["http_port"]), mux); err != nil {
|
|
|
+ fmt.Println("start http server faild:", err)
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
defer qu.Catch()
|
|
|
switch act {
|
|
@@ -65,12 +87,12 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
tmp["start"] = tmp[qu.ObjToString(SidField)]
|
|
|
bytes, _ := json.Marshal(tmp)
|
|
|
b := mongodbutil.Mgo.Save("ocr_task", string(bytes))
|
|
|
- log.Println("保存id:",b)
|
|
|
+ log.Println("保存id:", b)
|
|
|
case mu.OP_NOOP: //其他节点回应消息打印
|
|
|
- log.Println("节点接收成功", string(data),ra.String())
|
|
|
+ log.Println("节点接收成功", string(data), ra.String())
|
|
|
case mu.OP_GET_DOWNLOADERCODE: //分发任务
|
|
|
- if `{"permission":"get_ocr_task"}` != string(data){
|
|
|
- log.Println("没有权限:",string(data),ra)
|
|
|
+ if `{"permission":"get_ocr_task"}` != string(data) {
|
|
|
+ log.Println("没有权限:", string(data), ra)
|
|
|
go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra)
|
|
|
return
|
|
|
}
|
|
@@ -96,8 +118,8 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
}
|
|
|
newId := (*rdata)["_id"]
|
|
|
if newId.(bson.ObjectId).Hex() >= eid {
|
|
|
- go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra)//起始位置
|
|
|
- go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务
|
|
|
+ go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置
|
|
|
+ go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
|
|
|
totmp := make(map[string]string)
|
|
|
totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
|
|
|
totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(EidField)])
|
|
@@ -107,15 +129,51 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
Port: qu.IntAll(Sysconfig["toudpport"]),
|
|
|
})
|
|
|
log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
|
|
|
- mongodbutil.Mgo.Del("ocr_task", bson.M{"_id":ObjectId.(bson.ObjectId)})
|
|
|
+ mongodbutil.Mgo.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
|
|
|
sys.Unlock()
|
|
|
return
|
|
|
}
|
|
|
- go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务
|
|
|
+ go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
|
|
|
//log.Println(newId.(bson.ObjectId).Hex())
|
|
|
tmp[SidField] = newId.(bson.ObjectId).Hex()
|
|
|
mongodbutil.Mgo.Update("ocr_task",
|
|
|
- bson.M{"_id":ObjectId.(bson.ObjectId)},tmp,false,false)
|
|
|
+ bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
|
|
|
sys.Unlock()
|
|
|
}
|
|
|
}
|
|
|
+func queryOcr_task() map[string]int {
|
|
|
+ data := make(map[string]int)
|
|
|
+ taskArr, _ := mongodbutil.Mgo.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
|
|
|
+ taskNum := len(*taskArr)
|
|
|
+ if taskNum == 0 {
|
|
|
+ return data
|
|
|
+ }
|
|
|
+ data["taskNum"] = taskNum
|
|
|
+ sumNum := 0
|
|
|
+ nowSumNum :=0
|
|
|
+ for i, v := range *taskArr {
|
|
|
+ sid := bson.ObjectIdHex(qu.ObjToString(v["start"]))
|
|
|
+ eid := bson.ObjectIdHex(qu.ObjToString(v[qu.ObjToString(EidField)]))
|
|
|
+ sumNum += mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{
|
|
|
+ "$gte": sid,
|
|
|
+ "$lte": eid,
|
|
|
+ }})
|
|
|
+ if i ==0{
|
|
|
+ nowSumNum = sumNum
|
|
|
+ }
|
|
|
+ }
|
|
|
+ data["sumNum"] = sumNum
|
|
|
+ data["nowSumNum"] = nowSumNum
|
|
|
+ tmpsid := bson.ObjectIdHex(qu.ObjToString((*taskArr)[0]["start"]))
|
|
|
+ over := (*taskArr)[0][qu.ObjToString(SidField)]
|
|
|
+ overNum := mongodbutil.Mgo.Count("bidding", bson.M{"_id": bson.M{
|
|
|
+ "$gte": tmpsid,
|
|
|
+ "$lt": bson.ObjectIdHex(qu.ObjToString(over)),
|
|
|
+ }})
|
|
|
+ data["overNum"] = overNum
|
|
|
+ undoneNum := sumNum - overNum
|
|
|
+ data["undoneNum"] = undoneNum
|
|
|
+ nowUnDoneNum := nowSumNum - overNum
|
|
|
+ data["nowUnDoneNum"] = nowUnDoneNum
|
|
|
+ return data
|
|
|
+}
|