|
@@ -1,18 +1,202 @@
|
|
|
package main
|
|
|
|
|
|
import (
|
|
|
-
|
|
|
+ "encoding/json"
|
|
|
+ "log"
|
|
|
+ "net"
|
|
|
+ "os"
|
|
|
+ "qfw/common/src/qfw/util"
|
|
|
+ qu "qfw/util"
|
|
|
"time"
|
|
|
+ mu "mfw/util"
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+var (
|
|
|
+ Sysconfig map[string]interface{} //配置文件
|
|
|
+ mgo *MongodbSim //mongodb操作对象
|
|
|
+ qy_mgo *MongodbSim
|
|
|
+ udpclient mu.UdpClient //udp对象
|
|
|
+ nextNode []map[string]interface{} //下节点数组
|
|
|
+
|
|
|
+ coll_name,qy_coll_name string
|
|
|
+ core_element,other_element []string //要素
|
|
|
+ total_score,core_max,core_each,other_max,other_each int
|
|
|
)
|
|
|
|
|
|
func init() {
|
|
|
+ //加载配置文件
|
|
|
+ qu.ReadConfig(&Sysconfig)
|
|
|
+
|
|
|
+ mconf := Sysconfig["mongodb"].(map[string]interface{})
|
|
|
+ mgo = &MongodbSim{
|
|
|
+ MongodbAddr: mconf["addrName"].(string),
|
|
|
+ DbName: mconf["dbName"].(string),
|
|
|
+ Size: qu.IntAllDef(mconf["pool"], 10),
|
|
|
+ }
|
|
|
+ mgo.InitPool()
|
|
|
+
|
|
|
+
|
|
|
+ qy_mconf := Sysconfig["qy_mongodb"].(map[string]interface{})
|
|
|
+ qy_mgo = &MongodbSim{
|
|
|
+ MongodbAddr: qy_mconf["qy_addrName"].(string),
|
|
|
+ DbName: qy_mconf["qy_dbName"].(string),
|
|
|
+ Size: qu.IntAllDef(qy_mconf["pool"], 10),
|
|
|
+ }
|
|
|
+ qy_mgo.InitPool()
|
|
|
|
|
|
+
|
|
|
+ coll_name = mconf["collName"].(string)
|
|
|
+ qy_coll_name = qy_mconf["qy_collName"].(string)
|
|
|
+
|
|
|
+ core_element = qu.ObjArrToStringArr(Sysconfig["core_element"].([]interface{}))
|
|
|
+ other_element = qu.ObjArrToStringArr(Sysconfig["other_element"].([]interface{}))
|
|
|
+ score_standard := Sysconfig["score_standard"].(map[string]interface{})
|
|
|
+ total_score = qu.IntAll(score_standard["total_score"])
|
|
|
+ core_max = qu.IntAll(score_standard["core_max"])
|
|
|
+ core_each = qu.IntAll(score_standard["core_each"])
|
|
|
+ other_max = qu.IntAll(score_standard["other_max"])
|
|
|
+ other_each = qu.IntAll(score_standard["other_each"])
|
|
|
+
|
|
|
+ log.Println("采用udp模式")
|
|
|
}
|
|
|
|
|
|
|
|
|
+func mainT() {
|
|
|
+ go checkMapJob()
|
|
|
+ updport := Sysconfig["udpport"].(string)
|
|
|
+ udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
|
+ udpclient.Listen(processUdpMsg)
|
|
|
+ log.Println("Udp服务监听", updport)
|
|
|
+ time.Sleep(99999 * time.Hour)
|
|
|
+}
|
|
|
+
|
|
|
+//快速测试使用
|
|
|
func main() {
|
|
|
|
|
|
+ sid := "4f16936d52c1d9fbf843c60e"
|
|
|
+ eid := "6f16936d52c1d9fbf843c60e"
|
|
|
+ log.Println(sid, "---", eid)
|
|
|
+ mapinfo := map[string]interface{}{}
|
|
|
+ if sid == "" || eid == "" {
|
|
|
+ log.Println("sid,eid参数不能为空")
|
|
|
+ os.Exit(0)
|
|
|
+ }
|
|
|
+ mapinfo["gtid"] = sid
|
|
|
+ mapinfo["lteid"] = eid
|
|
|
+ mapinfo["stop"] = "true"
|
|
|
+ startTask([]byte{}, mapinfo)
|
|
|
+ time.Sleep(99999 * time.Hour)
|
|
|
+
|
|
|
+}
|
|
|
|
|
|
|
|
|
- time.Sleep(99999 * time.Hour)
|
|
|
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
+ switch act {
|
|
|
+ case mu.OP_TYPE_DATA: //上个节点的数据
|
|
|
+ //从表中开始处理
|
|
|
+ var mapInfo map[string]interface{}
|
|
|
+ err := json.Unmarshal(data, &mapInfo)
|
|
|
+ log.Println("err:", err, "mapInfo:", mapInfo)
|
|
|
+ if err != nil {
|
|
|
+ udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
|
|
|
+ } else if mapInfo != nil {
|
|
|
+ taskType := qu.ObjToString(mapInfo["stype"])
|
|
|
+ if taskType == "pingfen" {
|
|
|
+ go startTask(data, mapInfo)
|
|
|
+ } else {
|
|
|
+ log.Println("类别异常... ...")
|
|
|
+ }
|
|
|
+ key, _ := mapInfo["key"].(string)
|
|
|
+ if key == "" {
|
|
|
+ key = "udpok"
|
|
|
+ }
|
|
|
+ udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
|
|
|
+ }
|
|
|
+ case mu.OP_NOOP: //下个节点回应
|
|
|
+ ok := string(data)
|
|
|
+ if ok != "" {
|
|
|
+ log.Println("ok:", ok)
|
|
|
+ udptaskmap.Delete(ok)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func startTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
+
|
|
|
+ //遍历数据
|
|
|
+ log.Println("开始评分流程")
|
|
|
+ defer qu.Catch()
|
|
|
+ //区间id
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
+ "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
+ updateExtract := [][]map[string]interface{}{}//更新需要
|
|
|
+ index:=0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
|
+ if index%10000 == 0 {
|
|
|
+ log.Println("current:", index, tmp["_id"])
|
|
|
+ }
|
|
|
+
|
|
|
+ element_score:=dealWithElementRate(tmp)
|
|
|
+ error_score,abnormal_score:=dealWithErrorRate(tmp)
|
|
|
+ log.Println("元素分:",element_score,"错误分:",error_score,"异常分:",abnormal_score)
|
|
|
+
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "element_score": element_score,
|
|
|
+ "error_score":error_score,
|
|
|
+ "abnormal_score": abnormal_score,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if len(updateExtract) >= 200 {
|
|
|
+ mgo.UpSertBulk(coll_name, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if len(updateExtract) >0 {
|
|
|
+ mgo.UpSertBulk(coll_name, updateExtract...)
|
|
|
+ }
|
|
|
+
|
|
|
+ time.Sleep(60 * time.Second)
|
|
|
+
|
|
|
+ //任务完成,开始发送广播通知下面节点
|
|
|
+ if mapInfo["stop"] == nil {
|
|
|
+ log.Println("评分统计完成-发送udp")
|
|
|
+ for _, to := range nextNode {
|
|
|
+ sid, _ := mapInfo["gtid"].(string)
|
|
|
+ eid, _ := mapInfo["lteid"].(string)
|
|
|
+ key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
|
|
|
+ by, _ := json.Marshal(map[string]interface{}{
|
|
|
+ "gtid": sid,
|
|
|
+ "lteid": eid,
|
|
|
+ "stype": util.ObjToString(to["stype"]),
|
|
|
+ "key": key,
|
|
|
+ })
|
|
|
+ addr := &net.UDPAddr{
|
|
|
+ IP: net.ParseIP(to["addr"].(string)),
|
|
|
+ Port: util.IntAll(to["port"]),
|
|
|
+ }
|
|
|
+ node := &udpNode{by, addr, time.Now().Unix(), 0}
|
|
|
+ udptaskmap.Store(key, node)
|
|
|
+ udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|