Distributed Calculations in Analytics

The usual data flow in Analytics is as follows:

  1. Integration (e.g., IntegrationManager) loads ‘raw’ source data into a Data Feed table. This is quick as it is ‘append only’ (no costly merge involved) and there is no validation of the values (no parsing).

  2. Periodically, the Data Feed data is ‘flushed’ to a Data Source. This is costly, mainly due to the following reasons:

    • The Data Feed data is fetched from the DB and loaded in a file-backed map in the application layer.

    • Optionally, a validation and/or a transformation processes that data in Groovy.

    • The result is uploaded to the Data Source, where a merge is required to avoid duplicates.

    • It is a single threaded task.

  3. Also periodically, Datamart is ‘refreshed’ with the new Data Source data.
    Often, an additional Analytics calculation then enriches the data using a calculation (Groovy) logic. Data Source-level calculations are also not uncommon.

When tens of millions of rows are loaded this way, the flush and calculation Data Loads can take many hours, sometimes even 1 or 2 days.

What Is Distributed Calculation

To solve this problem, you can use the Distributed Calculation Data Load type. This Data Load type enables you to split the data into batches and process them independently.

Configure Distributed Calculation

Logic

Start with creating a calculation logic which specifies the target user entry, defines the maximum batch size and the calculation process:

  1. Go to Administration > Logics > Analytics Calculations.

  2. Click Add Logic, fill in the name, label, validity and status.

  3. Click the logic name to open it for editing.

  4. Create the logic elements that will do the calculation (see the example below).

The distributed calculation logic has three stages, coded in the Calculation Context attribute of the logic elements:

  • Calculation-Initialization – Elements executed just once, normally to create the calculation items consumed in the next stage.
    A calculation item can be seen as representing a batch of rows to be loaded/processed. Its definition is free, but is likely to take the form of filter criteria on the source (see example below). In contrast to regular Calculation Data Load, in the distributed calculation the calculation elements are executed for each Calculation Item, instead of for each row.

  • Calculation – Executed in the evaluation of each individual calculation item. Multiple threads, in possibly multiple nodes/pods, process these items in parallel (the ‘distributed’ stage).

  • Calculation-Summary – Evaluated once all the calculation items have been processed, allowing for post-load processing.

Example Logic

This example mimics the current Data Feed → Data Source FLUSH Data Load behavior. The source Data Feed is captured as an input parameter ('user entry’). The Data Load is defined in the first element as:

{ targetName: "DMDS.Transactions", type: "DISTRIBUTED_CALCULATION", isIncremental: false, isDistributed: true, config: { formulaName: "distFlushLogic", inputParams: [ { name: "Source", value: "DMF.Transactions" }, { name: "FieldMapping," value: { "attribute1": "transactionID" ... } } ] } }

The second element (with Context set to 'Calculation-Initialization'):

def sourceName = api.stringUserEntry("Source") if (api.isInputGenerationExecution()) { return } def BATCH_SIZE = 1000000 def ctx = api.getDatamartContext() def source = ctx.getFieldCollection(sourceName) def batchFilters = ctx.batchFilters(source, null, BATCH_SIZE) //api.logInfo("batches", batchFilters) batchFilters.eachWithIndex { filter, i -> dist.addOrUpdateCalcItem("batch", i, filter) }

The third element (with Context set to 'Calculation'):

def sourceName = api.stringUserEntry("Source") def mapping = input.FieldMapping if (api.isInputGenerationExecution()) { return } def calcItem = dist.getCalcItem() def ctx = api.getDatamartContext() def source = ctx.getFieldCollection(sourceName) def batchFilter = api.filterFromMap(calcItem.Value) // api.logInfo("batch " + calcItem.Key2, batchFilter) def loader = dist.getDataLoader() loader.mapping = mapping def q = ctx.newQuery(source, false) .selectAll(true) .where(batchFilter) ctx.streamQuery(q).withCloseable { row -> loader.addRow(row) }

The final element (with Context set to 'Calculation-Summary'):

 

Data Load

Create a Distributed Calculation Data Load:

  1. Go to Analytics > Data Manager > Data Loads.

  2. Create a new Data Load and as Data Load Type select Distributed Calculation.

  3. In the Data Load Settings pane, select the Target Datamart.

    DistributedCalculations02.png

    The Data Load can be run in incremental/non-incremental mode, with the only difference being that the target table is truncated before loading the new data in non-inc mode.

  4. Select the Load Mode:

    • Full – The job generates all the rows, values for all fields. The target is truncated before the result is loaded. Duplicate rows (i.e., with identical values for the key fields but older than lastUpdateDate) are removed.

    • Incremental – The job generates additional rows, with values for all fields.The existing rows can be replaced by creating duplicates (with a more recent lastUpdateDate). Rows can be deleted by setting isDeleted to true.

    • Enrichment – The job generates additional field values, only for existing rows (the fields to populate are specified in DL.config.outputElements). No rows can be added or deleted.

    • Allocation – The job allocates values to numeric fields not sourced from anywhere else, only for existing rows. Typically, multiple rows for the same Id are added. And the totals, aggregated by Id, are assigned to the target allocation fields. No rows can be added or deleted.

  5. On the Calculation tab, select the previously created calculation logic.

  6. Fill in the input parameters defined by the logic.

  7. In Target Fields, select the fields you want to update.

  8. Save the Data Load. A price parameter table is automatically created that stores the so called Calculation Items that are created by the Distributed Calculation logic. This table with calculated results is then displayed on the Calculation Items tab in the Job Details panel.

  9. Click the Run Data Load button to execute the logic.

Note: The minimum (hardcoded) threshold for a calculation to run in distributed mode is 500 rows.

See also ‘DDL Enrichment Demo’ component in Studio.

Found an issue in documentation? Write to us.

 
Pricefx version 13.1