package org.infinispan.statetransfer; import org.infinispan.Cache; import org.infinispan.config.Configuration; import org.infinispan.config.GlobalConfiguration; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.test.MultipleCacheManagersTest; import org.infinispan.test.TestingUtil; import org.infinispan.statetransfer.StateTransferFunctionalTest.*; import org.testng.annotations.Test; import javax.transaction.TransactionManager; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.lang.reflect.Method; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * // TODO: Document this * * @author Galder ZamarreƱo * @since // TODO */ @Test(groups = "functional", testName = "statetransfer.StateTransferReplicationQueueTest", enabled = true) public class StateTransferReplicationQueueTest extends MultipleCacheManagersTest { public static final String A_B_NAME = "a_b_name"; public static final String A_C_NAME = "a_c_name"; public static final String A_B_AGE = "a_b_age"; public static final String A_C_AGE = "a_c_age"; public static final String JOE = "JOE"; public static final String BOB = "BOB"; public static final Integer TWENTY = 20; public static final Integer FORTY = 40; private final String cacheName = "nbst-replqueue"; Configuration config; protected void createCacheManagers() throws Throwable { // This impl only really sets up a configuration for use later. config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_ASYNC, true); config.setUseReplQueue(true); config.setReplQueueInterval(100, TimeUnit.MILLISECONDS); config.setReplQueueMaxElements(100); config.setUseAsyncMarshalling(false); config.setFetchInMemoryState(true); config.setUseLockStriping(false); // reduces the odd chance of a key collision and deadlock } protected EmbeddedCacheManager createCacheManager() { EmbeddedCacheManager cm = addClusterEnabledCacheManager(); GlobalConfiguration gc = cm.getGlobalConfiguration(); Properties p = new Properties(); p.setProperty("maxThreads", "25"); gc.setAsyncTransportExecutorProperties(p); cm.defineConfiguration(cacheName, config.clone()); return cm; } protected void writeInitialData(final Cache c) { c.put(A_B_NAME, JOE); c.put(A_B_AGE, TWENTY); c.put(A_C_NAME, BOB); c.put(A_C_AGE, FORTY); } protected void verifyInitialData(Cache c) { assert JOE.equals(c.get(A_B_NAME)) : "Incorrect value for key " + A_B_NAME; assert TWENTY.equals(c.get(A_B_AGE)) : "Incorrect value for key " + A_B_AGE; assert BOB.equals(c.get(A_C_NAME)) : "Incorrect value for key " + A_C_NAME; assert FORTY.equals(c.get(A_C_AGE)) : "Incorrect value for key " + A_C_AGE; } public void testStateTransferWithNodeRestartedAndBusy(Method m) throws Exception { log.info(m.getName() + " start"); thirdWritingCacheTest(false); log.info(m.getName() + "end"); } private void thirdWritingCacheTest(boolean tx) throws InterruptedException { Cache cache1, cache3; cache1 = createCacheManager().getCache(cacheName); EmbeddedCacheManager manager3 = createCacheManager(); cache3 = manager3.getCache(cacheName); writeInitialData(cache1); // // Delay the transient copy, so that we get a more thorough log test // cache1.put("delay", new DelayTransfer()); WritingThread writerThread = new WritingThread(cache1, tx); writerThread.start(); manager3.stop(); // Pause for view to update TestingUtil.blockUntilViewsReceived(60000, cache1); cache3 = createCacheManager().getCache(cacheName); // Pause to give caches time to see each other TestingUtil.blockUntilViewsReceived(60000, cache1, cache3); writerThread.stopThread(); writerThread.join(); verifyInitialData(cache3); int count = writerThread.result(); // Since this is async, sleep a bit to allow any ongoing repls to go through TestingUtil.sleepThread(5000); for (int c = 0; c < count; c++) { Object o = cache3.get("test" + c); assert o == null; // assert new Integer(c).equals(o) : "Entry under key [test" + c + "] was [" + cache3.get("test" + c) + "] but expected [" + c + "]"; } } private static class WritingThread extends Thread { private final Cache cache; private final boolean tx; private volatile boolean stop; private volatile int result; private TransactionManager tm; WritingThread(Cache cache, boolean tx) { super("WriterThread"); this.cache = cache; this.tx = tx; if (tx) tm = TestingUtil.getTransactionManager(cache); setDaemon(true); } public int result() { return result; } public void run() { int c = 0; while (!stop) { try { if (tx) tm.begin(); cache.put("test" + c, new PojoValue(c)); cache.remove("test" + c); // cache.put("test" + c, c); c++; if (tx) tm.commit(); // TestingUtil.sleepThread(1); // Slow it down a bit } catch (Exception e) { stopThread(); } } result = c; } public void stopThread() { stop = true; } } private static class PojoValue implements Externalizable { private static AtomicBoolean holdUp = new AtomicBoolean(); private volatile int value; public PojoValue(int value) { this.value = value; } @Override public void writeExternal(ObjectOutput out) throws IOException { String threadName = Thread.currentThread().getName(); if (!holdUp.get() && threadName.contains("STREAMING_STATE_TRANSFER-sender")) { System.out.println("In streaming..."); holdUp.compareAndSet(false, true); System.out.println("Holding up..."); TestingUtil.sleepThread(2000); // Sleep for 2 seconds to hold up state transfer } out.writeInt(value); } @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { value = in.readInt(); } @Override public int hashCode() { return value + 31; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; PojoValue pojo = (PojoValue) o; if (value != pojo.value) return false; return true; } } }