QueryAPI & Pipeline Queries

Writing pipeline queries

Query api provides a pipeline based data query approach. Contrary to SQL which provides a declarative language that describe the data query in a single (but sometime complex) statement, pipeline based data queries gives the user the ability describe his query as a sequence of data transformation stages.

Stages can be seen as a data transformation node that takes a table as input and provides a table as output.

Like regular database tables, a table is a concrete or virtual (table view) data set structured by named columns and composed of data rows.

table_structure.svg

There exists two kind of stages

  • Source stages: which are the starting stage of every pipelined query, their source is a concrete database table. Their output is a subset of theirs rows and columns provide

    d as a table view

  • Transformation stages: which are transforming the output of a previous stage, getting optionally a concrete databases table as additional input and providing a table view as output

Here is an example of representation of a pipeline query with data example:

qapi_illustrations.svg

As we can see each stage can only refer to

  • its input table columns (which is the previous stage output). For instance, the InnerJoin stage is referring to Products table sku via input.sku

  • and if it is present, columns from a concrete table. For instance, Products for the Source stage and Costs for the InnerJoin stage.

The above example is implemented with QueryApi as follows:

def qapi = api. queryApi() def p = qapi. tables().products() def c = qapi. tables().productExtensions("Costs") def query = qapi. source(p, [p.sku(), p.label()]) // outputs a table with columns "sku" and "label" .innerJoin(c, { input -> [c.Cost] }, { input -> input.sku.equal(c.sku()) }) // outputs a table with columns "sku", "label", "Cost" .filter { input -> input.Cost.greaterThan(10) } // does not change columns but filter out rows .sortBy { input -> [qapi.orders().descendingNullsLast(input.Cost)] } // does not alter columns but changes rows order

As shown in this code snippet, each stage can refer to columns

  • of a concrete table when it is belonging to its arguments. This is the case

    • of the source stage which uses p to refer to the concrete product table columns source(p, [p.sku(), p.label()])

    • the join stage which uses c to refer to the concrete costs table columns innerJoin(c, { input -> [c.Cost] }, { input -> input.sku.equal(c.sku()) })

  • of the table view it gets as input. To refer to these columns the client code should provide a function getting a Tables.Columns object which give access to this input columns. In the above example this is the case for the last three stages of the query pipeline.

Tables.Columns object is an immutable map associating columns names to their reference. As groovy supports the . operator to access the entry of a map, columns can be accessed directly with it, like {input -> input.MyColumnName}

By default, columns defined from a concrete source table have the same name as the accessor used to get it. For example, qapi.tables().products().sku() will lead to a column named "sku".

See PipelineStage javadoc for a comprehensive documentation of each available stage. TODO link to the java doc.

Executing the query and fetching data

Once a data pipeline has been defined, the user can run the corresponding query and fetch its results using .stream(Function). This method gets a function as argument which will be in charge of consuming the result stream. This PipelineStage.ResultStream is an Iterable of ResultRow which are giving access to the result rows data in the same way columns are in pipeline stages, i. e. using an immutable map view indexed by column names.

def qapi = api. queryApi() def p = qapi.tables().products() return qapi.source(p, [p. sku()]) // output table is a single column named `sku` .stream { it.sum {row -> row.sku.size() } } // could also be row["sku"]

This example shows how result row columns can be accessed either

  • using the groovy . operator

  • or using the standard map key accessor [] opperator

 

Found an issue in documentation? Write to us.