threads.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package jconcurrency
  2. /**
  3. 线程处理工具类
  4. **/
  5. import (
  6. "sync"
  7. "time"
  8. "github.com/panjf2000/ants/v2"
  9. )
  10. type Thead struct {
  11. size int
  12. pool chan bool
  13. wait *sync.WaitGroup
  14. work *ants.PoolWithFunc
  15. }
  16. //创建线程工具类,如果不使用函数可以传nil,仅使用池锁控制
  17. func NewThreads(size int, fn func(a any)) *Thead {
  18. th := &Thead{size, make(chan bool, size), &sync.WaitGroup{}, nil}
  19. if fn != nil {
  20. th.work, _ = ants.NewPoolWithFunc(size, fn, ants.WithNonblocking(false))
  21. }
  22. return th
  23. }
  24. //在不使用并发函数的时候,仅实用锁池
  25. func (th *Thead) Open() {
  26. th.pool <- true
  27. th.wait.Add(1)
  28. }
  29. //在不使用并发函数的时候,仅释放锁池
  30. func (th *Thead) Close() {
  31. <-th.pool
  32. th.wait.Add(-1)
  33. }
  34. //调用并发函数池,初始化需要传递函数,函数只接收一个参数
  35. func (th *Thead) Run(args any) {
  36. th.Open()
  37. defer th.Close()
  38. _ = th.work.Invoke(args)
  39. }
  40. //一般不用此方法
  41. func (th *Thead) RunCommon(fn func(arg any), args any) {
  42. th.Open()
  43. go func() {
  44. defer th.Close()
  45. fn(args)
  46. }()
  47. }
  48. //等待所有线程结束
  49. func (th *Thead) Wait() {
  50. th.wait.Wait()
  51. if th.work != nil {
  52. //连续3次没有任务进行,即判断任务完成
  53. i := 0
  54. for {
  55. time.Sleep(10 * time.Millisecond)
  56. if th.work.Free() == th.size {
  57. i++
  58. } else {
  59. i = 0
  60. }
  61. if i > 3 {
  62. break
  63. }
  64. }
  65. }
  66. }