Tao Zhang 4 ani în urmă
comite
eeeef4e752

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+*/.DS_Store
+bin

+ 8 - 0
.idea/.gitignore

@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
+# Editor-based HTTP Client requests
+/httpRequests/

+ 20 - 0
.idea/inspectionProfiles/Project_Default.xml

@@ -0,0 +1,20 @@
+<component name="InspectionProjectProfileManager">
+  <profile version="1.0">
+    <option name="myName" value="Project Default" />
+    <inspection_tool class="GoUnhandledErrorResult" enabled="true" level="WARNING" enabled_by_default="true">
+      <methods>
+        <method importPath="hash" receiver="Hash" name="Write" />
+        <method importPath="strings" receiver="*Builder" name="Write" />
+        <method importPath="strings" receiver="*Builder" name="WriteByte" />
+        <method importPath="bytes" receiver="*Buffer" name="WriteRune" />
+        <method importPath="bytes" receiver="*Buffer" name="Write" />
+        <method importPath="bytes" receiver="*Buffer" name="WriteString" />
+        <method importPath="strings" receiver="*Builder" name="WriteString" />
+        <method importPath="bytes" receiver="*Buffer" name="WriteByte" />
+        <method importPath="strings" receiver="*Builder" name="WriteRune" />
+        <method importPath="math/rand" receiver="*Rand" name="Read" />
+        <method importPath="google.golang.org/grpc" receiver="*ClientConn" name="Close" />
+      </methods>
+    </inspection_tool>
+  </profile>
+</component>

+ 5 - 0
.idea/inspectionProfiles/profiles_settings.xml

@@ -0,0 +1,5 @@
+<component name="InspectionProjectProfileManager">
+  <settings>
+    <option name="PROJECT_PROFILE" />
+  </settings>
+</component>

+ 6 - 0
.idea/misc.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="JavaScriptSettings">
+    <option name="languageLevel" value="ES6" />
+  </component>
+</project>

+ 8 - 0
.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/.idea/queued.iml" filepath="$PROJECT_DIR$/.idea/queued.iml" />
+    </modules>
+  </component>
+</project>

+ 8 - 0
.idea/queued.iml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="WEB_MODULE" version="4">
+  <component name="NewModuleRootManager">
+    <content url="file://$MODULE_DIR$" />
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>

+ 21 - 0
README.md

@@ -0,0 +1,21 @@
+## 基于GRPC的任务队列管理
+双向流
+### 1 支持功能
+- 任务发布
+- 任务分发,支持多种分发模式 (广播、随机)
+- 负载均衡
+- 任务回执,支持多种回执模式,(原路返回、广播)
+- 有回执的任务,支持超时自动回收,重新发布
+
+
+
+```shell script
+
+指生成Bean
+protoc -I ./proto_src --go_out=./proto proto_src/ggclassefity.proto 
+```
+
+```shell script
+生成相应的服务接口
+protoc -I ./proto_src --go_out=plugins=grpc:./proto proto_src/queue.proto
+```

+ 52 - 0
demo/main.go

@@ -0,0 +1,52 @@
+package main
+
+import (
+	pb "app.yhyue.com/BP/queued/proto"
+	"app.yhyue.com/BP/queued/util"
+	"github.com/golang/protobuf/proto"
+	"log"
+	"math/rand"
+	"time"
+)
+
+const (
+	address = "127.0.0.1:8080"
+)
+
+var (
+	sendPool   = make(chan *pb.PubReq, 50)
+	recivePool = make(chan *pb.PubReq, 50)
+)
+
+func makeData() []float32 {
+	ret := make([]float32, 0, 0)
+	for i := 0; i < 10; i++ {
+		ret = append(ret, rand.Float32())
+	}
+	return ret
+}
+func main() {
+	pub := util.NewPublisher(address, "syslog", sendPool)
+	rec := util.NewReciver(address, "syslog", recivePool)
+	go pub.Run()
+	go rec.Run()
+	//发送消息
+	go func() {
+		for {
+			param := pb.ClfReq{Item: makeData()}
+			bs, _ := proto.Marshal(&param)
+			time.Sleep(2 * time.Second)
+			sendPool <- &pb.PubReq{
+				PublishType: 2,
+				Param:       bs,
+			}
+		}
+	}()
+	//接受消息
+	for {
+		select {
+		case msg := <-recivePool:
+			log.Println(msg)
+		}
+	}
+}

+ 9 - 0
go.mod

@@ -0,0 +1,9 @@
+module app.yhyue.com/BP/queued
+
+go 1.14
+
+require (
+	github.com/golang/protobuf v1.4.2
+	github.com/google/uuid v1.1.2
+	google.golang.org/grpc v1.32.0
+)

+ 72 - 0
go.sum

@@ -0,0 +1,72 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
+github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
+github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
+github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
+google.golang.org/grpc v1.32.0 h1:zWTV+LMdc3kaiJMSTOFz2UgSBgx8RNQoTGiZu3fR9S0=
+google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

+ 121 - 0
proto/ggclassefity.pb.go

@@ -0,0 +1,121 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: ggclassefity.proto
+
+//声明 包名
+
+package proto
+
+import (
+	fmt "fmt"
+	proto "github.com/golang/protobuf/proto"
+	math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+//分类请求结构体
+type ClfReq struct {
+	Item                 []float32 `protobuf:"fixed32,1,rep,packed,name=item,proto3" json:"item,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}  `json:"-"`
+	XXX_unrecognized     []byte    `json:"-"`
+	XXX_sizecache        int32     `json:"-"`
+}
+
+func (m *ClfReq) Reset()         { *m = ClfReq{} }
+func (m *ClfReq) String() string { return proto.CompactTextString(m) }
+func (*ClfReq) ProtoMessage()    {}
+func (*ClfReq) Descriptor() ([]byte, []int) {
+	return fileDescriptor_5b0d598141a86c23, []int{0}
+}
+
+func (m *ClfReq) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_ClfReq.Unmarshal(m, b)
+}
+func (m *ClfReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_ClfReq.Marshal(b, m, deterministic)
+}
+func (m *ClfReq) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_ClfReq.Merge(m, src)
+}
+func (m *ClfReq) XXX_Size() int {
+	return xxx_messageInfo_ClfReq.Size(m)
+}
+func (m *ClfReq) XXX_DiscardUnknown() {
+	xxx_messageInfo_ClfReq.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ClfReq proto.InternalMessageInfo
+
+func (m *ClfReq) GetItem() []float32 {
+	if m != nil {
+		return m.Item
+	}
+	return nil
+}
+
+type ClfResp struct {
+	Item                 []int32  `protobuf:"varint,1,rep,packed,name=item,proto3" json:"item,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *ClfResp) Reset()         { *m = ClfResp{} }
+func (m *ClfResp) String() string { return proto.CompactTextString(m) }
+func (*ClfResp) ProtoMessage()    {}
+func (*ClfResp) Descriptor() ([]byte, []int) {
+	return fileDescriptor_5b0d598141a86c23, []int{1}
+}
+
+func (m *ClfResp) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_ClfResp.Unmarshal(m, b)
+}
+func (m *ClfResp) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_ClfResp.Marshal(b, m, deterministic)
+}
+func (m *ClfResp) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_ClfResp.Merge(m, src)
+}
+func (m *ClfResp) XXX_Size() int {
+	return xxx_messageInfo_ClfResp.Size(m)
+}
+func (m *ClfResp) XXX_DiscardUnknown() {
+	xxx_messageInfo_ClfResp.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ClfResp proto.InternalMessageInfo
+
+func (m *ClfResp) GetItem() []int32 {
+	if m != nil {
+		return m.Item
+	}
+	return nil
+}
+
+func init() {
+	proto.RegisterType((*ClfReq)(nil), "proto.ClfReq")
+	proto.RegisterType((*ClfResp)(nil), "proto.ClfResp")
+}
+
+func init() {
+	proto.RegisterFile("ggclassefity.proto", fileDescriptor_5b0d598141a86c23)
+}
+
+var fileDescriptor_5b0d598141a86c23 = []byte{
+	// 92 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4a, 0x4f, 0x4f, 0xce,
+	0x49, 0x2c, 0x2e, 0x4e, 0x4d, 0xcb, 0x2c, 0xa9, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62,
+	0x05, 0x53, 0x4a, 0x32, 0x5c, 0x6c, 0xce, 0x39, 0x69, 0x41, 0xa9, 0x85, 0x42, 0x42, 0x5c, 0x2c,
+	0x99, 0x25, 0xa9, 0xb9, 0x12, 0x8c, 0x0a, 0xcc, 0x1a, 0x4c, 0x41, 0x60, 0xb6, 0x92, 0x2c, 0x17,
+	0x3b, 0x58, 0xb6, 0xb8, 0x00, 0x45, 0x9a, 0x15, 0x22, 0x9d, 0xc4, 0x06, 0x36, 0xc3, 0x18, 0x10,
+	0x00, 0x00, 0xff, 0xff, 0x0d, 0x91, 0xe5, 0xf9, 0x60, 0x00, 0x00, 0x00,
+}

+ 414 - 0
proto/queue.pb.go

@@ -0,0 +1,414 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: queue.proto
+
+//声明 包名
+
+package proto
+
+import (
+	context "context"
+	fmt "fmt"
+	proto "github.com/golang/protobuf/proto"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+	math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+//任务
+type PubReq struct {
+	Id                   string   `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
+	ChannelId            string   `protobuf:"bytes,2,opt,name=channelId,proto3" json:"channelId,omitempty"`
+	Sender               string   `protobuf:"bytes,3,opt,name=sender,proto3" json:"sender,omitempty"`
+	SerialType           int32    `protobuf:"varint,4,opt,name=serial_type,json=serialType,proto3" json:"serial_type,omitempty"`
+	PublishType          int32    `protobuf:"varint,5,opt,name=publish_type,json=publishType,proto3" json:"publish_type,omitempty"`
+	Param                []byte   `protobuf:"bytes,6,opt,name=param,proto3" json:"param,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *PubReq) Reset()         { *m = PubReq{} }
+func (m *PubReq) String() string { return proto.CompactTextString(m) }
+func (*PubReq) ProtoMessage()    {}
+func (*PubReq) Descriptor() ([]byte, []int) {
+	return fileDescriptor_96e4d7d76a734cd8, []int{0}
+}
+
+func (m *PubReq) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_PubReq.Unmarshal(m, b)
+}
+func (m *PubReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_PubReq.Marshal(b, m, deterministic)
+}
+func (m *PubReq) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_PubReq.Merge(m, src)
+}
+func (m *PubReq) XXX_Size() int {
+	return xxx_messageInfo_PubReq.Size(m)
+}
+func (m *PubReq) XXX_DiscardUnknown() {
+	xxx_messageInfo_PubReq.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_PubReq proto.InternalMessageInfo
+
+func (m *PubReq) GetId() string {
+	if m != nil {
+		return m.Id
+	}
+	return ""
+}
+
+func (m *PubReq) GetChannelId() string {
+	if m != nil {
+		return m.ChannelId
+	}
+	return ""
+}
+
+func (m *PubReq) GetSender() string {
+	if m != nil {
+		return m.Sender
+	}
+	return ""
+}
+
+func (m *PubReq) GetSerialType() int32 {
+	if m != nil {
+		return m.SerialType
+	}
+	return 0
+}
+
+func (m *PubReq) GetPublishType() int32 {
+	if m != nil {
+		return m.PublishType
+	}
+	return 0
+}
+
+func (m *PubReq) GetParam() []byte {
+	if m != nil {
+		return m.Param
+	}
+	return nil
+}
+
+//标准字符串返回结果
+type PubResp struct {
+	Code                 int32    `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
+	Msg                  string   `protobuf:"bytes,2,opt,name=msg,proto3" json:"msg,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *PubResp) Reset()         { *m = PubResp{} }
+func (m *PubResp) String() string { return proto.CompactTextString(m) }
+func (*PubResp) ProtoMessage()    {}
+func (*PubResp) Descriptor() ([]byte, []int) {
+	return fileDescriptor_96e4d7d76a734cd8, []int{1}
+}
+
+func (m *PubResp) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_PubResp.Unmarshal(m, b)
+}
+func (m *PubResp) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_PubResp.Marshal(b, m, deterministic)
+}
+func (m *PubResp) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_PubResp.Merge(m, src)
+}
+func (m *PubResp) XXX_Size() int {
+	return xxx_messageInfo_PubResp.Size(m)
+}
+func (m *PubResp) XXX_DiscardUnknown() {
+	xxx_messageInfo_PubResp.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_PubResp proto.InternalMessageInfo
+
+func (m *PubResp) GetCode() int32 {
+	if m != nil {
+		return m.Code
+	}
+	return 0
+}
+
+func (m *PubResp) GetMsg() string {
+	if m != nil {
+		return m.Msg
+	}
+	return ""
+}
+
+type RecvReq struct {
+	Sender               string   `protobuf:"bytes,1,opt,name=sender,proto3" json:"sender,omitempty"`
+	ChannelId            string   `protobuf:"bytes,2,opt,name=channelId,proto3" json:"channelId,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *RecvReq) Reset()         { *m = RecvReq{} }
+func (m *RecvReq) String() string { return proto.CompactTextString(m) }
+func (*RecvReq) ProtoMessage()    {}
+func (*RecvReq) Descriptor() ([]byte, []int) {
+	return fileDescriptor_96e4d7d76a734cd8, []int{2}
+}
+
+func (m *RecvReq) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_RecvReq.Unmarshal(m, b)
+}
+func (m *RecvReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_RecvReq.Marshal(b, m, deterministic)
+}
+func (m *RecvReq) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_RecvReq.Merge(m, src)
+}
+func (m *RecvReq) XXX_Size() int {
+	return xxx_messageInfo_RecvReq.Size(m)
+}
+func (m *RecvReq) XXX_DiscardUnknown() {
+	xxx_messageInfo_RecvReq.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_RecvReq proto.InternalMessageInfo
+
+func (m *RecvReq) GetSender() string {
+	if m != nil {
+		return m.Sender
+	}
+	return ""
+}
+
+func (m *RecvReq) GetChannelId() string {
+	if m != nil {
+		return m.ChannelId
+	}
+	return ""
+}
+
+func init() {
+	proto.RegisterType((*PubReq)(nil), "proto.PubReq")
+	proto.RegisterType((*PubResp)(nil), "proto.PubResp")
+	proto.RegisterType((*RecvReq)(nil), "proto.RecvReq")
+}
+
+func init() {
+	proto.RegisterFile("queue.proto", fileDescriptor_96e4d7d76a734cd8)
+}
+
+var fileDescriptor_96e4d7d76a734cd8 = []byte{
+	// 271 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x4f, 0x4b, 0xc3, 0x40,
+	0x10, 0xc5, 0xbb, 0x69, 0x93, 0xd2, 0x49, 0x2c, 0x32, 0x88, 0x84, 0x22, 0x18, 0x73, 0xca, 0x41,
+	0xaa, 0xe8, 0x07, 0xf0, 0xec, 0x4d, 0x57, 0xef, 0x92, 0x3f, 0x83, 0x5d, 0x49, 0x93, 0x6d, 0xb6,
+	0x09, 0xf4, 0x1b, 0xf9, 0x31, 0x65, 0x67, 0x23, 0xb5, 0x17, 0x4f, 0x3b, 0xef, 0xcd, 0x5b, 0xf8,
+	0xcd, 0x83, 0x70, 0xd7, 0x53, 0x4f, 0x6b, 0xdd, 0xb5, 0xfb, 0x16, 0x7d, 0x7e, 0xd2, 0x6f, 0x01,
+	0xc1, 0x4b, 0x5f, 0x48, 0xda, 0xe1, 0x12, 0x3c, 0x55, 0xc5, 0x22, 0x11, 0xd9, 0x42, 0x7a, 0xaa,
+	0xc2, 0x2b, 0x58, 0x94, 0x9b, 0xbc, 0x69, 0xa8, 0x7e, 0xae, 0x62, 0x8f, 0xed, 0xa3, 0x81, 0x97,
+	0x10, 0x18, 0x6a, 0x2a, 0xea, 0xe2, 0x29, 0xaf, 0x46, 0x85, 0xd7, 0x10, 0x1a, 0xea, 0x54, 0x5e,
+	0x7f, 0xec, 0x0f, 0x9a, 0xe2, 0x59, 0x22, 0x32, 0x5f, 0x82, 0xb3, 0xde, 0x0f, 0x9a, 0xf0, 0x06,
+	0x22, 0xdd, 0x17, 0xb5, 0x32, 0x1b, 0x97, 0xf0, 0x39, 0x11, 0x8e, 0x1e, 0x47, 0x2e, 0xc0, 0xd7,
+	0x79, 0x97, 0x6f, 0xe3, 0x20, 0x11, 0x59, 0x24, 0x9d, 0x48, 0xef, 0x60, 0xce, 0xa4, 0x46, 0x23,
+	0xc2, 0xac, 0x6c, 0x2b, 0x62, 0x58, 0x5f, 0xf2, 0x8c, 0xe7, 0x30, 0xdd, 0x9a, 0xcf, 0x11, 0xd4,
+	0x8e, 0xe9, 0x13, 0xcc, 0x25, 0x95, 0x83, 0xbd, 0xed, 0x48, 0x2b, 0x4e, 0x68, 0xff, 0xbd, 0xf1,
+	0xe1, 0x0b, 0xa2, 0x57, 0x5b, 0xd9, 0x1b, 0x75, 0x83, 0x2a, 0x09, 0x6f, 0x99, 0xc0, 0x62, 0xe2,
+	0x99, 0xab, 0x71, 0xed, 0xba, 0x5b, 0x2d, 0xff, 0x4a, 0xa3, 0xd3, 0x49, 0x26, 0x6c, 0x5a, 0x52,
+	0x49, 0x6a, 0x20, 0xfc, 0x5d, 0x8f, 0x38, 0xab, 0xd3, 0xdf, 0xe9, 0xe4, 0x5e, 0x14, 0x01, 0x3b,
+	0x8f, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x21, 0x96, 0x01, 0xe7, 0xa5, 0x01, 0x00, 0x00,
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConnInterface
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion6
+
+// QueueServiceClient is the client API for QueueService service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type QueueServiceClient interface {
+	//任务发布
+	Publish(ctx context.Context, opts ...grpc.CallOption) (QueueService_PublishClient, error)
+	//任务接收
+	Receive(ctx context.Context, in *RecvReq, opts ...grpc.CallOption) (QueueService_ReceiveClient, error)
+}
+
+type queueServiceClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewQueueServiceClient(cc grpc.ClientConnInterface) QueueServiceClient {
+	return &queueServiceClient{cc}
+}
+
+func (c *queueServiceClient) Publish(ctx context.Context, opts ...grpc.CallOption) (QueueService_PublishClient, error) {
+	stream, err := c.cc.NewStream(ctx, &_QueueService_serviceDesc.Streams[0], "/proto.QueueService/Publish", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &queueServicePublishClient{stream}
+	return x, nil
+}
+
+type QueueService_PublishClient interface {
+	Send(*PubReq) error
+	CloseAndRecv() (*PubResp, error)
+	grpc.ClientStream
+}
+
+type queueServicePublishClient struct {
+	grpc.ClientStream
+}
+
+func (x *queueServicePublishClient) Send(m *PubReq) error {
+	return x.ClientStream.SendMsg(m)
+}
+
+func (x *queueServicePublishClient) CloseAndRecv() (*PubResp, error) {
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	m := new(PubResp)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+func (c *queueServiceClient) Receive(ctx context.Context, in *RecvReq, opts ...grpc.CallOption) (QueueService_ReceiveClient, error) {
+	stream, err := c.cc.NewStream(ctx, &_QueueService_serviceDesc.Streams[1], "/proto.QueueService/Receive", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &queueServiceReceiveClient{stream}
+	if err := x.ClientStream.SendMsg(in); err != nil {
+		return nil, err
+	}
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	return x, nil
+}
+
+type QueueService_ReceiveClient interface {
+	Recv() (*PubReq, error)
+	grpc.ClientStream
+}
+
+type queueServiceReceiveClient struct {
+	grpc.ClientStream
+}
+
+func (x *queueServiceReceiveClient) Recv() (*PubReq, error) {
+	m := new(PubReq)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+// QueueServiceServer is the server API for QueueService service.
+type QueueServiceServer interface {
+	//任务发布
+	Publish(QueueService_PublishServer) error
+	//任务接收
+	Receive(*RecvReq, QueueService_ReceiveServer) error
+}
+
+// UnimplementedQueueServiceServer can be embedded to have forward compatible implementations.
+type UnimplementedQueueServiceServer struct {
+}
+
+func (*UnimplementedQueueServiceServer) Publish(srv QueueService_PublishServer) error {
+	return status.Errorf(codes.Unimplemented, "method Publish not implemented")
+}
+func (*UnimplementedQueueServiceServer) Receive(req *RecvReq, srv QueueService_ReceiveServer) error {
+	return status.Errorf(codes.Unimplemented, "method Receive not implemented")
+}
+
+func RegisterQueueServiceServer(s *grpc.Server, srv QueueServiceServer) {
+	s.RegisterService(&_QueueService_serviceDesc, srv)
+}
+
+func _QueueService_Publish_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(QueueServiceServer).Publish(&queueServicePublishServer{stream})
+}
+
+type QueueService_PublishServer interface {
+	SendAndClose(*PubResp) error
+	Recv() (*PubReq, error)
+	grpc.ServerStream
+}
+
+type queueServicePublishServer struct {
+	grpc.ServerStream
+}
+
+func (x *queueServicePublishServer) SendAndClose(m *PubResp) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func (x *queueServicePublishServer) Recv() (*PubReq, error) {
+	m := new(PubReq)
+	if err := x.ServerStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+func _QueueService_Receive_Handler(srv interface{}, stream grpc.ServerStream) error {
+	m := new(RecvReq)
+	if err := stream.RecvMsg(m); err != nil {
+		return err
+	}
+	return srv.(QueueServiceServer).Receive(m, &queueServiceReceiveServer{stream})
+}
+
+type QueueService_ReceiveServer interface {
+	Send(*PubReq) error
+	grpc.ServerStream
+}
+
+type queueServiceReceiveServer struct {
+	grpc.ServerStream
+}
+
+func (x *queueServiceReceiveServer) Send(m *PubReq) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+var _QueueService_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "proto.QueueService",
+	HandlerType: (*QueueServiceServer)(nil),
+	Methods:     []grpc.MethodDesc{},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "Publish",
+			Handler:       _QueueService_Publish_Handler,
+			ClientStreams: true,
+		},
+		{
+			StreamName:    "Receive",
+			Handler:       _QueueService_Receive_Handler,
+			ServerStreams: true,
+		},
+	},
+	Metadata: "queue.proto",
+}

+ 12 - 0
proto_src/ggclassefity.proto

@@ -0,0 +1,12 @@
+syntax = "proto3"; //声明proto的版本 只能 是3,才支持 grpc
+
+//声明 包名
+package proto;
+//分类请求结构体
+message ClfReq{
+  repeated float item = 1;
+}
+
+message ClfResp{
+  repeated int32 item = 1;
+}

+ 36 - 0
proto_src/queue.proto

@@ -0,0 +1,36 @@
+syntax = "proto3"; //声明proto的版本 只能 是3,才支持 grpc
+
+//声明 包名
+package proto;
+
+//服务管理
+service QueueService {
+  //任务发布
+  rpc Publish (stream PubReq) returns (PubResp) {
+  }
+  //任务接收
+  rpc Receive (RecvReq) returns (stream PubReq) {
+  }
+}
+//任务
+message PubReq {
+  string id = 1;//标示
+  string channelId = 2;//频道
+  string sender = 3;//发送人id
+  int32 serial_type = 4;//序列化方式 0 raw bytes,1 json,2 grpc
+  int32 publish_type = 5 ;//任务发布方式 0 随机 1 广播
+  bytes param = 6 ;//参数
+
+}
+
+//标准字符串返回结果
+message PubResp {
+  int32 code = 1;//执行状态代码
+  string msg = 2;//执行状态
+}
+
+message RecvReq{
+  string sender = 1;
+  string channelId = 2;
+}
+

+ 2 - 0
queued.go

@@ -0,0 +1,2 @@
+package queued
+

+ 36 - 0
server/main.go

@@ -0,0 +1,36 @@
+package main
+
+import (
+	"app.yhyue.com/BP/queued/proto"
+	"flag"
+	"google.golang.org/grpc"
+	"log"
+	"net"
+)
+
+//服务地址配置
+var (
+	addr = flag.String("addr", ":8080", "队列服务监听地址")
+)
+
+func init() {
+	flag.Parse()
+}
+func main() {
+	lis, err := net.Listen("tcp", *addr)
+	if err != nil {
+		log.Fatalln(err.Error())
+		return
+	}
+	//创建一个grpc 服务器
+	s := grpc.NewServer()
+	//注册事件
+	proto.RegisterQueueServiceServer(s, new(QueueImpl))
+	//
+	if err != nil {
+		log.Fatalln(err.Error())
+		return
+	}
+	//处理链接
+	_ = s.Serve(lis)
+}

+ 178 - 0
server/queueimpl.go

@@ -0,0 +1,178 @@
+package main
+
+import (
+	"app.yhyue.com/BP/queued/proto"
+	"log"
+	"math/rand"
+	"sync"
+	"time"
+)
+
+const (
+	CHAN_MAX_SIZE          = 500
+	PUBLISH_CHANNEL_ALL    = iota //发送给本频道下所有人
+	PUBLISH_CHANNEL_RANDOM        //发送给本频道下随机1个人
+	PUBLISH_CHANNEL_SEQ           //发送给本频道下顺序1个人
+)
+
+//数据存储结构
+var cache *sync.Map
+
+//消费者
+type Consumer struct {
+	Id           string
+	ReciveOutput chan *proto.PubReq
+	Cancel       chan bool
+	IsUse        bool
+}
+
+//频道
+type Channel struct {
+	Id         string
+	Consumeres *sync.Map //id:Consumer
+}
+
+//
+type QueueImpl struct {
+}
+
+//初始化
+func init() {
+	cache = new(sync.Map)
+}
+
+//
+func getChannel(key string) *Channel {
+	if v, ok := cache.Load(key); ok {
+		c := v.(*Channel)
+		return c
+	} else {
+		c := &Channel{
+			Id:         key,
+			Consumeres: new(sync.Map),
+		}
+		cache.Store(key, c)
+		return c
+	}
+}
+
+//发布任务
+func (q QueueImpl) Publish(server proto.QueueService_PublishServer) error {
+	//TODO 生成用户ID
+	//消息处理
+	for {
+		msg, err := server.Recv()
+		if err != nil {
+			break
+		}
+		err = deliverMsg(msg)
+		if err != nil {
+			break
+		}
+	}
+	_ = server.SendAndClose(&proto.PubResp{
+		Code: 500,
+		Msg:  "终止",
+	})
+	return nil
+}
+
+func (q QueueImpl) Receive(req *proto.RecvReq, server proto.QueueService_ReceiveServer) error {
+	channel := getChannel(req.ChannelId)
+	var consumer *Consumer
+	if v, ok := channel.Consumeres.Load(req.Sender); ok {
+		consumer, _ = v.(*Consumer)
+	} else {
+		consumer = &Consumer{
+			Id:           req.Sender,
+			ReciveOutput: make(chan *proto.PubReq, CHAN_MAX_SIZE),
+		}
+		channel.Consumeres.Store(req.Sender, consumer)
+	}
+	var msg *proto.PubReq
+	var err error
+Lab:
+	for {
+		select {
+		case msg = <-consumer.ReciveOutput:
+			err = server.Send(msg)
+			if err != nil {
+				break Lab
+			}
+		}
+	}
+	channel.Consumeres.Delete(consumer.Id)
+	//消息重回队列,直到发送成功为止
+	if msg != nil && err != nil {
+		log.Println(" 发送失败,尝试重新发送")
+		for {
+			if err = deliverMsg(msg); err == nil {
+				break
+			}
+			time.Sleep(1 * time.Second)
+		}
+	}
+	return nil
+}
+
+//消息投递,发送给消息消费者
+func deliverMsg(msg *proto.PubReq) error {
+	sendSuccess := false
+	channel := getChannel(msg.ChannelId)
+
+	if msg.PublishType == PUBLISH_CHANNEL_ALL {
+		channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
+			sendSuccess = true
+			c := v.(*Consumer)
+			c.ReciveOutput <- msg
+			return true
+		})
+	} else if msg.PublishType == PUBLISH_CHANNEL_RANDOM {
+		//随机
+		keys := make([]string, 0, 0)
+		channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
+			key, _ := k.(string)
+			keys = append(keys, key)
+			return true
+		})
+		//机选1个消费者
+		key := keys[rand.Intn(len(keys))]
+		if v, ok := channel.Consumeres.Load(key); ok {
+			c := v.(*Consumer)
+			c.ReciveOutput <- msg
+			sendSuccess = true
+		}
+	} else if msg.PublishType == PUBLISH_CHANNEL_SEQ {
+		var consumer *Consumer
+		channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
+			c := v.(*Consumer)
+			if !c.IsUse {
+				consumer = c
+				return false
+			}
+			return true
+		})
+		first := true
+		if consumer == nil {
+			channel.Consumeres.Range(func(k interface{}, v interface{}) bool {
+				c := v.(*Consumer)
+				if first {
+					c.IsUse = true
+					consumer = c
+					first = false
+				} else {
+					c.IsUse = false
+				}
+				return true
+			})
+		}
+		if consumer != nil {
+			consumer.ReciveOutput <- msg
+			sendSuccess = true
+		}
+	}
+	if !sendSuccess {
+		log.Printf("msg id %s 发送失败,找不到合适的消费者\n", msg.Id)
+	}
+	return nil
+}

+ 60 - 0
util/publisher.go

@@ -0,0 +1,60 @@
+package util
+
+import (
+	"app.yhyue.com/BP/queued/proto"
+	"context"
+	"google.golang.org/grpc"
+	"log"
+	"time"
+)
+
+/*
+任务发布封装
+*/
+type Publisher struct {
+	Id          string
+	QueueServer string
+	Channel     string
+	C           <-chan *proto.PubReq
+}
+
+//
+func NewPublisher(server string, channel string, ch <-chan *proto.PubReq) *Publisher {
+	uuid, _ := makeUuid()
+	return &Publisher{
+		Id:          uuid,
+		C:           ch,
+		QueueServer: server,
+		Channel:     channel,
+	}
+}
+
+//
+func (pb *Publisher) Run() {
+	for {
+		conn, err := grpc.Dial(pb.QueueServer, grpc.WithInsecure())
+		if err != nil {
+			time.Sleep(1 * time.Second)
+			log.Fatalf("did not connect: %v", err)
+		}
+		c := proto.NewQueueServiceClient(conn)
+		stream, _ := c.Publish(context.Background())
+		//
+	Lab:
+		for {
+			select {
+			case msg := <-pb.C:
+				uuid, _ := makeUuid()
+				msg.Sender = pb.Id
+				msg.Id = uuid
+				msg.ChannelId = pb.Channel
+				err := stream.Send(msg)
+				if err != nil {
+					break Lab
+				}
+			}
+		}
+		_ = conn.Close()
+		time.Sleep(3 * time.Second)
+	}
+}

+ 55 - 0
util/reciver.go

@@ -0,0 +1,55 @@
+package util
+
+import (
+	"app.yhyue.com/BP/queued/proto"
+	"context"
+	"google.golang.org/grpc"
+	"log"
+	"time"
+)
+
+/**
+接收端
+*/
+type Reciver struct {
+	Id          string
+	Channel     string
+	QueueServer string
+	C           chan<- *proto.PubReq
+}
+
+func NewReciver(server string, channel string, ch chan<- *proto.PubReq) *Reciver {
+	uuid, _ := makeUuid()
+	return &Reciver{
+		Id:          uuid,
+		Channel:     channel,
+		C:           ch,
+		QueueServer: server,
+	}
+}
+func (r *Reciver) Run() {
+	for {
+		conn, err := grpc.Dial(r.QueueServer, grpc.WithInsecure())
+		if err != nil {
+			time.Sleep(1 * time.Second)
+			log.Fatalf("did not connect: %v", err)
+		}
+		c := proto.NewQueueServiceClient(conn)
+		stream, _ := c.Receive(context.Background(), &proto.RecvReq{
+			Sender:    r.Id,
+			ChannelId: r.Channel,
+		})
+		//
+	Lab:
+		for {
+			msg, err := stream.Recv()
+			if err != nil {
+				log.Print(err.Error())
+				break Lab
+			}
+			r.C <- msg
+		}
+		_ = conn.Close()
+		time.Sleep(3 * time.Second)
+	}
+}

+ 12 - 0
util/util.go

@@ -0,0 +1,12 @@
+package util
+
+import "github.com/google/uuid"
+
+//生成uuid
+func makeUuid() (string, error) {
+	id, err := uuid.NewRandom()
+	if err != nil {
+		return "", err
+	}
+	return id.String(), nil
+}