-
Bug
-
Resolution: Can't Do
-
Major
-
None
-
jBPM 3.1.0, jBPM 3.1.1, jBPM 3.1.2, jBPM 3.1.3, jBPM 3.1.4, jBPM 3.2.0, jBPM 3.2.1
-
None
There appears to be a couple of thread safety problems with the behaviour of the CommandExecutorThread:
(1) Consider the following code:
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
try
finally
{ // Tear down the pojo persistence context. jbpmContext.close(); }If this code is executed while the token is in node ?node 1?, the token.signal() method causes a node-enter event to fire on the next asynchronous node ?node 2? and a message to be stored in the JBPM_MESSAGE table. This message identifies the token as being in ?node 2?. However the node associated with the token in the JBPM_TOKEN table is still in ?node 1? until the jbpmContext.close() method flushes the process instance to the database.
So the JBPM_MESSAGE.NODE for the token is ?node 2? but the JBPM_TOKEN.NODE for the same token is ?node 1?.
This means that on line 53 of org.jbpm.command.ExecuteNodeCommand execute(), an exception is thrown:
public void execute() {
if (! node.equals(token.getNode()))
and an exception is added to the message so that it is not re-tried.
As a work-around, I modified the org.jbpm.msg.command.CommandExecutorThread handleProcessingException method in a pretty ugly way; I just leave the message in the table to be re-tried if the exception message contains the String ?token moved?:
void handleProcessingException(MessageProcessingException e) {
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext(jbpmContextName);
try {
// get the message session from the context
DbMessageService dbMessageSessionImpl = (DbMessageService) jbpmContext.getServices().getMessageService();
MessagingSession messageSession = dbMessageSessionImpl.getMessagingSession();
// get the problematic command message from the exception
Message message = e.message;
// MWS 01/04/06 Workaround for async continuations
//
// Will get this exception if the JBPM_MESSAGE.NODE is not the same as
// the node associated with the JBPM_MESSAGE.TOKEN (i.e. its JBPM_TOKEN.NODE)
// in the JBPM_MESSAGE table (see org.jbpm.command.ExecuteNodeCommand execute());
// however this can be caused by a token.signal()
// persisting the JBPM_MESSAGE (e.g. from a node-enter on an async node)
// before the token state is persisted by a subsequent call to
// jbpmContext.close(); e.g.
//
// try { // ... // token.signal() //Writes to JBPM_MESSAGE if async node entered // jbpmContext.save(); // } finally {
// jbpmContext.close //Updates JBPM.TOKEN in DB
//
// i.e. the above is not thread-safe
//
// As a workaround, leave the message to be processed again
// on the following invocation of the CommandExecutorThread
//
if (e.getCause().getMessage() != null) {
if (e.getCause().getMessage().indexOf("token moved")!=-1) {
try { Thread.sleep(2000); } catch(InterruptedException ie) { //Do nothing }
return;
}
}
// remove the problematic message from the queue
dbMessageSessionImpl.receiveByIdNoWait(message.getId());
message = Message.createCopy(message);
// update the message with the stack trace
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
message.setException(sw.toString());
// update the message with the jbpm-error-queue destination
message.setDestination(errorDestination);
// resend
messageSession.save(message);
} finally { jbpmContext.close(); }
}
(2) There also appears to be a problem with the interaction of the SchedulerThread and the CommandExecutorThread. For a node ?node 1? that is marked asynchronous and has a timer that fires on node-enter with an associated action that executes immediately, the ActionHandler on the timer action may progress the token from asynchronous node ?node 1? to node ?node 2? and update the token in the JBPM_TOKEN table. However the update of the JPBM_TOKEN row may occur before the CommandExecutorThread has consumed the message in the JBPM_MESSAGE table that is stored on entry into ?node 1? and details that the token is currently in ?node 1?.
So the JBPM_MESSAGE.NODE for the token is ?node 1? but the JBPM_TOKEN.NODE for the same token is ?node 2?. This means that on line 53 of org.jbpm.command.ExecuteNodeCommand execute(), an exception is thrown (as described in (1)):
public void execute() {
if (! node.equals(token.getNode())) { throw new JbpmException("couldn't continue execution for '"+token+"', token moved"); }
If node 2 is also asynchronous, the situation can arise where the same token ID appears in the JBPM_MESSAGE table twice, with each identifying a different node; ?node 1? and ?node 2?.
As a work-around, in the ActionHandler for the timer, I wait for any messages related to the current node to be consumed by the CommandExecutorThread before progressing the token (I make the assumption that the JBPM_MESSAGE table can only contain one message per token):
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
try {
graphSession = jbpmContext.getGraphSession();
//Load the processInstance from the database
processInstance = graphSession.loadProcessInstance(getProcessInstanceId());
?
MessagingSession messagingSession = jbpmContext.getMessagingSession();
for (; {
java.util.Iterator iterator = messagingSession.getMessageIterator("CMD_EXECUTOR");
boolean foundToken = false;
while(iterator.hasNext()) {
Message message = (Message) iterator.next();
if (message.getToken() == token)
}
if (foundToken) {
try
catch (InterruptedException e)
{ //Do nothing }} else
{ break; }}
?
//Progress over the 'credit_approved' transition
taskInstance.end("credit_approved");
jbpmContext.save(processInstance);
} finally {
jbpmContext.close();
}
Note that I open a new jbpmContext rather than getting this from the ExecutionContext because the code above executes in a new thread spawned by the ActionHandler. However, this work-around occasionally produces the following exception, so I?m obviously doing something wrong (I haven?t investigated this in detail as yet):
Found message for token: 6
Waiting for CommandExecutorThread to consume message
org.jbpm.persistence.JbpmPersistenceException: couldn't commit hibernate session
at org.jbpm.persistence.db.DbPersistenceService.close(DbPersistenceService.java:171)
at org.jbpm.svc.Services.close(Services.java:211)
at org.jbpm.JbpmContext.close(JbpmContext.java:138)
at com.mdsuk.bpm.webshop.actions.ReceiveCreditReferenceAsyncActionHandler.asyncExecute(ReceiveCreditReferenceAsyncActionHandler.java:103)
at com.mdsuk.bpm.webshop.actions.AsyncActionHandler.run(AsyncActionHandler.java:24)
at java.lang.Thread.run(Unknown Source)
Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect): org.jbpm.graph.exe.Token#6
at org.hibernate.persister.entity.AbstractEntityPersister.check(AbstractEntityPersister.java:1635)
at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:2208)
at org.hibernate.persister.entity.AbstractEntityPersister.updateOrInsert(AbstractEntityPersister.java:2118)
at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:2374)
at org.hibernate.action.EntityUpdateAction.execute(EntityUpdateAction.java:84)
at org.hibernate.engine.ActionQueue.execute(ActionQueue.java:243)
at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:227)
at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:141)
at org.hibernate.event.def.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:296)
at org.hibernate.event.def.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:27)
at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:980)
at org.hibernate.impl.SessionImpl.managedFlush(SessionImpl.java:353)