Calculation Flow Management

The Pricefx server has a so called heartbeat service that at given intervals performs common tasks. The default interval is 1 minute; it is configured in pricefx-config.xml (backgroundWorkers/pollingPeriod). The calculation flow processor is hooked to this heartbeat service, so every minute the processor checks the definitions of calculation flows and if the time has come, it loads the definition of a given calculation flow and processes it.

Deploying and Deleting Flows

A flow can be in one of these two states: draft or deployed.

Draft flows are not processed by the CF processor. They serve as a means to locally configure the flow before it is submitted to the CF processor. You can have as many draft flows as you want and edit them freely; they are saved automatically. Anyone can delete them. The CF processor never touches the draft flows.

Deployed flows are picked by the processor. No user except the CF processor can delete the flows. This is important as it changes the way you delete the deployed flows. You only “request” the deletion of a flow and it is up to the processor to delete it. This way we avoid any concurrency problems when manipulating with the deployed flows in the database: there is only one processor and many users. That is why users cannot edit deployed flows or save them.

Deploying a Flow

  1. Select a flow in a draft mode.

  2. Click the Deploy button.

  3. As a result, there are now two flows in the list. The deployed one has the Draft option disabled and that is the flow the CF processor will pick and process.

Modifying a Deployed Flow

Since it is not possible to edit a deployed flow, you need to first make the change in the draft flow, save it and then deploy it. In our example the result is:

The consequence is that now we have two deployed flows with the same name! That is correct. As mentioned above, users can only request the deploy/delete actions. We deployed a new flow with the same name but it differs by the creation time. When the heartbeat service triggers the CF processor, the processor picks all the deployed flows. But if the processor finds more flows with the same name, it picks the latest one and deletes all the older ones. So after the flows are processed, we end up with two again – one draft flow and one deployed flow.

Deleting a Deployed Flow

This is a similar concept. Users only request the deletion of a flow. It is done in the background where a new flow with no flow items is created. When the CF processor sees such a flow, it deletes it.

The sequence of actions is:

  1. Select a deployed flow to be deleted.

  2. Trigger the deletion by clicking the Delete button.

  3. The deletion is requested:

  4. The deletion is performed.

Running CF from Dashboard Wizards

You can also run CF manually. The action is implemented as a REST API call to the server, adding an entry to the CFT (Calculation Flow Trait) table.

def flowId = .... def flowItemId = .... callPayload = """ {      "data":{ "flowId":"${flowId}", "flowItemId":"${flowItemId}", "traitType":"START_IMMEDIATELY", "configuration":"{}"      }, "oldValues":null } """ controller.addBackendCall("Request start","/add/CFT", callPayload, "Request issued","Request failed")

Flow ID and flow item ID can be obtained either by inspecting the requests/responses from PFX UI, or programmatically.

def cf = api.find("CF", api.filter("uniqueName", "RebateRecordCalculation"), api.filter("draft", false)) def flowId = cf.get(0).flowId def entries = api.jsonDecode(cf.get(0).configuration).entries def flowItemId = entries.get(0).id api.trace("CF", "", "${flowId} ${flowItemId}")

Preservation of State Between Flow Runs

CF can have several items. Items do not share the api.global space with other items. But an item shares api.global within its multiple runs. 

The use case here is e.g., sequencing where you need to write down information which tasks have been started and finished.

As a consequence, you have to handle the caching. When using the cache utils for storing values from a Company Parameter table while executing a CF logic, the cache will not get refreshed until you clear it by yourself.

 Note that the content of the api.global variable is not persisted if its size exceeds 16 MB.

Update of Domain Objects

Usually data altering API is enabled via api.add("")api.addOrUpdate("")api.delete("") methods.

Examples

Action

Sample

Action

Sample

Start calculation of CFS

actionBuilder.addCalculatedFieldSetAction('cfsLabel').setCalculate(true)

Start calculation of Price List

actionBuilder.addPricelistAction('pricelistName').setCalculate(true)

Start calculation of Manual Price List

actionBuilder.addManualPricelistAction('manualPricelistLabel').setCalculate(true)

Start calculation of Live Price Grid

Full recalculation:
actionBuilder.addLivePriceGridAction('livePriceGridLabel').setCalculate(true) 

Recalculation of some SKUs ( MaiTai+):
actionBuilder.addLivePriceGridAction('livePriceGridLabel').restrictToSkus(['sku1', 'sku2', 'sku3']).setCalculate(true)

Start calculation of Data Load

actionBuilder.addDataLoadAction('dataLoadLabel', 'dataLoadType', 'target').setCalculate(true)

Start calculation of Simulation

actionBuilder.addSimulationAction('simulationLabel').setCalculate(true)

Start calculation of rebate record set

actionBuilder.addRebateRecordAction('rrscLabel').setCalculate(true)

Start calculation of a Model Object

actionBuilder.addModelCalculationAction('myModelName').fromStep('stepX').toLastStep().setCalculate(true)

Start Quote mass update

actionBuilder.addQuoteMassUpdateAction('label').setCalculate(true)

Start Price API job

actionBuilder.addPriceAPIAction('jobName').setCalculate(true)

Start Export or Archive job (Administration > Configuration > Import / Export / Archiving)

actionBuilder.addExportAction('jobName').doExport('templateName')

actionBuilder.addExportAction('jobName').doArchive('templateName')

Start a job which recalculates object references

actionBuilder.addObjRefAction().setFilter(filter).setPartition(partition).setTypeCode('typeCode')

See also an example of use in the Knowledge Base.

Self-altering of Schedule by Formula

actionBuilder.getScheduleSettings() gets you an object which is used to obtain the schedule as seen in the UI. If needed, you can alter it.

Methods to get the current schedule settings:

  • public CalculationFlowTimeUnit getTimeUnit()

  • BigDecimal getNumberOfTimeUnits()

  • String getStartingDate()

  • Boolean isPeriodic()

Methods to set new schedule settings:

  • void setNewNumberOfTimeUnits(BigDecimal newNumberOfTimeUnits)

  • void setNewStartingDate(String newStartingDate)

  • void setNewPeriodic(Boolean newPeriodic)

  • void setNewTimeUnit(CalculationFlowTimeUnit newTimeUnit)

Helper methods:

  • boolean isStartImmediatelyTraitActive()
    Returns true if the CF was run on the user request (not by the scheduler itself) – either manually in UI or via the REST API action.

  • String getDateTimeFormat() // yyyy-MM-dd'T'HH:mm:ss
    Preferred format of storing the Date (java.util, joda) dates in the api.global map.

Example

This sample shows how to change the original schedule interval of 10 minutes (as defined in the UI) to a 2-minute interval and back. Running this element in the CF logic would run CF at the intervals START, 2, 8, 2, 8, 2, .....  The original 10-minute interval is intact, we are putting in there another 2-minute interval.

api.retainGlobal = true // Schedules are: original interval, 2 minutes interval, original interval, 2 minutes interval, original interval // let's say the original interval is 10 minutes and start at 9:34 then the // sequence of schedules will be: 9:44, 9:46, 9:54, 9:56, 10:04, 10:06, 10:14 // you see that the 10 minute interval is kept, with one run inside that interval def sharedState = api.global if (sharedState.condititionMet) { // switching from original schedule per each run to 2 minute schedule sharedState.condititionMet = false def settings = actionBuilder.getScheduleSettings() // make note of original schedule sharedState.startingDate = settings.getStartingDate() sharedState.numberOfTimeUnits = settings.getNumberOfTimeUnits() settings.setNewNumberOfTimeUnits(2) } else { // switching from 2 minute schedule to original schedule sharedState.condititionMet = true def settings = actionBuilder.getScheduleSettings() def startingDate = sharedState.remove("startingDate") def numberOfTimeUnits = sharedState.remove("numberOfTimeUnits") settings.setNewStartingDate(startingDate) settings.setNewNumberOfTimeUnits(numberOfTimeUnits) // note the logic will go there also for the first ever run, global state is not set yet, // properties are null and using setting.setNew... with null values means "keep the original values" // -> therefore first run is always the original schedule interval }

Advanced Sequencing Techniques

Sequencing is combining the above mentioned techniques for maximum flexibility and power, namely preservation of state between flow runsself-altering schedule and starting of actions.

Let's suppose that a customer needs to automatically calculate a Price List every 10 days, but before that a CFS enriching product master has to be started and finished.

The set of actions is:

  1. Define a CF with a 10-day interval.

  2. Once the CF is woken up by the scheduler, the formula starts a CFS, lowers the sampling interval from 10 days to 5 minutes (the CF will be woken up every 5 minutes) and marks an original schedule to the api.global map.

  3. The CF checks every 5 minutes the CFS status and if it is ready, it starts the price list calculation and puts the schedule back to the 10-day interval (the exact setting is in the api.global map).

Scheduling & Prioritization of Calculation Flows

Calculation flows are used to schedule repeating background jobs at a somewhat deterministic rate/frequency. Due to the nature of background jobs, this implies that there is no "exact" timing – in the notion of "every x seconds".

Background jobs and also calculation flows are not designed to execute high frequency requests/jobs at a precise interval. High frequency in this context means: more often than once every 5 minutes.

The following rules explain how calculation flows are scheduled:

  • Calculation flows are prioritized over other (non-CF) jobs.

  • However, calculation flows are not scheduled “by the second” (as well as the other jobs).

  • The order is by the priority and then by the creation date (oldest first) and there is a maximum limit of concurrent running jobs, so the number of open slots varies. This is necessary to ensure fair resource usage in a shared environment.

As a conclusion, calculation flows are the right vehicle for „fixed rate“ jobs, just make sure the scheduled frequency is reasonably lower than the job check intervals.

 Lowering the frequency of job checking to seconds (to achieve very precise job order) is absolutely not recommended, as it has a huge negative impact on the whole system.

If you need to run a calculation flow at a high frequency (e.g., more often than every 5 minutes) and exact timing, it is recommended to use an external scheduler and work with JSON API commands / formula executions.

Failover Mechanism

You can turn on a failover mechanism for the calculation flow processor. It is useful when you want to make sure that if a node stops working, some other node will pick up the work.

To enable it, use the ServerRole: "Calculation Flow Failover Processor" for the node(s) which should pick up processing if the master node (i.e., a node with "Calculation Flow Processor") fails.

CF Calculation Running on Cloud Provider Backend Node

To save resources, Calculation Flow jobs run by default on the pricefx-cluster-app-backend-* pod (also known as the service role – the role taking care of janitor tasks, emails, logs, etc.) in Pricefx hosting infrastructure. If you, for some reason, want to have a dedicated pod created for CF jobs, you can enable it:

  • For individual CFs by enabling the option Disable on service role in the CF configuration. This is a preferred option if you want to keep your costs down because you are running only the required jobs inside pods. The next two options are more costly because running a CF as a pod is more expensive than running it on the backend node.
    Note: If the disableCFOnServiceRole property is enabled on the cluster or partition, all CFs will run on a dedicated pod regardless of individual CF settings.

  • For the current partition by adding the Advanced Configuration Option disableCFOnServiceRole and setting its value to true.

  • Cluster-wide by setting the property disableCFOnServiceRole to true in the pricefx-config.xml file.

You can find out in the UI if a Calculation Flow job is running on a service role or a dedicated pod. In Jobs & Tasks or Calculation Tracking, make the Lightweight column visible and check the value for your Calculation Flow. If the value is true, it runs on the backend node. If the value is false, it runs on its dedicated pod.

Note: In baremetal environments, Calculation Flows will always run on the backend node regardless of this settings.

Found an issue in documentation? Write to us.

 
Pricefx version 13.1