Browse Source

Merge branch 'develop' of 192.168.3.17:zhanghongbo/qfw into develop

wangshan 9 years ago
parent
commit
2bed3d1199

+ 32 - 0
messageframe/src/util/client.go

@@ -0,0 +1,32 @@
+//客户端封装
+package util
+
+import (
+	"fmt"
+	"net"
+	"os"
+)
+
+//出错检测
+func checkError(err error) {
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "出错了: %s", err.Error())
+		os.Exit(1)
+	}
+}
+
+//
+func StartClient(parseEvent func([]byte), serveraddr string) (*Writer, net.Conn) {
+	tcpAddr, err := net.ResolveTCPAddr("tcp4", serveraddr)
+	checkError(err)
+
+	conn, err := net.DialTCP("tcp", nil, tcpAddr)
+	checkError(err)
+	w := NewWriter(conn)
+	//接受消息
+	messageQueue := make(chan RawData, 2000) //并发1000
+	go processMsg(messageQueue, parseEvent)
+	//从流中提取消息
+	go forwardMessage(conn, messageQueue)
+	return w, conn
+}

+ 15 - 0
messageframe/src/util/event.go

@@ -0,0 +1,15 @@
+package util
+
+const (
+	EVENT_RETURN_MACHINE_ID              = iota
+	EVENT_REQUEST_HEARTBEAT              //心跳
+	EVENT_RETURN_HEARTBEAT               //
+	EVENT_PUBLISH_MYSERVICES             //发布我的服务
+	EVENT_BROADCAST_REQUEST_SPIDER_STATE //获取爬虫状态
+
+	//------------------------
+	SERVICE_RECIVE_SPIDER_STATE = 1000 //接受爬虫状态
+	SERVICE_ECPS_SPIDER_HN      = iota //河南爬虫
+	SERVICE_ECPS_SPIDER_HB             //河北爬虫
+
+)

+ 19 - 0
messageframe/src/util/key.go

@@ -0,0 +1,19 @@
+package util
+
+import (
+	"math/rand"
+)
+
+const (
+	KEY = "abcdefghhijklmnopqrstuvwxyz0123456789ABCDEFJHIJKLMNOPQRSTUVWXYZ"
+)
+
+//
+func UUID(length int) string {
+	var ret string
+	for i := 0; i < length; i++ {
+		pos := rand.Intn(61)
+		ret += KEY[pos : pos+1]
+	}
+	return ret
+}

+ 120 - 0
messageframe/src/util/protocol.go

@@ -0,0 +1,120 @@
+package util
+
+//底层网络通信协议
+import (
+	"bufio"
+	"encoding/binary"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net"
+)
+
+//自定义数据格式,Header(4字节)+Frame
+//
+type Reader struct {
+	conn   net.Conn
+	reader *bufio.Reader
+	buffer [4]byte
+}
+
+//
+type Writer struct {
+	conn net.Conn
+}
+
+//
+type RawData []byte //流数据
+
+//
+func NewReader(c net.Conn) *Reader {
+	return &Reader{
+		conn:   c,
+		reader: bufio.NewReader(c),
+	}
+}
+
+//
+func NewWriter(c net.Conn) *Writer {
+	return &Writer{
+		conn: c,
+	}
+}
+
+//读取头部
+func (p *Reader) readHeader() (int32, error) {
+	buf := p.buffer[:4]
+	if _, err := io.ReadFull(p.reader, buf); err != nil {
+		return 0, err
+	}
+	size := int32(binary.BigEndian.Uint32(buf))
+	if size < 0 || size > 16384000 {
+		return 0, fmt.Errorf("Incorrect frame size (%d)", size)
+	}
+	return size, nil
+}
+
+//读取完整一帧数据,防止粘包
+func (p *Reader) readFrame(size int) ([]byte, error) {
+	var buf []byte
+	if size <= len(p.buffer) {
+		buf = p.buffer[0:size]
+	} else {
+		buf = make([]byte, size)
+	}
+	_, err := io.ReadFull(p.reader, buf)
+	return buf, err
+}
+
+//读取数据写入队列
+func forwardMessage(c net.Conn, queue chan<- RawData) {
+	defer c.Close()
+	logReader := NewReader(c)
+	for {
+		size, err := logReader.readHeader()
+		if err != nil {
+			break
+		}
+		data, err := logReader.readFrame(int(size))
+		if err != nil {
+			break
+		}
+		queue <- RawData(data)
+	}
+}
+
+//从队列中读取数据,定期处理
+func processMsg(q <-chan RawData, parseEvent func([]byte)) {
+	for {
+		select {
+		case msg := <-q:
+			parseEvent(msg)
+		}
+	}
+}
+
+//写数据
+func (w *Writer) Write(bs []byte) (int, error) {
+	data := Enpacket(bs)
+	return w.conn.Write(data)
+}
+
+//写入的是对象,一般用map[string]interface{}
+func (w *Writer) WriteObj(obj interface{}) (int, error) {
+	return w.conn.Write(EnpacketObj(obj))
+}
+
+//封包
+func Enpacket(bs []byte) []byte {
+	head := make([]byte, 4)
+	binary.BigEndian.PutUint32(head, uint32(len(bs)))
+	data := append(head)
+	data = append(data, bs...)
+	return data
+}
+
+//封包
+func EnpacketObj(obj interface{}) []byte {
+	bs, _ := json.Marshal(obj)
+	return Enpacket(bs)
+}

+ 32 - 0
messageframe/src/util/server.go

@@ -0,0 +1,32 @@
+//服务端封装
+package util
+
+import (
+	"fmt"
+	"net"
+)
+
+//启动服务端
+func StartServer(parseEvent func([]byte),
+	processconnection func(conn net.Conn),
+	addr string) {
+	netListen, err := net.Listen("tcp", ":6060")
+	if err != nil {
+		fmt.Println(err.Error())
+	}
+	defer netListen.Close()
+	fmt.Println("等待连接")
+	//接受消息
+	messageQueue := make(chan RawData, 2000) //并发1000
+	go processMsg(messageQueue, parseEvent)
+	for {
+		conn, err := netListen.Accept()
+		if err != nil {
+			continue
+		} else {
+			//告诉客户端ID,每次发消息都要携带自己ID
+			processconnection(conn)
+		}
+		go forwardMessage(conn, messageQueue)
+	}
+}

+ 21 - 0
messageframe/src/客户端/main.go

@@ -0,0 +1,21 @@
+package main
+
+import (
+	"net"
+	"time"
+	"util"
+)
+
+//
+var w *util.Writer //通常只使用writer,不要使用conn
+var conn net.Conn
+var my_id string //所有通信都要使用的
+var my_services []int = []int{util.SERVICE_ECPS_SPIDER_HN}
+
+func main() {
+	w, conn = util.StartClient(func(data []byte) {
+		ProcessMsg(data)
+	}, "127.0.0.1:6060")
+	defer conn.Close()
+	time.Sleep(10 * time.Minute)
+}

+ 45 - 0
messageframe/src/客户端/processmsg.go

@@ -0,0 +1,45 @@
+package main
+
+import (
+	"encoding/json"
+	"log"
+	"time"
+	"util"
+)
+
+//通用时间处理
+func ProcessMsg(msg []byte) {
+	tmp := map[string]interface{}{}
+	json.Unmarshal(msg, &tmp)
+
+	if v, ok := tmp["event"]; ok {
+		event := int(v.(float64))
+		switch event {
+		case util.EVENT_RETURN_MACHINE_ID: //服务端分配id,发布我能处理的时间
+			my_id = tmp["data"].(string)
+			w.WriteObj(map[string]interface{}{
+				"myid":  my_id,
+				"event": util.EVENT_PUBLISH_MYSERVICES,
+				"data":  my_services,
+			})
+			log.Println("服务器回应了机器ID")
+		case util.EVENT_REQUEST_HEARTBEAT: //请求的心跳,回应心跳请求
+			w.WriteObj(map[string]interface{}{
+				"myid":  my_id,
+				"event": util.EVENT_RETURN_HEARTBEAT,
+				"data":  time.Now().Unix(),
+			})
+			log.Println("对服务器回应了本机心跳")
+		case util.EVENT_BROADCAST_REQUEST_SPIDER_STATE: //回应,,请求爬虫状态
+			w.WriteObj(map[string]interface{}{
+				"myid":  my_id,
+				"event": util.SERVICE_RECIVE_SPIDER_STATE,
+				"data":  "状态良好",
+			})
+		case util.SERVICE_ECPS_SPIDER_HN: //爬取河南工商公示数据
+			log.Println("爬河南公示数据")
+		default:
+			log.Println("服务器发过来的请求,我无法处理", string(msg))
+		}
+	}
+}

+ 134 - 0
messageframe/src/服务端/main.go

@@ -0,0 +1,134 @@
+package main
+
+import (
+	"encoding/json"
+	"log"
+	"math/rand"
+	"net"
+	"sync"
+	"time"
+	"util"
+)
+
+type Client struct {
+	conn      net.Conn
+	timestamp int64
+}
+
+var lock sync.Mutex
+
+//所有的请求
+var allclient map[string]*Client = make(map[string]*Client)
+
+//服务与提供者对应表
+var allservice map[int][]string = make(map[int][]string)
+
+//
+
+//心跳检测,每隔20秒检测一次
+func GC() {
+	now := time.Now().Unix()
+	for k, v := range allclient {
+		if now-v.timestamp > 12 {
+			//超过40秒未回应心跳
+			v.conn.Close()
+			removeClient(k)
+			continue
+		}
+		_, err := v.conn.Write(util.EnpacketObj(map[string]interface{}{
+			"event": util.EVENT_REQUEST_HEARTBEAT,
+		}))
+		if err != nil { //发心跳包出错
+			v.conn.Close()
+			removeClient(k)
+		}
+	}
+	time.AfterFunc(4*time.Second, GC)
+}
+
+//删除服务节点
+func removeClient(myid string) {
+	lock.Lock()
+	delete(allclient, myid)
+	for k, v := range allservice {
+		for j, smid := range v {
+			if smid == myid {
+				allservice[k] = append(v[:j], v[j+1:]...)
+				break
+			}
+		}
+	}
+	lock.Unlock()
+	log.Println("删除节点", myid, allservice)
+}
+
+//处理客户端发过来的消息
+func processmsg(msg []byte) {
+	tmp := map[string]interface{}{}
+	json.Unmarshal(msg, &tmp)
+	my_id := tmp["myid"].(string)
+	if v, ok := tmp["event"]; ok {
+		event := int(v.(float64))
+		switch event {
+		//TODO 只写需要特殊处理的时间,其他都走default
+		case util.EVENT_RETURN_HEARTBEAT: //心跳回应包处理
+			allclient[my_id].timestamp = int64(tmp["data"].(float64))
+			log.Println("更新", my_id, "的心跳时间")
+		case util.EVENT_PUBLISH_MYSERVICES: //客户端发布了自己的服务
+			services := tmp["data"].([]interface{}) //一个客户端提供多个可处理的服务
+			for _, v := range services {
+				service := int(v.(float64))
+				allservice[service] = append(allservice[service], my_id)
+			}
+			log.Println("所有服务", allservice)
+		case util.EVENT_BROADCAST_REQUEST_SPIDER_STATE: //要个爬虫状态
+			data := util.Enpacket(msg)
+			for service_machine_id, v := range allclient { //发所有,不支持的不处理
+				if service_machine_id == my_id { //广播不用发给自己
+					continue
+				}
+				v.conn.Write(data)
+			}
+		case util.SERVICE_RECIVE_SPIDER_STATE: //爬虫的回应,发给所有监控端
+			bs := util.Enpacket(msg)
+			if v, ok := allservice[event]; ok {
+				for _, service_machine_id := range v {
+					if service_machine_id == my_id { //广播不用发给自己
+						continue
+					}
+					allclient[service_machine_id].conn.Write(bs)
+				}
+			}
+		default: //处理业务事件
+			if v, ok := allservice[event]; ok {
+				//随机选择一个节点提供服务,允许出错尝试3次
+				bs := util.Enpacket(msg)
+				for i := 0; i < 3; i++ {
+					service_machine_id := v[rand.Intn(len(v))]
+					_, err := allclient[service_machine_id].conn.Write(bs)
+					if err == nil {
+						break
+					} else {
+						removeClient(service_machine_id)
+					}
+				}
+			}
+		}
+	}
+}
+
+//
+func main() {
+	//心跳检测
+	go GC()
+	//启动服务
+	util.StartServer(func(data []byte) {
+		//接受消息处理
+		processmsg(data)
+	}, func(c net.Conn) { //连接后返回UUID
+		uuid := util.UUID(32)
+		c.Write(util.EnpacketObj(map[string]interface{}{"event": util.EVENT_RETURN_MACHINE_ID, "data": uuid}))
+		allclient[uuid] = &Client{conn: c, timestamp: time.Now().Unix()}
+	}, ":6060")
+
+}

+ 47 - 0
messageframe/src/爬虫监控/main.go

@@ -0,0 +1,47 @@
+package main
+
+import (
+	"flag"
+	"net"
+	"time"
+	"util"
+)
+
+//
+var w *util.Writer //通常只使用writer,不要使用conn
+var conn net.Conn
+var my_id string                                                //所有通信都要使用的
+var my_services []int = []int{util.SERVICE_RECIVE_SPIDER_STATE} //接受爬虫状态
+
+func GC() {
+	w.WriteObj(map[string]interface{}{
+		"myid":  my_id,
+		"event": util.EVENT_BROADCAST_REQUEST_SPIDER_STATE,
+	})
+	time.AfterFunc(20*time.Second, GC) //20秒查一次爬虫状态
+}
+
+func main() {
+	var serveraddr string
+	var dogc bool
+	flag.StringVar(&serveraddr, "s", "127.0.0.1:6060", "服务端地址")
+	flag.BoolVar(&dogc, "g", false, "是否运行GC,主监控控制gc就可以了,其他点,可以制作查看")
+	flag.Parse()
+	w, conn = util.StartClient(func(data []byte) {
+		ProcessMsg(data)
+	}, serveraddr)
+	defer conn.Close()
+	time.Sleep(10 * time.Second) //等,回应machineid
+	if dogc {
+		go GC()
+	}
+	//模拟发100个河南爬虫任务
+	for i := 0; i < 100; i++ { //
+		w.WriteObj(map[string]interface{}{
+			"myid":  my_id,
+			"event": util.SERVICE_ECPS_SPIDER_HN,
+			"data":  "下载数据要携带的参数",
+		})
+	}
+	time.Sleep(10 * time.Minute)
+}

+ 38 - 0
messageframe/src/爬虫监控/processmsg.go

@@ -0,0 +1,38 @@
+package main
+
+import (
+	"encoding/json"
+	"log"
+	"time"
+	"util"
+)
+
+//通用时间处理
+func ProcessMsg(msg []byte) {
+	tmp := map[string]interface{}{}
+	json.Unmarshal(msg, &tmp)
+	if v, ok := tmp["event"]; ok {
+		event := int(v.(float64))
+		switch event {
+		case util.EVENT_RETURN_MACHINE_ID: //服务端分配id,发布我能处理的时间
+			my_id = tmp["data"].(string)
+			w.WriteObj(map[string]interface{}{
+				"myid":  my_id,
+				"event": util.EVENT_PUBLISH_MYSERVICES,
+				"data":  my_services,
+			})
+			log.Println("服务器回应了机器ID")
+		case util.EVENT_REQUEST_HEARTBEAT: //请求的心跳,回应心跳请求
+			w.WriteObj(map[string]interface{}{
+				"myid":  my_id,
+				"event": util.EVENT_RETURN_HEARTBEAT,
+				"data":  time.Now().Unix(),
+			})
+			log.Println("对服务器回应了本机心跳")
+		case util.SERVICE_RECIVE_SPIDER_STATE: //获得各个爬虫状态
+			log.Println("各个爬虫状态,", string(msg))
+		default:
+			log.Println("服务器发过来的请求,我无法处理", string(msg))
+		}
+	}
+}