Uploaded image for project: 'Infinispan'
  1. Infinispan
  2. ISPN-887

XAResource implementation improvements (TransactionalXAResource)

    XMLWordPrintable

Details

    • Enhancement
    • Resolution: Done
    • Major
    • 5.0.0.CR8, 5.0.0.FINAL
    • 4.2.0.Final
    • Transactions
    • None

    Description

      Reviewing the code with Jonathan Halliday has brought the following aspects (see TODOs below):

      package org.infinispan.transaction.xa;

      import org.infinispan.commands.CommandsFactory;
      import org.infinispan.commands.tx.CommitCommand;
      import org.infinispan.commands.tx.PrepareCommand;
      import org.infinispan.commands.tx.RollbackCommand;
      import org.infinispan.config.Configuration;
      import org.infinispan.context.InvocationContextContainer;
      import org.infinispan.context.impl.LocalTxInvocationContext;
      import org.infinispan.interceptors.InterceptorChain;
      import org.infinispan.util.logging.Log;
      import org.infinispan.util.logging.LogFactory;

      import javax.transaction.xa.XAException;
      import javax.transaction.xa.XAResource;
      import javax.transaction.xa.Xid;

      /**

      • This acts both as an local {@link org.infinispan.transaction.xa.CacheTransaction}

        and implementor of an

        {@link * javax.transaction.xa.XAResource}

        that will be called by tx manager on various tx stages.
        *

      • @author Mircea.Markus@jboss.com
      • @since 4.0
        */
        public class TransactionXaAdapter implements XAResource {

      private static final Log log = LogFactory.getLog(TransactionXaAdapter.class);
      private static boolean trace = log.isTraceEnabled();

      //todo - comment why timeout is not used
      // - it is useful only if TM and client are in separate processes and TM might fail. this is because a client might tm.begin and then the TM (running separate process) crashes
      // - in this scenario the TM won't ever call XAResource.rollback, so these resources would be held there forever
      // - not affecting us as in all scenarios TM&XAResource are collocated

      private int txTimeout;

      private final InvocationContextContainer icc;
      private final InterceptorChain invoker;

      private final CommandsFactory commandsFactory;
      private final Configuration configuration;

      private final TransactionTable txTable;

      /**

      • XAResource is associated with a transaction between enlistment (XAResource.start()) XAResource.end(). It's only the
      • boundary methods (prepare, commit, rollback) that need to be "stateless".
      • Reefer to section 3.4.4 from JTA spec v.1.1
        */
        private final LocalTransaction localTransaction;

      public TransactionXaAdapter(LocalTransaction localTransaction, TransactionTable txTable, CommandsFactory commandsFactory,
      Configuration configuration, InterceptorChain invoker, InvocationContextContainer icc)

      { this.localTransaction = localTransaction; this.txTable = txTable; this.commandsFactory = commandsFactory; this.configuration = configuration; this.invoker = invoker; this.icc = icc; }

      /**

      • This can be call for any transaction object. See Section 3.4.6 (Resource Sharing) from JTA spec v1.1.
        */
        public int prepare(Xid xid) throws XAException {
        //todo if I throw an exception here then I should also cleanup resources as .rollback might never be called!!
        LocalTransaction localTransaction = getLocalTransactionAndValidate(xid);

      //todo - same as last comment
      validateNotMarkedForRollback(localTransaction);

      if (configuration.isOnePhaseCommit()) {
      if (trace) log.trace("Received prepare for tx:

      {0}. Skipping call as 1PC will be used.", xid);
      return XA_OK;
      }

      PrepareCommand prepareCommand = commandsFactory.buildPrepareCommand(localTransaction.getGlobalTransaction(), localTransaction.getModifications(), configuration.isOnePhaseCommit());
      if (trace) log.trace("Sending prepare command through the chain: " + prepareCommand);

      LocalTxInvocationContext ctx = icc.createTxInvocationContext();
      ctx.setLocalTransaction(localTransaction);
      try {
      invoker.invoke(ctx, prepareCommand);
      if (localTransaction.isReadOnly()) { if (trace) log.trace("Readonly transaction: " + localTransaction.getGlobalTransaction()); // force a cleanup to release any objects held. Some TMs don't call commit if it is a READ ONLY tx. See ISPN-845 commit(xid, false); return XA_RDONLY; } else { return XA_OK; }
      } catch (Throwable e) { // todo if I throw this exception make sure that all locks are 100% cleaned up, as TM won't do any rollback call on it. // todo - handle this! -> if only a node fails to ack tx prepare, and that node is still part of the cluster, it needs to be sync with tx state. // one way of doing this is by pushing the tx state to that node until one of two happens: a) node ack or b) node is shunned from the cluster log.error("Error while processing PrepareCommand", e); throw new XAException(XAException.XAER_RMERR); }
      }

      /**
      * Same comment as for {@link #prepare(javax.transaction.xa.Xid)} applies for commit.
      */
      public void commit(Xid xid, boolean isOnePhase) throws XAException {
      LocalTransaction localTransaction = getLocalTransactionAndValidate(xid);

      if (trace) log.trace("committing transaction {0}

      ", localTransaction.getGlobalTransaction());
      try {
      LocalTxInvocationContext ctx = icc.createTxInvocationContext();
      ctx.setLocalTransaction(localTransaction);
      // todo this needs to be split in two:
      // - configuration.isOnePhaseCommit() this is not "as important", as the user ack that it doesn't "really" need consistency
      // - on the other case ("isOnePhase"==true) make sure that this method either commits successfully or it fails and cleans up logs eventually
      if (configuration.isOnePhaseCommit() || isOnePhase) {
      validateNotMarkedForRollback(localTransaction);

      if (trace) log.trace("Doing an 1PC prepare call on the interceptor chain");
      PrepareCommand command = commandsFactory.buildPrepareCommand(localTransaction.getGlobalTransaction(), localTransaction.getModifications(), true);
      try

      { invoker.invoke(ctx, command); }

      catch (Throwable e)

      { log.error("Error while processing 1PC PrepareCommand", e); throw new XAException(XAException.XAER_RMERR); }
      } else {
      CommitCommand commitCommand = commandsFactory.buildCommitCommand(localTransaction.getGlobalTransaction());
      try { invoker.invoke(ctx, commitCommand); } catch (Throwable e) { log.error("Error while processing 1PC PrepareCommand", e); throw new XAException(XAException.XAER_RMERR); }

      }
      } finally

      { cleanup(localTransaction); }

      }

      /**

      • Same comment as for {@link #prepare(javax.transaction.xa.Xid)}

        applies for commit.
        */
        public void rollback(Xid xid) throws XAException

        { rollbackImpl(xid, commandsFactory, icc, invoker, txTable); }

      public static void rollbackImpl(Xid xid, CommandsFactory commandsFactory, InvocationContextContainer icc, InterceptorChain invoker, TransactionTable txTable) throws XAException {
      LocalTransaction localTransaction = txTable.getLocalTransaction(xid);
      if (localTransaction == null) {
      if (trace) log.trace("no tx found for

      {0}", xid);
      throw new XAException(XAException.XAER_NOTA);
      }
      if (trace) log.trace("rollback transaction {0}

      ", localTransaction.getGlobalTransaction());
      RollbackCommand rollbackCommand = commandsFactory.buildRollbackCommand(localTransaction.getGlobalTransaction());
      LocalTxInvocationContext ctx = icc.createTxInvocationContext();
      ctx.setLocalTransaction(localTransaction);
      try

      { invoker.invoke(ctx, rollbackCommand); }

      catch (Throwable e)

      { log.error("Exception while rollback", e); throw new XAException(XAException.XA_HEURHAZ); }

      finally

      { cleanupImpl(localTransaction, txTable, icc); }

      }

      private LocalTransaction getLocalTransactionAndValidate(Xid xid) throws XAException {
      LocalTransaction localTransaction1 = txTable.getLocalTransaction(xid);
      if (localTransaction1 == null) {
      log.error("This should not happen when XAResource and TM are in the same process! No tx found for

      {0}", xid);
      throw new XAException(XAException.XAER_NOTA);
      }
      return localTransaction1;
      }

      public void start(Xid xid, int i) throws XAException { localTransaction.setXid(xid); txTable.addLocalTransactionMapping(localTransaction); if (trace) log.trace("start called on tx " + this.localTransaction.getGlobalTransaction()); }

      public void end(Xid xid, int i) throws XAException { if (trace) log.trace("end called on tx " + this.localTransaction.getGlobalTransaction()); }

      public void forget(Xid xid) throws XAException { if (trace) log.trace("forget called"); }

      public int getTransactionTimeout() throws XAException { if (trace) log.trace("start called"); return txTimeout; }

      public boolean isSameRM(XAResource xaResource) throws XAException {
      if (!(xaResource instanceof TransactionXaAdapter)) { return false; }
      TransactionXaAdapter other = (TransactionXaAdapter) xaResource;
      return other.equals(this);
      }

      public Xid[] recover(int i) throws XAException { if (trace) log.trace("recover called: " + i); return null; }

      public boolean setTransactionTimeout(int i) throws XAException { this.txTimeout = i; return true; }

      @Override
      public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof TransactionXaAdapter)) return false; TransactionXaAdapter that = (TransactionXaAdapter) o; return this.localTransaction.equals(that.localTransaction); }

      @Override
      public int hashCode() { return localTransaction.getGlobalTransaction().hashCode(); }

      @Override
      public String toString() {
      return "TransactionXaAdapter{" + "localTransaction=" + localTransaction + '}';
      }

      private void validateNotMarkedForRollback(LocalTransaction localTransaction) throws XAException {
      if (localTransaction.isMarkedForRollback()) {
      if (trace) log.trace("Transaction already marked for rollback: {0}

      ", localTransaction);
      throw new XAException(XAException.XA_RBROLLBACK);
      }
      }

      private void cleanup(LocalTransaction localTransaction)

      { TransactionXaAdapter.cleanupImpl(localTransaction, txTable, icc); }

      private static void cleanupImpl(LocalTransaction localTransaction, TransactionTable txTable, InvocationContextContainer icc)

      { txTable.removeLocalTransaction(localTransaction); icc.suspend(); }


      }

      Attachments

        Activity

          People

            mircea.markus Mircea Markus (Inactive)
            mircea.markus Mircea Markus (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: