msclient.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // msclient
  2. package spider
  3. import (
  4. "math/rand"
  5. mu "mfw/util"
  6. "time"
  7. )
  8. type DynamicIPMap struct {
  9. Code string
  10. InvalidTime int64
  11. }
  12. var Msclient *mu.Client
  13. var MsclientFile *mu.Client
  14. var MsclientBid *mu.Client
  15. var MsclientTest *mu.Client
  16. var MsclientChromedp *mu.Client
  17. var MsclientChromedpTest *mu.Client
  18. var Alldownloader map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  19. var AlldownloaderBid map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  20. var AlldownloaderFile map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  21. var AlldownloaderTest map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  22. var AlldownloaderChromedp map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  23. var AlldownloaderChromedpTest map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  24. func processevent(p *mu.Packet) {
  25. defer mu.Catch()
  26. var data []byte
  27. switch p.Event {
  28. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  29. data = p.GetBusinessData()
  30. //log.Println("获取动态地址:", len(data), string(data))
  31. for i := 0; i < len(data)/8; i++ {
  32. code := string(data[i*8 : (i+1)*8])
  33. Alldownloader[code] = DynamicIPMap{
  34. Code: code,
  35. InvalidTime: time.Now().Unix() + 60*10,
  36. }
  37. }
  38. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  39. data = p.GetBusinessData()
  40. //log.Println("删除动态地址:", len(data), string(data))
  41. for i := 0; i < len(data)/8; i++ {
  42. code := string(data[i*8 : (i+1)*8])
  43. delete(Alldownloader, code)
  44. }
  45. }
  46. }
  47. func processeventbid(p *mu.Packet) {
  48. defer mu.Catch()
  49. var data []byte
  50. switch p.Event {
  51. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  52. data = p.GetBusinessData()
  53. //log.Println("获取动态地址:", len(data), string(data))
  54. for i := 0; i < len(data)/8; i++ {
  55. code := string(data[i*8 : (i+1)*8])
  56. AlldownloaderBid[code] = DynamicIPMap{
  57. Code: code,
  58. InvalidTime: time.Now().Unix() + 60*10,
  59. }
  60. }
  61. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  62. data = p.GetBusinessData()
  63. //log.Println("删除动态地址:", len(data), string(data))
  64. for i := 0; i < len(data)/8; i++ {
  65. code := string(data[i*8 : (i+1)*8])
  66. delete(AlldownloaderBid, code)
  67. }
  68. }
  69. }
  70. func processeventFile(p *mu.Packet) {
  71. defer mu.Catch()
  72. var data []byte
  73. switch p.Event {
  74. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  75. data = p.GetBusinessData()
  76. //log.Println("获取动态地址:", len(data), string(data))
  77. for i := 0; i < len(data)/8; i++ {
  78. code := string(data[i*8 : (i+1)*8])
  79. AlldownloaderFile[code] = DynamicIPMap{
  80. Code: code,
  81. InvalidTime: time.Now().Unix() + 60*10,
  82. }
  83. }
  84. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  85. data = p.GetBusinessData()
  86. //log.Println("删除动态地址:", len(data), string(data))
  87. for i := 0; i < len(data)/8; i++ {
  88. code := string(data[i*8 : (i+1)*8])
  89. delete(AlldownloaderFile, code)
  90. }
  91. }
  92. }
  93. func processeventTest(p *mu.Packet) {
  94. defer mu.Catch()
  95. var data []byte
  96. switch p.Event {
  97. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  98. data = p.GetBusinessData()
  99. //log.Println("获取动态地址:", len(data), string(data))
  100. for i := 0; i < len(data)/8; i++ {
  101. code := string(data[i*8 : (i+1)*8])
  102. AlldownloaderTest[code] = DynamicIPMap{
  103. Code: code,
  104. InvalidTime: time.Now().Unix() + 60*10,
  105. }
  106. }
  107. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  108. data = p.GetBusinessData()
  109. //log.Println("删除动态地址:", len(data), string(data))
  110. for i := 0; i < len(data)/8; i++ {
  111. code := string(data[i*8 : (i+1)*8])
  112. delete(AlldownloaderTest, code)
  113. }
  114. }
  115. }
  116. func processeventChromedp(p *mu.Packet) {
  117. defer mu.Catch()
  118. var data []byte
  119. switch p.Event {
  120. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  121. data = p.GetBusinessData()
  122. //log.Println("获取动态地址:", len(data), string(data))
  123. for i := 0; i < len(data)/8; i++ {
  124. code := string(data[i*8 : (i+1)*8])
  125. AlldownloaderChromedp[code] = DynamicIPMap{
  126. Code: code,
  127. InvalidTime: time.Now().Unix() + 60*10,
  128. }
  129. }
  130. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  131. data = p.GetBusinessData()
  132. //log.Println("删除动态地址:", len(data), string(data))
  133. for i := 0; i < len(data)/8; i++ {
  134. code := string(data[i*8 : (i+1)*8])
  135. delete(AlldownloaderChromedp, code)
  136. }
  137. }
  138. }
  139. func processeventChromedpTest(p *mu.Packet) {
  140. defer mu.Catch()
  141. var data []byte
  142. switch p.Event {
  143. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  144. data = p.GetBusinessData()
  145. //log.Println("获取动态地址:", len(data), string(data))
  146. for i := 0; i < len(data)/8; i++ {
  147. code := string(data[i*8 : (i+1)*8])
  148. AlldownloaderChromedp[code] = DynamicIPMap{
  149. Code: code,
  150. InvalidTime: time.Now().Unix() + 60*10,
  151. }
  152. }
  153. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  154. data = p.GetBusinessData()
  155. //log.Println("删除动态地址:", len(data), string(data))
  156. for i := 0; i < len(data)/8; i++ {
  157. code := string(data[i*8 : (i+1)*8])
  158. delete(AlldownloaderChromedp, code)
  159. }
  160. }
  161. }
  162. func gc4Alldownloader() {
  163. n := time.Now().Unix()
  164. for _, v := range Alldownloader {
  165. if v.InvalidTime < n {
  166. delete(Alldownloader, v.Code)
  167. }
  168. }
  169. time.AfterFunc(1*time.Minute, gc4Alldownloader)
  170. }
  171. func gc4AlldownloaderBid() {
  172. n := time.Now().Unix()
  173. for _, v := range AlldownloaderBid {
  174. if v.InvalidTime < n {
  175. delete(AlldownloaderBid, v.Code)
  176. }
  177. }
  178. time.AfterFunc(1*time.Minute, gc4AlldownloaderBid)
  179. }
  180. func gc4AlldownloaderFile() {
  181. n := time.Now().Unix()
  182. for _, v := range AlldownloaderFile {
  183. if v.InvalidTime < n {
  184. delete(AlldownloaderFile, v.Code)
  185. }
  186. }
  187. time.AfterFunc(1*time.Minute, gc4AlldownloaderFile)
  188. }
  189. func gc4AlldownloaderTest() {
  190. n := time.Now().Unix()
  191. for _, v := range AlldownloaderTest {
  192. if v.InvalidTime < n {
  193. delete(AlldownloaderTest, v.Code)
  194. }
  195. }
  196. time.AfterFunc(1*time.Minute, gc4AlldownloaderTest)
  197. }
  198. func gc4AlldownloaderChromedp() {
  199. n := time.Now().Unix()
  200. for _, v := range AlldownloaderChromedp {
  201. if v.InvalidTime < n {
  202. delete(AlldownloaderChromedp, v.Code)
  203. }
  204. }
  205. time.AfterFunc(1*time.Minute, gc4AlldownloaderChromedp)
  206. }
  207. func gc4AlldownloaderChromedpTest() {
  208. n := time.Now().Unix()
  209. for _, v := range AlldownloaderChromedp {
  210. if v.InvalidTime < n {
  211. delete(AlldownloaderChromedp, v.Code)
  212. }
  213. }
  214. time.AfterFunc(1*time.Minute, gc4AlldownloaderChromedpTest)
  215. }
  216. func GetOneDownloader() string {
  217. if len(AlldownloaderTest) < 1 {
  218. return ""
  219. }
  220. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  221. pos := r.Intn(len(AlldownloaderTest))
  222. index := 0
  223. retcode := ""
  224. for k, _ := range AlldownloaderTest {
  225. if index == pos {
  226. retcode = k
  227. break
  228. }
  229. index++
  230. }
  231. return retcode
  232. }
  233. // 初始化,启动消息客户端
  234. func InitMsgClient(serveraddr, serveraddrbid, serveraddrtest, name, namebid, nametest string) {
  235. Msclient, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
  236. MsgServerAddr: serveraddr,
  237. EventHandler: processevent,
  238. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  239. ReadBufferSize: 500,
  240. WriteBufferSize: 500,
  241. })
  242. go gc4Alldownloader()
  243. MsclientBid, _ = mu.NewClient(&mu.ClientConfig{ClientName: namebid,
  244. MsgServerAddr: serveraddrbid,
  245. EventHandler: processeventbid,
  246. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  247. ReadBufferSize: 500,
  248. WriteBufferSize: 500,
  249. })
  250. go gc4AlldownloaderBid()
  251. MsclientTest, _ = mu.NewClient(&mu.ClientConfig{ClientName: nametest,
  252. MsgServerAddr: serveraddrtest,
  253. EventHandler: processeventTest,
  254. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  255. ReadBufferSize: 500,
  256. WriteBufferSize: 500,
  257. })
  258. go gc4AlldownloaderTest()
  259. }
  260. // 初始chrome启动消息客户端
  261. func InitChromeMsgClient(chromeaddr, chrometestaddr, name, nametest string) {
  262. InitMsgClientChromedp(chromeaddr, name)
  263. InitMsgClientChromedpTest(chrometestaddr, nametest)
  264. }
  265. func InitMsgClientFile(serveraddr, name string) {
  266. MsclientFile, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
  267. MsgServerAddr: serveraddr,
  268. EventHandler: processeventFile,
  269. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  270. ReadBufferSize: 500,
  271. WriteBufferSize: 500,
  272. })
  273. go gc4AlldownloaderFile()
  274. }
  275. func InitMsgClientChromedp(serveraddr, name string) {
  276. MsclientChromedp, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
  277. MsgServerAddr: serveraddr,
  278. EventHandler: processeventChromedp,
  279. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  280. ReadBufferSize: 200,
  281. WriteBufferSize: 200,
  282. })
  283. go gc4AlldownloaderChromedp()
  284. }
  285. func InitMsgClientChromedpTest(serveraddr, name string) {
  286. MsclientChromedpTest, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
  287. MsgServerAddr: serveraddr,
  288. EventHandler: processeventChromedpTest,
  289. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  290. ReadBufferSize: 200,
  291. WriteBufferSize: 200,
  292. })
  293. go gc4AlldownloaderChromedpTest()
  294. }