influxdb.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789
  1. package client
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net"
  11. "net/http"
  12. "net/url"
  13. "strconv"
  14. "strings"
  15. "time"
  16. "github.com/influxdata/influxdb/models"
  17. )
  18. const (
  19. // DefaultHost is the default host used to connect to an InfluxDB instance
  20. DefaultHost = "localhost"
  21. // DefaultPort is the default port used to connect to an InfluxDB instance
  22. DefaultPort = 8086
  23. // DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
  24. DefaultTimeout = 0
  25. )
  26. // Query is used to send a command to the server. Both Command and Database are required.
  27. type Query struct {
  28. Command string
  29. Database string
  30. // Chunked tells the server to send back chunked responses. This places
  31. // less load on the server by sending back chunks of the response rather
  32. // than waiting for the entire response all at once.
  33. Chunked bool
  34. // ChunkSize sets the maximum number of rows that will be returned per
  35. // chunk. Chunks are either divided based on their series or if they hit
  36. // the chunk size limit.
  37. //
  38. // Chunked must be set to true for this option to be used.
  39. ChunkSize int
  40. }
  41. // ParseConnectionString will parse a string to create a valid connection URL
  42. func ParseConnectionString(path string, ssl bool) (url.URL, error) {
  43. var host string
  44. var port int
  45. h, p, err := net.SplitHostPort(path)
  46. if err != nil {
  47. if path == "" {
  48. host = DefaultHost
  49. } else {
  50. host = path
  51. }
  52. // If they didn't specify a port, always use the default port
  53. port = DefaultPort
  54. } else {
  55. host = h
  56. port, err = strconv.Atoi(p)
  57. if err != nil {
  58. return url.URL{}, fmt.Errorf("invalid port number %q: %s\n", path, err)
  59. }
  60. }
  61. u := url.URL{
  62. Scheme: "http",
  63. }
  64. if ssl {
  65. u.Scheme = "https"
  66. }
  67. u.Host = net.JoinHostPort(host, strconv.Itoa(port))
  68. return u, nil
  69. }
  70. // Config is used to specify what server to connect to.
  71. // URL: The URL of the server connecting to.
  72. // Username/Password are optional. They will be passed via basic auth if provided.
  73. // UserAgent: If not provided, will default "InfluxDBClient",
  74. // Timeout: If not provided, will default to 0 (no timeout)
  75. type Config struct {
  76. URL url.URL
  77. Username string
  78. Password string
  79. UserAgent string
  80. Timeout time.Duration
  81. Precision string
  82. UnsafeSsl bool
  83. }
  84. // NewConfig will create a config to be used in connecting to the client
  85. func NewConfig() Config {
  86. return Config{
  87. Timeout: DefaultTimeout,
  88. }
  89. }
  90. // Client is used to make calls to the server.
  91. type Client struct {
  92. url url.URL
  93. username string
  94. password string
  95. httpClient *http.Client
  96. userAgent string
  97. precision string
  98. }
  99. const (
  100. // ConsistencyOne requires at least one data node acknowledged a write.
  101. ConsistencyOne = "one"
  102. // ConsistencyAll requires all data nodes to acknowledge a write.
  103. ConsistencyAll = "all"
  104. // ConsistencyQuorum requires a quorum of data nodes to acknowledge a write.
  105. ConsistencyQuorum = "quorum"
  106. // ConsistencyAny allows for hinted hand off, potentially no write happened yet.
  107. ConsistencyAny = "any"
  108. )
  109. // NewClient will instantiate and return a connected client to issue commands to the server.
  110. func NewClient(c Config) (*Client, error) {
  111. tlsConfig := &tls.Config{
  112. InsecureSkipVerify: c.UnsafeSsl,
  113. }
  114. tr := &http.Transport{
  115. TLSClientConfig: tlsConfig,
  116. }
  117. client := Client{
  118. url: c.URL,
  119. username: c.Username,
  120. password: c.Password,
  121. httpClient: &http.Client{Timeout: c.Timeout, Transport: tr},
  122. userAgent: c.UserAgent,
  123. precision: c.Precision,
  124. }
  125. if client.userAgent == "" {
  126. client.userAgent = "InfluxDBClient"
  127. }
  128. return &client, nil
  129. }
  130. // SetAuth will update the username and passwords
  131. func (c *Client) SetAuth(u, p string) {
  132. c.username = u
  133. c.password = p
  134. }
  135. // SetPrecision will update the precision
  136. func (c *Client) SetPrecision(precision string) {
  137. c.precision = precision
  138. }
  139. // Query sends a command to the server and returns the Response
  140. func (c *Client) Query(q Query) (*Response, error) {
  141. u := c.url
  142. u.Path = "query"
  143. values := u.Query()
  144. values.Set("q", q.Command)
  145. values.Set("db", q.Database)
  146. if q.Chunked {
  147. values.Set("chunked", "true")
  148. if q.ChunkSize > 0 {
  149. values.Set("chunk_size", strconv.Itoa(q.ChunkSize))
  150. }
  151. }
  152. if c.precision != "" {
  153. values.Set("epoch", c.precision)
  154. }
  155. u.RawQuery = values.Encode()
  156. req, err := http.NewRequest("POST", u.String(), nil)
  157. if err != nil {
  158. return nil, err
  159. }
  160. req.Header.Set("User-Agent", c.userAgent)
  161. if c.username != "" {
  162. req.SetBasicAuth(c.username, c.password)
  163. }
  164. resp, err := c.httpClient.Do(req)
  165. if err != nil {
  166. return nil, err
  167. }
  168. defer resp.Body.Close()
  169. var response Response
  170. if q.Chunked {
  171. cr := NewChunkedResponse(resp.Body)
  172. for {
  173. r, err := cr.NextResponse()
  174. if err != nil {
  175. // If we got an error while decoding the response, send that back.
  176. return nil, err
  177. }
  178. if r == nil {
  179. break
  180. }
  181. response.Results = append(response.Results, r.Results...)
  182. if r.Err != nil {
  183. response.Err = r.Err
  184. break
  185. }
  186. }
  187. } else {
  188. dec := json.NewDecoder(resp.Body)
  189. dec.UseNumber()
  190. if err := dec.Decode(&response); err != nil {
  191. // Ignore EOF errors if we got an invalid status code.
  192. if !(err == io.EOF && resp.StatusCode != http.StatusOK) {
  193. return nil, err
  194. }
  195. }
  196. }
  197. // If we don't have an error in our json response, and didn't get StatusOK,
  198. // then send back an error.
  199. if resp.StatusCode != http.StatusOK && response.Error() == nil {
  200. return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
  201. }
  202. return &response, nil
  203. }
  204. // Write takes BatchPoints and allows for writing of multiple points with defaults
  205. // If successful, error is nil and Response is nil
  206. // If an error occurs, Response may contain additional information if populated.
  207. func (c *Client) Write(bp BatchPoints) (*Response, error) {
  208. u := c.url
  209. u.Path = "write"
  210. var b bytes.Buffer
  211. for _, p := range bp.Points {
  212. err := checkPointTypes(p)
  213. if err != nil {
  214. return nil, err
  215. }
  216. if p.Raw != "" {
  217. if _, err := b.WriteString(p.Raw); err != nil {
  218. return nil, err
  219. }
  220. } else {
  221. for k, v := range bp.Tags {
  222. if p.Tags == nil {
  223. p.Tags = make(map[string]string, len(bp.Tags))
  224. }
  225. p.Tags[k] = v
  226. }
  227. if _, err := b.WriteString(p.MarshalString()); err != nil {
  228. return nil, err
  229. }
  230. }
  231. if err := b.WriteByte('\n'); err != nil {
  232. return nil, err
  233. }
  234. }
  235. req, err := http.NewRequest("POST", u.String(), &b)
  236. if err != nil {
  237. return nil, err
  238. }
  239. req.Header.Set("Content-Type", "")
  240. req.Header.Set("User-Agent", c.userAgent)
  241. if c.username != "" {
  242. req.SetBasicAuth(c.username, c.password)
  243. }
  244. precision := bp.Precision
  245. if precision == "" {
  246. precision = c.precision
  247. }
  248. params := req.URL.Query()
  249. params.Set("db", bp.Database)
  250. params.Set("rp", bp.RetentionPolicy)
  251. params.Set("precision", precision)
  252. params.Set("consistency", bp.WriteConsistency)
  253. req.URL.RawQuery = params.Encode()
  254. resp, err := c.httpClient.Do(req)
  255. if err != nil {
  256. return nil, err
  257. }
  258. defer resp.Body.Close()
  259. var response Response
  260. body, err := ioutil.ReadAll(resp.Body)
  261. if err != nil {
  262. return nil, err
  263. }
  264. if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
  265. var err = fmt.Errorf(string(body))
  266. response.Err = err
  267. return &response, err
  268. }
  269. return nil, nil
  270. }
  271. // WriteLineProtocol takes a string with line returns to delimit each write
  272. // If successful, error is nil and Response is nil
  273. // If an error occurs, Response may contain additional information if populated.
  274. func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error) {
  275. u := c.url
  276. u.Path = "write"
  277. r := strings.NewReader(data)
  278. req, err := http.NewRequest("POST", u.String(), r)
  279. if err != nil {
  280. return nil, err
  281. }
  282. req.Header.Set("Content-Type", "")
  283. req.Header.Set("User-Agent", c.userAgent)
  284. if c.username != "" {
  285. req.SetBasicAuth(c.username, c.password)
  286. }
  287. params := req.URL.Query()
  288. params.Set("db", database)
  289. params.Set("rp", retentionPolicy)
  290. params.Set("precision", precision)
  291. params.Set("consistency", writeConsistency)
  292. req.URL.RawQuery = params.Encode()
  293. resp, err := c.httpClient.Do(req)
  294. if err != nil {
  295. return nil, err
  296. }
  297. defer resp.Body.Close()
  298. var response Response
  299. body, err := ioutil.ReadAll(resp.Body)
  300. if err != nil {
  301. return nil, err
  302. }
  303. if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
  304. err := fmt.Errorf(string(body))
  305. response.Err = err
  306. return &response, err
  307. }
  308. return nil, nil
  309. }
  310. // Ping will check to see if the server is up
  311. // Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
  312. func (c *Client) Ping() (time.Duration, string, error) {
  313. now := time.Now()
  314. u := c.url
  315. u.Path = "ping"
  316. req, err := http.NewRequest("GET", u.String(), nil)
  317. if err != nil {
  318. return 0, "", err
  319. }
  320. req.Header.Set("User-Agent", c.userAgent)
  321. if c.username != "" {
  322. req.SetBasicAuth(c.username, c.password)
  323. }
  324. resp, err := c.httpClient.Do(req)
  325. if err != nil {
  326. return 0, "", err
  327. }
  328. defer resp.Body.Close()
  329. version := resp.Header.Get("X-Influxdb-Version")
  330. return time.Since(now), version, nil
  331. }
  332. // Structs
  333. // Message represents a user message.
  334. type Message struct {
  335. Level string `json:"level,omitempty"`
  336. Text string `json:"text,omitempty"`
  337. }
  338. // Result represents a resultset returned from a single statement.
  339. type Result struct {
  340. Series []models.Row
  341. Messages []*Message
  342. Err error
  343. }
  344. // MarshalJSON encodes the result into JSON.
  345. func (r *Result) MarshalJSON() ([]byte, error) {
  346. // Define a struct that outputs "error" as a string.
  347. var o struct {
  348. Series []models.Row `json:"series,omitempty"`
  349. Messages []*Message `json:"messages,omitempty"`
  350. Err string `json:"error,omitempty"`
  351. }
  352. // Copy fields to output struct.
  353. o.Series = r.Series
  354. o.Messages = r.Messages
  355. if r.Err != nil {
  356. o.Err = r.Err.Error()
  357. }
  358. return json.Marshal(&o)
  359. }
  360. // UnmarshalJSON decodes the data into the Result struct
  361. func (r *Result) UnmarshalJSON(b []byte) error {
  362. var o struct {
  363. Series []models.Row `json:"series,omitempty"`
  364. Messages []*Message `json:"messages,omitempty"`
  365. Err string `json:"error,omitempty"`
  366. }
  367. dec := json.NewDecoder(bytes.NewBuffer(b))
  368. dec.UseNumber()
  369. err := dec.Decode(&o)
  370. if err != nil {
  371. return err
  372. }
  373. r.Series = o.Series
  374. r.Messages = o.Messages
  375. if o.Err != "" {
  376. r.Err = errors.New(o.Err)
  377. }
  378. return nil
  379. }
  380. // Response represents a list of statement results.
  381. type Response struct {
  382. Results []Result
  383. Err error
  384. }
  385. // MarshalJSON encodes the response into JSON.
  386. func (r *Response) MarshalJSON() ([]byte, error) {
  387. // Define a struct that outputs "error" as a string.
  388. var o struct {
  389. Results []Result `json:"results,omitempty"`
  390. Err string `json:"error,omitempty"`
  391. }
  392. // Copy fields to output struct.
  393. o.Results = r.Results
  394. if r.Err != nil {
  395. o.Err = r.Err.Error()
  396. }
  397. return json.Marshal(&o)
  398. }
  399. // UnmarshalJSON decodes the data into the Response struct
  400. func (r *Response) UnmarshalJSON(b []byte) error {
  401. var o struct {
  402. Results []Result `json:"results,omitempty"`
  403. Err string `json:"error,omitempty"`
  404. }
  405. dec := json.NewDecoder(bytes.NewBuffer(b))
  406. dec.UseNumber()
  407. err := dec.Decode(&o)
  408. if err != nil {
  409. return err
  410. }
  411. r.Results = o.Results
  412. if o.Err != "" {
  413. r.Err = errors.New(o.Err)
  414. }
  415. return nil
  416. }
  417. // Error returns the first error from any statement.
  418. // Returns nil if no errors occurred on any statements.
  419. func (r *Response) Error() error {
  420. if r.Err != nil {
  421. return r.Err
  422. }
  423. for _, result := range r.Results {
  424. if result.Err != nil {
  425. return result.Err
  426. }
  427. }
  428. return nil
  429. }
  430. // ChunkedResponse represents a response from the server that
  431. // uses chunking to stream the output.
  432. type ChunkedResponse struct {
  433. dec *json.Decoder
  434. }
  435. // NewChunkedResponse reads a stream and produces responses from the stream.
  436. func NewChunkedResponse(r io.Reader) *ChunkedResponse {
  437. dec := json.NewDecoder(r)
  438. dec.UseNumber()
  439. return &ChunkedResponse{dec: dec}
  440. }
  441. // NextResponse reads the next line of the stream and returns a response.
  442. func (r *ChunkedResponse) NextResponse() (*Response, error) {
  443. var response Response
  444. if err := r.dec.Decode(&response); err != nil {
  445. if err == io.EOF {
  446. return nil, nil
  447. }
  448. return nil, err
  449. }
  450. return &response, nil
  451. }
  452. // Point defines the fields that will be written to the database
  453. // Measurement, Time, and Fields are required
  454. // Precision can be specified if the time is in epoch format (integer).
  455. // Valid values for Precision are n, u, ms, s, m, and h
  456. type Point struct {
  457. Measurement string
  458. Tags map[string]string
  459. Time time.Time
  460. Fields map[string]interface{}
  461. Precision string
  462. Raw string
  463. }
  464. // MarshalJSON will format the time in RFC3339Nano
  465. // Precision is also ignored as it is only used for writing, not reading
  466. // Or another way to say it is we always send back in nanosecond precision
  467. func (p *Point) MarshalJSON() ([]byte, error) {
  468. point := struct {
  469. Measurement string `json:"measurement,omitempty"`
  470. Tags map[string]string `json:"tags,omitempty"`
  471. Time string `json:"time,omitempty"`
  472. Fields map[string]interface{} `json:"fields,omitempty"`
  473. Precision string `json:"precision,omitempty"`
  474. }{
  475. Measurement: p.Measurement,
  476. Tags: p.Tags,
  477. Fields: p.Fields,
  478. Precision: p.Precision,
  479. }
  480. // Let it omit empty if it's really zero
  481. if !p.Time.IsZero() {
  482. point.Time = p.Time.UTC().Format(time.RFC3339Nano)
  483. }
  484. return json.Marshal(&point)
  485. }
  486. // MarshalString renders string representation of a Point with specified
  487. // precision. The default precision is nanoseconds.
  488. func (p *Point) MarshalString() string {
  489. pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time)
  490. if err != nil {
  491. return "# ERROR: " + err.Error() + " " + p.Measurement
  492. }
  493. if p.Precision == "" || p.Precision == "ns" || p.Precision == "n" {
  494. return pt.String()
  495. }
  496. return pt.PrecisionString(p.Precision)
  497. }
  498. // UnmarshalJSON decodes the data into the Point struct
  499. func (p *Point) UnmarshalJSON(b []byte) error {
  500. var normal struct {
  501. Measurement string `json:"measurement"`
  502. Tags map[string]string `json:"tags"`
  503. Time time.Time `json:"time"`
  504. Precision string `json:"precision"`
  505. Fields map[string]interface{} `json:"fields"`
  506. }
  507. var epoch struct {
  508. Measurement string `json:"measurement"`
  509. Tags map[string]string `json:"tags"`
  510. Time *int64 `json:"time"`
  511. Precision string `json:"precision"`
  512. Fields map[string]interface{} `json:"fields"`
  513. }
  514. if err := func() error {
  515. var err error
  516. dec := json.NewDecoder(bytes.NewBuffer(b))
  517. dec.UseNumber()
  518. if err = dec.Decode(&epoch); err != nil {
  519. return err
  520. }
  521. // Convert from epoch to time.Time, but only if Time
  522. // was actually set.
  523. var ts time.Time
  524. if epoch.Time != nil {
  525. ts, err = EpochToTime(*epoch.Time, epoch.Precision)
  526. if err != nil {
  527. return err
  528. }
  529. }
  530. p.Measurement = epoch.Measurement
  531. p.Tags = epoch.Tags
  532. p.Time = ts
  533. p.Precision = epoch.Precision
  534. p.Fields = normalizeFields(epoch.Fields)
  535. return nil
  536. }(); err == nil {
  537. return nil
  538. }
  539. dec := json.NewDecoder(bytes.NewBuffer(b))
  540. dec.UseNumber()
  541. if err := dec.Decode(&normal); err != nil {
  542. return err
  543. }
  544. normal.Time = SetPrecision(normal.Time, normal.Precision)
  545. p.Measurement = normal.Measurement
  546. p.Tags = normal.Tags
  547. p.Time = normal.Time
  548. p.Precision = normal.Precision
  549. p.Fields = normalizeFields(normal.Fields)
  550. return nil
  551. }
  552. // Remove any notion of json.Number
  553. func normalizeFields(fields map[string]interface{}) map[string]interface{} {
  554. newFields := map[string]interface{}{}
  555. for k, v := range fields {
  556. switch v := v.(type) {
  557. case json.Number:
  558. jv, e := v.Float64()
  559. if e != nil {
  560. panic(fmt.Sprintf("unable to convert json.Number to float64: %s", e))
  561. }
  562. newFields[k] = jv
  563. default:
  564. newFields[k] = v
  565. }
  566. }
  567. return newFields
  568. }
  569. // BatchPoints is used to send batched data in a single write.
  570. // Database and Points are required
  571. // If no retention policy is specified, it will use the databases default retention policy.
  572. // If tags are specified, they will be "merged" with all points. If a point already has that tag, it will be ignored.
  573. // If time is specified, it will be applied to any point with an empty time.
  574. // Precision can be specified if the time is in epoch format (integer).
  575. // Valid values for Precision are n, u, ms, s, m, and h
  576. type BatchPoints struct {
  577. Points []Point `json:"points,omitempty"`
  578. Database string `json:"database,omitempty"`
  579. RetentionPolicy string `json:"retentionPolicy,omitempty"`
  580. Tags map[string]string `json:"tags,omitempty"`
  581. Time time.Time `json:"time,omitempty"`
  582. Precision string `json:"precision,omitempty"`
  583. WriteConsistency string `json:"-"`
  584. }
  585. // UnmarshalJSON decodes the data into the BatchPoints struct
  586. func (bp *BatchPoints) UnmarshalJSON(b []byte) error {
  587. var normal struct {
  588. Points []Point `json:"points"`
  589. Database string `json:"database"`
  590. RetentionPolicy string `json:"retentionPolicy"`
  591. Tags map[string]string `json:"tags"`
  592. Time time.Time `json:"time"`
  593. Precision string `json:"precision"`
  594. }
  595. var epoch struct {
  596. Points []Point `json:"points"`
  597. Database string `json:"database"`
  598. RetentionPolicy string `json:"retentionPolicy"`
  599. Tags map[string]string `json:"tags"`
  600. Time *int64 `json:"time"`
  601. Precision string `json:"precision"`
  602. }
  603. if err := func() error {
  604. var err error
  605. if err = json.Unmarshal(b, &epoch); err != nil {
  606. return err
  607. }
  608. // Convert from epoch to time.Time
  609. var ts time.Time
  610. if epoch.Time != nil {
  611. ts, err = EpochToTime(*epoch.Time, epoch.Precision)
  612. if err != nil {
  613. return err
  614. }
  615. }
  616. bp.Points = epoch.Points
  617. bp.Database = epoch.Database
  618. bp.RetentionPolicy = epoch.RetentionPolicy
  619. bp.Tags = epoch.Tags
  620. bp.Time = ts
  621. bp.Precision = epoch.Precision
  622. return nil
  623. }(); err == nil {
  624. return nil
  625. }
  626. if err := json.Unmarshal(b, &normal); err != nil {
  627. return err
  628. }
  629. normal.Time = SetPrecision(normal.Time, normal.Precision)
  630. bp.Points = normal.Points
  631. bp.Database = normal.Database
  632. bp.RetentionPolicy = normal.RetentionPolicy
  633. bp.Tags = normal.Tags
  634. bp.Time = normal.Time
  635. bp.Precision = normal.Precision
  636. return nil
  637. }
  638. // utility functions
  639. // Addr provides the current url as a string of the server the client is connected to.
  640. func (c *Client) Addr() string {
  641. return c.url.String()
  642. }
  643. // checkPointTypes ensures no unsupported types are submitted to influxdb, returning error if they are found.
  644. func checkPointTypes(p Point) error {
  645. for _, v := range p.Fields {
  646. switch v.(type) {
  647. case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, float32, float64, bool, string, nil:
  648. return nil
  649. default:
  650. return fmt.Errorf("unsupported point type: %T", v)
  651. }
  652. }
  653. return nil
  654. }
  655. // helper functions
  656. // EpochToTime takes a unix epoch time and uses precision to return back a time.Time
  657. func EpochToTime(epoch int64, precision string) (time.Time, error) {
  658. if precision == "" {
  659. precision = "s"
  660. }
  661. var t time.Time
  662. switch precision {
  663. case "h":
  664. t = time.Unix(0, epoch*int64(time.Hour))
  665. case "m":
  666. t = time.Unix(0, epoch*int64(time.Minute))
  667. case "s":
  668. t = time.Unix(0, epoch*int64(time.Second))
  669. case "ms":
  670. t = time.Unix(0, epoch*int64(time.Millisecond))
  671. case "u":
  672. t = time.Unix(0, epoch*int64(time.Microsecond))
  673. case "n":
  674. t = time.Unix(0, epoch)
  675. default:
  676. return time.Time{}, fmt.Errorf("Unknown precision %q", precision)
  677. }
  678. return t, nil
  679. }
  680. // SetPrecision will round a time to the specified precision
  681. func SetPrecision(t time.Time, precision string) time.Time {
  682. switch precision {
  683. case "n":
  684. case "u":
  685. return t.Round(time.Microsecond)
  686. case "ms":
  687. return t.Round(time.Millisecond)
  688. case "s":
  689. return t.Round(time.Second)
  690. case "m":
  691. return t.Round(time.Minute)
  692. case "h":
  693. return t.Round(time.Hour)
  694. }
  695. return t
  696. }