|
@@ -0,0 +1,62 @@
|
|
|
|
+package util
|
|
|
|
+
|
|
|
|
+import (
|
|
|
|
+ "github.com/influxdata/influxdb-client"
|
|
|
|
+ "log"
|
|
|
|
+ "time"
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+var influxdburl string
|
|
|
|
+
|
|
|
|
+//
|
|
|
|
+func InitInfluxdb(url string) {
|
|
|
|
+ log.Println("influxdburl:", url)
|
|
|
|
+ influxdburl = url
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+//
|
|
|
|
+func InsertInto(dbname string, measurement string, tags []influxdb.Tag, fields map[string]interface{}, timestamp time.Time, rp /*数据保留策略,默认autogen*/ string) error {
|
|
|
|
+ client, err := influxdb.NewClient(influxdburl)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ writer := client.Writer()
|
|
|
|
+ writer.Database = dbname
|
|
|
|
+ writer.RetentionPolicy = rp
|
|
|
|
+ pt := influxdb.Point{
|
|
|
|
+ Name: measurement,
|
|
|
|
+ Tags: tags,
|
|
|
|
+ Fields: fields,
|
|
|
|
+ Time: timestamp,
|
|
|
|
+ }
|
|
|
|
+ if _, err := pt.WriteTo(writer); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+//查询接口
|
|
|
|
+func Search(dbname string, fn func(row influxdb.Row) error, query string, queryoption ...influxdb.QueryOption) error {
|
|
|
|
+ client, err := influxdb.NewClient(influxdburl)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ querier := client.Querier()
|
|
|
|
+ querier.Database = dbname
|
|
|
|
+ cur, err := querier.Select(query, queryoption...)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ defer cur.Close()
|
|
|
|
+ result, err := cur.NextSet()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ series, err := result.NextSeries()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ influxdb.EachRow(series, fn)
|
|
|
|
+ return nil
|
|
|
|
+}
|