123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- package jconcurrency
- /**
- 线程处理工具类
- **/
- import (
- "sync"
- "time"
- "github.com/panjf2000/ants/v2"
- )
- type Thead struct {
- size int
- pool chan bool
- wait *sync.WaitGroup
- work *ants.PoolWithFunc
- }
- //创建线程工具类,如果不使用函数可以传nil,仅使用池锁控制
- func NewThreads(size int, fn func(a any)) *Thead {
- th := &Thead{size, make(chan bool, size), &sync.WaitGroup{}, nil}
- if fn != nil {
- th.work, _ = ants.NewPoolWithFunc(size, fn, ants.WithNonblocking(false))
- }
- return th
- }
- //在不使用并发函数的时候,仅实用锁池
- func (th *Thead) Open() {
- th.pool <- true
- th.wait.Add(1)
- }
- //在不使用并发函数的时候,仅释放锁池
- func (th *Thead) Close() {
- <-th.pool
- th.wait.Add(-1)
- }
- //调用并发函数池,初始化需要传递函数,函数只接收一个参数
- func (th *Thead) Run(args any) {
- th.Open()
- defer th.Close()
- _ = th.work.Invoke(args)
- }
- //一般不用此方法
- func (th *Thead) RunCommon(fn func(arg any), args any) {
- th.Open()
- go func() {
- defer th.Close()
- fn(args)
- }()
- }
- //等待所有线程结束
- func (th *Thead) Wait() {
- th.wait.Wait()
- if th.work != nil {
- //连续3次没有任务进行,即判断任务完成
- i := 0
- for {
- time.Sleep(10 * time.Millisecond)
- if th.work.Free() == th.size {
- i++
- } else {
- i = 0
- }
- if i > 3 {
- break
- }
- }
- }
- }
|