Uploaded image for project: 'jBPM'
  1. jBPM
  2. JBPM-626

CommandExecutorThread thread safety issues

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Can't Do
    • Icon: Major Major
    • None
    • jBPM 3.1.0, jBPM 3.1.1, jBPM 3.1.2, jBPM 3.1.3, jBPM 3.1.4, jBPM 3.2.0, jBPM 3.2.1
    • Runtime Engine
    • None

      There appears to be a couple of thread safety problems with the behaviour of the CommandExecutorThread:

      (1) Consider the following code:

      JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
      try

      { GraphSession graphSession = jbpmContext.getGraphSession(); // Find the process definition in the database ProcessDefinition processDefinition = graphSession.findLatestProcessDefinition(processDefinitionName); // Create an execution of the process definition. ProcessInstance processInstance = new ProcessInstance(processDefinition); // Check that the process instance is in the start state Token token = processInstance.getRootToken(); // Start the process execution token.signal(); ? jbpmContext.save(processInstance); }

      finally

      { // Tear down the pojo persistence context. jbpmContext.close(); }

      If this code is executed while the token is in node ?node 1?, the token.signal() method causes a node-enter event to fire on the next asynchronous node ?node 2? and a message to be stored in the JBPM_MESSAGE table. This message identifies the token as being in ?node 2?. However the node associated with the token in the JBPM_TOKEN table is still in ?node 1? until the jbpmContext.close() method flushes the process instance to the database.
      So the JBPM_MESSAGE.NODE for the token is ?node 2? but the JBPM_TOKEN.NODE for the same token is ?node 1?.
      This means that on line 53 of org.jbpm.command.ExecuteNodeCommand execute(), an exception is thrown:

      public void execute() {
      if (! node.equals(token.getNode()))

      { throw new JbpmException("couldn't continue execution for '"+token+"', token moved"); }

      and an exception is added to the message so that it is not re-tried.

      As a work-around, I modified the org.jbpm.msg.command.CommandExecutorThread handleProcessingException method in a pretty ugly way; I just leave the message in the table to be re-tried if the exception message contains the String ?token moved?:

      void handleProcessingException(MessageProcessingException e) {
      JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext(jbpmContextName);
      try {
      // get the message session from the context
      DbMessageService dbMessageSessionImpl = (DbMessageService) jbpmContext.getServices().getMessageService();
      MessagingSession messageSession = dbMessageSessionImpl.getMessagingSession();

      // get the problematic command message from the exception
      Message message = e.message;

      // MWS 01/04/06 Workaround for async continuations
      //
      // Will get this exception if the JBPM_MESSAGE.NODE is not the same as
      // the node associated with the JBPM_MESSAGE.TOKEN (i.e. its JBPM_TOKEN.NODE)
      // in the JBPM_MESSAGE table (see org.jbpm.command.ExecuteNodeCommand execute());
      // however this can be caused by a token.signal()
      // persisting the JBPM_MESSAGE (e.g. from a node-enter on an async node)
      // before the token state is persisted by a subsequent call to
      // jbpmContext.close(); e.g.
      //
      // try { // ... // token.signal() //Writes to JBPM_MESSAGE if async node entered // jbpmContext.save(); // } finally {
      // jbpmContext.close //Updates JBPM.TOKEN in DB
      //
      // i.e. the above is not thread-safe
      //
      // As a workaround, leave the message to be processed again
      // on the following invocation of the CommandExecutorThread
      //
      if (e.getCause().getMessage() != null) {
      if (e.getCause().getMessage().indexOf("token moved")!=-1) {
      try { Thread.sleep(2000); } catch(InterruptedException ie) { //Do nothing }
      return;
      }
      }

      // remove the problematic message from the queue
      dbMessageSessionImpl.receiveByIdNoWait(message.getId());

      message = Message.createCopy(message);

      // update the message with the stack trace
      StringWriter sw = new StringWriter();
      e.printStackTrace(new PrintWriter(sw));
      message.setException(sw.toString());

      // update the message with the jbpm-error-queue destination
      message.setDestination(errorDestination);

      // resend
      messageSession.save(message);

      } finally { jbpmContext.close(); }
      }

      (2) There also appears to be a problem with the interaction of the SchedulerThread and the CommandExecutorThread. For a node ?node 1? that is marked asynchronous and has a timer that fires on node-enter with an associated action that executes immediately, the ActionHandler on the timer action may progress the token from asynchronous node ?node 1? to node ?node 2? and update the token in the JBPM_TOKEN table. However the update of the JPBM_TOKEN row may occur before the CommandExecutorThread has consumed the message in the JBPM_MESSAGE table that is stored on entry into ?node 1? and details that the token is currently in ?node 1?.
      So the JBPM_MESSAGE.NODE for the token is ?node 1? but the JBPM_TOKEN.NODE for the same token is ?node 2?. This means that on line 53 of org.jbpm.command.ExecuteNodeCommand execute(), an exception is thrown (as described in (1)):

      public void execute() {
      if (! node.equals(token.getNode())) { throw new JbpmException("couldn't continue execution for '"+token+"', token moved"); }

      If node 2 is also asynchronous, the situation can arise where the same token ID appears in the JBPM_MESSAGE table twice, with each identifying a different node; ?node 1? and ?node 2?.

      As a work-around, in the ActionHandler for the timer, I wait for any messages related to the current node to be consumed by the CommandExecutorThread before progressing the token (I make the assumption that the JBPM_MESSAGE table can only contain one message per token):

      JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
      try {
      graphSession = jbpmContext.getGraphSession();

      //Load the processInstance from the database
      processInstance = graphSession.loadProcessInstance(getProcessInstanceId());

      ?

      MessagingSession messagingSession = jbpmContext.getMessagingSession();
      for (; {
      java.util.Iterator iterator = messagingSession.getMessageIterator("CMD_EXECUTOR");
      boolean foundToken = false;
      while(iterator.hasNext()) {
      Message message = (Message) iterator.next();
      if (message.getToken() == token)

      { foundToken = true; }

      }
      if (foundToken) {
      try

      { System.out.println("Found message for token: " + token.getId()); System.out.println("Waiting for CommandExecutorThread to consume message"); Thread.sleep(500); }

      catch (InterruptedException e)

      { //Do nothing }

      } else

      { break; }

      }

      ?

      //Progress over the 'credit_approved' transition
      taskInstance.end("credit_approved");

      jbpmContext.save(processInstance);
      } finally {
      jbpmContext.close();
      }

      Note that I open a new jbpmContext rather than getting this from the ExecutionContext because the code above executes in a new thread spawned by the ActionHandler. However, this work-around occasionally produces the following exception, so I?m obviously doing something wrong (I haven?t investigated this in detail as yet):

      Found message for token: 6
      Waiting for CommandExecutorThread to consume message
      org.jbpm.persistence.JbpmPersistenceException: couldn't commit hibernate session
      at org.jbpm.persistence.db.DbPersistenceService.close(DbPersistenceService.java:171)
      at org.jbpm.svc.Services.close(Services.java:211)
      at org.jbpm.JbpmContext.close(JbpmContext.java:138)
      at com.mdsuk.bpm.webshop.actions.ReceiveCreditReferenceAsyncActionHandler.asyncExecute(ReceiveCreditReferenceAsyncActionHandler.java:103)
      at com.mdsuk.bpm.webshop.actions.AsyncActionHandler.run(AsyncActionHandler.java:24)
      at java.lang.Thread.run(Unknown Source)
      Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect): org.jbpm.graph.exe.Token#6
      at org.hibernate.persister.entity.AbstractEntityPersister.check(AbstractEntityPersister.java:1635)
      at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:2208)
      at org.hibernate.persister.entity.AbstractEntityPersister.updateOrInsert(AbstractEntityPersister.java:2118)
      at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:2374)
      at org.hibernate.action.EntityUpdateAction.execute(EntityUpdateAction.java:84)
      at org.hibernate.engine.ActionQueue.execute(ActionQueue.java:243)
      at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:227)
      at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:141)
      at org.hibernate.event.def.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:296)
      at org.hibernate.event.def.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:27)
      at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:980)
      at org.hibernate.impl.SessionImpl.managedFlush(SessionImpl.java:353)

            tom.baeyens Tom Baeyens (Inactive)
            mshotton Mark Shotton (Inactive)
            Votes:
            5 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: