张金坤 8 лет назад
Родитель
Сommit
593c20e86d

+ 10 - 0
src/jfw/modules/behaviorcollect/src/config.json

@@ -0,0 +1,10 @@
+{
+    "rpcaddr": ":8083",
+    "webaddr": ":8084",
+    "insertthreads": 5,
+    "cachesize": 50000,
+    "influxdb": "https://jianyu:Topnet@20150501@wxlmjy.qmx.top:443",
+    "log_model": {
+        "剑鱼实验室": "/jylab/.*"
+    }
+}

+ 87 - 0
src/jfw/modules/behaviorcollect/src/main.go

@@ -0,0 +1,87 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"net/rpc"
+	"os"
+	"parse"
+	"writer"
+)
+
+//
+type config struct {
+	InfluxdbAddr  string                 `json:"influxdb"`
+	InsertThreads int                    `json:"insertthreads"`
+	RpcAddr       string                 `json:"rpcaddr"`
+	Cachesize     int                    `json:"cachesize"`
+	WebAddr       string                 `json:"webaddr"`
+	Log_model     map[string]interface{} `json:"log_model"`
+}
+
+//
+type RpcService struct {
+}
+
+//
+func (rs *RpcService) Log(args string, reply *string, req *http.Request) error {
+	go processrequest(args, req)
+	return nil
+}
+
+var c config
+var list map[string]parse.Face
+
+//
+func init() {
+	fi, _ := os.Open("./config.json")
+	bs, _ := ioutil.ReadAll(fi)
+	fi.Close()
+	json.Unmarshal(bs, &c)
+	//初始化写入工具类
+	writer.InitWriter(c.InfluxdbAddr, c.InsertThreads, c.Cachesize)
+	//
+	list = map[string]parse.Face{
+		"accesslog": parse.AccessLogParse{},
+	}
+	parse.AccessLogInit(c.Log_model)
+}
+
+//
+func processrequest(jsonstr string, req *http.Request) {
+	data := map[string]interface{}{}
+	json.Unmarshal([]byte(jsonstr), &data)
+	model := data["model"].(string)
+	if v, ok := list[model]; ok {
+		switch model {
+		case "accesslog":
+			writer.Write("jy_logs", "30d", v.Parse(data, req))
+
+		}
+	}
+}
+
+//
+func main() {
+	//TODO 开通RPC ajax服务
+	http.HandleFunc("/ajax", func(w http.ResponseWriter, req *http.Request) {
+		//
+		jsontxt := req.FormValue("json") //post方式传过来
+		processrequest(jsontxt, req)
+	})
+	http.ListenAndServe(c.WebAddr, nil)
+	//开通json rpc服务
+	newServer := rpc.NewServer()
+	newServer.Register(new(RpcService))
+	l, err := net.Listen("tcp", c.RpcAddr) // any available address
+	if err != nil {
+		fmt.Println(err.Error())
+	}
+	go newServer.Accept(l)
+	//
+	lock := make(chan bool)
+	<-lock
+}

+ 3 - 0
src/jfw/modules/behaviorcollect/src/parse/README.md

@@ -0,0 +1,3 @@
+依据请求的模块,
+根据传入的map对象,
+转化成point

+ 152 - 0
src/jfw/modules/behaviorcollect/src/parse/accesslog.go

@@ -0,0 +1,152 @@
+package parse
+
+import (
+	"fmt"
+	"net"
+	"net/http"
+	"regexp"
+	"strings"
+	"time"
+
+	"github.com/influxdata/influxdb-client"
+)
+
+//用户访问日志转换
+type AccessLogParse struct {
+}
+
+//设置tag标签
+var tagtitle = map[string]interface{}{
+	"s_os":     true,
+	"s_browse": true,
+	"s_model":  true,
+}
+var log_model = map[string]*regexp.Regexp{}
+
+//
+func (alp AccessLogParse) Parse(data map[string]interface{}, req *http.Request) *influxdb.Point {
+	//补充基本数据
+	data = AddBasicData(data, req)
+	fields := map[string]interface{}{}
+	tags := []influxdb.Tag{}
+	for k, v := range data {
+		if tagtitle[k] == nil {
+			fields[k] = v
+		} else {
+			tag := influxdb.Tag{Key: k, Value: fmt.Sprint(v)}
+			tags = append(tags, tag)
+		}
+	}
+
+	ponit := &influxdb.Point{
+		Name:   "accesslog",
+		Tags:   tags,
+		Fields: fields,
+		Time:   time.Now(),
+	}
+	return ponit
+
+}
+
+//
+func AccessLogInit(model map[string]interface{}) {
+	//TODO 加载IP转换,url模块对应
+	for k, v := range model {
+		reg, _ := regexp.Compile(fmt.Sprint(v))
+		log_model[k] = reg
+	}
+}
+
+//补充基本数据字段
+func AddBasicData(data map[string]interface{}, req *http.Request) map[string]interface{} {
+	agent := req.Header.Get("user-agent")
+	data["s_os"] = GetOS(agent)
+	data["s_ip"] = GetIp(req)
+	data["s_browse"] = GetBrowse(agent)
+	data["s_refer"] = req.Referer()
+	data["s_client"] = agent
+	data["s_model"] = GetModel(fmt.Sprint(data["s_url"]))
+	delete(data, "model")
+	return data
+}
+
+//根据url获取所属模块
+func GetModel(url string) string {
+	s_model := "其他"
+	for k, v := range log_model {
+		b := v.MatchString(url)
+		if b {
+			s_model = k
+			break
+		}
+	}
+	return s_model
+}
+
+//获取平台类型
+func GetOS(useros string) string {
+	osVersion := "其他"
+	if strings.Contains(useros, "NT 6.0") {
+		osVersion = "Windows Vista/Server 2008"
+	} else if strings.Contains(useros, "NT 5.2") {
+		osVersion = "Windows Server 2003"
+	} else if strings.Contains(useros, "NT 5.1") {
+		osVersion = "Windows XP"
+	} else if strings.Contains(useros, "NT 5") {
+		osVersion = "Windows 2000"
+	} else if strings.Contains(useros, "Mac") {
+		osVersion = "Mac"
+	} else if strings.Contains(useros, "Unix") {
+		osVersion = "UNIX"
+	} else if strings.Contains(useros, "Linux") {
+		osVersion = "Linux"
+	} else if strings.Contains(useros, "SunOS") {
+		osVersion = "SunOS"
+	} else if strings.Contains(useros, "NT 6.3") {
+		osVersion = "Window8"
+	} else if strings.Contains(useros, "NT 6.1") {
+		osVersion = "Window7"
+	} else if strings.Contains(useros, "NT 10.0") {
+		osVersion = "Window10"
+	}
+	return osVersion
+}
+
+//获取浏览器类型
+func GetBrowse(userbrowser string) string {
+	browserVersion := "其他"
+	if strings.Contains(userbrowser, "MSIE") {
+		browserVersion = "IE"
+	} else if strings.Contains(userbrowser, "Firefox") {
+		browserVersion = "Firefox"
+	} else if strings.Contains(userbrowser, "Chrome") {
+		browserVersion = "Chrome"
+	} else if strings.Contains(userbrowser, "Safari") {
+		browserVersion = "Safari"
+	} else if strings.Contains(userbrowser, "rv:11.0") {
+		browserVersion = "IE11"
+	}
+	return browserVersion
+}
+
+//获取ip
+func GetIp(req *http.Request) string {
+	if req == nil {
+		return ""
+	}
+	ip_for := req.Header.Get("x-forwarded-for")
+	ip_client := req.Header.Get("http_client_ip")
+	ip_addr := req.Header.Get("Remote_addr")
+	un := "unknown"
+	if (ip_for != un) && (len(strings.TrimSpace(ip_for)) > 0) {
+		return ip_for
+	}
+	if (ip_client != un) && (len(strings.TrimSpace(ip_client)) > 0) {
+		return ip_client
+	}
+	if (ip_addr != un) && (len(strings.TrimSpace(ip_addr)) > 0) {
+		return ip_addr
+	}
+	ip, _, _ := net.SplitHostPort(req.RemoteAddr)
+	return ip
+}

+ 13 - 0
src/jfw/modules/behaviorcollect/src/parse/parse.go

@@ -0,0 +1,13 @@
+package parse
+
+import (
+	"net/http"
+
+	"github.com/influxdata/influxdb-client"
+)
+
+//每个转换对象都要实现的接口
+type Face interface {
+	//
+	Parse(data map[string]interface{}, req *http.Request) *influxdb.Point
+}

+ 20 - 0
src/jfw/modules/behaviorcollect/src/util/catch.go

@@ -0,0 +1,20 @@
+package util
+
+import (
+	"log"
+	"runtime"
+)
+
+//出错拦截
+func Catch() {
+	if r := recover(); r != nil {
+		log.Println("err:", r)
+		for skip := 0; ; skip++ {
+			_, file, line, ok := runtime.Caller(skip)
+			if !ok {
+				break
+			}
+			go log.Printf("%v,%v\n", file, line)
+		}
+	}
+}

+ 60 - 0
src/jfw/modules/behaviorcollect/src/writer/writer.go

@@ -0,0 +1,60 @@
+package writer
+
+import "github.com/influxdata/influxdb-client"
+
+//带缓冲区的多并发写入
+//
+type Item struct {
+	Rp string
+	P  *influxdb.Point
+	Db string
+}
+
+var cache chan *Item
+var lock chan bool
+var influxdbaddr string
+//
+func InitWriter(dbaddr string, threads,cachesize int) {
+	influxdbaddr = dbaddr
+	lock = make(chan bool,threads)
+	cache = make(chan *Item, cachesize)
+	go consumecache()
+	return
+}
+
+//
+func Write(db string, rp string, p *influxdb.Point) {
+	cache <- &Item{
+		Db: db,
+		Rp: rp,
+		P:  p,
+	}
+}
+
+//插入数据守护
+func consumecache() {
+	for {
+		select {
+		case p := <-cache:
+			lock<-true
+			go doinsert(p,lock)
+		}
+	}
+}
+
+//
+func doinsert(p *Item,l <-chan bool) {
+	defer clearlock(l)
+	client, err := influxdb.NewClient(influxdbaddr)
+	if err != nil {
+		return
+	}
+	w := client.Writer()
+	w.Database = p.Db
+	w.RetentionPolicy = p.Rp
+	p.P.WriteTo(w)
+}
+//
+func clearlock( l <-chan bool ){
+	<-l
+}