//------------------------- PRIVATE functions of the Sequencer -------------------------------------------- /** * This is the heart of the Sequencer, this is executed every minute (or every 5 mins) and starts the Actions. * */ def processAllSequences(actionBuilder) { api.logInfo("function Library.processAllSequences()", "") def sequences = getAllSequences() for (sequence in sequences) { api.logInfo("Processing sequence", "${sequence.key}, ${sequence.value}") processSequence(sequence.value, actionBuilder) } } def canStartSequence(sequence) { def canStart = true for (order in sequence) { for (action in order.actions) { def status = action.attribute1 if (status != null && status.length() > 0 && status != "DONE") canStart = false } } return canStart } def getSequenceControlTable() { return api.findLookupTable("SequenceControlTable") } def getAllSequences() { def table = [:] def sequenceControlTable = getSequenceControlTable() def startRow = 0 while (rows = api.find("MLTV4", startRow, "key2", Filter.equal("lookupTable.id", sequenceControlTable.id), )) { startRow += rows.size() for (row in rows) { def rowSequenceName = row.key1 def rowOrder = row.key2 def sequence = table[rowSequenceName] if (!sequence) { sequence = [] table[rowSequenceName] = sequence } def order = sequence.find { it.order == rowOrder } if (!order) { order = ["order": rowOrder, actions: []] sequence.add(order) } order.actions.add(row) } } return table } def processSequence(sequence, actionBuilder) { api.logInfo("function Library.processSequence()", "") for (order in sequence) { if (canStartOrder(order)) { startOrder(order, actionBuilder) //start first waiting order break //and do not care about the rest } if (isProcessingOrder(order)) { updateOrder(order) if (isProcessingOrder(order)) { break //if the Order is still processing after update, then wait } } if (isOrderDone(order)) { continue // skip this order, it's done already } if (isOrderError(order)) { break //if the order failed, do not continue any further processing } } } /** * update the status of all PROCESSING Actions in the Order * @param order */ def updateOrder(order) { api.logInfo("function Library.updateOrder()", "${order.order}") for (action in order.actions) { updateAction(action) } } def startOrder(order, actionBuilder) { api.logInfo("function Library.startOrder()", "${order.order}") for (action in order.actions) { startAction(action, actionBuilder) } } def isOrderError(order) { //api.logInfo("function Library.isOrderError()","${order.order}") def error = false for (action in order.actions) { if (action.attribute1 == "ERROR") error = true } api.logInfo("function Library.isOrderError(${order.order})", "return ${error}") return error } def isOrderDone(order) { //api.logInfo("function Library.isOrderDone()","${order.order}") def done = true for (action in order.actions) { if (action.attribute1 != "DONE") done = false } api.logInfo("function Library.isOrderDone(${order.order})", "return ${done}") return done } def canStartOrder(order) { //api.logInfo("function Library.canStartOrder()","${order.order}") def canStart = true order.actions.each { if (it.attribute1 != null && it.attribute1.length() > 0) canStart = false } //statuses PROCESSING, ERROR, DONE //api.trace("function Library.canStartOrder() canStart =","","${canStart}") api.logInfo("function Library.canStartOrder(${order.order})", "return ${canStart}") return canStart } def isProcessingOrder(order) { //api.logInfo("function Library.isProcessingOrder()","${order.order}") def isProcessing = false for (action in order.actions) { if (action.attribute1 == "PROCESSING") isProcessing = true } api.logInfo("function Library.isProcessingOrder(${order.order})", "return ${isProcessing}") return isProcessing } def updateAction(action) { api.logInfo("function Library.updateAction()", "${action.key1},${action.key2},${action.key3},${action.key4},${action.attribute1},${action.attribute2},${action.attribute3},${action.attribute4}") def previousStatus = action.attribute1 def newStatus = getCurrentActionStatus(action) if (previousStatus != newStatus) { updateActionStatus(action, newStatus) api.logInfo("function Library.updateAction()", "status updated from ${previousStatus} to ${newStatus}") } } def getCurrentActionStatus(action) { api.logInfo("function Library.getCurrentActionStatus()", "${action.key1},${action.key2},${action.key3},${action.key4},${action.attribute1},${action.attribute2},${action.attribute3},${action.attribute4}") def status = null def actionType = action.key3 switch (actionType) { case "DL": status = getDLStatus(action); break case "CFS": status = getCFSStatus(action); break } api.logInfo("getCurrentActionStatus() Status", status) def res = null switch (status) { case ["DRAFT"]: res = null; break case ["PENDING", "SCHEDULED", "PROCESSING", "SCHEDULED_DIRTY"]: res = "PROCESSING"; break case ["ERROR", "CANCELLED"]: res = "ERROR"; break case ["READY"]: res = "DONE"; break } api.logInfo("getCurrentActionStatus() Result", res) return res } def getDLStatus(action) { api.logInfo("function Library.getDLStatus()", "${action.key1},${action.key2},${action.key3},${action.key4},${action.attribute1},${action.attribute2},${action.attribute3},${action.attribute4}") def dl = api.findDataLoad(action.key4, action.attribute3, action.attribute4) def status = dl?.status api.logInfo("function Library.getDLStatus()", "return ${status}") return status } def getCFSStatus(action) { api.logInfo("function Library.getCFSStatus()", "${action.key1},${action.key2},${action.key3},${action.key4},${action.attribute1},${action.attribute2},${action.attribute3},${action.attribute4}") def cfs = api.findCalculatedFieldSets(new Date(), action.key4) if (cfs != null && cfs.size() > 1) { api.logWarn("getCFSStatus() - found more CFSs valid for today with name ", action.key4) } //api.trace("cfs","",cfs?.first()) return cfs?.first()?.status } def startAction(action, actionBuilder) { api.logInfo("function Library.startAction()", "${action.key1},${action.key2},${action.key3},${action.key4},${action.attribute1},${action.attribute2},${action.attribute3},${action.attribute4}") def actionType = action.key3 def res = null switch (actionType) { case "DL": res = startDL(action, actionBuilder); break case "CFS": res = startCFS(action, actionBuilder); break } if (res) { updateActionStatus(action, "PROCESSING") } } def startCFS(action, actionBuilder) { api.logInfo("function Library.startCFS()", action.key4) actionBuilder.addCalculatedFieldSetAction(action.key4).setCalculate(true) } def startDL(action, actionBuilder) { api.logInfo("function Library.startDL()", "${action.key4},${action.attribute3},${action.attribute4}") actionBuilder.addDataLoadAction(action.key4, action.attribute3, action.attribute4).setCalculate(true) } def updateActionStatus(action, statusValue) { api.logInfo("function Library.updateActionStatus()", "${action.key3},${statusValue}") //api.trace("action","",action) def entry = [ lookupTableId: getSequenceControlTable().id, key1 : action.key1, key2 : action.key2, key3 : action.key3, key4 : action.key4, attribute1 : statusValue ] api.update("MLTV4", entry) action.status = statusValue }