123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- package main
- import (
- "log"
- mu "mfw/util"
- "net"
- "os"
- "qfw/common/src/qfw/util"
- qu "qfw/util"
- "time"
- )
- var (
- sysconfig map[string]interface{} //配置文件
- mgo *MongodbSim //mongodb操作对象
- qy_mgo *MongodbSim
- udpclient mu.UdpClient //udp对象
- nextNode []map[string]interface{} //下节点数组
- siteMap map[string]map[string]interface{} //站点map
- coll_name,qy_coll_name string
- core_element,other_element []map[string]interface{} //要素
- deduct_element []string
- total_score,core_max,core_each,other_max,other_each ,deduct_each int
- specialaddr string
- )
- //站点配置
- func initSite() {
- mconf := sysconfig["mongodb"].(map[string]interface{})
- site := mconf["site"].(map[string]interface{})
- siteMap = make(map[string]map[string]interface{}, 0)
- start := int(time.Now().Unix())
- sess_site := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess_site)
- res_site := sess_site.DB(site["site_dbname"].(string)).C(site["site_coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
- for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
- data_map := map[string]interface{}{
- "area": util.ObjToString(site_dict["area"]),
- "city": util.ObjToString(site_dict["city"]),
- }
- siteMap[util.ObjToString(site_dict["site"])] = data_map
- }
- log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(siteMap))
- }
- //mgo-配置等
- func initMgo() {
- mconf := sysconfig["mongodb"].(map[string]interface{})
- log.Println(mconf)
- mgo = &MongodbSim{
- MongodbAddr: mconf["addrName"].(string),
- DbName: mconf["dbName"].(string),
- Size: qu.IntAllDef(mconf["pool"], 10),
- }
- mgo.InitPool()
- qy_mconf := sysconfig["qy_mongodb"].(map[string]interface{})
- qy_mgo = &MongodbSim{
- MongodbAddr: qy_mconf["qy_addrName"].(string),
- DbName: qy_mconf["qy_dbName"].(string),
- Size: qu.IntAllDef(qy_mconf["pool"], 10),
- UserName: qy_mconf["qy_username"].(string),
- Password: qy_mconf["qy_password"].(string),
- }
- qy_mgo.InitPool()
- coll_name = mconf["collName"].(string)
- qy_coll_name = qy_mconf["qy_collName"].(string)
- core_element = qu.ObjArrToMapArr(sysconfig["core_element"].([]interface{}))
- other_element = qu.ObjArrToMapArr(sysconfig["other_element"].([]interface{}))
- deduct_element =qu.ObjArrToStringArr(sysconfig["deduct_element"].([]interface{}))
- score_standard := sysconfig["score_standard"].(map[string]interface{})
- total_score = qu.IntAll(score_standard["total_score"])
- core_max = qu.IntAll(score_standard["core_max"])
- core_each = qu.IntAll(score_standard["core_each"])
- other_max = qu.IntAll(score_standard["other_max"])
- other_each = qu.IntAll(score_standard["other_each"])
- deduct_each = qu.IntAll(score_standard["deduct_each"])
- specialaddr = sysconfig["specialaddr"].(string)
- }
- //初始化
- func init() {
- qu.ReadConfig(&sysconfig)//加载配置文件
- initMgo()
- initSite()//加载站点
- }
- //正式流程
- func mainT() {
- go checkMapJob()
- updport := sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- time.Sleep(99999 * time.Hour)
- }
- //快速测试使用
- func main() {
- sid := "1f0000000000000000000000"
- eid := "9f0000000000000000000000"
- log.Println(sid, "---", eid)
- mapinfo := map[string]interface{}{}
- if sid == "" || eid == "" {
- log.Println("sid,eid参数不能为空")
- os.Exit(0)
- }
- mapinfo["gtid"] = sid
- mapinfo["lteid"] = eid
- startScoreTask([]byte{}, mapinfo)
- time.Sleep(99999 * time.Hour)
- }
- //udp监听
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- case mu.OP_NOOP: //下个节点回应
- }
- }
- //打分流程-方法
- func startScoreTask(data []byte, mapInfo map[string]interface{}) {
- log.Println("开始评分流程")
- defer qu.Catch()
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- log.Println("查询条件:",q)
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
- updateExtract := [][]map[string]interface{}{}//更新需要
- index:=0
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- element_score,core_score,other_score,element_reason:=dealWithElementRate(tmp)
- error_score,abnormal_score,error_reason,abnormal_reason:=dealWithErrorRate(tmp)
- if index%1000 == 0 {
- log.Println("当前数量:", index, tmp["_id"],"元素分:",element_score,"错误分:",error_score,"异常分:",abnormal_score)
- }
- updateExtract = append(updateExtract, []map[string]interface{}{
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "element_score": element_score,
- "core_score": core_score,
- "other_score": other_score,
- "error_score":error_score,
- "abnormal_score": abnormal_score,
- "quality_reason":map[string]interface{}{
- "element_reason":element_reason,
- "error_reason":error_reason,
- "abnormal_reason":abnormal_reason,
- },
- },
- },
- })
- if len(updateExtract) >= 200 {
- mgo.UpSertBulk(coll_name, updateExtract...)
- updateExtract = [][]map[string]interface{}{}
- }
- tmp = make(map[string]interface{})
- }
- if len(updateExtract) >0 {
- mgo.UpSertBulk(coll_name, updateExtract...)
- }
- log.Println("task quality over - 总计数量",index)
- }
|