package main import ( "log" mu "mfw/util" "net" "os" "qfw/common/src/qfw/util" qu "qfw/util" "time" ) var ( sysconfig map[string]interface{} //配置文件 mgo *MongodbSim //mongodb操作对象 qy_mgo *MongodbSim udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //下节点数组 siteMap map[string]map[string]interface{} //站点map coll_name,qy_coll_name string core_element,other_element []map[string]interface{} //要素 deduct_element []string total_score,core_max,core_each,other_max,other_each ,deduct_each int specialaddr string ) //站点配置 func initSite() { mconf := sysconfig["mongodb"].(map[string]interface{}) site := mconf["site"].(map[string]interface{}) siteMap = make(map[string]map[string]interface{}, 0) start := int(time.Now().Unix()) sess_site := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess_site) res_site := sess_site.DB(site["site_dbname"].(string)).C(site["site_coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter() for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); { data_map := map[string]interface{}{ "area": util.ObjToString(site_dict["area"]), "city": util.ObjToString(site_dict["city"]), } siteMap[util.ObjToString(site_dict["site"])] = data_map } log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(siteMap)) } //mgo-配置等 func initMgo() { mconf := sysconfig["mongodb"].(map[string]interface{}) log.Println(mconf) mgo = &MongodbSim{ MongodbAddr: mconf["addrName"].(string), DbName: mconf["dbName"].(string), Size: qu.IntAllDef(mconf["pool"], 10), } mgo.InitPool() qy_mconf := sysconfig["qy_mongodb"].(map[string]interface{}) qy_mgo = &MongodbSim{ MongodbAddr: qy_mconf["qy_addrName"].(string), DbName: qy_mconf["qy_dbName"].(string), Size: qu.IntAllDef(qy_mconf["pool"], 10), UserName: qy_mconf["qy_username"].(string), Password: qy_mconf["qy_password"].(string), } qy_mgo.InitPool() coll_name = mconf["collName"].(string) qy_coll_name = qy_mconf["qy_collName"].(string) core_element = qu.ObjArrToMapArr(sysconfig["core_element"].([]interface{})) other_element = qu.ObjArrToMapArr(sysconfig["other_element"].([]interface{})) deduct_element =qu.ObjArrToStringArr(sysconfig["deduct_element"].([]interface{})) score_standard := sysconfig["score_standard"].(map[string]interface{}) total_score = qu.IntAll(score_standard["total_score"]) core_max = qu.IntAll(score_standard["core_max"]) core_each = qu.IntAll(score_standard["core_each"]) other_max = qu.IntAll(score_standard["other_max"]) other_each = qu.IntAll(score_standard["other_each"]) deduct_each = qu.IntAll(score_standard["deduct_each"]) specialaddr = sysconfig["specialaddr"].(string) } //初始化 func init() { qu.ReadConfig(&sysconfig)//加载配置文件 initMgo() initSite()//加载站点 } //正式流程 func mainT() { go checkMapJob() updport := sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } //快速测试使用 func main() { sid := "1f0000000000000000000000" eid := "9f0000000000000000000000" log.Println(sid, "---", eid) mapinfo := map[string]interface{}{} if sid == "" || eid == "" { log.Println("sid,eid参数不能为空") os.Exit(0) } mapinfo["gtid"] = sid mapinfo["lteid"] = eid startScoreTask([]byte{}, mapinfo) time.Sleep(99999 * time.Hour) } //udp监听 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 case mu.OP_NOOP: //下个节点回应 } } //打分流程-方法 func startScoreTask(data []byte, mapInfo map[string]interface{}) { log.Println("开始评分流程") defer qu.Catch() //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(mapInfo["gtid"].(string)), "$lte": StringTOBsonId(mapInfo["lteid"].(string)), }, } log.Println("查询条件:",q) sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter() updateExtract := [][]map[string]interface{}{}//更新需要 index:=0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { element_score,core_score,other_score,element_reason:=dealWithElementRate(tmp) error_score,abnormal_score,error_reason,abnormal_reason:=dealWithErrorRate(tmp) if index%1000 == 0 { log.Println("当前数量:", index, tmp["_id"],"元素分:",element_score,"错误分:",error_score,"异常分:",abnormal_score) } updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "element_score": element_score, "core_score": core_score, "other_score": other_score, "error_score":error_score, "abnormal_score": abnormal_score, "quality_reason":map[string]interface{}{ "element_reason":element_reason, "error_reason":error_reason, "abnormal_reason":abnormal_reason, }, }, }, }) if len(updateExtract) >= 200 { mgo.UpSertBulk(coll_name, updateExtract...) updateExtract = [][]map[string]interface{}{} } tmp = make(map[string]interface{}) } if len(updateExtract) >0 { mgo.UpSertBulk(coll_name, updateExtract...) } log.Println("task quality over - 总计数量",index) }