Quellcode durchsuchen

新增附件下载程序

maxiaoshan vor 2 Jahren
Ursprung
Commit
cff0087c47

+ 8 - 3
README.md

@@ -7,6 +7,11 @@
 
 
 ### bidding_listen
-+  1.查询bidding昨天全量数据,数据入库 ods_datamonitoring_bidding
-+  2.重复数据 is_repeat = 1
-+  3.定时任务,每天晚上八点执行一次
++ 1.查询bidding昨天全量数据,数据入库 ods_datamonitoring_bidding
++ 2.重复数据 is_repeat = 1
++ 3.定时任务,每天晚上八点执行一次
+
+### download_file
++ 1.查询增量bidding非竞品,且无附件数据
++ 2.筛选contenthtml中的a标签href链接
++ 3.对href链接是附件的进行下载、补充,同时生成爬虫维护任务(附件异常)

+ 33 - 0
download_file/src/config.json

@@ -0,0 +1,33 @@
+{
+  "mongodb": "192.168.3.166:27082",
+  "db": "test",
+  "coll": "bidding_processing_ids",
+  "username": "",
+  "password": "",
+  "size": 5,
+  "udpport": ":1782",
+  "nextudp": {
+    "addr": "127.0.0.1",
+    "port": 1781,
+    "stype": "extract"
+  },
+  "mail": {
+    "to": "maxiaoshan@topnet.net.cn",
+    "api": "http://172.17.145.179:19281/_send/_mail"
+  },
+  "ocrserveraddr": "192.168.3.12:10021",
+  "bidding": {
+    "mongodb": "192.168.3.166:27082",
+    "db": "qfw",
+    "coll": "bidding",
+    "username": "",
+    "password": "",
+    "size": 5
+  },
+  "oss":{
+    "ossEndpoint":"oss-cn-beijing.aliyuncs.com",
+    "ossAccessKeyId":"LTAI4G5x9aoZx8dDamQ7vfZi",
+    "ossAccessKeySecret":"Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+    "ossBucketName":"jy-editor"
+  }
+}

+ 69 - 0
download_file/src/filetext_grpc.go

@@ -0,0 +1,69 @@
+package main
+
+import (
+	"context"
+	"google.golang.org/grpc"
+	"proto"
+	qu "qfw/util"
+)
+
+var FileTextClient proto.FileExtractClient
+
+//直接调用接口地址
+//func InitFileTextGrpcClient() {
+//	defer qu.Catch()
+//	qu.Debug("address:", qu.ObjToString(Config["ocrserveraddr"])) //192.168.3.13:22101
+//	conn, err := grpc.Dial(qu.ObjToString(Config["ocrserveraddr"]), grpc.WithTransportCredentials(insecure.NewCredentials()))
+//	if err != nil {
+//		qu.Debug("Init Connect Error: ", err)
+//	}
+//	FileTextClient = proto.NewFileExtractClient(conn)
+//}
+
+func GetFileText_back(fileName, fileUrl, fileType string, fileBytes []byte) (*proto.FileResponse, error) {
+	defer qu.Catch()
+	req := &proto.Request{
+		FileName:    fileName,
+		FileUrl:     fileUrl,
+		FileType:    fileType,
+		FileBytes:   fileBytes,
+		ExtractType: 1,
+		ReturnType:  0, //正式环境改为0
+	}
+	fileReq := &proto.FileRequest{
+		Message: []*proto.Request{
+			req,
+		},
+	}
+	resp, err := FileTextClient.FileExtract(context.Background(), fileReq)
+	if err != nil {
+		return nil, err
+	}
+	qu.Debug(resp.Result)
+	return resp, nil
+}
+
+func GetFileText(conn *grpc.ClientConn, fileName, fileUrl, fileType string, fileBytes []byte) *proto.FileResponse {
+	defer qu.Catch()
+	FileTextClient = proto.NewFileExtractClient(conn)
+	defer conn.Close()
+	req := &proto.Request{
+		FileName:    fileName,
+		FileUrl:     fileUrl,
+		FileType:    fileType,
+		FileBytes:   fileBytes,
+		ExtractType: 1,
+		ReturnType:  0, //正式环境改为0
+	}
+	fileReq := &proto.FileRequest{
+		Message: []*proto.Request{
+			req,
+		},
+	}
+	resp, err := FileTextClient.FileExtract(context.Background(), fileReq)
+	if err != nil {
+		return nil
+	}
+	//qu.Debug(resp.Result)
+	return resp
+}

+ 78 - 0
download_file/src/main.go

@@ -0,0 +1,78 @@
+package main
+
+import (
+	"flag"
+	"mongodb"
+	"net/http"
+	"os"
+	qu "qfw/util"
+	sp "spiderutil"
+	"time"
+	. "util"
+)
+
+func init() {
+	//config
+	qu.ReadConfig("config.json", &Config)
+	//mgo
+	//Mgo = &mongodb.MongodbSim{
+	//	MongodbAddr: qu.ObjToString(config["mongodb"]),
+	//	DbName:      qu.ObjToString(config["db"]),
+	//	Size:        qu.IntAll(config["size"]),
+	//}
+	//Mgo.InitPool()
+	bidding := Config["bidding"].(map[string]interface{})
+	MgoB = &mongodb.MongodbSim{
+		MongodbAddr: qu.ObjToString(bidding["mongodb"]),
+		DbName:      qu.ObjToString(bidding["db"]),
+		Size:        qu.IntAll(bidding["size"]),
+		UserName:    qu.ObjToString(bidding["username"]),
+		Password:    qu.ObjToString(bidding["password"]),
+	}
+	MgoB.InitPool()
+	//其它参数
+	Coll = qu.ObjToString(Config["coll"])
+	mail := Config["mail"].(map[string]interface{})
+	Tomail = qu.ObjToString(mail["to"])
+	Api = qu.ObjToString(mail["api"])
+	//udp
+	UdpPort = qu.ObjToString(Config["udpport"])
+	nextUdp := Config["nextudp"].(map[string]interface{})
+	NextAddr = qu.ObjToString(nextUdp["addr"])
+	NextPort = qu.IntAll(nextUdp["port"])
+	NextStype = qu.ObjToString(nextUdp["stype"])
+	InitUdp() //初始化udp
+	//初始化oss
+	oss := Config["oss"].(map[string]interface{})
+	sp.OssInit(
+		qu.ObjToString(oss["ossEndpoint"]),
+		qu.ObjToString(oss["ossAccessKeyId"]),
+		qu.ObjToString(oss["ossAccessKeySecret"]),
+		qu.ObjToString(oss["ossBucketName"]),
+	)
+	//初始化grpc
+	OcrServerAddr = qu.ObjToString(Config["ocrserveraddr"])
+	//InitFileTextGrpcClient()
+	go checkMapJob()
+}
+
+func main() {
+	flag.StringVar(&StartID, "sid", "", "起始id")
+	flag.Parse()
+	if StartID == "" {
+		qu.Debug("起始ID为空")
+		os.Exit(-1)
+	}
+	qu.Debug("起始ID:", StartID)
+	go func() {
+		for {
+			ok := DownloadFile()
+			if !ok {
+				time.Sleep(1 * time.Minute)
+			}
+		}
+	}()
+	go http.ListenAndServe(UdpPort, nil)
+	ch := make(chan bool, 1)
+	<-ch
+}

+ 39 - 0
download_file/src/proto/filetext.proto

@@ -0,0 +1,39 @@
+syntax = "proto3";
+
+option go_package = "/proto";
+
+//附件传输
+package proto;
+
+message FileRequest {
+  repeated Request message = 1; //文件名称
+  string other = 2;     //信息id
+  string topic = 3;
+}
+
+message Request {
+  string fileName = 1; //文件名称
+  string fileUrl = 2;     //下载的url
+  bytes fileBytes = 3;     //文件字节流
+  string fileType = 4;    //文件类型 pdf ,xls ,doc
+  int32 returnType = 5;    //文件类型 {0:url,1:content,2:url+content} 默认0
+  int32 extractType = 6;  //解析文件类型{0:oss,1:fileBytes,2:url} 默认0
+}
+
+message FileResponse {
+  repeated Result result = 1;  //返回结果
+  string other = 2;     //信息id
+}
+
+message Result{
+  string fileName = 1;  //文件名称
+  string textContent = 2;   //文本内容
+  string textUrl = 3;      //文本URL
+  string filePath = 4;  //文件路径
+  string errorState = 5;   //错误状态{0:文件下载失败,1:文件解析失败}
+}
+
+
+service FileExtract {
+  rpc FileExtract(FileRequest) returns (FileResponse);
+}

+ 261 - 0
download_file/src/serviced/proto/heartbeat.pb.go

@@ -0,0 +1,261 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: heartbeat.proto
+
+//声明 包名
+
+package proto
+
+import (
+	context "context"
+	fmt "fmt"
+	proto "github.com/golang/protobuf/proto"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+	math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+//stream请求结构
+type StreamReqData struct {
+	ServiceName          string   `protobuf:"bytes,1,opt,name=serviceName,proto3" json:"serviceName,omitempty"`
+	ServiceIp            string   `protobuf:"bytes,2,opt,name=serviceIp,proto3" json:"serviceIp,omitempty"`
+	ServicePort          int32    `protobuf:"varint,3,opt,name=servicePort,proto3" json:"servicePort,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *StreamReqData) Reset()         { *m = StreamReqData{} }
+func (m *StreamReqData) String() string { return proto.CompactTextString(m) }
+func (*StreamReqData) ProtoMessage()    {}
+func (*StreamReqData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_3c667767fb9826a9, []int{0}
+}
+
+func (m *StreamReqData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_StreamReqData.Unmarshal(m, b)
+}
+func (m *StreamReqData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_StreamReqData.Marshal(b, m, deterministic)
+}
+func (m *StreamReqData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_StreamReqData.Merge(m, src)
+}
+func (m *StreamReqData) XXX_Size() int {
+	return xxx_messageInfo_StreamReqData.Size(m)
+}
+func (m *StreamReqData) XXX_DiscardUnknown() {
+	xxx_messageInfo_StreamReqData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StreamReqData proto.InternalMessageInfo
+
+func (m *StreamReqData) GetServiceName() string {
+	if m != nil {
+		return m.ServiceName
+	}
+	return ""
+}
+
+func (m *StreamReqData) GetServiceIp() string {
+	if m != nil {
+		return m.ServiceIp
+	}
+	return ""
+}
+
+func (m *StreamReqData) GetServicePort() int32 {
+	if m != nil {
+		return m.ServicePort
+	}
+	return 0
+}
+
+//stream返回结构
+type StreamResData struct {
+	Data                 string   `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *StreamResData) Reset()         { *m = StreamResData{} }
+func (m *StreamResData) String() string { return proto.CompactTextString(m) }
+func (*StreamResData) ProtoMessage()    {}
+func (*StreamResData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_3c667767fb9826a9, []int{1}
+}
+
+func (m *StreamResData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_StreamResData.Unmarshal(m, b)
+}
+func (m *StreamResData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_StreamResData.Marshal(b, m, deterministic)
+}
+func (m *StreamResData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_StreamResData.Merge(m, src)
+}
+func (m *StreamResData) XXX_Size() int {
+	return xxx_messageInfo_StreamResData.Size(m)
+}
+func (m *StreamResData) XXX_DiscardUnknown() {
+	xxx_messageInfo_StreamResData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StreamResData proto.InternalMessageInfo
+
+func (m *StreamResData) GetData() string {
+	if m != nil {
+		return m.Data
+	}
+	return ""
+}
+
+func init() {
+	proto.RegisterType((*StreamReqData)(nil), "proto.StreamReqData")
+	proto.RegisterType((*StreamResData)(nil), "proto.StreamResData")
+}
+
+func init() {
+	proto.RegisterFile("heartbeat.proto", fileDescriptor_3c667767fb9826a9)
+}
+
+var fileDescriptor_3c667767fb9826a9 = []byte{
+	// 174 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0x48, 0x4d, 0x2c,
+	0x2a, 0x49, 0x4a, 0x4d, 0x2c, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a,
+	0x85, 0x5c, 0xbc, 0xc1, 0x25, 0x45, 0xa9, 0x89, 0xb9, 0x41, 0xa9, 0x85, 0x2e, 0x89, 0x25, 0x89,
+	0x42, 0x0a, 0x5c, 0xdc, 0xc5, 0xa9, 0x45, 0x65, 0x99, 0xc9, 0xa9, 0x7e, 0x89, 0xb9, 0xa9, 0x12,
+	0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0xc8, 0x42, 0x42, 0x32, 0x5c, 0x9c, 0x50, 0xae, 0x67, 0x81,
+	0x04, 0x13, 0x58, 0x1e, 0x21, 0x80, 0xa4, 0x3f, 0x20, 0xbf, 0xa8, 0x44, 0x82, 0x59, 0x81, 0x51,
+	0x83, 0x35, 0x08, 0x59, 0x48, 0x49, 0x19, 0x61, 0x65, 0x31, 0xd8, 0x4a, 0x21, 0x2e, 0x96, 0x94,
+	0xc4, 0x92, 0x44, 0xa8, 0x5d, 0x60, 0xb6, 0x91, 0x07, 0x17, 0xa7, 0x07, 0xc8, 0xc5, 0x4e, 0xa9,
+	0x89, 0x25, 0x42, 0xd6, 0x5c, 0x9c, 0x01, 0xa5, 0x25, 0x10, 0x4d, 0x42, 0x22, 0x10, 0x0f, 0xe8,
+	0xa1, 0x38, 0x5b, 0x0a, 0x5d, 0x14, 0x6c, 0xb2, 0x12, 0x83, 0x06, 0x63, 0x12, 0x1b, 0x58, 0xc2,
+	0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xce, 0x97, 0xa7, 0x35, 0x02, 0x01, 0x00, 0x00,
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConnInterface
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion6
+
+// HeartBeatClient is the client API for HeartBeat service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type HeartBeatClient interface {
+	PutStream(ctx context.Context, opts ...grpc.CallOption) (HeartBeat_PutStreamClient, error)
+}
+
+type heartBeatClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewHeartBeatClient(cc grpc.ClientConnInterface) HeartBeatClient {
+	return &heartBeatClient{cc}
+}
+
+func (c *heartBeatClient) PutStream(ctx context.Context, opts ...grpc.CallOption) (HeartBeat_PutStreamClient, error) {
+	stream, err := c.cc.NewStream(ctx, &_HeartBeat_serviceDesc.Streams[0], "/proto.HeartBeat/PutStream", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &heartBeatPutStreamClient{stream}
+	return x, nil
+}
+
+type HeartBeat_PutStreamClient interface {
+	Send(*StreamReqData) error
+	CloseAndRecv() (*StreamResData, error)
+	grpc.ClientStream
+}
+
+type heartBeatPutStreamClient struct {
+	grpc.ClientStream
+}
+
+func (x *heartBeatPutStreamClient) Send(m *StreamReqData) error {
+	return x.ClientStream.SendMsg(m)
+}
+
+func (x *heartBeatPutStreamClient) CloseAndRecv() (*StreamResData, error) {
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	m := new(StreamResData)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+// HeartBeatServer is the server API for HeartBeat service.
+type HeartBeatServer interface {
+	PutStream(HeartBeat_PutStreamServer) error
+}
+
+// UnimplementedHeartBeatServer can be embedded to have forward compatible implementations.
+type UnimplementedHeartBeatServer struct {
+}
+
+func (*UnimplementedHeartBeatServer) PutStream(srv HeartBeat_PutStreamServer) error {
+	return status.Errorf(codes.Unimplemented, "method PutStream not implemented")
+}
+
+func RegisterHeartBeatServer(s *grpc.Server, srv HeartBeatServer) {
+	s.RegisterService(&_HeartBeat_serviceDesc, srv)
+}
+
+func _HeartBeat_PutStream_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(HeartBeatServer).PutStream(&heartBeatPutStreamServer{stream})
+}
+
+type HeartBeat_PutStreamServer interface {
+	SendAndClose(*StreamResData) error
+	Recv() (*StreamReqData, error)
+	grpc.ServerStream
+}
+
+type heartBeatPutStreamServer struct {
+	grpc.ServerStream
+}
+
+func (x *heartBeatPutStreamServer) SendAndClose(m *StreamResData) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func (x *heartBeatPutStreamServer) Recv() (*StreamReqData, error) {
+	m := new(StreamReqData)
+	if err := x.ServerStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+var _HeartBeat_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "proto.HeartBeat",
+	HandlerType: (*HeartBeatServer)(nil),
+	Methods:     []grpc.MethodDesc{},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "PutStream",
+			Handler:       _HeartBeat_PutStream_Handler,
+			ClientStreams: true,
+		},
+	},
+	Metadata: "heartbeat.proto",
+}

+ 206 - 0
download_file/src/serviced/proto/ocr.pb.go

@@ -0,0 +1,206 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: ocr.proto
+
+//OCR图像转文字服务
+
+package proto
+
+import (
+	context "context"
+	fmt "fmt"
+	proto "github.com/golang/protobuf/proto"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+	math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+type OcrRequest struct {
+	Image                []byte   `protobuf:"bytes,1,opt,name=image,proto3" json:"image,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *OcrRequest) Reset()         { *m = OcrRequest{} }
+func (m *OcrRequest) String() string { return proto.CompactTextString(m) }
+func (*OcrRequest) ProtoMessage()    {}
+func (*OcrRequest) Descriptor() ([]byte, []int) {
+	return fileDescriptor_52282bc2e1d222c6, []int{0}
+}
+
+func (m *OcrRequest) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_OcrRequest.Unmarshal(m, b)
+}
+func (m *OcrRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_OcrRequest.Marshal(b, m, deterministic)
+}
+func (m *OcrRequest) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_OcrRequest.Merge(m, src)
+}
+func (m *OcrRequest) XXX_Size() int {
+	return xxx_messageInfo_OcrRequest.Size(m)
+}
+func (m *OcrRequest) XXX_DiscardUnknown() {
+	xxx_messageInfo_OcrRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_OcrRequest proto.InternalMessageInfo
+
+func (m *OcrRequest) GetImage() []byte {
+	if m != nil {
+		return m.Image
+	}
+	return nil
+}
+
+type OcrResponse struct {
+	Message              string   `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *OcrResponse) Reset()         { *m = OcrResponse{} }
+func (m *OcrResponse) String() string { return proto.CompactTextString(m) }
+func (*OcrResponse) ProtoMessage()    {}
+func (*OcrResponse) Descriptor() ([]byte, []int) {
+	return fileDescriptor_52282bc2e1d222c6, []int{1}
+}
+
+func (m *OcrResponse) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_OcrResponse.Unmarshal(m, b)
+}
+func (m *OcrResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_OcrResponse.Marshal(b, m, deterministic)
+}
+func (m *OcrResponse) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_OcrResponse.Merge(m, src)
+}
+func (m *OcrResponse) XXX_Size() int {
+	return xxx_messageInfo_OcrResponse.Size(m)
+}
+func (m *OcrResponse) XXX_DiscardUnknown() {
+	xxx_messageInfo_OcrResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_OcrResponse proto.InternalMessageInfo
+
+func (m *OcrResponse) GetMessage() string {
+	if m != nil {
+		return m.Message
+	}
+	return ""
+}
+
+func init() {
+	proto.RegisterType((*OcrRequest)(nil), "proto.OcrRequest")
+	proto.RegisterType((*OcrResponse)(nil), "proto.OcrResponse")
+}
+
+func init() {
+	proto.RegisterFile("ocr.proto", fileDescriptor_52282bc2e1d222c6)
+}
+
+var fileDescriptor_52282bc2e1d222c6 = []byte{
+	// 124 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcc, 0x4f, 0x2e, 0xd2,
+	0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0x4a, 0x5c, 0x5c, 0xfe, 0xc9, 0x45,
+	0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x22, 0x5c, 0xac, 0x99, 0xb9, 0x89, 0xe9, 0xa9,
+	0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0x10, 0x8e, 0x92, 0x3a, 0x17, 0x37, 0x58, 0x4d, 0x71,
+	0x41, 0x7e, 0x5e, 0x71, 0xaa, 0x90, 0x04, 0x17, 0x7b, 0x6e, 0x6a, 0x71, 0x31, 0x4c, 0x19, 0x67,
+	0x10, 0x8c, 0x6b, 0x64, 0xcc, 0xc5, 0xec, 0x9f, 0x5c, 0x24, 0xa4, 0x03, 0xa1, 0x04, 0x21, 0x36,
+	0xe9, 0x21, 0xcc, 0x97, 0x12, 0x42, 0x16, 0x82, 0x18, 0x97, 0xc4, 0x06, 0x16, 0x32, 0x06, 0x04,
+	0x00, 0x00, 0xff, 0xff, 0xd9, 0x2d, 0xf8, 0xb2, 0x9c, 0x00, 0x00, 0x00,
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConnInterface
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion6
+
+// OcrClient is the client API for Ocr service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type OcrClient interface {
+	Ocr(ctx context.Context, in *OcrRequest, opts ...grpc.CallOption) (*OcrResponse, error)
+}
+
+type ocrClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewOcrClient(cc grpc.ClientConnInterface) OcrClient {
+	return &ocrClient{cc}
+}
+
+func (c *ocrClient) Ocr(ctx context.Context, in *OcrRequest, opts ...grpc.CallOption) (*OcrResponse, error) {
+	out := new(OcrResponse)
+	err := c.cc.Invoke(ctx, "/proto.Ocr/Ocr", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// OcrServer is the server API for Ocr service.
+type OcrServer interface {
+	Ocr(context.Context, *OcrRequest) (*OcrResponse, error)
+}
+
+// UnimplementedOcrServer can be embedded to have forward compatible implementations.
+type UnimplementedOcrServer struct {
+}
+
+func (*UnimplementedOcrServer) Ocr(ctx context.Context, req *OcrRequest) (*OcrResponse, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Ocr not implemented")
+}
+
+func RegisterOcrServer(s *grpc.Server, srv OcrServer) {
+	s.RegisterService(&_Ocr_serviceDesc, srv)
+}
+
+func _Ocr_Ocr_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(OcrRequest)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(OcrServer).Ocr(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/proto.Ocr/Ocr",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(OcrServer).Ocr(ctx, req.(*OcrRequest))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+var _Ocr_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "proto.Ocr",
+	HandlerType: (*OcrServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "Ocr",
+			Handler:    _Ocr_Ocr_Handler,
+		},
+	},
+	Streams:  []grpc.StreamDesc{},
+	Metadata: "ocr.proto",
+}

+ 252 - 0
download_file/src/serviced/proto/serverload.pb.go

@@ -0,0 +1,252 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: serverload.proto
+
+//声明 包名
+
+package proto
+
+import (
+	context "context"
+	fmt "fmt"
+	proto "github.com/golang/protobuf/proto"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+	math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+//stream请求结构
+type ServerLoadStreamReqData struct {
+	Ip                   string   `protobuf:"bytes,1,opt,name=ip,proto3" json:"ip,omitempty"`
+	Load                 float64  `protobuf:"fixed64,2,opt,name=load,proto3" json:"load,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *ServerLoadStreamReqData) Reset()         { *m = ServerLoadStreamReqData{} }
+func (m *ServerLoadStreamReqData) String() string { return proto.CompactTextString(m) }
+func (*ServerLoadStreamReqData) ProtoMessage()    {}
+func (*ServerLoadStreamReqData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_eb77c27475e52076, []int{0}
+}
+
+func (m *ServerLoadStreamReqData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_ServerLoadStreamReqData.Unmarshal(m, b)
+}
+func (m *ServerLoadStreamReqData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_ServerLoadStreamReqData.Marshal(b, m, deterministic)
+}
+func (m *ServerLoadStreamReqData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_ServerLoadStreamReqData.Merge(m, src)
+}
+func (m *ServerLoadStreamReqData) XXX_Size() int {
+	return xxx_messageInfo_ServerLoadStreamReqData.Size(m)
+}
+func (m *ServerLoadStreamReqData) XXX_DiscardUnknown() {
+	xxx_messageInfo_ServerLoadStreamReqData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ServerLoadStreamReqData proto.InternalMessageInfo
+
+func (m *ServerLoadStreamReqData) GetIp() string {
+	if m != nil {
+		return m.Ip
+	}
+	return ""
+}
+
+func (m *ServerLoadStreamReqData) GetLoad() float64 {
+	if m != nil {
+		return m.Load
+	}
+	return 0
+}
+
+//stream返回结构
+type ServerLoadStreamResData struct {
+	Code                 string   `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *ServerLoadStreamResData) Reset()         { *m = ServerLoadStreamResData{} }
+func (m *ServerLoadStreamResData) String() string { return proto.CompactTextString(m) }
+func (*ServerLoadStreamResData) ProtoMessage()    {}
+func (*ServerLoadStreamResData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_eb77c27475e52076, []int{1}
+}
+
+func (m *ServerLoadStreamResData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_ServerLoadStreamResData.Unmarshal(m, b)
+}
+func (m *ServerLoadStreamResData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_ServerLoadStreamResData.Marshal(b, m, deterministic)
+}
+func (m *ServerLoadStreamResData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_ServerLoadStreamResData.Merge(m, src)
+}
+func (m *ServerLoadStreamResData) XXX_Size() int {
+	return xxx_messageInfo_ServerLoadStreamResData.Size(m)
+}
+func (m *ServerLoadStreamResData) XXX_DiscardUnknown() {
+	xxx_messageInfo_ServerLoadStreamResData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ServerLoadStreamResData proto.InternalMessageInfo
+
+func (m *ServerLoadStreamResData) GetCode() string {
+	if m != nil {
+		return m.Code
+	}
+	return ""
+}
+
+func init() {
+	proto.RegisterType((*ServerLoadStreamReqData)(nil), "proto.ServerLoadStreamReqData")
+	proto.RegisterType((*ServerLoadStreamResData)(nil), "proto.ServerLoadStreamResData")
+}
+
+func init() {
+	proto.RegisterFile("serverload.proto", fileDescriptor_eb77c27475e52076)
+}
+
+var fileDescriptor_eb77c27475e52076 = []byte{
+	// 155 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x28, 0x4e, 0x2d, 0x2a,
+	0x4b, 0x2d, 0xca, 0xc9, 0x4f, 0x4c, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53,
+	0x4a, 0xb6, 0x5c, 0xe2, 0xc1, 0x60, 0x29, 0x9f, 0xfc, 0xc4, 0x94, 0xe0, 0x92, 0xa2, 0xd4, 0xc4,
+	0xdc, 0xa0, 0xd4, 0x42, 0x97, 0xc4, 0x92, 0x44, 0x21, 0x3e, 0x2e, 0xa6, 0xcc, 0x02, 0x09, 0x46,
+	0x05, 0x46, 0x0d, 0xce, 0x20, 0xa6, 0xcc, 0x02, 0x21, 0x21, 0x2e, 0x16, 0x90, 0x7e, 0x09, 0x26,
+	0x05, 0x46, 0x0d, 0xc6, 0x20, 0x30, 0x5b, 0x49, 0x17, 0x9b, 0xf6, 0x62, 0xb0, 0x76, 0x21, 0x2e,
+	0x96, 0xe4, 0xfc, 0x94, 0x54, 0xa8, 0x01, 0x60, 0xb6, 0x51, 0x2c, 0x17, 0x17, 0x42, 0xb9, 0x90,
+	0x3f, 0x17, 0x67, 0x40, 0x69, 0x09, 0x44, 0x97, 0x90, 0x1c, 0xc4, 0x5d, 0x7a, 0x38, 0x5c, 0x23,
+	0x85, 0x5b, 0x1e, 0x6c, 0x9d, 0x12, 0x83, 0x06, 0x63, 0x12, 0x1b, 0x58, 0x89, 0x31, 0x20, 0x00,
+	0x00, 0xff, 0xff, 0x6a, 0xae, 0x41, 0xc4, 0xee, 0x00, 0x00, 0x00,
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConnInterface
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion6
+
+// ServerLoadClient is the client API for ServerLoad service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type ServerLoadClient interface {
+	PutStream(ctx context.Context, opts ...grpc.CallOption) (ServerLoad_PutStreamClient, error)
+}
+
+type serverLoadClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewServerLoadClient(cc grpc.ClientConnInterface) ServerLoadClient {
+	return &serverLoadClient{cc}
+}
+
+func (c *serverLoadClient) PutStream(ctx context.Context, opts ...grpc.CallOption) (ServerLoad_PutStreamClient, error) {
+	stream, err := c.cc.NewStream(ctx, &_ServerLoad_serviceDesc.Streams[0], "/proto.ServerLoad/PutStream", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &serverLoadPutStreamClient{stream}
+	return x, nil
+}
+
+type ServerLoad_PutStreamClient interface {
+	Send(*ServerLoadStreamReqData) error
+	CloseAndRecv() (*ServerLoadStreamResData, error)
+	grpc.ClientStream
+}
+
+type serverLoadPutStreamClient struct {
+	grpc.ClientStream
+}
+
+func (x *serverLoadPutStreamClient) Send(m *ServerLoadStreamReqData) error {
+	return x.ClientStream.SendMsg(m)
+}
+
+func (x *serverLoadPutStreamClient) CloseAndRecv() (*ServerLoadStreamResData, error) {
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	m := new(ServerLoadStreamResData)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+// ServerLoadServer is the server API for ServerLoad service.
+type ServerLoadServer interface {
+	PutStream(ServerLoad_PutStreamServer) error
+}
+
+// UnimplementedServerLoadServer can be embedded to have forward compatible implementations.
+type UnimplementedServerLoadServer struct {
+}
+
+func (*UnimplementedServerLoadServer) PutStream(srv ServerLoad_PutStreamServer) error {
+	return status.Errorf(codes.Unimplemented, "method PutStream not implemented")
+}
+
+func RegisterServerLoadServer(s *grpc.Server, srv ServerLoadServer) {
+	s.RegisterService(&_ServerLoad_serviceDesc, srv)
+}
+
+func _ServerLoad_PutStream_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(ServerLoadServer).PutStream(&serverLoadPutStreamServer{stream})
+}
+
+type ServerLoad_PutStreamServer interface {
+	SendAndClose(*ServerLoadStreamResData) error
+	Recv() (*ServerLoadStreamReqData, error)
+	grpc.ServerStream
+}
+
+type serverLoadPutStreamServer struct {
+	grpc.ServerStream
+}
+
+func (x *serverLoadPutStreamServer) SendAndClose(m *ServerLoadStreamResData) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func (x *serverLoadPutStreamServer) Recv() (*ServerLoadStreamReqData, error) {
+	m := new(ServerLoadStreamReqData)
+	if err := x.ServerStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+var _ServerLoad_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "proto.ServerLoad",
+	HandlerType: (*ServerLoadServer)(nil),
+	Methods:     []grpc.MethodDesc{},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "PutStream",
+			Handler:       _ServerLoad_PutStream_Handler,
+			ClientStreams: true,
+		},
+	},
+	Metadata: "serverload.proto",
+}

+ 512 - 0
download_file/src/serviced/proto/service.pb.go

@@ -0,0 +1,512 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: service.proto
+
+//声明 包名
+
+package proto
+
+import (
+	context "context"
+	fmt "fmt"
+	proto "github.com/golang/protobuf/proto"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+	math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+//服务参数
+type ServiceMeta struct {
+	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	Ip                   string   `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"`
+	Port                 int32    `protobuf:"varint,3,opt,name=port,proto3" json:"port,omitempty"`
+	Workers              int32    `protobuf:"varint,4,opt,name=workers,proto3" json:"workers,omitempty"`
+	Balance              int32    `protobuf:"varint,5,opt,name=balance,proto3" json:"balance,omitempty"`
+	Meta                 string   `protobuf:"bytes,6,opt,name=meta,proto3" json:"meta,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *ServiceMeta) Reset()         { *m = ServiceMeta{} }
+func (m *ServiceMeta) String() string { return proto.CompactTextString(m) }
+func (*ServiceMeta) ProtoMessage()    {}
+func (*ServiceMeta) Descriptor() ([]byte, []int) {
+	return fileDescriptor_a0b84a42fa06f626, []int{0}
+}
+
+func (m *ServiceMeta) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_ServiceMeta.Unmarshal(m, b)
+}
+func (m *ServiceMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_ServiceMeta.Marshal(b, m, deterministic)
+}
+func (m *ServiceMeta) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_ServiceMeta.Merge(m, src)
+}
+func (m *ServiceMeta) XXX_Size() int {
+	return xxx_messageInfo_ServiceMeta.Size(m)
+}
+func (m *ServiceMeta) XXX_DiscardUnknown() {
+	xxx_messageInfo_ServiceMeta.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ServiceMeta proto.InternalMessageInfo
+
+func (m *ServiceMeta) GetName() string {
+	if m != nil {
+		return m.Name
+	}
+	return ""
+}
+
+func (m *ServiceMeta) GetIp() string {
+	if m != nil {
+		return m.Ip
+	}
+	return ""
+}
+
+func (m *ServiceMeta) GetPort() int32 {
+	if m != nil {
+		return m.Port
+	}
+	return 0
+}
+
+func (m *ServiceMeta) GetWorkers() int32 {
+	if m != nil {
+		return m.Workers
+	}
+	return 0
+}
+
+func (m *ServiceMeta) GetBalance() int32 {
+	if m != nil {
+		return m.Balance
+	}
+	return 0
+}
+
+func (m *ServiceMeta) GetMeta() string {
+	if m != nil {
+		return m.Meta
+	}
+	return ""
+}
+
+//标准字符串返回结果
+type StringRepData struct {
+	Data                 string   `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *StringRepData) Reset()         { *m = StringRepData{} }
+func (m *StringRepData) String() string { return proto.CompactTextString(m) }
+func (*StringRepData) ProtoMessage()    {}
+func (*StringRepData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_a0b84a42fa06f626, []int{1}
+}
+
+func (m *StringRepData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_StringRepData.Unmarshal(m, b)
+}
+func (m *StringRepData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_StringRepData.Marshal(b, m, deterministic)
+}
+func (m *StringRepData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_StringRepData.Merge(m, src)
+}
+func (m *StringRepData) XXX_Size() int {
+	return xxx_messageInfo_StringRepData.Size(m)
+}
+func (m *StringRepData) XXX_DiscardUnknown() {
+	xxx_messageInfo_StringRepData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StringRepData proto.InternalMessageInfo
+
+func (m *StringRepData) GetData() string {
+	if m != nil {
+		return m.Data
+	}
+	return ""
+}
+
+//标准字符串请求
+type StringReqData struct {
+	Data                 string   `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *StringReqData) Reset()         { *m = StringReqData{} }
+func (m *StringReqData) String() string { return proto.CompactTextString(m) }
+func (*StringReqData) ProtoMessage()    {}
+func (*StringReqData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_a0b84a42fa06f626, []int{2}
+}
+
+func (m *StringReqData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_StringReqData.Unmarshal(m, b)
+}
+func (m *StringReqData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_StringReqData.Marshal(b, m, deterministic)
+}
+func (m *StringReqData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_StringReqData.Merge(m, src)
+}
+func (m *StringReqData) XXX_Size() int {
+	return xxx_messageInfo_StringReqData.Size(m)
+}
+func (m *StringReqData) XXX_DiscardUnknown() {
+	xxx_messageInfo_StringReqData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StringReqData proto.InternalMessageInfo
+
+func (m *StringReqData) GetData() string {
+	if m != nil {
+		return m.Data
+	}
+	return ""
+}
+
+//
+type ApplyReqData struct {
+	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	Balance              int32    `protobuf:"varint,2,opt,name=balance,proto3" json:"balance,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *ApplyReqData) Reset()         { *m = ApplyReqData{} }
+func (m *ApplyReqData) String() string { return proto.CompactTextString(m) }
+func (*ApplyReqData) ProtoMessage()    {}
+func (*ApplyReqData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_a0b84a42fa06f626, []int{3}
+}
+
+func (m *ApplyReqData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_ApplyReqData.Unmarshal(m, b)
+}
+func (m *ApplyReqData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_ApplyReqData.Marshal(b, m, deterministic)
+}
+func (m *ApplyReqData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_ApplyReqData.Merge(m, src)
+}
+func (m *ApplyReqData) XXX_Size() int {
+	return xxx_messageInfo_ApplyReqData.Size(m)
+}
+func (m *ApplyReqData) XXX_DiscardUnknown() {
+	xxx_messageInfo_ApplyReqData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ApplyReqData proto.InternalMessageInfo
+
+func (m *ApplyReqData) GetName() string {
+	if m != nil {
+		return m.Name
+	}
+	return ""
+}
+
+func (m *ApplyReqData) GetBalance() int32 {
+	if m != nil {
+		return m.Balance
+	}
+	return 0
+}
+
+type ApplyRepData struct {
+	Ip                   string   `protobuf:"bytes,1,opt,name=ip,proto3" json:"ip,omitempty"`
+	Port                 int32    `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *ApplyRepData) Reset()         { *m = ApplyRepData{} }
+func (m *ApplyRepData) String() string { return proto.CompactTextString(m) }
+func (*ApplyRepData) ProtoMessage()    {}
+func (*ApplyRepData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_a0b84a42fa06f626, []int{4}
+}
+
+func (m *ApplyRepData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_ApplyRepData.Unmarshal(m, b)
+}
+func (m *ApplyRepData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_ApplyRepData.Marshal(b, m, deterministic)
+}
+func (m *ApplyRepData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_ApplyRepData.Merge(m, src)
+}
+func (m *ApplyRepData) XXX_Size() int {
+	return xxx_messageInfo_ApplyRepData.Size(m)
+}
+func (m *ApplyRepData) XXX_DiscardUnknown() {
+	xxx_messageInfo_ApplyRepData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ApplyRepData proto.InternalMessageInfo
+
+func (m *ApplyRepData) GetIp() string {
+	if m != nil {
+		return m.Ip
+	}
+	return ""
+}
+
+func (m *ApplyRepData) GetPort() int32 {
+	if m != nil {
+		return m.Port
+	}
+	return 0
+}
+
+func init() {
+	proto.RegisterType((*ServiceMeta)(nil), "proto.ServiceMeta")
+	proto.RegisterType((*StringRepData)(nil), "proto.StringRepData")
+	proto.RegisterType((*StringReqData)(nil), "proto.StringReqData")
+	proto.RegisterType((*ApplyReqData)(nil), "proto.ApplyReqData")
+	proto.RegisterType((*ApplyRepData)(nil), "proto.ApplyRepData")
+}
+
+func init() {
+	proto.RegisterFile("service.proto", fileDescriptor_a0b84a42fa06f626)
+}
+
+var fileDescriptor_a0b84a42fa06f626 = []byte{
+	// 279 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x50, 0xb1, 0x4e, 0xc3, 0x30,
+	0x10, 0xc5, 0xa6, 0x69, 0xc4, 0x41, 0x19, 0x0e, 0x06, 0xab, 0x53, 0x15, 0x96, 0x4e, 0x1d, 0x5a,
+	0xc1, 0xc4, 0x82, 0xd4, 0x95, 0x25, 0xfd, 0x02, 0xb7, 0x9c, 0x2a, 0x8b, 0x34, 0x36, 0xb6, 0x05,
+	0xea, 0x47, 0xf0, 0x97, 0x7c, 0x08, 0xb2, 0x9d, 0x80, 0xa1, 0x74, 0x60, 0xca, 0xf3, 0xbb, 0xf7,
+	0x72, 0xef, 0x1e, 0x8c, 0x1c, 0xd9, 0x57, 0xb5, 0xa1, 0x99, 0xb1, 0xda, 0x6b, 0x2c, 0xe2, 0xa7,
+	0x7a, 0x67, 0x70, 0xbe, 0x4a, 0x83, 0x47, 0xf2, 0x12, 0x11, 0x06, 0xad, 0xdc, 0x91, 0x60, 0x13,
+	0x36, 0x3d, 0xab, 0x23, 0xc6, 0x4b, 0xe0, 0xca, 0x08, 0x1e, 0x19, 0xae, 0x4c, 0xd0, 0x18, 0x6d,
+	0xbd, 0x38, 0x9d, 0xb0, 0x69, 0x51, 0x47, 0x8c, 0x02, 0xca, 0x37, 0x6d, 0x9f, 0xc9, 0x3a, 0x31,
+	0x88, 0x74, 0xff, 0x0c, 0x93, 0xb5, 0x6c, 0x64, 0xbb, 0x21, 0x51, 0xa4, 0x49, 0xf7, 0x0c, 0xff,
+	0xd9, 0x91, 0x97, 0x62, 0x98, 0x76, 0x05, 0x5c, 0xdd, 0xc0, 0x68, 0xe5, 0xad, 0x6a, 0xb7, 0x35,
+	0x99, 0xa5, 0x4c, 0x81, 0x9e, 0xa4, 0x97, 0x7d, 0xa0, 0x80, 0x73, 0xd1, 0xcb, 0x51, 0xd1, 0x3d,
+	0x5c, 0x3c, 0x18, 0xd3, 0xec, 0x33, 0xcd, 0xc1, 0x65, 0x59, 0x36, 0xfe, 0x23, 0x5b, 0x35, 0xff,
+	0x72, 0xa7, 0x18, 0xa9, 0x03, 0x76, 0xd0, 0x01, 0xff, 0xee, 0x60, 0xfe, 0xc1, 0xa0, 0xec, 0xba,
+	0xc4, 0x5b, 0x28, 0x6b, 0xda, 0x2a, 0xe7, 0x09, 0x31, 0x35, 0x3e, 0xcb, 0x6a, 0x1e, 0x5f, 0xf7,
+	0x5c, 0x7e, 0x6b, 0x75, 0x12, 0x6c, 0x4b, 0x72, 0x5e, 0xdb, 0xfd, 0xbf, 0x6c, 0x0b, 0x28, 0x62,
+	0x5a, 0xbc, 0xea, 0x04, 0xf9, 0xe5, 0xe3, 0x5f, 0x64, 0x6f, 0xba, 0x0b, 0x11, 0x1b, 0x92, 0x8e,
+	0xf0, 0x2f, 0xc5, 0xb1, 0x65, 0xeb, 0x61, 0xa4, 0x17, 0x9f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x98,
+	0xb6, 0x0b, 0x2e, 0x51, 0x02, 0x00, 0x00,
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConnInterface
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion6
+
+// ServiceClient is the client API for Service service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type ServiceClient interface {
+	//注册服务
+	Registe(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringRepData, error)
+	//注销服务
+	Destory(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringRepData, error)
+	//申请服务
+	Apply(ctx context.Context, in *ApplyReqData, opts ...grpc.CallOption) (*ApplyRepData, error)
+	//释放服务
+	Release(ctx context.Context, in *ApplyRepData, opts ...grpc.CallOption) (*StringRepData, error)
+}
+
+type serviceClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewServiceClient(cc grpc.ClientConnInterface) ServiceClient {
+	return &serviceClient{cc}
+}
+
+func (c *serviceClient) Registe(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringRepData, error) {
+	out := new(StringRepData)
+	err := c.cc.Invoke(ctx, "/proto.Service/Registe", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+func (c *serviceClient) Destory(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringRepData, error) {
+	out := new(StringRepData)
+	err := c.cc.Invoke(ctx, "/proto.Service/Destory", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+func (c *serviceClient) Apply(ctx context.Context, in *ApplyReqData, opts ...grpc.CallOption) (*ApplyRepData, error) {
+	out := new(ApplyRepData)
+	err := c.cc.Invoke(ctx, "/proto.Service/Apply", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+func (c *serviceClient) Release(ctx context.Context, in *ApplyRepData, opts ...grpc.CallOption) (*StringRepData, error) {
+	out := new(StringRepData)
+	err := c.cc.Invoke(ctx, "/proto.Service/Release", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// ServiceServer is the server API for Service service.
+type ServiceServer interface {
+	//注册服务
+	Registe(context.Context, *ServiceMeta) (*StringRepData, error)
+	//注销服务
+	Destory(context.Context, *ServiceMeta) (*StringRepData, error)
+	//申请服务
+	Apply(context.Context, *ApplyReqData) (*ApplyRepData, error)
+	//释放服务
+	Release(context.Context, *ApplyRepData) (*StringRepData, error)
+}
+
+// UnimplementedServiceServer can be embedded to have forward compatible implementations.
+type UnimplementedServiceServer struct {
+}
+
+func (*UnimplementedServiceServer) Registe(ctx context.Context, req *ServiceMeta) (*StringRepData, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Registe not implemented")
+}
+func (*UnimplementedServiceServer) Destory(ctx context.Context, req *ServiceMeta) (*StringRepData, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Destory not implemented")
+}
+func (*UnimplementedServiceServer) Apply(ctx context.Context, req *ApplyReqData) (*ApplyRepData, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Apply not implemented")
+}
+func (*UnimplementedServiceServer) Release(ctx context.Context, req *ApplyRepData) (*StringRepData, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Release not implemented")
+}
+
+func RegisterServiceServer(s *grpc.Server, srv ServiceServer) {
+	s.RegisterService(&_Service_serviceDesc, srv)
+}
+
+func _Service_Registe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(ServiceMeta)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(ServiceServer).Registe(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/proto.Service/Registe",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(ServiceServer).Registe(ctx, req.(*ServiceMeta))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+func _Service_Destory_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(ServiceMeta)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(ServiceServer).Destory(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/proto.Service/Destory",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(ServiceServer).Destory(ctx, req.(*ServiceMeta))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+func _Service_Apply_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(ApplyReqData)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(ServiceServer).Apply(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/proto.Service/Apply",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(ServiceServer).Apply(ctx, req.(*ApplyReqData))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+func _Service_Release_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(ApplyRepData)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(ServiceServer).Release(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/proto.Service/Release",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(ServiceServer).Release(ctx, req.(*ApplyRepData))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+var _Service_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "proto.Service",
+	HandlerType: (*ServiceServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "Registe",
+			Handler:    _Service_Registe_Handler,
+		},
+		{
+			MethodName: "Destory",
+			Handler:    _Service_Destory_Handler,
+		},
+		{
+			MethodName: "Apply",
+			Handler:    _Service_Apply_Handler,
+		},
+		{
+			MethodName: "Release",
+			Handler:    _Service_Release_Handler,
+		},
+	},
+	Streams:  []grpc.StreamDesc{},
+	Metadata: "service.proto",
+}

+ 41 - 0
download_file/src/serviced/serviced.go

@@ -0,0 +1,41 @@
+package serviced
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	service "serviced/proto"
+	. "util"
+)
+
+func GetOcrServerConn() (*grpc.ClientConn, error) {
+	conn, err := grpc.Dial(OcrServerAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		return nil, errors.New("Conn Ocr Server Error")
+	}
+	defer conn.Close()
+	ipClient := service.NewServiceClient(conn)
+	var serverIp string
+	var serverPort int
+	for i := 1; i <= 3; i++ { //重试
+		repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "file_service", Balance: 2})
+		if err != nil {
+			continue
+		} else {
+			serverIp = repl.Ip
+			serverPort = int(repl.Port)
+			break
+		}
+	}
+	if serverIp == "" || serverPort == 0 { //重试三次,ip、port失败
+		return nil, errors.New("Get Ip And Port Error")
+	}
+	addr := serverIp + ":" + fmt.Sprint(serverPort)
+	conn, err = grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		return nil, errors.New("FileText Grpc Dial Error,addr:" + addr)
+	}
+	return conn, err
+}

+ 356 - 0
download_file/src/task.go

@@ -0,0 +1,356 @@
+package main
+
+import (
+	"bytes"
+	"crypto/tls"
+	"fmt"
+	"github.com/PuerkitoBio/goquery"
+	"io"
+	"io/ioutil"
+	"mongodb"
+	"net/http"
+	qu "qfw/util"
+	"regexp"
+	"serviced"
+	sp "spiderutil"
+	"strings"
+	"sync"
+	"time"
+	. "util"
+)
+
+var (
+	htmlModelReg    = regexp.MustCompile(`{{[a-zA-z.()\d,:]{5,}}}|^(\$)`) //过滤模板语言
+	reg_filter_url  = regexp.MustCompile(`((\.\./)+|null|[。))]+$)`)
+	reg_invalid_url = regexp.MustCompile(`(^(tel)|^#[\p{Han}]+$|^[\p{Han}]+$|javascript|login|mailto|\.(jsp|jspx|aspx|home|com|cn|shtml|jhtml|chtml|html|htm)[))##/、]{0,}$)+`)
+
+	reg_fileter_text = regexp.MustCompile("([<>《》[]()()【】\\[\\]『』。;、;,\\s\u3000\u2003\u00a0]+|(\\\\n)+(\\\\t)+)")
+	reg_filetype     = regexp.MustCompile(`\.(docx|gif|jpg|doc|pdf|rar|png|zip|gz|swf|xlsx|xls|wps|jpeg)$`)
+	reg_err_filetype = regexp.MustCompile(`(\.(jtbz|jxzf|tytbz|hbz|tbyj|et|tbz|rtf|dwg|bmp|htbz|qttbz|application|zbid|pptx|gef)$|^(#_|file:))`)
+	//全匹配无效内容
+	reg_invalid_text = regexp.MustCompile(`^(\d{4}年\d{1,2}月\d{1,2}日|潜在供应商|递交|查看评论|flash插件|打印文章|收藏|请点击|更多|无|采购详细内容|申请履约保函|关于我们|返回|百度一下|登录(系统)?|查看|网站首页|(免费)?注册|其他|立即报名|我要(报价|投诉|投标|留言)|[\d.])$`)
+	//包含无效关键词
+	reg_filter_text1 = regexp.MustCompile(`(\.(jsp|jspx|aspx|home|com|cn|shtml|jhtml|chtml|html|htm)(/)?$|网站|政府|财产|得分|合同|业绩|负责人|页面|注意事项|注册|投诉|导航|登录|办理|请到原网|我要纠错|([\p{Han}]|\d)+[a-zA-z\d-]{5,}$|[上下首尾](一)?[页篇条]|跳转|详情请见原网站|详见(项目|公告)详情|原(文|公告)链接(地址)?|点击(报名|查看|查阅)(原公告(内容|详情))?|(点[击我])?(查看|查阅)(资质等级树|标的物详情|内容|公告|详情))`)
+	//以关键词结尾
+	reg_filter_text2 = regexp.MustCompile(`((公司|代理|单位|中心|采购办|机构|设计室|(事务|鉴定|研究|管理)所|(卫生|研究|法|设计|医)院|(工程|办事)[部处]|博物馆|工作站|幼儿园|学校|委员会|平台|局|队|[小中大]学)$|(\.{3}|…|管委会|指引|视频|主页|活动|指南|总结|核查|评审|投诉|磋商|调查|列表|处理|须知|审查|名单|需求书|确认书|规则|通知|评价|征询|咨询|采购意向|审计|招标|监理|监测|测量|钻探|测绘|服务|评估|公示|信息|采购|公告|勘察|施工|标段|工程|项目|编制|谈判|意见|设计|邀请函|审核|检测|(意见|建议)书?)$)`)
+
+	//修复链接
+	reg_repair_href1 = regexp.MustCompile(`^(\.\./|\./|/)+`)
+	reg_domain       = regexp.MustCompile(`((http|https)[::]//(www\.)?|www\.|WWW\.)[^/]+/`)
+	reg_domain_param = regexp.MustCompile(`((http|https)[::]//(www\.)?|www\.|WWW\.).*/`)
+
+	//附件类型
+	reg_jpg  = regexp.MustCompile(`(jpg|png|jpeg|image)`)
+	reg_docx = regexp.MustCompile(`(docx|word)`)
+	reg_doc  = regexp.MustCompile(`doc`)
+	reg_xlsx = regexp.MustCompile(`(xlsx|xls|sheet)`)
+	reg_pdf  = regexp.MustCompile(`pdf`)
+	reg_zip  = regexp.MustCompile(`zip`)
+	reg_rar  = regexp.MustCompile(`rar`)
+)
+
+type Data struct {
+	Url      string
+	Text     string
+	Ok       bool
+	By       string
+	FileType string
+}
+
+// DownloadFile 补充未下附件
+func DownloadFile() bool {
+	gtid, lteid := GetIdInterval(StartID) //获取ID段
+	if gtid == "" || lteid == "" {
+		return false
+	}
+	GetDataAndDownload(gtid, lteid) //下载
+	SendUdp(gtid, lteid, NextStype, NextAddr, NextPort)
+	return true
+}
+
+// DownloadFile 补充未下附件
+func GetDataAndDownload(gtid, lteid string) {
+	defer qu.Catch()
+	//查询数据
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  mongodb.StringTOBsonId(gtid),
+			"$lte": mongodb.StringTOBsonId(lteid),
+		},
+	}
+	//field := map[string]interface{}{
+	//	"contenthtml": 1,
+	//	"spidercode":  1,
+	//	"href":        1,
+	//	"site":        1,
+	//	"channel":     1,
+	//	"title":       1,
+	//	"competehref": 1,
+	//	"projectinfo": 1,
+	//}
+	query = map[string]interface{}{
+		"_id": mongodb.StringTOBsonId("64a216f2b44bf0875142bc1e"),
+	}
+	it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Iter()
+	n := 0
+	arr := []map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			href := qu.ObjToString(tmp["href"])
+			competehref := qu.ObjToString(tmp["competehref"])
+			if competehref != "" && competehref != "#" { //竞品链接,竞品数据过滤
+				return
+			}
+			if projectinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok && len(projectinfo) > 0 {
+				if attachments, ok := projectinfo["attachments"].(map[string]interface{}); ok && len(attachments) > 0 {
+					return
+				}
+			}
+			//1、筛选a标签
+			hrefMap := map[string]string{} //记录contenthtml中筛出的a标签信息;key:url,val:text
+			contenthtml := qu.ObjToString(tmp["contenthtml"])
+			doc, _ := goquery.NewDocumentFromReader(strings.NewReader(contenthtml))
+			doc.Find("a[href]").Each(func(index int, element *goquery.Selection) {
+				attachmentURL, _ := element.Attr("href") //链接
+				if attachmentURL != "" && !htmlModelReg.MatchString(attachmentURL) {
+					hrefMap[attachmentURL] = element.Text()
+				}
+			})
+			tmpResult := FilterAndDownload(hrefMap)                            //筛选有效附件链接
+			result, attachments, attchText := DealAndDownload(tmpResult, href) //修复链接和文本并下载附件
+			if len(attachments) > 0 {
+				tmp["file_add_log"] = result
+				if projectinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok {
+					projectinfo["attachments"] = attachments
+				} else {
+					tmp["projectinfo"] = map[string]interface{}{"attachments": attachments}
+				}
+				if len(attchText) > 0 {
+					tmp["attach_text"] = attchText
+				}
+				lock.Lock()
+				arr = append(arr, tmp)
+				if len(arr) > 100 {
+					MgoB.SaveBulk("bidding_downloadfile_log", arr...)
+					arr = []map[string]interface{}{}
+				}
+				lock.Unlock()
+			}
+		}(tmp)
+		if n%100 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoB.SaveBulk("bidding_downloadfile_log", arr...)
+		arr = []map[string]interface{}{}
+	}
+	qu.Debug("当前轮执行完毕:", gtid, lteid)
+}
+
+// FilterAndDownload 筛选有效数据并下载对应附件
+func FilterAndDownload(hrefMap map[string]string) (result []*Data) {
+	defer qu.Catch()
+	if len(hrefMap) == 0 {
+		return
+	}
+	result = []*Data{} //有效附件链接信息
+	for url, text := range hrefMap {
+		//url长度过滤
+		tmpUrl := strings.ToLower(url)
+		if len([]rune(tmpUrl)) <= 10 { //长度
+			continue
+		}
+		//url无效字符过滤
+		tmpUrl = reg_filter_url.ReplaceAllString(tmpUrl, "")
+		if tmpUrl == "" || reg_invalid_url.MatchString(tmpUrl) {
+			continue
+		}
+		tmpText := strings.ToLower(text)
+		//url、text无效附件类型过滤
+		if reg_err_filetype.MatchString(tmpUrl) || reg_err_filetype.MatchString(tmpText) { //无效附件类型
+			continue
+		}
+		tmpText = reg_fileter_text.ReplaceAllString(tmpText, "") //过滤无效字符
+		//text过滤
+		if fileType := reg_filetype.FindString(tmpUrl); fileType != "" { //含常见附件类型结尾的url
+			result = append(result, &Data{
+				Url:      url,
+				Text:     text,
+				By:       "url",
+				FileType: strings.ReplaceAll(fileType, ".", ""),
+			})
+		} else if fileType := reg_filetype.FindString(tmpText); fileType != "" { //含常见附件类型结尾的text
+			result = append(result, &Data{
+				Url:      url,
+				Text:     text,
+				By:       "text",
+				FileType: strings.ReplaceAll(fileType, ".", ""),
+			})
+		} else {
+			//textStr = reg_fileter_text.ReplaceAllString(textStr, "")  //过滤无效字符
+			if reg_invalid_text.ReplaceAllString(tmpText, "") == "" { //无效,全文本匹配,舍弃
+				continue
+			} else if reg_filter_text1.MatchString(tmpText) || reg_filter_text2.MatchString(tmpText) { //无效,部分文本匹配,舍弃
+				continue
+			}
+			result = append(result, &Data{
+				Url:  url,
+				Text: tmpText,
+				By:   "filter",
+			})
+		}
+	}
+	return
+}
+
+// DealAndDownload 修复链接和文本并下载附件
+func DealAndDownload(tmp []*Data, href string) (result []*Data, attachments, attachText map[string]interface{}) {
+	defer qu.Catch()
+	if len(tmp) == 0 {
+		return
+	}
+	attachments = map[string]interface{}{}
+	attachText = map[string]interface{}{}
+	for _, data := range tmp {
+		url := strings.ReplaceAll(data.Url, "\\", "/")
+		//异常链接修复
+		if !strings.HasPrefix(url, "https") && !strings.HasPrefix(url, "http") { //异常链接
+			if strings.HasPrefix(url, "data:image/") { //base64图片
+				//待处理TODO
+			} else {
+				url = reg_repair_href1.ReplaceAllString(url, "") //处理../ ./ /
+				//获取href域名
+				domain := reg_domain.FindString(href)
+				//var urlArr []string
+				param_domain := reg_domain_param.FindString(href)
+				if domain != "" { //优先拼接域名
+					data.Url = domain + url
+					result = append(result, data)
+				}
+				if param_domain != "" { //再拼接带参链接
+					data.Url = param_domain + url
+					result = append(result, data)
+				}
+			}
+		} else {
+			result = append(result, data)
+		}
+	}
+	if len(result) > 0 {
+		index := 0
+		for _, data := range result {
+			contentType, ret := Download(data.Url) //下载
+			fileType := data.FileType              //从url或者text提取的附件类型
+			if fileType == "" {
+				fileType = GetType(contentType, ret) //获取附件类型
+				data.FileType = fileType
+			}
+			if fileType != "" {
+				fileName := "附件" + fmt.Sprint(index+1) + "." + fileType
+				fid := sp.GetHashKey(ret) + sp.TypeByExt(fileName)
+				bs := bytes.NewReader(ret)
+				size := qu.ConvertFileSize(bs.Len())
+				b, _ := sp.OssPutObject(fid, io.MultiReader(bs)) //附件上传
+				//qu.Debug("oss", fileName, size, fileType, fid)
+				data.Ok = b
+				if b {
+					attachments[fmt.Sprint(index+1)] = map[string]interface{}{
+						"fid":      fid,
+						"filename": fileName,
+						"ftype":    fileType,
+						"org_url":  data.Url,
+						"size":     size,
+						"url":      "oss",
+					}
+					//附件解析
+					conn, err := serviced.GetOcrServerConn() //链接ocr服务治理中心
+					if err == nil {
+						resp := GetFileText(conn, fileName, fid, fileType, ret)
+						if resp != nil {
+							tmap := map[string]interface{}{}
+							for i, r := range resp.Result {
+								rmap := map[string]interface{}{
+									"file_name":  r.FileName,
+									"attach_url": r.TextUrl,
+									"state":      r.ErrorState,
+								}
+								tmap[fmt.Sprint(i)] = rmap
+							}
+							if len(tmap) > 0 {
+								attachText[fmt.Sprint(index)] = tmap
+							}
+						}
+					} else {
+						qu.Debug("附件解析服务连接失败:", err)
+					}
+					index++
+				}
+			}
+		}
+	}
+	return
+}
+
+//下载
+func Download(url string) (string, []byte) {
+	defer qu.Catch()
+	client := &http.Client{
+		Timeout: 3 * time.Minute,
+		Transport: &http.Transport{
+			TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+		},
+	}
+	req, err := http.NewRequest("GET", url, nil)
+	if err != nil {
+		//fmt.Println("Error creating request:", err)
+		return "", []byte{}
+	}
+	resp, err := client.Do(req)
+	if err != nil {
+		//fmt.Println("Error sending request:", err)
+		return "", []byte{}
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode == 200 {
+		bodyBytes, _ := ioutil.ReadAll(resp.Body)
+		return resp.Header.Get("Content-Type"), bodyBytes
+	}
+	return "", []byte{}
+}
+
+func GetType(contentType string, ret []byte) string {
+	if contentType != "" {
+		if reg_jpg.MatchString(contentType) {
+			return "jpg"
+		} else if reg_docx.MatchString(contentType) {
+			return "docx"
+		} else if reg_doc.MatchString(contentType) {
+			return "doc"
+		} else if reg_xlsx.MatchString(contentType) {
+			return "xlsx"
+		} else if reg_pdf.MatchString(contentType) {
+			return "pdf"
+		} else if reg_zip.MatchString(contentType) {
+			return "zip"
+		} else if reg_rar.MatchString(contentType) {
+			return "rar"
+		}
+	} else if len(ret) > 0 {
+		return qu.GetFileType(ret)
+	}
+	return ""
+}

+ 131 - 0
download_file/src/udp.go

@@ -0,0 +1,131 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"log"
+	mu "mfw/util"
+	"net"
+	"net/http"
+	qu "qfw/util"
+	"time"
+	. "util"
+)
+
+//存储udp,防止多个udp同时执行
+//var DataChannel = make(chan map[string]string, 50)
+type UdpNode struct {
+	Data      []byte
+	Addr      *net.UDPAddr
+	Timestamp int64
+	Retry     int
+}
+
+//初始化udp
+func InitUdp() {
+	defer qu.Catch()
+	Udpclient = mu.UdpClient{Local: UdpPort, BufSize: 1024}
+	Udpclient.Listen(processUdpMsg)
+	qu.Debug("Udp服务监听", UdpPort)
+}
+
+//udp调用信号
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	defer qu.Catch()
+	switch act {
+	case mu.OP_TYPE_DATA: //上个节点的数据
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		qu.Debug("err:", err, "mapInfo:", mapInfo)
+		stype := qu.ObjToString(mapInfo["stype"])
+		if err != nil || stype == "" {
+			Udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) //回执
+		} else if mapInfo != nil { //接收到udp,回执
+			gtid := qu.ObjToString(mapInfo["gtid"])
+			lteid := qu.ObjToString(mapInfo["lteid"])
+			Udpclient.WriteUdp([]byte(gtid+"-"+lteid+"-"+stype), mu.OP_NOOP, ra)
+			//if stype == "repeat" { //历史判重发送的udp
+			//	DataChannel <- map[string]string{
+			//		"gtid":  gtid,
+			//		"lteid": lteid,
+			//	}
+			//	//准备数据
+			//	//GetBiddingDada(gtid, lteid)
+			//} else if stype == "merge" { //将extract_redownload抽取表的数据合并到bidding_redownload
+			//	UpdateBiddingData(gtid, lteid, TmpBid, TmpExt, stype)
+			//} else if stype == "redownload" { //bidding_redownload表重新下载
+			//	Redownload(gtid, lteid, TmpBid, TmpBid, "redownload")
+			//}
+		}
+	case mu.OP_NOOP: //下个节点回应
+		ok := string(data)
+		if ok != "" {
+			qu.Debug("ok:", ok)
+			UdptaskMap.Delete(ok)
+		}
+	}
+}
+
+//发送udp的时候一个key参数
+func SendUdp(gtid, lteid, stype, udpaddr string, udpport int) {
+	defer qu.Catch()
+	key := gtid + "-" + lteid + "-" + stype
+	by, _ := json.Marshal(map[string]interface{}{
+		"gtid":  gtid,
+		"lteid": lteid,
+		"stype": stype,
+		"key":   key,
+	})
+	addr := &net.UDPAddr{
+		IP:   net.ParseIP(udpaddr),
+		Port: udpport,
+	}
+	node := &UdpNode{by, addr, time.Now().Unix(), 0}
+	UdptaskMap.Store(key, node)
+	qu.Debug("send:", stype, key)
+	Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+}
+
+func checkMapJob() {
+	//阿里云内网无法发送邮件
+	log.Println("start checkMapJob")
+	for {
+		UdptaskMap.Range(func(k, v interface{}) bool {
+			now := time.Now().Unix()
+			node, _ := v.(*UdpNode)
+			if now-node.Timestamp > 120 {
+				node.Retry++
+				if node.Retry > 10 {
+					log.Println("udp重试失败", k)
+					UdptaskMap.Delete(k)
+					res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", Api, Tomail, "downloadfile-send-fail", k.(string)))
+					if err == nil {
+						defer res.Body.Close()
+						read, err := ioutil.ReadAll(res.Body)
+						log.Println("邮件发发送:", string(read), err)
+					}
+				} else {
+					log.Println("udp重发", k)
+					Udpclient.WriteUdp(node.Data, mu.OP_TYPE_DATA, node.Addr)
+				}
+			} else if now-node.Timestamp > 10 {
+				log.Println("udp任务超时中..", k)
+			}
+			return true
+		})
+		time.Sleep(60 * time.Second)
+	}
+}
+
+//func sendMail(key string) {
+//	for i := 1; i <= 3; i++ {
+//		res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "redownload-receive-oss-fail", "发送附件识别失败:"+key))
+//		if err == nil {
+//			res.Body.Close()
+//			read, err := ioutil.ReadAll(res.Body)
+//			log.Println("邮件发送:", string(read), err)
+//			break
+//		}
+//	}
+//}

+ 54 - 0
download_file/src/util/util.go

@@ -0,0 +1,54 @@
+package util
+
+import (
+	mu "mfw/util"
+	"mongodb"
+	qu "qfw/util"
+	"sync"
+)
+
+var (
+	Config  map[string]interface{}
+	Mgo     *mongodb.MongodbSim
+	MgoB    *mongodb.MongodbSim
+	Coll    string
+	StartID string //bidding_processing_ids表ID
+	//udp
+	Udpclient mu.UdpClient //udp对象
+	UdpPort   string
+	NextAddr  string
+	NextPort  int
+	NextStype string
+	//mail
+	UdptaskMap = &sync.Map{}
+	Tomail     string
+	Api        string
+	//ocr
+	OcrServerAddr string //ocr服务治理中心
+)
+
+func GetIdInterval(id string) (gtid, lteid string) {
+	defer qu.Catch()
+	qu.Debug("获取id段...")
+	query := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt": mongodb.StringTOBsonId(id),
+		},
+		"dataprocess": 8,
+	}
+	list, _ := MgoB.Find(Coll, query, map[string]interface{}{"_id": 1}, nil, false, -1, -1)
+	dataLength := len(*list)
+	if dataLength == 0 { //无新ID段
+		return
+	}
+
+	gtid = qu.ObjToString((*list)[0]["gtid"])              //起始id
+	lteid = qu.ObjToString((*list)[dataLength-1]["lteid"]) //结束id
+	//成功获取id区间
+	if gtid < lteid {
+		StartID = mongodb.BsonIdToSId((*list)[dataLength-1]["_id"])
+		qu.Debug("当前轮ID区间:", gtid, lteid, "表ID:", StartID)
+		return
+	}
+	return "", ""
+}