فهرست منبع

influxdb数据保存和检索

wangshan 8 سال پیش
والد
کامیت
303f935166
1فایلهای تغییر یافته به همراه62 افزوده شده و 0 حذف شده
  1. 62 0
      common/src/qfw/util/influxdb.go

+ 62 - 0
common/src/qfw/util/influxdb.go

@@ -0,0 +1,62 @@
+package util
+
+import (
+	"github.com/influxdata/influxdb-client"
+	"log"
+	"time"
+)
+
+var influxdburl string
+
+//
+func InitInfluxdb(url string) {
+	log.Println("influxdburl:", url)
+	influxdburl = url
+}
+
+//
+func InsertInto(dbname string, measurement string, tags []influxdb.Tag, fields map[string]interface{}, timestamp time.Time, rp /*数据保留策略,默认autogen*/ string) error {
+	client, err := influxdb.NewClient(influxdburl)
+	if err != nil {
+		return err
+	}
+	writer := client.Writer()
+	writer.Database = dbname
+	writer.RetentionPolicy = rp
+	pt := influxdb.Point{
+		Name:   measurement,
+		Tags:   tags,
+		Fields: fields,
+		Time:   timestamp,
+	}
+	if _, err := pt.WriteTo(writer); err != nil {
+		return err
+	}
+	return nil
+}
+
+//查询接口
+func Search(dbname string, fn func(row influxdb.Row) error, query string, queryoption ...influxdb.QueryOption) error {
+	client, err := influxdb.NewClient(influxdburl)
+	if err != nil {
+		return err
+	}
+
+	querier := client.Querier()
+	querier.Database = dbname
+	cur, err := querier.Select(query, queryoption...)
+	if err != nil {
+		return err
+	}
+	defer cur.Close()
+	result, err := cur.NextSet()
+	if err != nil {
+		return err
+	}
+	series, err := result.NextSeries()
+	if err != nil {
+		return err
+	}
+	influxdb.EachRow(series, fn)
+	return nil
+}