msclient.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. // msclient
  2. package spider
  3. import (
  4. "math/rand"
  5. mu "mfw/util"
  6. "time"
  7. )
  8. //
  9. type DynamicIPMap struct {
  10. Code string
  11. InvalidTime int64
  12. }
  13. var Msclient *mu.Client
  14. var MsclientFile *mu.Client
  15. var MsclientBid *mu.Client
  16. var MsclientTest *mu.Client
  17. var Alldownloader map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  18. var AlldownloaderBid map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  19. var AlldownloaderFile map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  20. var AlldownloaderTest map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  21. //
  22. func processevent(p *mu.Packet) {
  23. defer mu.Catch()
  24. var data []byte
  25. switch p.Event {
  26. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  27. data = p.GetBusinessData()
  28. //log.Println("获取动态地址:", len(data), string(data))
  29. for i := 0; i < len(data)/8; i++ {
  30. code := string(data[i*8 : (i+1)*8])
  31. Alldownloader[code] = DynamicIPMap{
  32. Code: code,
  33. InvalidTime: time.Now().Unix() + 60*10,
  34. }
  35. }
  36. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  37. data = p.GetBusinessData()
  38. //log.Println("删除动态地址:", len(data), string(data))
  39. for i := 0; i < len(data)/8; i++ {
  40. code := string(data[i*8 : (i+1)*8])
  41. delete(Alldownloader, code)
  42. }
  43. }
  44. }
  45. func processeventbid(p *mu.Packet) {
  46. defer mu.Catch()
  47. var data []byte
  48. switch p.Event {
  49. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  50. data = p.GetBusinessData()
  51. //log.Println("获取动态地址:", len(data), string(data))
  52. for i := 0; i < len(data)/8; i++ {
  53. code := string(data[i*8 : (i+1)*8])
  54. AlldownloaderBid[code] = DynamicIPMap{
  55. Code: code,
  56. InvalidTime: time.Now().Unix() + 60*10,
  57. }
  58. }
  59. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  60. data = p.GetBusinessData()
  61. //log.Println("删除动态地址:", len(data), string(data))
  62. for i := 0; i < len(data)/8; i++ {
  63. code := string(data[i*8 : (i+1)*8])
  64. delete(AlldownloaderBid, code)
  65. }
  66. }
  67. }
  68. func processeventFile(p *mu.Packet) {
  69. defer mu.Catch()
  70. var data []byte
  71. switch p.Event {
  72. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  73. data = p.GetBusinessData()
  74. //log.Println("获取动态地址:", len(data), string(data))
  75. for i := 0; i < len(data)/8; i++ {
  76. code := string(data[i*8 : (i+1)*8])
  77. AlldownloaderFile[code] = DynamicIPMap{
  78. Code: code,
  79. InvalidTime: time.Now().Unix() + 60*10,
  80. }
  81. }
  82. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  83. data = p.GetBusinessData()
  84. //log.Println("删除动态地址:", len(data), string(data))
  85. for i := 0; i < len(data)/8; i++ {
  86. code := string(data[i*8 : (i+1)*8])
  87. delete(AlldownloaderFile, code)
  88. }
  89. }
  90. }
  91. func processeventTest(p *mu.Packet) {
  92. defer mu.Catch()
  93. var data []byte
  94. switch p.Event {
  95. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  96. data = p.GetBusinessData()
  97. //log.Println("获取动态地址:", len(data), string(data))
  98. for i := 0; i < len(data)/8; i++ {
  99. code := string(data[i*8 : (i+1)*8])
  100. AlldownloaderTest[code] = DynamicIPMap{
  101. Code: code,
  102. InvalidTime: time.Now().Unix() + 60*10,
  103. }
  104. }
  105. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  106. data = p.GetBusinessData()
  107. //log.Println("删除动态地址:", len(data), string(data))
  108. for i := 0; i < len(data)/8; i++ {
  109. code := string(data[i*8 : (i+1)*8])
  110. delete(AlldownloaderTest, code)
  111. }
  112. }
  113. }
  114. //
  115. func gc4Alldownloader() {
  116. n := time.Now().Unix()
  117. for _, v := range Alldownloader {
  118. if v.InvalidTime < n {
  119. delete(Alldownloader, v.Code)
  120. }
  121. }
  122. time.AfterFunc(1*time.Minute, gc4Alldownloader)
  123. }
  124. func gc4AlldownloaderBid() {
  125. n := time.Now().Unix()
  126. for _, v := range AlldownloaderBid {
  127. if v.InvalidTime < n {
  128. delete(AlldownloaderBid, v.Code)
  129. }
  130. }
  131. time.AfterFunc(1*time.Minute, gc4AlldownloaderBid)
  132. }
  133. func gc4AlldownloaderFile() {
  134. n := time.Now().Unix()
  135. for _, v := range AlldownloaderFile {
  136. if v.InvalidTime < n {
  137. delete(AlldownloaderFile, v.Code)
  138. }
  139. }
  140. time.AfterFunc(1*time.Minute, gc4AlldownloaderFile)
  141. }
  142. func gc4AlldownloaderTest() {
  143. n := time.Now().Unix()
  144. for _, v := range AlldownloaderTest {
  145. if v.InvalidTime < n {
  146. delete(AlldownloaderTest, v.Code)
  147. }
  148. }
  149. time.AfterFunc(1*time.Minute, gc4AlldownloaderTest)
  150. }
  151. //
  152. func GetOneDownloader() string {
  153. if len(AlldownloaderTest) < 1 {
  154. return ""
  155. }
  156. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  157. pos := r.Intn(len(AlldownloaderTest))
  158. index := 0
  159. retcode := ""
  160. for k, _ := range AlldownloaderTest {
  161. if index == pos {
  162. retcode = k
  163. break
  164. }
  165. index++
  166. }
  167. return retcode
  168. }
  169. //初始化,启动消息客户端
  170. func InitMsgClient(serveraddr, serveraddrbid, serveraddrtest, name, namebid, nametest string) {
  171. Msclient, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
  172. MsgServerAddr: serveraddr,
  173. EventHandler: processevent,
  174. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  175. ReadBufferSize: 10,
  176. WriteBufferSize: 10,
  177. })
  178. go gc4Alldownloader()
  179. MsclientBid, _ = mu.NewClient(&mu.ClientConfig{ClientName: namebid,
  180. MsgServerAddr: serveraddrbid,
  181. EventHandler: processeventbid,
  182. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  183. ReadBufferSize: 10,
  184. WriteBufferSize: 10,
  185. })
  186. go gc4AlldownloaderBid()
  187. MsclientTest, _ = mu.NewClient(&mu.ClientConfig{ClientName: nametest,
  188. MsgServerAddr: serveraddrtest,
  189. EventHandler: processeventTest,
  190. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  191. ReadBufferSize: 10,
  192. WriteBufferSize: 10,
  193. })
  194. go gc4AlldownloaderTest()
  195. }
  196. func InitMsgClientFile(serveraddr, name string) {
  197. MsclientFile, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
  198. MsgServerAddr: serveraddr,
  199. EventHandler: processeventFile,
  200. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  201. ReadBufferSize: 200,
  202. WriteBufferSize: 200,
  203. })
  204. go gc4AlldownloaderFile()
  205. }