Performance Improvements (Batching, api.getBatchInfo)

Overview

This methodology is based on smart usage of api.global – together with api.batchInfo() we can make use of it to speed up the process of fetching data from PA and different sources and to save the processing time and hardware usage (especially limit the PA queries or api.find/stream up to 200 times).

api.global is thread safe which means that the cache can be shared as long as the different parts of the calculation are calculated on the same thread. For example when you run a job with 'Allow Distributed Calculation' unchecked, there will be only one node and one thread meaning that the global cache can be shared for all of the items in calculation (not just batch). When 'Allow Distributed Calculation' is checked, there can be several nodes taking care of the calculation (which will have several threads with different batches, meaning that the cache is shared for one thread only). But when we have a good implementation of the code, we can speed up the process using 'Allowed Distributed Calculation' together with api.global cache.

api.getBatchInfo() shows the items that were added to the batch, so you can see which items are going to be calculated next. It shows results up to 200 items or NULL while in the debug node (so it is recommended to handle this case for development too). It is always a list of two dimensional arrays, PL/LPG context [ [sku,secondKey].... ] , in CFS [ [typeCode, secondaryKey]]. When using it in CFS, you have to convert typeCode to Product SKU / Customer ID or any other unique ID to get the correct data. 

Differences in distributed calculation were mentioned there (including batching flow example)Distributed Calculation

Measures

Why is it so important? Let's check an example of processing times.

This is just to give you an idea what it means to optimize results for simple 3 queries and 2000 and 100 000 items calculation.

Notes:

  • The real numbers are greyed out, done on the 2000 level.

  • The 100 000 items is a projection, but the results will be much better than shown due to the fact that distributed calculation will send the work to more than 2 nodes (as it was for 2000 items).

  • Optimized = querying using api.getBatchInfo() together with api.global cache following current guide.

  • Not optimized = querying on item level (1 to 1) like in most projects.

  • Distributed = distributed jobs of calculations (send to x nodes). Allowed Distributed Calculation = Checked.

  • Not distributed = single node & thread work. Allowed Distributed Calculation = Unchecked.

Implementation (LIB Example)

Overview of the lib:

Common library: https://gitlab.pricefx.eu/accelerators/pricefx-logic

Particular example: https://gitlab.pricefx.eu/accelerators/pricefx-logic/-/blob/master/CalculationLogic/SharedLib/elements/BatchUtils.groovy

Prerequisites:

Methods of the presented library (as of today):

  • libs.CommonLib.BatchUtils.prepareBatch(sku) – This method initializes the cache variables that can be used later on (should be called right after api.retainGlobal = true in the logic with feed of sku).

    • Will create a new local reference api.local.isNewBatch – boolean value (true/false) shows if you are working with a new batch or not.

    • Will create/override a new global reference api.global.iterationNumber – shows if this is 2nd pass or 3rd pass of calculation etc.

    • Will create/override a new global reference api.global.currentBatch as Set of the given key.

  • libs.CommonLib.BatchUtils.isNewBatch() – returns a boolean value showing if data should be fetched and cached from the sources or not.

  • libs.CommonLib.BatchUtils.getCurrentBatchSku() – returns the Set of elements that should be fetched and cached later on.

Implementation:

  1. You need a LIB, which is located at: https://gitlab.pricefx.eu/accelerators/pricefx-logic/-/blob/master/

  2. You need to apply it as required. Here is an example of fetching PX data and caching it for the batches using api.stream():

api.retainGlobal = true api.local.currentSku = api.product("sku") libs.CommonLib.BatchUtils.prepareBatch(api.local.currentSku) if (libs.CommonLib.BatchUtils.isNewBatch()) { def skus = libs.CommonLib.BatchUtils.getCurrentBatchSku() def filter = Filter.and( Filter.equal("name", "Costs"), Filter.in("sku", skus) ) def costsStream = api.stream("PX", "-lastUpdateDate", ["sku","attribute1"], filter) def costs = costsStream.collectEntries( [ it.sku, it?.attribute1 ?: 0.0 ] ) costsStream.close() api.global.costs = [:] skus.each { api.global.costs[it] = costs[it] ?: 0.0 } }

Note: The presented LIB is under development, currently it does not support CFS and 2nd key logics (MATRIX).

Implementation (NON-LIB example)

The implementation may vary but the idea should be similar. This example is from one of the projects for PL/LPG calculation (as for CFS the api.getBatchInfo() sends typeCodeID, not the SKUs).

In the beginning we need to override retainGlobal using:

api.retainGlobal = true

Then as an example, make sure that the api.global.batch is initialized while it does not exist for the current thread.

if (!api.global.batch) {     api.global.batch = [:] }

In the next step, check whether the SKU is already cached or not. More details are in the comments in the code.

To use the cached cost in a different element, you can access it like this:

This was a simple example of usage in api.find. When the given cache should be shared for other 199 items without the need to access the PX table again, you can extend this functionality. There is another example within a PA query:

The given examples were for api.find and Datamart queries, however you can use them in any other strategy, including pre-calculations of data, fetching a logic for a given SKU, playing with the 2nd key etc.

Warning

  • When you work on a small amount of data, api.getBatchInfo() returns NULL. Using a shared library will solve this issue and it will always take at least one item. (In the pricefx-server settings, you can set when a batch is going to be created. I tested on 2000+ items and there were no issues, but less could return a value of NULL.)

  • api.find() has a limited number of rows that you can fetch (you can use api.getMaxFindResultsLimit() to get the limit for the given environment and partition). Make sure that the data is not cut in the middle of the query or handle it with startRow/maxRows. As an alternative, you can use api.stream to cache the data and then work on it, especially while playing with the 2nd key.

  • Datamart queries have limitations for the data that can be loaded (it is not exposed in the UI and it is cluster wide as pricefx-server settings). As an alternative, use streams.

  • 2nd and 3rd... passes will create new batches, so the data will be loaded and processed again.

  • Make sure that you set the element timeout for a sufficient period of time, as in the logic you will be fetching data for up to 200 SKUs and the query result time will be higher than for a single SKU.

  • Find and others functions in PFX have their own limitations in terms of maximum rows that will be represented to the user. Make sure your logic and data do not exceed those limits; if they do, use streams or ask Support to extend the limits. But a good implementation of the code and good configuration (limitations, requirements, business keys of PX, filters on the different context) will almost always solve the issue.

  • Once there was the following issue: Allowed Distributed Calculation NODE1 was fetching values correctly from PA, however NODE3 was not. The issue was resolved later on, but it was hard to troubleshoot, so it is good to have a few log outputs for such cases.

  • If you need to store the data for all nodes in the calculation flow (in Distributed Mode), do not use api.global as it will be different for all nodes and threads; use a shared cache instead.

Found an issue in documentation? Write to us.