initdata.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package main
  2. import (
  3. log "github.com/donnie4w/go-logger/logger"
  4. qu "qfw/util"
  5. "time"
  6. )
  7. var (
  8. sysconfig map[string]interface{} //配置文件
  9. data_mgo, source_mgo *MongodbSim
  10. data_c_name, source_c_name string
  11. using_machine int
  12. lastNodeResponse int64
  13. taskList []map[string]interface{}
  14. )
  15. func initMgo() {
  16. sourceconf := sysconfig["source_mgodb"].(map[string]interface{})
  17. source_c_name = qu.ObjToString(sourceconf["coll"]) //数据源bidding
  18. source_mgo = &MongodbSim{
  19. MongodbAddr: sourceconf["addr"].(string),
  20. DbName: sourceconf["db"].(string),
  21. Size: 3,
  22. UserName: "zhengkun",
  23. Password: "zk@123123",
  24. }
  25. source_mgo.InitPool()
  26. dataconf := sysconfig["data_mgodb"].(map[string]interface{})
  27. data_c_name = qu.ObjToString(dataconf["coll"]) //机器源center
  28. data_mgo = &MongodbSim{
  29. MongodbAddr: dataconf["addr"].(string),
  30. DbName: dataconf["db"].(string),
  31. Size: 3,
  32. UserName: "zhengkun",
  33. Password: "zk@123123",
  34. }
  35. data_mgo.InitPool()
  36. taskList = []map[string]interface{}{}
  37. }
  38. func initVarData() {
  39. qu.ReadConfig(&sysconfig)
  40. initMgo()
  41. using_machine = qu.IntAll(sysconfig["using_machine"])
  42. nextNode = qu.ObjArrToMapArr(sysconfig["nextNode"].([]interface{}))
  43. lastNodeResponse = time.Now().Unix()
  44. isGetask = false
  45. }
  46. //加载抽取
  47. func initExtractNode() {
  48. resetExtNodeArr() //重置抽取节点数组
  49. sess := data_mgo.GetMgoConn()
  50. defer data_mgo.DestoryMongoConn(sess)
  51. q := map[string]interface{}{}
  52. it := sess.DB(data_mgo.DbName).C(data_c_name).Find(&q).Iter()
  53. for tmp := make(map[string]interface{}); it.Next(&tmp); {
  54. isuse := qu.IntAll(tmp["isuse"])
  55. if isuse == 0 {
  56. invalid_ext_node = append(invalid_ext_node, tmp)
  57. } else if isuse == 1 {
  58. using_ext_node = append(using_ext_node, tmp)
  59. } else if isuse == 2 {
  60. standby_ext_node = append(standby_ext_node, tmp)
  61. } else {
  62. }
  63. tmp = make(map[string]interface{})
  64. }
  65. //根据实际情况~把备用节点~与正常节点综合一下
  66. for { //可用数量-可变
  67. if len(using_ext_node) < using_machine {
  68. if len(standby_ext_node) == 0 {
  69. break
  70. }
  71. tmp_node := standby_ext_node[0]
  72. using_ext_node = append(using_ext_node, tmp_node)
  73. standby_ext_node = standby_ext_node[1:]
  74. } else {
  75. break
  76. }
  77. }
  78. if len(using_ext_node) <= 0 {
  79. sendErrMailApi("抽取控制中心~严重错误", "当前无可用机器")
  80. } else if len(using_ext_node) < using_machine { //不足预设-通知
  81. sendErrMailApi("抽取控制中心~警告", "当前可用机器不足预设~请检查")
  82. } else {
  83. }
  84. log.Debug("综合后节点~有效~备用~无效", len(using_ext_node), len(standby_ext_node), len(invalid_ext_node))
  85. }
  86. //重置抽取
  87. func resetExtNodeArr() {
  88. isAction = false
  89. using_ext_node = []map[string]interface{}{}
  90. standby_ext_node = []map[string]interface{}{}
  91. invalid_ext_node = []map[string]interface{}{}
  92. extractAction = map[string]map[string]interface{}{}
  93. heartAction = map[string]interface{}{}
  94. }