|
@@ -0,0 +1,182 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "log"
|
|
|
+ mu "mfw/util"
|
|
|
+ "net"
|
|
|
+ "os"
|
|
|
+ "qfw/common/src/qfw/util"
|
|
|
+ qu "qfw/util"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+var (
|
|
|
+ sysconfig map[string]interface{} //配置文件
|
|
|
+ mgo *MongodbSim //mongodb操作对象
|
|
|
+ qy_mgo *MongodbSim
|
|
|
+ udpclient mu.UdpClient //udp对象
|
|
|
+ nextNode []map[string]interface{} //下节点数组
|
|
|
+ siteMap map[string]map[string]interface{} //站点map
|
|
|
+ coll_name,qy_coll_name string
|
|
|
+ core_element,other_element []map[string]interface{} //要素
|
|
|
+ deduct_element []string
|
|
|
+ total_score,core_max,core_each,other_max,other_each ,deduct_each int
|
|
|
+ specialaddr string
|
|
|
+)
|
|
|
+
|
|
|
+//站点配置
|
|
|
+func initSite() {
|
|
|
+ mconf := sysconfig["mongodb"].(map[string]interface{})
|
|
|
+ site := mconf["site"].(map[string]interface{})
|
|
|
+ siteMap = make(map[string]map[string]interface{}, 0)
|
|
|
+ start := int(time.Now().Unix())
|
|
|
+ sess_site := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess_site)
|
|
|
+ res_site := sess_site.DB(site["site_dbname"].(string)).C(site["site_coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
|
|
|
+ for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
|
|
|
+ data_map := map[string]interface{}{
|
|
|
+ "area": util.ObjToString(site_dict["area"]),
|
|
|
+ "city": util.ObjToString(site_dict["city"]),
|
|
|
+ }
|
|
|
+ siteMap[util.ObjToString(site_dict["site"])] = data_map
|
|
|
+ }
|
|
|
+ log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(siteMap))
|
|
|
+}
|
|
|
+
|
|
|
+//mgo-配置等
|
|
|
+func initMgo() {
|
|
|
+ mconf := sysconfig["mongodb"].(map[string]interface{})
|
|
|
+ log.Println(mconf)
|
|
|
+ mgo = &MongodbSim{
|
|
|
+ MongodbAddr: mconf["addrName"].(string),
|
|
|
+ DbName: mconf["dbName"].(string),
|
|
|
+ Size: qu.IntAllDef(mconf["pool"], 10),
|
|
|
+ }
|
|
|
+ mgo.InitPool()
|
|
|
+
|
|
|
+ qy_mconf := sysconfig["qy_mongodb"].(map[string]interface{})
|
|
|
+ qy_mgo = &MongodbSim{
|
|
|
+ MongodbAddr: qy_mconf["qy_addrName"].(string),
|
|
|
+ DbName: qy_mconf["qy_dbName"].(string),
|
|
|
+ Size: qu.IntAllDef(qy_mconf["pool"], 10),
|
|
|
+ UserName: qy_mconf["qy_username"].(string),
|
|
|
+ Password: qy_mconf["qy_password"].(string),
|
|
|
+ }
|
|
|
+ qy_mgo.InitPool()
|
|
|
+
|
|
|
+ coll_name = mconf["collName"].(string)
|
|
|
+ qy_coll_name = qy_mconf["qy_collName"].(string)
|
|
|
+
|
|
|
+ core_element = qu.ObjArrToMapArr(sysconfig["core_element"].([]interface{}))
|
|
|
+ other_element = qu.ObjArrToMapArr(sysconfig["other_element"].([]interface{}))
|
|
|
+ deduct_element =qu.ObjArrToStringArr(sysconfig["deduct_element"].([]interface{}))
|
|
|
+
|
|
|
+ score_standard := sysconfig["score_standard"].(map[string]interface{})
|
|
|
+ total_score = qu.IntAll(score_standard["total_score"])
|
|
|
+ core_max = qu.IntAll(score_standard["core_max"])
|
|
|
+ core_each = qu.IntAll(score_standard["core_each"])
|
|
|
+ other_max = qu.IntAll(score_standard["other_max"])
|
|
|
+ other_each = qu.IntAll(score_standard["other_each"])
|
|
|
+ deduct_each = qu.IntAll(score_standard["deduct_each"])
|
|
|
+
|
|
|
+ specialaddr = sysconfig["specialaddr"].(string)
|
|
|
+}
|
|
|
+//初始化
|
|
|
+func init() {
|
|
|
+ qu.ReadConfig(&sysconfig)//加载配置文件
|
|
|
+ initMgo()
|
|
|
+ initSite()//加载站点
|
|
|
+}
|
|
|
+//正式流程
|
|
|
+func mainT() {
|
|
|
+ go checkMapJob()
|
|
|
+ updport := sysconfig["udpport"].(string)
|
|
|
+ udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
|
+ udpclient.Listen(processUdpMsg)
|
|
|
+ log.Println("Udp服务监听", updport)
|
|
|
+ time.Sleep(99999 * time.Hour)
|
|
|
+}
|
|
|
+//快速测试使用
|
|
|
+func main() {
|
|
|
+ sid := "1f0000000000000000000000"
|
|
|
+ eid := "9f0000000000000000000000"
|
|
|
+ log.Println(sid, "---", eid)
|
|
|
+ mapinfo := map[string]interface{}{}
|
|
|
+ if sid == "" || eid == "" {
|
|
|
+ log.Println("sid,eid参数不能为空")
|
|
|
+ os.Exit(0)
|
|
|
+ }
|
|
|
+ mapinfo["gtid"] = sid
|
|
|
+ mapinfo["lteid"] = eid
|
|
|
+ startScoreTask([]byte{}, mapinfo)
|
|
|
+ time.Sleep(99999 * time.Hour)
|
|
|
+}
|
|
|
+
|
|
|
+//udp监听
|
|
|
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
+ switch act {
|
|
|
+ case mu.OP_TYPE_DATA: //上个节点的数据
|
|
|
+
|
|
|
+ case mu.OP_NOOP: //下个节点回应
|
|
|
+
|
|
|
+ }
|
|
|
+}
|
|
|
+//打分流程-方法
|
|
|
+func startScoreTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
+ log.Println("开始评分流程")
|
|
|
+ defer qu.Catch()
|
|
|
+ //区间id
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
+ "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ log.Println("查询条件:",q)
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
+ updateExtract := [][]map[string]interface{}{}//更新需要
|
|
|
+ index:=0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
|
+ element_score,core_score,other_score,element_reason:=dealWithElementRate(tmp)
|
|
|
+ error_score,abnormal_score,error_reason,abnormal_reason:=dealWithErrorRate(tmp)
|
|
|
+
|
|
|
+ if index%1000 == 0 {
|
|
|
+ log.Println("当前数量:", index, tmp["_id"],"元素分:",element_score,"错误分:",error_score,"异常分:",abnormal_score)
|
|
|
+ }
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "element_score": element_score,
|
|
|
+ "core_score": core_score,
|
|
|
+ "other_score": other_score,
|
|
|
+ "error_score":error_score,
|
|
|
+ "abnormal_score": abnormal_score,
|
|
|
+ "quality_reason":map[string]interface{}{
|
|
|
+ "element_reason":element_reason,
|
|
|
+ "error_reason":error_reason,
|
|
|
+ "abnormal_reason":abnormal_reason,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if len(updateExtract) >= 200 {
|
|
|
+ mgo.UpSertBulk(coll_name, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(updateExtract) >0 {
|
|
|
+ mgo.UpSertBulk(coll_name, updateExtract...)
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("task quality over - 总计数量",index)
|
|
|
+
|
|
|
+}
|