/** * This is the heart of the Sequencer, this is executed every minute (or every 5 mins) and starts the Actions. * */ def processAllSequences(actionBuilder) { api.logInfo("function Library.processAllSequences()", "") def sequences = getAllSequences() for (sequence in sequences) { api.logInfo("Processing sequence", "${sequence.key}, ${sequence.value}") processSequence(sequence.value, actionBuilder) } } /** * Use this function to start a sequence by its name * @param sequenceName The name of the sequence you want to start */ def startSequence(sequenceName) { startSequence(sequenceName, false) } /** * Use this function to start a sequence by its name * @param sequenceName The name of the sequence you want to start * @param forceStart Whether you want to start the sequence regardless of its current status. This should be used only if the sequence is stuck with error. */ def startSequence(sequenceName, forceStart) { def ss = getAllSequences() def sequence = ss[sequenceName] if (forceStart || canStartSequence(sequence)) { for (order in sequence) { for (action in order.actions) { updateActionStatus(action, null) } } } } def canStartSequence(sequence) { def canStart = true for (order in sequence) { for (action in order.actions) { def status = action.attribute1 if (status != null && status.length() > 0 && status != "DONE") canStart = false } } return canStart } def getSequenceControlTable() { return api.findLookupTable("SequenceControlTable") } def getAllSequences() { def table = [:] def sequenceControlTable = getSequenceControlTable() def startRow = 0 while (rows = api.find("MLTV4", startRow, "key2", Filter.equal("lookupTable.id", sequenceControlTable.id), )) { startRow += rows.size() for (row in rows) { def rowSequenceName = row.key1 def rowOrder = row.key2 def sequence = table[rowSequenceName] if (!sequence) { sequence = [] table[rowSequenceName] = sequence } def order = sequence.find { it.order == rowOrder } if (!order) { order = ["order": rowOrder, actions: []] sequence.add(order) } order.actions.add(row) } } return table } def processSequence(sequence, actionBuilder) { api.logInfo("function Library.processSequence()","") for (order in sequence) { if (canStartOrder(order)) { startOrder(order, actionBuilder) //start first waiting order break //and do not care about the rest } if (isProcessingOrder(order)) { updateOrder(order) } if (isOrderDone(order)) { continue // skip this order, it's done already } if (isOrderError(order)) { break //if the order failed, do not continue any further processing } } } /** * update the status of all PROCESSING Actions in the Order * @param order */ def updateOrder(order) { api.logInfo("function Library.updateOrder()","${order.order}") for (action in order.actions) { updateAction(action) } } def startOrder(order, actionBuilder) { api.logInfo("function Library.startOrder()","${order.order}") for (action in order.actions) { startAction(action, actionBuilder) } } def isOrderError(order) { api.logInfo("function Library.isOrderError()","${order.order}") def error = false for (action in order.actions) { if (action.attribute1 == "ERROR") error = true } return error } def isOrderDone(order) { api.logInfo("function Library.isOrderDone()","${order.order}") def done = true for (action in order.actions) { if (action.attribute1 != "DONE") done = false } return done } def canStartOrder(order) { api.logInfo("function Library.canStartOrder()","${order.order}") def canStart = true order.actions.each { if (it.attribute1 != null && it.attribute1.length() > 0) canStart=false } //statuses PROCESSING, ERROR, DONE //api.trace("function Library.canStartOrder() canStart =","","${canStart}") return canStart } def isProcessingOrder(order) { api.logInfo("function Library.isProcessingOrder()","${order.order}") def isProcessing = false for (action in order.actions) { if (action.attribute1 == "PROCESSING") isProcessing = true } return isProcessing } def updateAction(action) { api.logInfo("function Library.updateAction()","${action.key1},${action.key2},${action.key3},${action.key4},${action.attribute1},${action.attribute2},${action.attribute3},${action.attribute4}") def previousStatus = action.attribute1 def newStatus = getCurrentActionStatus(action) if (previousStatus != newStatus) { updateActionStatus(action, newStatus) } } def getCurrentActionStatus(action) { api.logInfo("function Library.getCurrentActionStatus()","${action.key1},${action.key2},${action.key3},${action.key4},${action.attribute1},${action.attribute2},${action.attribute3},${action.attribute4}") def status = null def actionType = action.key3 switch (actionType) { case "DL": status = getDLStatus(action); break case "CFS": status = getCFSStatus(action); break } api.logInfo("getCurrentActionStatus() Status",status) def res = null switch (status) { case ["DRAFT"]: res = null; break case ["PENDING", "SCHEDULED", "PROCESSING", "SCHEDULED_DIRTY"]: res = "PROCESSING"; break case ["ERROR", "CANCELLED"]: res = "ERROR"; break case ["READY"]: res = "DONE"; break } api.logInfo("getCurrentActionStatus() Result",res) return res } def getDLStatus(action) { api.logInfo("function Library.getDLStatus()","${action.key1},${action.key2},${action.key3},${action.key4},${action.attribute1},${action.attribute2},${action.attribute3},${action.attribute4}") def dl = api.findDataLoad(action.key4, action.attribute3, action.attribute4) return dl?.status } def getCFSStatus(action) { api.logInfo("function Library.getCFSStatus()","${action.key1},${action.key2},${action.key3},${action.key4},${action.attribute1},${action.attribute2},${action.attribute3},${action.attribute4}") def cfs = api.findCalculatedFieldSets(new Date(), action.key4) if (cfs != null && cfs.size() > 1) { api.logWarn("getCFSStatus() - found more CFSs valid for today with name ",action.key4) } //api.trace("cfs","",cfs?.first()) return cfs?.first().status } def startAction(action, actionBuilder) { api.logInfo("function Library.startAction()","${action.key1},${action.key2},${action.key3},${action.key4},${action.attribute1},${action.attribute2},${action.attribute3},${action.attribute4}") def actionType = action.key3 def res = null switch (actionType) { case "DL": res = startDL(action, actionBuilder); break; case "CFS": res = startCFS(action, actionBuilder); break; } if (res) { updateActionStatus(action, "PROCESSING") } } def startCFS(action, actionBuilder) { api.logInfo("function Library.startCFS()",action.key4) actionBuilder.addCalculatedFieldSetAction(action.key4).setCalculate(true) } def startDL(action, actionBuilder) { api.logInfo("function Library.startDL()","${action.key4},${action.attribute3},${action.attribute4}") actionBuilder.addDataLoadAction(action.key4, action.attribute3, action.attribute4).setCalculate(true) } def updateActionStatus(action, statusValue) { api.logInfo("function Library.updateActionStatus()","${action.key3},${statusValue}") //api.trace("action","",action) def entry = [ lookupTableId : getSequenceControlTable().id, key1 : action.key1, key2 : action.key2, key3 : action.key3, key4 : action.key4, attribute1 : statusValue ] api.update("MLTV4", entry) action.status = statusValue }