Index: core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java =================================================================== --- core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java (revision 2229) +++ core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java (revision ) @@ -21,12 +21,16 @@ import org.infinispan.util.logging.LogFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -85,6 +89,8 @@ private int concurrencyLevel; @GuardedBy("mapLock") protected ConcurrentMap state; + @GuardedBy("mapLock") + protected ConcurrentLinkedQueue txState; private ReleaseAllLockContainer lockContainer; public AsyncStore(CacheStore delegate, AsyncStoreConfig asyncStoreConfig) { @@ -101,42 +107,43 @@ @Override public void store(InternalCacheEntry ed) { - enqueue(ed.getKey(), new Store(ed)); + enqueue(ed.getKey(), new Store(ed), false); } @Override public boolean remove(Object key) { - enqueue(key, new Remove(key)); + enqueue(key, new Remove(key), false); return true; } @Override public void clear() { Clear clear = new Clear(); - enqueue(clear, clear); + enqueue(clear, clear, false); } @Override public void purgeExpired() { PurgeExpired purge = new PurgeExpired(); - enqueue(purge, purge); + enqueue(purge, purge, false); } @Override public void prepare(List list, GlobalTransaction tx, boolean isOnePhase) { Prepare prepare = new Prepare(list, tx, isOnePhase); - enqueue(prepare, prepare); + enqueue(prepare, prepare, true); } @Override public void commit(GlobalTransaction tx) throws CacheLoaderException { Commit commit = new Commit(tx); - enqueue(commit, commit); + enqueue(commit, commit, true); } @Override public void start() throws CacheLoaderException { state = newStateMap(); + txState = new ConcurrentLinkedQueue(); log.info("Async cache loader starting {0}", this); stopped.set(false); super.start(); @@ -188,13 +195,24 @@ case PURGE_EXPIRED: super.purgeExpired(); break; - case PREPARE: + default: { + throw new IllegalStateException("Unknown modification type: " + mod); + } + } + } + } + + protected void applyTxModificationsSync(Queue mods) throws CacheLoaderException { + while (!mods.isEmpty()) { + Modification mod = mods.remove(); + if (trace) log.trace("Applying tx mod: " + mod); + if (mod.getType().equals(Modification.Type.PREPARE)) { - List coalesced = coalesceModificationList(((Prepare) mod).getList()); - super.prepare(coalesced, ((Prepare) mod).getTx(), ((Prepare) mod).isOnePhase()); + List coalesced = coalesceModificationList(((Prepare) mod).getList()); + super.prepare(coalesced, ((Prepare) mod).getTx(), ((Prepare) mod).isOnePhase()); - break; - case COMMIT: + } else if (mod.getType().equals(Modification.Type.COMMIT)) { - super.commit(((Commit) mod).getTx()); + super.commit(((Commit) mod).getTx()); - break; + } else { + throw new IllegalStateException("Unknown tx modification:" + mod); } } } @@ -230,7 +248,7 @@ return coalesced; } - private void enqueue(Object key, Modification mod) { + private void enqueue(Object key, Modification mod, boolean isTx) { try { if (stopped.get()) { throw new CacheException("AsyncStore stopped; no longer accepting more entries."); @@ -242,7 +260,11 @@ try { acquireLock(read); unlock = true; + if (isTx) { + txState.add(mod); + } else { - prev = state.put(key, mod); // put the key's latest state in updates + prev = state.put(key, mod); // put the key's latest state in updates + } } finally { if (unlock) read.unlock(); } @@ -309,27 +331,36 @@ */ class AsyncProcessor implements Runnable { private ConcurrentMap swap = newStateMap(); + private Queue txSwap = new LinkedList(); private final Set lockedKeys = new HashSet(); public void run() { + try { - while (!Thread.interrupted() && !stopped.get()) { - try { - run0(); - } - catch (InterruptedException e) { - break; - } - } + while (!Thread.interrupted() && !stopped.get()) { + try { + run0(); + } + catch (InterruptedException e) { + break; + } + } - try { - if (trace) log.trace("Process remaining batch {0}", swap.size()); - put(swap); + try { + if (trace) log.trace("Process remaining batch {0}", swap.size()); + put(swap); - if (trace) log.trace("Process remaining queued {0}", state.size()); - while (!state.isEmpty()) run0(); + if (trace) log.trace("Process remaining tx batch {0}", txSwap.size()); + putTx(txSwap); + if (trace) log.trace("Process remaining queued {0}, and remaining tx queued {1}", state.size(), txState.size()); + while (!state.isEmpty() || !txState.isEmpty()) { + run0(); + } - } catch (InterruptedException e) { - if (trace) log.trace("Remaining interrupted"); - } + } catch (InterruptedException e) { + if (trace) log.trace("Remaining interrupted"); + } + } catch (Exception e) { + log.warn("Unexpected exception:" , e); - } + } + } void run0() throws InterruptedException { if (trace) log.trace("Checking for modifications"); @@ -340,10 +371,13 @@ unlock = true; swap = state; state = newStateMap(); + txSwap = txState; + txState = new ConcurrentLinkedQueue(); // This needs doing within the WL section, because if a key is in use, we need to put it back in the state // map for later processing and we don't wanna do it in such way that we override a newer value that might // have been enqueued by a user thread. + //no need to lock entries from txSwap as no two prepare/commit/rollback on the same transaction are expected for (Object key : swap.keySet()) { boolean acquired = lockContainer.acquireLock(key, 0, TimeUnit.NANOSECONDS) != null; if (trace) log.trace("Lock for key {0} was acquired={1}", key, acquired); @@ -359,8 +393,8 @@ } try { - int size = swap.size(); - if (swap.isEmpty()) { + int size = swap.size() + txSwap.size(); + if (swap.isEmpty() && txSwap.isEmpty()) { awaitNotEmptyOrStopped(); } else { decrementAndGet(size); @@ -373,9 +407,9 @@ if (attemptNumber > 0 && log.isDebugEnabled()) log.debug("Retrying due to previous failure. {0} attempts left.", maxRetries - attemptNumber); successful = put(swap); + successful = putTx(txSwap) & successful; attemptNumber++; } while (!successful && attemptNumber <= maxRetries); - if (!successful) log.warn("Unable to process some async modifications after " + maxRetries + " retries!"); @@ -391,21 +425,34 @@ AsyncStore.this.applyModificationsSync(mods); return true; } catch (Exception e) { + return handleExWhilePut(e); + } + } + + boolean putTx(Queue mods) throws InterruptedException { + try { + AsyncStore.this.applyTxModificationsSync(mods); + return true; + } catch (Exception e) { + return handleExWhilePut(e); + } + } + + private boolean handleExWhilePut(Exception e) throws InterruptedException { - boolean isDebug = log.isDebugEnabled(); - if (isDebug) log.debug("Failed to process async modifications", e); - Throwable cause = e; - while (cause != null) { - if (cause instanceof InterruptedException) { - // 3rd party code may have cleared the thread interrupt status - if (isDebug) log.debug("Rethrowing InterruptedException"); - throw (InterruptedException) cause; - } - cause = cause.getCause(); - } - return false; - } - } + boolean isDebug = log.isDebugEnabled(); + if (isDebug) log.debug("Failed to process async modifications", e); + Throwable cause = e; + while (cause != null) { + if (cause instanceof InterruptedException) { + // 3rd party code may have cleared the thread interrupt status + if (isDebug) log.debug("Rethrowing InterruptedException"); + throw (InterruptedException) cause; + } + cause = cause.getCause(); + } + return false; + } + } - } private ConcurrentMap newStateMap() { return new ConcurrentHashMap(64, 0.75f, concurrencyLevel); @@ -415,7 +462,7 @@ private ReleaseAllLockContainer(int concurrencyLevel) { super(concurrencyLevel); } - + void releaseLocks(Set keys) { for (Object key : keys) { if (trace) log.trace("Release lock for key {0}", key); Index: core/src/main/java/org/infinispan/loaders/modifications/Prepare.java =================================================================== --- core/src/main/java/org/infinispan/loaders/modifications/Prepare.java (revision 1422) +++ core/src/main/java/org/infinispan/loaders/modifications/Prepare.java (revision ) @@ -80,5 +80,16 @@ result = 31 * result + (isOnePhase ? 1 : 0); return result; } - + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Prepare:"); + sb.append(tx); + sb.append(" isOnePhase:"); + sb.append(String.valueOf(isOnePhase)); + sb.append(";[").append(list).append("]"); + return sb.toString(); -} + } + +} Index: core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java =================================================================== --- core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java (revision 1521) +++ core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java (revision ) @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -64,134 +65,17 @@ if (store != null) store.stop(); } - public void testPutRemove() throws Exception { - final int number = 1000; - String key = "testPutRemove-k-"; - String value = "testPutRemove-v-"; - doTestPut(number, key, value); - doTestRemove(number, key); - } - - public void testPutClearPut() throws Exception { - final int number = 1000; - String key = "testPutClearPut-k-"; - String value = "testPutClearPut-v-"; - doTestPut(number, key, value); - doTestClear(number, key); - value = "testPutClearPut-v[2]-"; - doTestPut(number, key, value); - - doTestRemove(number, key); - } - - public void testMultiplePutsOnSameKey() throws Exception { - final int number = 1000; - String key = "testMultiplePutsOnSameKey-k"; - String value = "testMultiplePutsOnSameKey-v-"; - doTestSameKeyPut(number, key, value); - doTestSameKeyRemove(key); - } - - public void testRestrictionOnAddingToAsyncQueue() throws Exception { - store.remove("blah"); - - final int number = 10; - String key = "testRestrictionOnAddingToAsyncQueue-k"; - String value = "testRestrictionOnAddingToAsyncQueue-v-"; - doTestPut(number, key, value); - - // stop the cache store - store.stop(); + private void waitOnBarrier(CyclicBarrier barrier) throws CacheLoaderException { try { - store.remove("blah"); - assert false : "Should have restricted this entry from being made"; - } - catch (CacheException expected) { - } - - // clean up - store.start(); - doTestRemove(number, key); - } - - public void testThreadSafetyWritingDiffValuesForKey(Method m) throws Exception { - try { - final String key = "k1"; - final CountDownLatch v1Latch = new CountDownLatch(1); - final CountDownLatch v2Latch = new CountDownLatch(1); - final CountDownLatch endLatch = new CountDownLatch(1); - DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore(); - store = new MockAsyncStore(key, v1Latch, v2Latch, endLatch, underlying, asyncConfig); - dummyCfg = new DummyInMemoryCacheStore.Cfg(); - dummyCfg.setStore(m.getName()); - store.init(dummyCfg, null, null); - store.start(); - - store.store(InternalEntryFactory.create(key, "v1")); - v2Latch.await(); - store.store(InternalEntryFactory.create(key, "v2")); - endLatch.await(); - - assert store.load(key).getValue().equals("v2"); - } finally { - store.delegate.clear(); - store.stop(); - store = null; - } - } - - public void testTransactionalModificationsHappenInDiffThread(Method m) throws Exception { - try { - final GlobalTransactionFactory gtf = new GlobalTransactionFactory(); - final String k1 = k(m, "1"), k2 = k(m, "2"), v1 = v(m, "1"), v2 = v(m, "2"); - final ConcurrentMap localMods = new ConcurrentHashMap(); - final CyclicBarrier barrier = new CyclicBarrier(2); - DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore(); - store = new AsyncStore(underlying, asyncConfig) { - @Override - protected void applyModificationsSync(ConcurrentMap mods) throws CacheLoaderException { - for (Map.Entry entry : mods.entrySet()) { - localMods.put(entry.getKey(), entry.getValue()); - } - super.applyModificationsSync(mods); - try { + log.trace("I'am waiting on barrier"); - barrier.await(5, TimeUnit.SECONDS); - } catch (TimeoutException e) { - assert false : "Timed out applying for modifications"; - } catch (Exception e) { - throw new CacheLoaderException("Barrier failed", e); - } - } + barrier.await(5, TimeUnit.SECONDS); + } catch (TimeoutException e) { + assert false : "Timed out applying for modifications"; + } catch (Exception e) { + throw new CacheLoaderException("Barrier failed", e); + } + } - }; - dummyCfg = new DummyInMemoryCacheStore.Cfg(); - dummyCfg.setStore(m.getName()); - store.init(dummyCfg, null, null); - store.start(); - List mods = new ArrayList(); - mods.add(new Store(InternalEntryFactory.create(k1, v1))); - mods.add(new Store(InternalEntryFactory.create(k2, v2))); - mods.add(new Remove(k1)); - GlobalTransaction tx = gtf.newGlobalTransaction(null, false); - store.prepare(mods, tx, false); - barrier.await(5, TimeUnit.SECONDS); - - assert 1 == localMods.size(); - assert localMods.entrySet().iterator().next().getKey() instanceof Prepare; - assert !store.containsKey(k1); - assert !store.containsKey(k2); - - store.commit(tx); - barrier.await(5, TimeUnit.SECONDS); - assert store.load(k2).getValue().equals(v2); - assert !store.containsKey(k1); - } finally { - store.delegate.clear(); - store.stop(); - store = null; - } - } - public void testTransactionalModificationsAreCoalesced(Method m) throws Exception { try { final GlobalTransactionFactory gtf = new GlobalTransactionFactory(); @@ -224,13 +108,13 @@ @Override protected void applyModificationsSync(ConcurrentMap mods) throws CacheLoaderException { super.applyModificationsSync(mods); - try { - barrier.await(5, TimeUnit.SECONDS); - } catch (TimeoutException e) { - assert false : "Timed out applying for modifications"; - } catch (Exception e) { - throw new CacheLoaderException("Barrier failed", e); + waitOnBarrier(barrier); - } + } + + @Override + protected void applyTxModificationsSync(Queue mods) throws CacheLoaderException { + super.applyTxModificationsSync(mods); + waitOnBarrier(barrier); } }; dummyCfg = new DummyInMemoryCacheStore.Cfg(); @@ -246,6 +130,9 @@ mods.add(new Remove(k1)); GlobalTransaction tx = gtf.newGlobalTransaction(null, false); store.prepare(mods, tx, false); + + log.trace("So far so good"); + barrier.await(5, TimeUnit.SECONDS); assert 0 == storeCount.get(); assert 0 == removeCount.get(); Index: core/src/main/java/org/infinispan/loaders/modifications/Commit.java =================================================================== --- core/src/main/java/org/infinispan/loaders/modifications/Commit.java (revision 1422) +++ core/src/main/java/org/infinispan/loaders/modifications/Commit.java (revision ) @@ -62,5 +62,10 @@ Commit other = (Commit) obj; return tx.equals(other.tx); } - + + @Override + public String toString() { + return "Commit: " + tx; -} + } + +}