package jconcurrency /** 线程处理工具类 **/ import ( "sync" "time" "github.com/panjf2000/ants/v2" ) type Thead struct { size int pool chan bool wait *sync.WaitGroup work *ants.PoolWithFunc } //创建线程工具类,如果不使用函数可以传nil,仅使用池锁控制 func NewThreads(size int, fn func(a any)) *Thead { th := &Thead{size, make(chan bool, size), &sync.WaitGroup{}, nil} if fn != nil { th.work, _ = ants.NewPoolWithFunc(size, fn, ants.WithNonblocking(false)) } return th } //在不使用并发函数的时候,仅实用锁池 func (th *Thead) Open() { th.pool <- true th.wait.Add(1) } //在不使用并发函数的时候,仅释放锁池 func (th *Thead) Close() { <-th.pool th.wait.Add(-1) } //调用并发函数池,初始化需要传递函数,函数只接收一个参数 func (th *Thead) Run(args any) { th.Open() defer th.Close() _ = th.work.Invoke(args) } //一般不用此方法 func (th *Thead) RunCommon(fn func(arg any), args any) { th.Open() go func() { defer th.Close() fn(args) }() } //等待所有线程结束 func (th *Thead) Wait() { th.wait.Wait() if th.work != nil { //连续3次没有任务进行,即判断任务完成 i := 0 for { time.Sleep(10 * time.Millisecond) if th.work.Free() == th.size { i++ } else { i = 0 } if i > 3 { break } } } }