QueryAPI & Pipeline Queries
Writing pipeline queries
Query api provides a pipeline based data query approach. Contrary to SQL which provides a declarative language that describe the data query in a single (but sometime complex) statement, pipeline based data queries gives the user the ability describe his query as a sequence of data transformation stages.
Stages can be seen as a data transformation node that takes a table as input and provides a table as output.
Like regular database tables, a table is a concrete or virtual (table view) data set structured by named columns and composed of data rows.
There exists two kind of stages
Source stages: which are the starting stage of every pipelined query, their source is a concrete database table. Their output is a subset of theirs rows and columns provide
d as a table view
Transformation stages: which are transforming the output of a previous stage, getting optionally a concrete databases table as additional input and providing a table view as output
Here is an example of representation of a pipeline query with data example:
As we can see each stage can only refer to
its input table columns (which is the previous stage output). For instance, the InnerJoin stage is referring to Products table sku via
input.sku
and if it is present, columns from a concrete table. For instance,
Products
for the Source stage andCosts
for the InnerJoin stage.
The above example is implemented with QueryApi as follows:
def qapi = api. queryApi()
def p = qapi. tables().products()
def c = qapi. tables().productExtensions("Costs")
def query = qapi. source(p, [p.sku(), p.label()]) // outputs a table with columns "sku" and "label"
.innerJoin(c, { input -> [c.Cost] }, { input -> input.sku.equal(c.sku()) }) // outputs a table with columns "sku", "label", "Cost"
.filter { input -> input.Cost.greaterThan(10) } // does not change columns but filter out rows
.sortBy { input -> [qapi.orders().descendingNullsLast(input.Cost)] } // does not alter columns but changes rows order
As shown in this code snippet, each stage can refer to columns
of a concrete table when it is belonging to its arguments. This is the case
of the source stage which uses p to refer to the concrete product table columns
source(p, [p.sku(), p.label()])
the join stage which uses c to refer to the concrete costs table columns
innerJoin(c, { input -> [c.Cost] }, { input -> input.sku.equal(c.sku()) })
of the table view it gets as input. To refer to these columns the client code should provide a function getting a
Tables.Columns
object which give access to this input columns. In the above example this is the case for the last three stages of the query pipeline.
Tables.Columns
object is an immutable map associating columns names to their reference. As groovy supports the .
operator to access the entry of a map, columns can be accessed directly with it, like {input -> input.MyColumnName}
By default, columns defined from a concrete source table have the same name as the accessor used to get it. For example, qapi.tables().products().sku()
will lead to a column named "sku".
See PipelineStage
javadoc for a comprehensive documentation of each available stage. TODO link to the java doc.
Executing the query and fetching data
Once a data pipeline has been defined, the user can run the corresponding query and fetch its results using .stream(Function)
. This method gets a function as argument which will be in charge of consuming the result stream. This PipelineStage.ResultStream
is an Iterable
of ResultRow
which are giving access to the result rows data in the same way columns are in pipeline stages, i. e. using an immutable map view indexed by column names.
def qapi = api. queryApi()
def p = qapi.tables().products()
return qapi.source(p, [p. sku()]) // output table is a single column named `sku`
.stream { it.sum {row -> row.sku.size() } } // could also be row["sku"]
This example shows how result row columns can be accessed either
using the groovy
.
operatoror using the standard map key accessor
[]
opperator
Found an issue in documentation? Write to us.