Pricefx Classic UI is no longer supported. It has been replaced by Pricefx Unity UI.

 

Distributed Calculation

Introduction

The usual data flow in Analytics is as follows:

  1. Integration (e.g., IntegrationManager) loads ‘raw’ source data into a Data Feed table. This is quick as it is ‘append only’ (no costly merge involved) and there is no validation of the values (no parsing).

  2. Periodically, the Data Feed data is ‘flushed’ to a Data Source. This is costly, mainly due to the following reasons:

    • The Data Feed data is fetched from the DB and loaded in a file-backed map in the application layer.

    • Optionally, a validation and/or a transformation processes that data in Groovy.

    • The result is uploaded to the Data Source, where a merge is required to avoid duplicates.

    • It is a single threaded task.

  3. Also periodically, Datamart is ‘refreshed’ with the new Data Source data.
    Often, an additional PA Calculation then enriches the data using a calculation (Groovy) logic. Data Source-level calculations are also not uncommon.

When tens of millions of rows are loaded this way, the flush and calculation Data Loads can take many hours, sometimes even 1 or 2 days.

What Is Distributed Calculation

To solve this problem, you can use the Calculation+ Data Load type. This Data Load type enables you to split the data into batches and process them independently.

Configure Distributed Calculation

Logic

Start with creating a calculation logic which specifies the target user entry, defines the maximum batch size and the calculation process:

  1. Go to Administration > Calculation Logic > PA Calculation+.

  2. Click Add, fill in the name, label, validity and status.

  3. Select the new logic and click View/Edit Details.

  4. Create the logic elements that will do the calculation (see the example below).

The distributed calculation logic has three stages, coded in the Calculation Context attribute of the logic elements:

  • Calculation-Initialization – Elements executed just once, normally to create the calculation items consumed in the next stage.
    A calculation item can be seen as representing a batch of rows to be loaded/processed. Its definition is free, but is likely to take the form of filter criteria on the source (see example below).

  • Calculation – Executed in the evaluation of each individual calculation item. Multiple threads, in possibly multiple nodes/pods, process these items in parallel (the ‘distributed’ stage).

  • Calculation-Summary – Evaluated once all the calculation items have been processed, allowing for post-load processing.

Example Logic

This example mimics the current Data Feed → Data Source FLUSH Data Load behavior. The source Data Feed is captured as an input parameter ('user entry’). The Data Load is defined in the first element as:

{ targetName: "DMDS.Transactions", type: "DISTRIBUTED_CALCULATION", isIncremental: false, isDistributed: true, config: { formulaName: "distFlushLogic", inputParams: [ { name: "Source", value: "DMF.Transactions" }, { name: "FieldMapping," value: { "attribute1": "transactionID" ... } } ] } }

The second element (with Context set to 'Calculation-Initialization'):

def sourceName = api.stringUserEntry("Source") if (api.isSyntaxCheck()) return def BATCH_SIZE = 1000000 def ctx = api.datamartContext def source = ctx.getFieldCollection(sourceName) def batchFilters = ctx.generateBatchFilters(source, null, BATCH_SIZE) api.logInfo("batches", batchFilters) batchFilters.eachWithIndex{ f,i -> dist.addOrUpdateCalcItem("batch", i, f) }

The third element (with Context set to 'Calculation'):

def sourceName = api.stringUserEntry("Source") def mapping = api.input("FieldMapping") if (api.isSyntaxCheck()) return def calcItem = dist.calcItem def ctx = api.datamartContext def source = ctx.getFieldCollection(sourceName) def batchFilter = api.filterFromMap(calcItem.Value) api.logInfo("batch " + calcItem.Key2, batchFilter) def q = ctx.newQuery(source, false) q.selectAll(true) q.where(batchFilter) def loader = dist.dataLoader loader.mapping = mapping ctx.consumeData(q, { r -> loader.addRow(r) })

The final element (with Context set to 'Calculation-Summary'):

Data Load

Create a Distributed Calculation Data Load:

  1. Go to PriceAnalyzer > Data Manager > Data Loads.

  2. Create a new Data Load and as Type select Calculation+.

  3. In the Overview step, select the Load Mode:

    • Full – The job generates all the rows, values for all fields. The target is truncated before the result is loaded. Duplicate rows (i.e., with identical values for the key fields) are removed.

    • Incremental – The job generates additional rows, with values for all fields.The existing rows can be replaced by creating duplicates (with a more recent lastUpdateDate). Rows can be deleted by setting isDeleted to true.

    • Enrichment – The job generates additional field values, only for existing rows (the fields to populate are specified in DL.config.outputElements). No rows can be added or deleted.

    • Allocation – The job allocates values to numeric fields not sourced from anywhere else, only for existing rows. Typically, multiple rows for the same Id are added. And the totals, aggregated by Id, are assigned to the target allocation fields. No rows can be added or deleted.

  4. In the Target step, select the target Data Source.
    The Data Load can be run in incremental/non-incremental mode, with the only difference being that the target table is truncated before loading the new data in non-inc mode.

  5. In the Calculation step, select the previously created calculation logic.

  6. Fill in the input parameters defined by the logic.

  7. Save the Data Load. A price parameter table is automatically created that stores the so called CalculationItems that are created by the DistributedCalculation logic. This table with calculated results is then displayed on the Calculation Items tab in the Data Load.

  8. Click the Run Now button to execute the logic.

Found an issue in documentation? Write to us.