How to Run Distributed PA Calculation
As described in Distributed Calculations in Analytics, there are 4 types of distributed calculation. One of them is Enrichment.
Enrichment
The logic for type of distributed calculation has to be PA Calculation.
In this example, the transactional data pricing date is moved for demo purposes.
As a first step, create filters that will be used to create batches that are run in separate threads. You can decide how you want to slice and dice your data into batches. This step has to have Context Type set to Calculation - Initialization.
if (api.isInputGenerationExecution()) {
return
}
// querying DM to get a list of PricingDateMonths
// to group data into batches
def dmCtx = api.getDatamartContext()
def salesDM = dmCtx.getDatamart("Standard_Sales_Data")
def datamartQuery = dmCtx.newQuery(salesDM, true)
.select("PricingDateMonth")
.select("COUNT(PricingDateMonth)", "cnt")
.orderBy("PricingDateMonth ASC",)
def result = dmCtx.executeQuery(datamartQuery)
List<Filter> filters = result?.getData()?.collect { row ->
Filter.equal("PricingDateMonth", row.PricingDateMonth)
}
filters.eachWithIndex { f, i ->
// THIS ADDS THE FILTER TO THE LIST OF BATCHES
// IT WILL BE USED LATER IN CALCULATION STEP
dist.addOrUpdateCalcItem("batchNo-$i", i, f)
}
Once run in Data Load, this creates records in the Company Parameter table DistributedCalculation [XXXX] where XXXX is the ID of the Data Load. As you can see, the value is the filter created by the code above. Key 1 is batchNo-X (batchNo-$i) and Key 2 is a simple index (variable i in the code above).
The calculation itself happens in a context Calculation. In this example, the transaction currency from the customer master data is modified and all pricing dates are moved. The whole code is included below. It is important to understand, that the calculation logics are not run for every line, but it rather processes batches. This is written in the documentation but it is easy to miss it.
What the code does is that it first gets the line from the Company Parameter table mentioned above, gets the batch filter, and creates a loader from the newly introduced context variable dist
.
def calcItem = dist.calcItem
def batchFilter = api.filterFromMap(calcItem.Value)
def loader = dist.dataLoader
Next, we query the Datamart directly using a filter passed by the calcItem
variable. Notice that you need to add DM.
to the source Datamart. For Enrichment it is important to add selectId
to the query.
def batch = ctx.newQuery(source, false)
batch.selectId("id")
batch.selectAll(true)
batch.where(batchFilter)
Now, the enrichment itself happens in this loop.
It takes results from the Datamart query, iterates over each line in that batch and adds the needed data. In our case, we get the currency and move the pricing date. Once all the data is modified, we add the modified row to the loader that will save the data. The important thing for enrichment is to add the id
back to the array that is passed to the loader.
The full code is here:
Once you have the logic created, you need to create a Data Load.
As a target, choose your Datamart.
In Calculation, choose your logic. At the bottom, choose the fields you want to update.
Save it and run it.
See also ‘DDL Enrichment Demo’ component in Studio.
Found an issue in documentation? Write to us.