Index: modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/ExecuteJobCmd.java =================================================================== --- modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/ExecuteJobCmd.java (revision 6412) +++ modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/ExecuteJobCmd.java (working copy) @@ -44,9 +44,11 @@ private static final long serialVersionUID = 1L; private static final Log log = Log.getLog(ExecuteJobCmd.class.getName()); - + protected Long jobDbid; + protected JobExceptionHandler jobExceptionHandler; + public ExecuteJobCmd(String jobId) { if (jobId==null) { throw new JbpmException("jobId is null"); @@ -60,59 +62,76 @@ public Job execute(Environment environmentInterface) throws Exception { EnvironmentImpl environment = (EnvironmentImpl) environmentInterface; - DbSession dbSession = environment.get(DbSession.class); - if (dbSession==null) { - throw new JbpmException("no db-session configured"); - } + DbSession dbSession = environment.get(DbSession.class); + if (dbSession==null) { + throw new JbpmException("no db-session configured"); + } JobImpl job = (JobImpl) dbSession.get(JobImpl.class, jobDbid); - // in case of decision jobs, the job might have been deleted - // before we execute it (they are in a list) - if (job != null) { - JobContext jobContext = new JobContext(job); + // in case of decision jobs, the job might have been deleted + // before we execute it (they are in a list) + if (job != null) { + this.registerJobExceptionHandler(environment, job); + + JobContext jobContext = new JobContext(job); environment.setContext(jobContext); - try { - log.debug("executing job "+job+"..."); - job.execute(environment); - log.debug("executed job "+job); + try { + log.debug("executing job "+job+"..."); + job.execute(environment); + log.debug("executed job "+job); - // if this job is locked too long, it could be unlocked by the lockmonitor and - // executed by another thread. - Date lockExpirationDate = job.getLockExpirationTime(); - // can be null if it was rescheduled - if (lockExpirationDate != null) { - long lockExpiration = lockExpirationDate.getTime(); - long currentTime = System.currentTimeMillis(); - if (currentTime>lockExpiration) { - throw new JbpmException("job took too long: lock expired "+(currentTime-lockExpiration)+"ms ago"); - } - } - } catch (Exception exception) { - log.error("exception while executing '"+job+"'", exception); - handleJobExecutionException(environment, job, exception); - } finally { - environment.removeContext(jobContext); - } + // if this job is locked too long, it could be unlocked by the lockmonitor and + // executed by another thread. + Date lockExpirationDate = job.getLockExpirationTime(); + // can be null if it was rescheduled + if (lockExpirationDate != null) { + long lockExpiration = lockExpirationDate.getTime(); + long currentTime = System.currentTimeMillis(); + if (currentTime>lockExpiration) { + throw new JbpmException("job took too long: lock expired "+(currentTime-lockExpiration)+"ms ago"); + } + } + } catch (Exception exception) { + log.error("exception while executing '" + job + "'", exception); + handleJobExecutionException(exception); + } finally { + environment.removeContext(jobContext); + } - } else { - log.debug("job " + jobDbid + " no longer exists"); - } + } else { + log.debug("job " + jobDbid + " no longer exists"); + } return job; } - /** This transaction will be marked for rollback. A command will be associated with the - * Transaction.EVENT_AFTERCOMPLETION (after the job locks of the current transaction are - * released). Then the command will update the job with the exception details in a separate + /** This transaction will be marked for rollback. A command will be associated with the + * Transaction.EVENT_AFTERCOMPLETION (after the job locks of the current transaction are + * released). Then the command will update the job with the exception details in a separate * transaction. */ - protected void handleJobExecutionException(Environment environment, JobImpl job, Exception exception) { - Transaction transaction = environment.get(Transaction.class); - CommandService commandService = (CommandService) environment.get(CommandService.NAME_NEW_TX_REQUIRED_COMMAND_SERVICE); - JobExceptionHandler jobExceptionHandler = new JobExceptionHandler(job.getDbid(), exception, commandService); - transaction.registerSynchronization(jobExceptionHandler); - + protected void handleJobExecutionException(Exception exception) { + if (jobExceptionHandler != null) { + jobExceptionHandler.setException(exception); + } else { + log.warn("jobExceptionHandler hasnot initialized for exception : " + exception.getMessage()); + } + if (exception instanceof RuntimeException) { throw (RuntimeException) exception; } - throw new JbpmException("job failed: "+exception.getMessage(), exception); + throw new JbpmException("job failed: " + exception.getMessage(), exception); } + + protected void registerJobExceptionHandler(Environment environment, JobImpl job) { + Transaction transaction = environment.get(Transaction.class); + CommandService commandService = (CommandService) environment + .get(CommandService.NAME_NEW_TX_REQUIRED_COMMAND_SERVICE); + + jobExceptionHandler = new JobExceptionHandler(job.getDbid(), commandService); + try { + transaction.registerSynchronization(jobExceptionHandler); + } catch(Exception ex) { + log.warn("cannot register synchronization on current transaction : " + ex.getMessage() + + " job : " + job.getDbid()); + } + } } Index: modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExceptionHandler.java =================================================================== --- modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExceptionHandler.java (revision 6412) +++ modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExceptionHandler.java (working copy) @@ -35,32 +35,37 @@ import org.jbpm.pvm.internal.session.DbSession; import org.jbpm.pvm.internal.tx.Transaction; -/** +/** * @author Tom Baeyens */ public class JobExceptionHandler implements Synchronization, Command { - + private static final Log log = Log.getLog(JobExceptionHandler.class.getName()); private static final long serialVersionUID = 1L; - + protected CommandService commandService; protected long jobDbid; protected Throwable exception; - - public JobExceptionHandler(long jobDbid, Throwable exception, CommandService commandService) { + + public JobExceptionHandler(long jobDbid, CommandService commandService) { this.commandService = commandService; this.jobDbid = jobDbid; - this.exception = exception; } - + public void setException(Throwable exception) { + this.exception = exception; + } + public void beforeCompletion() { } public void afterCompletion(int status) { - // after the transaction rolled back, - // execute this job exception handler object as a command with - // the command service so that this gets done in a separate + if (exception == null) { + return; + } + // after the transaction rolled back, + // execute this job exception handler object as a command with + // the command service so that this gets done in a separate // transaction log.debug("starting new transaction for handling job exception"); commandService.execute(this); @@ -69,7 +74,7 @@ public Object execute(Environment environment) throws Exception { log.debug("handling job "+jobDbid+" exception: "+exception.getMessage()); - + // load the job from the db DbSession dbSession = environment.get(DbSession.class); if (dbSession==null) { @@ -85,8 +90,8 @@ log.debug("decrementing retries to "+decrementedRetries+" for "+job); job.release(); job.setRetries(decrementedRetries); - job.setException(sw.toString()); - + job.setException(sw.toString()); + // notify the job executor after the transaction is completed Transaction transaction = environment.get(Transaction.class); JobExecutor jobExecutor = environment.get(JobExecutor.class);