Description
When I fork 2 transitions( A and B) to submit , when A transition failed , B transition still Running , because can't execute KillXCommand.
SignalXCommand.startForkedActions, when one transition submit fail will create a new ActionStartXCommand and invoke failJob, failJob will add WorkflowNotificationXCommand and KillXCommand to commandQueue , and callback at XCommand.call method , but we add WorkflowNotificationXCommand and KillXCommand to ActionStartXCommand‘s commandQueue , but not SignalXCommand , so can't execute KillXCommand.
The code is as follows :
public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException { ...... for (Future<ActionExecutorContext> result : futures) { ...... if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.FAILED)) { new ActionStartXCommand(context.getAction().getId(), null).failJob(context); ...... } ...... }
public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException { WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); if (!handleUserRetry(context, action)) { incrActionErrorCounter(action.getType(), "failed", 1); LOG.warn("Failing Job due to failed action [{0}]", action.getName()); try { workflow.getWorkflowInstance().fail(action.getName()); WorkflowInstance wfInstance = workflow.getWorkflowInstance(); ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED); workflow.setWorkflowInstance(wfInstance); workflow.setStatus(WorkflowJob.Status.FAILED); action.setStatus(WorkflowAction.Status.FAILED); action.resetPending(); queue(new WorkflowNotificationXCommand(workflow, action)); queue(new KillXCommand(workflow.getId())); InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation()); } catch (WorkflowException ex) { throw new CommandException(ex); } } }
public final T call() throws CommandException { if (commandQueue != null) { for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) { LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey()); if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) { LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue() .size(), entry.getKey()); } } } }