Uploaded image for project: 'jBPM'
  1. jBPM
  2. JBPM-3723

Allow StatefulKnowledgeSession to be disposed from within a JTA Transaction.

    Details

    • Type: Enhancement
    • Status: Resolved (View Workflow)
    • Priority: Major
    • Resolution: Done
    • Affects Version/s: jBPM 5.2, jBPM 5.3
    • Fix Version/s: jBPM 6.0.0.CR4
    • Component/s: Runtime Engine
    • Labels:
    • Environment:

      JBoss BRMS 5.3.0.GA, Fedora 16 x86_64, Oracle HotSpot JVM 1.6.0_33

      Description

      I have the requirement to start a process instance from a MessageDrivenBean. I.e. a message arrives and starts a process. A single message should start one and only one process instance. Message loss is not acceptable, neither are duplicate process instances.

      To implement this, I configured jBPM5 to use JPA and JTA. The idea is to have a JTA Transaction span both the delivery of the JMS message, as well as the start of the process instance (until the first save-point). I'm running a StatefulKnowledgeSession (SKS) per Process Instance architecture, i.e. every process instance runs in its own SKS. According to the SKS API, one should call 'sks.dispose' when one is done with the SKS. This way, the SKS' resources are freed, the SKS is persisted to the DB and can be reloaded when it is needed again: http://docs.jboss.org/jbpm/v5.2/javadocs/org/drools/runtime/StatefulKnowledgeSession.html

      When doing this, I encounter a problem with calling 'sks.dispose()' inside a transaction. As mentioned here: https://community.jboss.org/thread/201901 , you cannot call 'sks.dispose()' inside a transaction. The problem seems to be that the SKS registers a transaction synchronization callback, which is called after transaction completion. But because of the fix of https://issues.jboss.org/browse/JBRULES-1880 , calling any method on an SKS after it has been disposed will throw a IllegalStateException. As explained here, the 'sks.dispose()' method should be called outside of the transaction boundaries: https://community.jboss.org/thread/201901 This is also the solution implemented in the ProcessFlowProvision framework(i.e. it resorts to Bean Managed Transactions (BMT)): http://people.redhat.com/jbride/

      The problem is that I can't call 'sks.dispose()' outside of the transaction boundary. In an MDB, the transactional boundaries are controlled by the JCA Inflow logic. I can't resort to BMT as that would cause the delivery (and acknowledgement) of the JMS message to be in a different transaction than the start of the process instance. This clearly does not implement the requirement outlined earlier as this could cause duplicate process instances being started (in certain situations, for example a crash after the process instance has been created, but before the JMS message has been ack'ed. When the message is re-delivered, a second process instance will be created). The current solution that I've implemented is to register my own Transaction Synchronization callback, which, after transaction completion, suspends the transaction, calls 'sks.dispose()' and resumes the transaction. This is custom logic which developers need to implement themselves, and frankly, it is not really obvious, every-day, Java 101 code.

      It would be nice if one would be able to either dispose the SKS from within a JTA transaction, or if jBPM5 would provide an easy to use API that will do this for you, without having to resort to writing custom TransactionSynchronization callback logic.

      This is a piece of the code I'm using now to get this to work (the 'tm' variable is the injected/looked up TransactionManager, 'utx' is the UserTransaction):

      public Map<String, Object> startProcess(String processId, Map<String, Object> parameters) {
      		// This thing should always be run in a Transactional Context, so check whether a transaction is running.
      		// TODO: This is a bit dirty, if a transaction is not running, we could actually start one ourselves. Just being lazy for now.
      		try {
      			if (utx.getStatus() != Status.STATUS_ACTIVE) {
      				throw new IllegalStateException("This method requires an active JTA transaction.");
      			}
      		} catch (SystemException e) {
      			throw new RuntimeException("Error get UserTransaction status.");
      		}
      
      		final StatefulKnowledgeSession ksession = SimpleKSessionService.getInstance().getKSession();
      		int ksessionId = ksession.getId();
      
      		try {
      			tm.getTransaction().registerSynchronization(new Synchronization() {
      
      				@Override
      				public void beforeCompletion() {
      					)// TODO Auto-generated method stub
      				}
      
      				@Override
      				public void afterCompletion(int arg0) {
      					LOGGER.info("After transaction completion, disposing knowledge session.");
      					try {
      						/*
      						 * We can't even call the dispose method here as it seems to register a transaction synchronization object somewhere
      						 * when it finds a live transaction. However, we have just completed our transaction, so it is not allowed to
      						 * register new transaction synchronization objects. So we first need to suspend the current transaction before we
      						 * can dispose the knowledge session.
      						 */
      						Transaction suspendedTransaction = tm.suspend();
      						ksession.dispose();
      						tm.resume(suspendedTransaction);
      					} catch (InvalidTransactionException ite) {
      						// TODO Auto-generated catch block
      						throw new RuntimeException("Unable to resume transaction.");
      					} catch (IllegalStateException ise) {
      						// TODO Auto-generated catch block
      						throw new RuntimeException("Unable to resume transaction.");
      					} catch (SystemException se) {
      						throw new RuntimeException("Unable to resume transaction");
      					}
      				}
      			});
      		} catch (IllegalStateException ise) {
      			ksession.dispose();
      			throw new RuntimeException("Error while registering transaction synchronization object.", ise);
      		} catch (RollbackException re) {
      			ksession.dispose();
      			throw new RuntimeException("Error while registering transaction synchronization object.", re);
      		} catch (SystemException se) {
      			ksession.dispose();
      			throw new RuntimeException("Error while registering transaction synchronization object.", se);
      		}
      
      		ProcessInstance pInstance = null;
      		if (parameters != null) {
      			pInstance = ksession.startProcess(processId, parameters);
      		} else {
      			pInstance = ksession.startProcess(processId);
      		}
      
      		Map<String, Object> returnMap = new HashMap<String, Object>();
      		returnMap.put(ProcessService.PROCESS_INSTANCE_ID, pInstance.getId());
      		returnMap.put(KSessionService.KSESSION_ID, ksessionId);
      		return returnMap;
      	}
      

        Gliffy Diagrams

          Attachments

            Activity

              People

              • Assignee:
                swiderski.maciej Maciej Swiderski
                Reporter:
                McCloud Duncan Doyle
              • Votes:
                1 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: