InfluxDB & Flux

I recently started using using the InfluxDB time-series database for storing some data I’ve been collecting. It’s the replacement for RRDTool that I’ve used in the past for performance metrics and it’s working nicely.

Their Flux query language took a bit of time to wrap my head around. It isn’t like a traditional scripting language. Yes, it has variables and functions and such but the overall flow of the logic is not like a typical programmer expects with branching and looping. SQL and their derivative, InfluxQL don’t either so it’s not surprising or bad. It’s just different.

The differences between Flux and typical scripting languages come from the specialized nature of the TSDB data model and the typical problems Flux is trying to solve. As all the intros to InfluxDB will explain, the database stores points that have a timestamp, a measurement name, any number of tags and any number of fields. Points are analgous to records in SQL - SELECT a set of points and get results that fit nicely into a spreadsheet with columns for timestamp, measurement, tags and fields and a row for each point. Nothing unexpected there.

Flux adds a twist that took me a while to grasp but once I did, the rest makes sense. Most Flux scripts look something like this.

from(database: "mydb")
  |> range(start: -1h)
  |> filter(fn: (r) => ...)

The from() function indicates where to get the data from. range() filters on timestamp and filter() does additional filtering as needed. The trick here is to understand what is being passed through filter(). That r passed in is a “record” or “row”, not a point! If, as in my case, you have multiple fields in your points, that point maps to a separate “record” for each field!

Consider points like below with data from and environmental sensor that reports temperature and humidity readings. They’re stored in this case in a single measurement (Envionment) with the sensor number as a tag and separate temperature and humitidy fields.

Environment,sensor=123456 temperature=69.2,humidity=86.3 1605224940000000000
Environment,sensor=123456 temperature=69.8,humidity=88.4 1605225040000000000
Environment,sensor=123456 temperature=68.3,humidity=88.1 1605225140000000000
Environment,sensor=123456 temperature=68.1,humidity=87.2 1605225240000000000

The output of from(...) |> range(...) will have two tables; both with columns for _time, _measurement, sensor, _field and _value. One table will have temperature in the _field column and the other will have humidity. That’s the key point here. The filter() gets called for each record in each table. It’s like a 3D matrix with axes for measurement, time and field and the filter() is being run in each cell in that matrix.

I kept trying to build filters like below before I figured this out. The r passed into the filter’s function doesn’t have these properties. This would never work.

  |> filter(fn: (r) =>
      r._measurement == "Environment"
      and r.humidity > 50
      and r.temperature < 32
  )

So, how would we accomplish this? That was the next aspect of Flux I figured out and it’s really quite elegant. These functions we pipe the data through don’t have to expect the same table structure at every step. We can do things like pivot the data.

from(database: "mydb")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "Environment")
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> filter(fn: (r) => r.humitify > 50 and r.temperature < 32)

The output of filter() and input to pivot() includes two tables; one with _field == "temperature" and one with _field == "humidity". The pivot() function reorganizes things so we have one table with columns with the temperature and humidity columns we need. Now a second filter() can get us the records we’re looking for.

Hopefully this helps someone get up to speed on Flux faster than I did.

comments powered by Disqus