Overview

This methodology is based on smart usage of api.global – together with api.batchInfo() we can make use of it to speed up the process of fetching data from PA and different sources and to save the processing time and hardware usage (especially limit the PA queries or api.find/stream up to 200 times).

api.global is thread safe which means that the cache can be shared as long as the different parts of the calculation are calculated on the same thread. For example when you run a job with 'Allow Distributed Calculation' unchecked, there will be only one node and one thread meaning that the global cache can be shared for all of the items in calculation (not just batch). When 'Allow Distributed Calculation' is checked, there can be several nodes taking care of the calculation (which will have several threads with different batches, meaning that the cache is shared for one thread only). But when we have a good implementation of the code, we can speed up the process using 'Allowed Distributed Calculation' together with api.global cache.

api.getBatchInfo() shows the items that were added to the batch, so you can see which items are going to be calculated next. It shows results up to 200 items or NULL while in the debug node (so it is recommended to handle this case for development too). It is always a list of two dimensional arrays, PL/LPG context [ [sku,secondKey].... ] , in CFS [ [typeCode, secondaryKey]]. When using it in CFS, you have to convert typeCode to Product SKU / Customer ID or any other unique ID to get the correct data. 

Differences in distributed calculation were mentioned there (including batching flow example)Distributed Calculation

Measures

Why is it so important? Let's check an example of processing times.

This is just to give you an idea what it means to optimize results for simple 3 queries and 2000 and 100 000 items calculation.

Notes:

Implementation (LIB Example)

Overview of the lib:

Common library: https://gitlab.pricefx.eu/accelerators/pricefx-logic

Particular example: https://gitlab.pricefx.eu/accelerators/pricefx-logic/-/blob/master/CalculationLogic/SharedLib/elements/BatchUtils.groovy

Prerequisites:

Methods of the presented library (as of today):

Implementation:

  1. You need a LIB, which is located at: https://gitlab.pricefx.eu/accelerators/pricefx-logic/-/blob/master/

  2. You need to apply it as required. Here is an example of fetching PX data and caching it for the batches using api.stream():

api.retainGlobal = true
api.local.currentSku = api.product("sku")
libs.CommonLib.BatchUtils.prepareBatch(api.local.currentSku)

if (libs.CommonLib.BatchUtils.isNewBatch()) {
    def skus = libs.CommonLib.BatchUtils.getCurrentBatchSku()
    def filter = Filter.and(
            Filter.equal("name", "Costs"),
            Filter.in("sku", skus)
        )
    def costsStream = api.stream("PX", "-lastUpdateDate", ["sku","attribute1"], filter)
    def costs = costsStream.collectEntries( 
        [ it.sku, it?.attribute1 ?: 0.0 ] 
    )
    costsStream.close()
    api.global.costs = [:]
    
    skus.each {
        api.global.costs[it] = costs[it] ?: 0.0
    }
}

Note: The presented LIB is under development, currently it does not support CFS and 2nd key logics (MATRIX).

Implementation (NON-LIB example)

The implementation may vary but the idea should be similar. This example is from one of the projects for PL/LPG calculation (as for CFS the api.getBatchInfo() sends typeCodeID, not the SKUs).

In the beginning we need to override retainGlobal using:

api.retainGlobal = true

Then as an example, make sure that the api.global.batch is initialized while it does not exist for the current thread.

if (!api.global.batch) {
    api.global.batch = [:]
}

In the next step, check whether the SKU is already cached or not. More details are in the comments in the code.

api.local.pid = api.product("sku") 
if (!api.global.batch[api.local.pid]) {
    // ensure to clear() current batch, there is no need to keep the previous batch cache in the memory, it only makes the map oversized
	// it is also more effective using .clear() function over initialization of map by [:] again and again
    api.global.batch.clear()

    // get all SKUs from the batch, we need to handle the NULL result which will happen during debugging 
	// while at batchInfo, collect all SKUs (as first element) – the second element will be null or it will have a value  
	// if 2nd key is not null, you can even cache values for SKU + 2nd key values, but we want to keep this example simple
    def batch = api.getBatchInfo()?.collect { it[0] } ?: [api.local.pid] as Set


    // in this example api.find() is used, it gets costs for the whole batch (it will also work if the batch is a single item!)  
	// here we already marked the PX “Costs” attribute2 within the “Real” format and “Required” flag, so we don’t need to validate 
	// the output (but it is good practice to do so), also the business key is set up on the SKU level so there won’t be any duplicates
    def cost = api.find("PX", 0, "lastUpdateDate",
            Filter.and(
                    Filter.equal("name", "Costs"),
                    Filter.in("sku", batch) // make sure you use Filter.in not Filter.equal, then you will check against Set of the SKUs and fetch data for the given batch
            )
    ).inject([:]) {
        result, cost ->
            result [cost.sku] = cost.attribute2 ?: 0.0
            result
    }     
	// then you need to save the result in the cache for all of the items, make sure that the cached values have some value, so even 
	// if we have cost data for 180 out of 200 items, we need to ensure that all 200 items will have some data, so we won’t fetch the cost for the same batch again
    batch.each {
        api.global.batch[it] = [
                "cost"    : cost[it] ?: 0.0,
                "tx”     : false // will be assigned in 2nd example.        
		]
    }
}

To use the cached cost in a different element, you can access it like this:

return api.global.batch[api.local.pid].cost

This was a simple example of usage in api.find. When the given cache should be shared for other 199 items without the need to access the PX table again, you can extend this functionality. There is another example within a PA query:

/**
 * check if tx data was already fetched for this SKU
 */
if (api.global.batch[api.local.pid].tx == false) {

    // 	all the filters and advanced filters are left out from the logic just to give you an example 
    //  this is why *filters is visible in the where statement
	//	let’s query again against Filter.in and get the keyset() of the given batch 
    def filters = [
            Filter.in("ProductID", api.global.batch.keySet())
    ]

    def ctx = api.getDatamartContext()
    def txDM = ctx.getTable("TransactionsDM")

    // simple call to DB to get SUM of QTY, AVG Unit Price and SUM of total SALES
    def datamartQuery = ctx.newQuery(txDM, true)
    datamartQuery.select("ProductID", "Sku")
    datamartQuery.select("SUM(QTY)", "QTY")
    datamartQuery.select("AVG(Unit_Price)", "Unit_Price")
    datamartQuery.select("SUM(SALES)", "SALES")
    datamartQuery.where(*filters)
    def result = ctx.executeQuery(datamartQuery)
    result = result?.getData()

    // assignee found results to the cache for given SKU
    for (row = 0; row < result?.getRowCount(); ++row) {
        def sku = result?.getValue(row, 0)
        api.global.batch[sku].tx_qty = result?.getValue(row, 1)
        api.global.batch[sku].tx_unitPrice = result?.getValue(row, 2)
        api.global.batch[sku].tx_sales = result?.getValue(row, 3)
    }

    // ensure to not call the 2 calls again for all SKUs in the batch: if we found only 180 items for the 200 batch, for the missing 20 
	// the query was performed as well but the data was not found, so we don't need to make the 2nd call again

    api.global.batch.keySet().each {
        api.global.batch[it].tx = true
    }
}

The given examples were for api.find and Datamart queries, however you can use them in any other strategy, including pre-calculations of data, fetching a logic for a given SKU, playing with the 2nd key etc.

Warning