123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package main
- import (
- log "github.com/donnie4w/go-logger/logger"
- qu "qfw/util"
- "time"
- )
- var (
- sysconfig map[string]interface{} //配置文件
- data_mgo, source_mgo *MongodbSim
- data_c_name, source_c_name string
- using_machine int
- lastNodeResponse int64
- taskList []map[string]interface{}
- )
- func initMgo() {
- sourceconf := sysconfig["source_mgodb"].(map[string]interface{})
- source_c_name = qu.ObjToString(sourceconf["coll"]) //数据源bidding
- source_mgo = &MongodbSim{
- MongodbAddr: sourceconf["addr"].(string),
- DbName: sourceconf["db"].(string),
- Size: 3,
- UserName: "zhengkun",
- Password: "zk@123123",
- }
- source_mgo.InitPool()
- dataconf := sysconfig["data_mgodb"].(map[string]interface{})
- data_c_name = qu.ObjToString(dataconf["coll"]) //机器源center
- data_mgo = &MongodbSim{
- MongodbAddr: dataconf["addr"].(string),
- DbName: dataconf["db"].(string),
- Size: 3,
- UserName: "zhengkun",
- Password: "zk@123123",
- }
- data_mgo.InitPool()
- taskList = []map[string]interface{}{}
- }
- func initVarData() {
- qu.ReadConfig(&sysconfig)
- initMgo()
- using_machine = qu.IntAll(sysconfig["using_machine"])
- nextNode = qu.ObjArrToMapArr(sysconfig["nextNode"].([]interface{}))
- lastNodeResponse = time.Now().Unix()
- isGetask = false
- }
- //加载抽取
- func initExtractNode() {
- resetExtNodeArr() //重置抽取节点数组
- sess := data_mgo.GetMgoConn()
- defer data_mgo.DestoryMongoConn(sess)
- q := map[string]interface{}{}
- it := sess.DB(data_mgo.DbName).C(data_c_name).Find(&q).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); {
- isuse := qu.IntAll(tmp["isuse"])
- if isuse == 0 {
- invalid_ext_node = append(invalid_ext_node, tmp)
- } else if isuse == 1 {
- using_ext_node = append(using_ext_node, tmp)
- } else if isuse == 2 {
- standby_ext_node = append(standby_ext_node, tmp)
- } else {
- }
- tmp = make(map[string]interface{})
- }
- //根据实际情况~把备用节点~与正常节点综合一下
- for { //可用数量-可变
- if len(using_ext_node) < using_machine {
- if len(standby_ext_node) == 0 {
- break
- }
- tmp_node := standby_ext_node[0]
- using_ext_node = append(using_ext_node, tmp_node)
- standby_ext_node = standby_ext_node[1:]
- } else {
- break
- }
- }
- if len(using_ext_node) <= 0 {
- sendErrMailApi("抽取控制中心~严重错误", "当前无可用机器")
- } else if len(using_ext_node) < using_machine { //不足预设-通知
- sendErrMailApi("抽取控制中心~警告", "当前可用机器不足预设~请检查")
- } else {
- }
- log.Debug("综合后节点~有效~备用~无效", len(using_ext_node), len(standby_ext_node), len(invalid_ext_node))
- }
- //重置抽取
- func resetExtNodeArr() {
- isAction = false
- using_ext_node = []map[string]interface{}{}
- standby_ext_node = []map[string]interface{}{}
- invalid_ext_node = []map[string]interface{}{}
- extractAction = map[string]map[string]interface{}{}
- heartAction = map[string]interface{}{}
- }
|