main.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/cron"
  5. "log"
  6. "net/http"
  7. qu "qfw/util"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. var (
  13. sysconfig map[string]interface{} //配置文件
  14. port string
  15. save_mgo *MongodbSim
  16. idsArr,extArr []string
  17. vpsTmp map[string]map[string]interface{}
  18. extTmp map[string]interface{}
  19. during,isErr int64
  20. test map[string]interface{}
  21. updatelock sync.Mutex
  22. save_coll_name string
  23. )
  24. func init() {
  25. //加载配置文件
  26. log.Println("加载...")
  27. qu.ReadConfig(&sysconfig)
  28. port = sysconfig["port"].(string)
  29. arr1 := sysconfig["vpsIDs"].([]interface{})
  30. idsArr = qu.ObjArrToStringArr(arr1)
  31. arr2 := sysconfig["extIDs"].([]interface{})
  32. extArr = qu.ObjArrToStringArr(arr2)
  33. resetRecordData()
  34. during = qu.Int64All(sysconfig["during"])
  35. isErr = qu.Int64All(sysconfig["isErr"])
  36. saveconf := sysconfig["save_mgodb"].(map[string]interface{})
  37. save_coll_name = qu.ObjToString(saveconf["coll"])
  38. save_mgo = &MongodbSim{
  39. MongodbAddr: saveconf["addr"].(string),
  40. DbName: saveconf["db"].(string),
  41. Size: qu.IntAllDef(saveconf["pool"], 5),
  42. }
  43. save_mgo.InitPool()
  44. log.Println("准备完毕...")
  45. }
  46. func main() {
  47. //http://monitor.spdata.jianyu360.com/,程序端口7811
  48. addr := ":"+port
  49. http.HandleFunc("/", handler)
  50. go http.ListenAndServe(addr, nil)
  51. //每隔1分钟执行一次:0 */1 * * * ? 每隔5秒执行一次:*/5 * * * * ?
  52. spec :=fmt.Sprintf("30 */%d * * * ?",during)
  53. spec_reset := "0 0 0 * * ?"
  54. spec_extract :=fmt.Sprintf("20 */%d * * * ?",during)
  55. c := cron.New()
  56. c.AddFunc(spec, func() { taskFinishing()})
  57. c.AddFunc(spec_reset, func() { resetRecordData()})
  58. c.AddFunc(spec_extract, func() { extractRunning()})
  59. c.Start()
  60. time.Sleep(99999 * time.Hour)
  61. }
  62. func handler(w http.ResponseWriter, r *http.Request) {
  63. updatelock.Lock()
  64. r.ParseForm() //解析参数,默认是不会解析的
  65. if r.Method == "GET" {
  66. vpsid ,process,isProMail:= "",int64(0),int64(0)
  67. extid :=""
  68. for k, v := range r.Form {
  69. if k=="id" {
  70. vpsid = strings.Join(v, "")
  71. isProMail = qu.Int64All(vpsTmp[vpsid]["isProMail"])
  72. }else if k=="process" {
  73. process = qu.Int64All(strings.Join(v, ""))
  74. if process==0 {
  75. isProMail = 0
  76. }
  77. }else if k=="extract" {
  78. extid = strings.Join(v, "")
  79. }else {
  80. }
  81. }
  82. if vpsid!="" {
  83. vpsTmp[vpsid] = map[string]interface{}{
  84. "isHeart":1,
  85. "isErrNum":0,
  86. "isVpsMail":0, //收到心跳-vps邮件重置为0,可以发
  87. "isProcess":process,
  88. "isProMail":isProMail,
  89. }
  90. }
  91. if extid!="" {
  92. extTmp[extid] = time.Now().Unix()
  93. }
  94. } else if r.Method == "POST" {
  95. } else {
  96. }
  97. updatelock.Unlock()
  98. }
  99. //重置数据
  100. func resetRecordData() {
  101. updatelock.Lock()
  102. vpsTmp = make(map[string]map[string]interface{},0)
  103. for _,v := range idsArr{
  104. id := qu.ObjToString(v)
  105. vpsTmp[id] = map[string]interface{}{
  106. "isHeart":0,
  107. "isErrNum":0,
  108. "isProcess" : 0,
  109. "isVpsMail":0,
  110. "isProMail":0,
  111. }
  112. }
  113. log.Println("重置数据vps...",len(vpsTmp))
  114. extTmp = make(map[string]interface{},0)
  115. for _,v := range extArr{
  116. extid := qu.ObjToString(v)
  117. extTmp[extid] = time.Now().Unix()
  118. }
  119. log.Println("重置ext数据...")
  120. updatelock.Unlock()
  121. }
  122. //不断监听处理
  123. func taskFinishing() {
  124. //加锁
  125. updatelock.Lock()
  126. log.Println("...处理一次...")
  127. isVpsMailContent,isProMailContent:= "",""
  128. for _ , vpsid := range idsArr {
  129. //此标识-是否正常
  130. //log.Println("原:",dataTmp[vpsid])
  131. isHeart,isProcess:= qu.Int64All(vpsTmp[vpsid]["isHeart"]),qu.Int64All(vpsTmp[vpsid]["isProcess"])
  132. isErrNum := int64(0)
  133. isVpsMail,isProMail := qu.Int64All(vpsTmp[vpsid]["isVpsMail"]),qu.Int64All(vpsTmp[vpsid]["isProMail"])
  134. if isVpsMail == 1 { //送过邮件了
  135. //log.Println("发过vps邮件","心跳:",isHeart,"次数:",isErrNum,"下载器:",isProcess,"vps邮件:",isVpsMail,"pro邮件:",isProMail)
  136. continue
  137. }
  138. if isHeart == 0 { //未接收心跳反应,错误+1
  139. isErrNum = qu.Int64All(vpsTmp[vpsid]["isErrNum"])
  140. isErrNum ++
  141. }
  142. if isErrNum > isErr { //错误超过一定次数,发邮件vps异常
  143. isErrNum = 0
  144. if isVpsMailContent == ""{
  145. isVpsMailContent = vpsid
  146. }else {
  147. isVpsMailContent = isVpsMailContent+","+vpsid
  148. }
  149. isVpsMail = 1
  150. }
  151. if isProcess == 1 && isProMail==0 {//下载器异常-未发送过下载器异常情况
  152. if isProMailContent == ""{
  153. isProMailContent = vpsid
  154. }else {
  155. isProMailContent = isProMailContent+","+vpsid
  156. }
  157. isProMail = 1
  158. isProcess = 0
  159. }
  160. //log.Println("处理后:","心跳:",0,"次数:",isErrNum,"下载器:",isProcess,"vps邮件:",isVpsMail,"pro邮件:",isProMail)
  161. vpsTmp[vpsid] = map[string]interface{}{
  162. "isHeart":0,
  163. "isErrNum":isErrNum,
  164. "isProcess":isProcess,
  165. "isVpsMail":isVpsMail,
  166. "isProMail" : isProMail,
  167. }
  168. }
  169. //log.Println("处理后",isProMailContent)
  170. if isVpsMailContent!=""{
  171. log.Println("发邮件:vps异常...",isVpsMailContent)
  172. comeintime :=qu.Int64All(time.Now().Unix())//日志记录
  173. save_mgo.Save(save_coll_name, map[string]interface{}{
  174. "name":"vps",
  175. "comeintime":comeintime,
  176. "date":qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT),
  177. "detail" : isVpsMailContent,
  178. })
  179. sendErrVpsMailApi("vps",isVpsMailContent)
  180. }else {
  181. if isProMailContent !="" {
  182. log.Println("发邮件:下载器异常...",isProMailContent)
  183. comeintime :=qu.Int64All(time.Now().Unix())//日志记录
  184. save_mgo.Save(save_coll_name, map[string]interface{}{
  185. "name":"下载器",
  186. "comeintime":comeintime,
  187. "date":qu.FormatDateByInt64(&comeintime, qu.DATEFORMAT),
  188. "detail" : isProMailContent,
  189. })
  190. sendErrVpsMailApi("下载器异常",isProMailContent)
  191. }
  192. }
  193. updatelock.Unlock()
  194. }
  195. //抽取活跃监测
  196. func extractRunning() {
  197. log.Println("...抽取活跃处理一次...")
  198. isSend := false
  199. for _,v := range extTmp{
  200. t := qu.Int64All(v)
  201. now_time := time.Now().Unix()
  202. if now_time-t > int64(1800) {
  203. isSend = true//有异常发邮件
  204. break
  205. }
  206. }
  207. if isSend {
  208. updatelock.Lock()
  209. extTmp = make(map[string]interface{},0)
  210. for _,v := range extArr{
  211. extid := qu.ObjToString(v)
  212. extTmp[extid] = time.Now().Unix()
  213. }
  214. //发送邮件
  215. sendErrExtMailApi("抽取程序异常","抽取程序超半小时未响应...清检查")
  216. updatelock.Unlock()
  217. }
  218. }