rz 2 năm trước cách đây
mục cha
commit
c7965e8fea
8 tập tin đã thay đổi với 229 bổ sung1 xóa
  1. 8 1
      README.md
  2. 5 0
      go.mod
  3. 6 0
      go.sum
  4. 1 0
      jconcurrency.go
  5. 62 0
      map.go
  6. 19 0
      map_test.go
  7. 75 0
      threads.go
  8. 53 0
      threads_test.go

+ 8 - 1
README.md

@@ -1,3 +1,10 @@
 # jconcurrency
 
-并发相关工具,包含并发map、goroutine池
+并发相关工具,包含并发map、goroutine池
+## 目前实现
+- 简单map,提供常用读、写方法
+- 简单的线程并发
+
+## 示例
+
+详见test

+ 5 - 0
go.mod

@@ -0,0 +1,5 @@
+module jconcurrency
+
+go 1.19
+
+require github.com/panjf2000/ants/v2 v2.6.0

+ 6 - 0
go.sum

@@ -0,0 +1,6 @@
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/panjf2000/ants/v2 v2.6.0 h1:xOSpw42m+BMiJ2I33we7h6fYzG4DAlpE1xyI7VS2gxU=
+github.com/panjf2000/ants/v2 v2.6.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=

+ 1 - 0
jconcurrency.go

@@ -0,0 +1 @@
+package jconcurrency

+ 62 - 0
map.go

@@ -0,0 +1,62 @@
+package jconcurrency
+
+import (
+	"sync"
+)
+
+//HashMap对象,安全操作
+type JyMap struct {
+	Lock sync.RWMutex
+	Data map[any]any
+}
+
+func NewJM() *JyMap {
+	return &JyMap{Data: map[any]any{}}
+}
+
+//设置
+func (jm *JyMap) Set(k, v any) {
+	jm.Lock.Lock()
+	jm.Data[k] = v
+	jm.Lock.Unlock()
+}
+
+//获取
+func (jm *JyMap) Get(k any) (v any) {
+	jm.Lock.RLock()
+	defer jm.Lock.RUnlock()
+	return jm.Data[k]
+}
+
+//获取并删除
+func (jm *JyMap) GetAndDel(k any) (v any) {
+	jm.Lock.Lock()
+	defer jm.Lock.Unlock()
+	v = jm.Data[k]
+	delete(jm.Data, k)
+	return
+}
+
+//仅删除
+func (jm *JyMap) Del(k any) {
+	jm.Lock.Lock()
+	defer jm.Lock.Unlock()
+	delete(jm.Data, k)
+	return
+}
+
+//如果不存在则设置
+func (jm *JyMap) SetIfNoExists(k, v any) {
+	jm.Lock.Lock()
+	defer jm.Lock.Unlock()
+	if jm.Data[k] == nil {
+		jm.Data[k] = v
+	}
+}
+
+//map中是否存在
+func (jm *JyMap) IsExists(k any) bool {
+	jm.Lock.RLock()
+	defer jm.Lock.RUnlock()
+	return jm.Data[k] != nil
+}

+ 19 - 0
map_test.go

@@ -0,0 +1,19 @@
+package jconcurrency
+
+import (
+	"log"
+	"testing"
+)
+
+func Test_map(t *testing.T) {
+	jm := NewJM()
+	jm.Set("a", 1)
+	jm.Set(1, 3)
+	log.Println("key 1 :", jm.IsExists(1))
+	jm.Del(1)
+	log.Println("key 1 :", jm.IsExists(1))
+	jm.SetIfNoExists("a", 5)
+	jm.SetIfNoExists("b", 6)
+	log.Println(jm.GetAndDel("b"))
+	log.Println(jm)
+}

+ 75 - 0
threads.go

@@ -0,0 +1,75 @@
+package jconcurrency
+
+/**
+线程处理工具类
+**/
+import (
+	"sync"
+	"time"
+
+	"github.com/panjf2000/ants/v2"
+)
+
+type Thead struct {
+	size int
+	pool chan bool
+	wait *sync.WaitGroup
+	work *ants.PoolWithFunc
+}
+
+//创建线程工具类,如果不使用函数可以传nil,仅使用池锁控制
+func NewThreads(size int, fn func(a any)) *Thead {
+	th := &Thead{size, make(chan bool, size), &sync.WaitGroup{}, nil}
+	if fn != nil {
+		th.work, _ = ants.NewPoolWithFunc(size, fn, ants.WithNonblocking(false))
+	}
+	return th
+}
+
+//在不使用并发函数的时候,仅实用锁池
+func (th *Thead) Open() {
+	th.pool <- true
+	th.wait.Add(1)
+}
+
+//在不使用并发函数的时候,仅释放锁池
+func (th *Thead) Close() {
+	<-th.pool
+	th.wait.Add(-1)
+}
+
+//调用并发函数池,初始化需要传递函数,函数只接收一个参数
+func (th *Thead) Run(args any) {
+	th.Open()
+	defer th.Close()
+	_ = th.work.Invoke(args)
+}
+
+//一般不用此方法
+func (th *Thead) RunCommon(fn func(arg any), args any) {
+	th.Open()
+	go func() {
+		defer th.Close()
+		fn(args)
+	}()
+}
+
+//等待所有线程结束
+func (th *Thead) Wait() {
+	th.wait.Wait()
+	if th.work != nil {
+		//连续3次没有任务进行,即判断任务完成
+		i := 0
+		for {
+			time.Sleep(10 * time.Millisecond)
+			if th.work.Free() == th.size {
+				i++
+			} else {
+				i = 0
+			}
+			if i > 3 {
+				break
+			}
+		}
+	}
+}

+ 53 - 0
threads_test.go

@@ -0,0 +1,53 @@
+package jconcurrency
+
+import (
+	"fmt"
+	"log"
+	"testing"
+)
+
+//调用并发包实现,只接收一个参数,构造一个结构体,使用的时候强转一下,如下。推荐
+func Test_threadRunStruct(t *testing.T) {
+	th := NewThreads(10, TFuncWithStruct)
+	jm := NewJM()
+	for i := 0; i < 1000; i++ {
+		//调用方法时,传递结构体
+		th.Run(Args{i, jm})
+	}
+	th.Wait()
+	log.Println("====", len(jm.Data))
+
+}
+
+type Args struct {
+	num int
+	jm  *JyMap
+}
+
+//使用方法时,强转结构体
+func TFuncWithStruct(s any) {
+	as := s.(Args)
+	as.jm.Set(as.num, as.num)
+}
+
+//调用并发包实现,只接收一个参数
+func Test_threadRun(t *testing.T) {
+	th := NewThreads(10, testFuncWithAny)
+	for i := 0; i < 1000; i++ {
+		th.Run(i)
+	}
+
+}
+
+//一般实现,只接入一个any参数
+func Test_threadCommon(t *testing.T) {
+	th := NewThreads(10, nil)
+	for i := 0; i < 1000; i++ {
+		th.RunCommon(testFuncWithAny, fmt.Sprintf("AA%dAA", i))
+	}
+
+}
+
+func testFuncWithAny(a any) {
+	log.Println(a)
+}