package org.infinispan.distribution.rehash; import java.io.IOException; import java.util.ArrayList; import org.infinispan.Cache; import org.infinispan.config.Configuration; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.test.TestingUtil; import org.infinispan.test.fwk.TestCacheManagerFactory; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "functional", testName = "distribution.rehash.DataCountTest", enabled = true) public class DataCountTest { private static final Log log = LogFactory.getLog(DataCountTest.class); private static int NODE_COUNT = 3; private static int NUM_OWNERS = 2; private static int DATA_COUNT = 30000; ArrayList cacheManagerList = new ArrayList(NODE_COUNT); ArrayList> cacheList = new ArrayList>(NODE_COUNT); Configuration c; @BeforeClass public static void init() { System.setProperty("java.net.preferIPv4Stack", "true"); } @BeforeMethod(alwaysRun = true) public void createCache() throws InterruptedException, IOException { c = new Configuration(); c.setCacheMode(Configuration.CacheMode.DIST_SYNC); c.setNumOwners(NUM_OWNERS); EmbeddedCacheManager cacheManager = null; for (int i = 0; i < NODE_COUNT; ++i) { cacheManager = TestCacheManagerFactory.createClusteredCacheManager(c); Cache cache = cacheManager.getCache(); cacheManagerList.add(cacheManager); cacheList.add(cache); cacheManagerList.get(i).start(); cacheList.get(i).start(); } log.info("ALL caches are initialized"); } @AfterMethod(alwaysRun = true) public void shutdown() { for (int i = 0; i < NODE_COUNT; ++i) { cacheManagerList.get(i).stop(); } } public void testRehash() throws Exception { // Put all the data on one node and let it hash to its owners. for (int i = 0; i < DATA_COUNT; i++) { cacheList.get(0).put(String.valueOf(i), i); } TestingUtil.blockUntilViewsReceived(10000, cacheList); int sum = checkCacheSize("Total number of entries: ", 0); Assert.assertEquals(sum, DATA_COUNT * NUM_OWNERS); log.info("Sum: " + sum); // Shut down the first cache. cacheList.get(0).stop(); cacheManagerList.get(0).stop(); TestingUtil.blockUntilViewsReceived(10000, cacheList); sum = checkCacheSize("After LEAVE rehash. Total number of entries: ", 1); // We expect all the data to be present. Assert.assertEquals(sum, DATA_COUNT * NUM_OWNERS); // Add a new cache manager back into the list and start it. cacheManagerList.set(0, TestCacheManagerFactory.createClusteredCacheManager(c)); Cache cache = cacheManagerList.get(0).getCache(); cacheList.set(0, cache); cacheManagerList.get(0).start(); cacheList.get(0).start(); TestingUtil.blockUntilViewsReceived(10000, cacheList); // We expect the data to migrate back. sum = checkCacheSize("After JOIN rehash. Total number of entries: ", 0); log.info("Sum: " + sum); // Now clear the cache. cacheList.get(0).clear(); cacheList.get(1).clear(); cacheList.get(2).clear(); TestingUtil.blockUntilViewsReceived(10000, cacheList); sum = checkCacheSize("After cache.clear(). Total number of entries: ", 0); log.info("Sum: " + sum); log.info("The total number of entries should be zero"); Assert.assertEquals(sum, 0); } private int checkCacheSize(String auxInfo, int startIndex) { int sum = 0; for (int i = startIndex; i < cacheList.size(); i++) { int size = cacheList.get(i).entrySet().size(); log.info(auxInfo + size); sum += size; } return sum; } }