Details
-
Bug
-
Resolution: Obsolete
-
Blocker
-
7.0.3.Final
Description
The method invokeMapReduce() doesn't really invoke the M/R task, it only creates it, and the execution only starts when the test method calls task.execute() explicitly. It shouldn't try to check the contents of the shared intermediary cache, because the intermediary cache may not exist yet - and it may accidentally create it with the wrong configuration. I get this error when I run only the testInvokeMapReduceOnAllKeys method:
09:55:37,632 TRACE (testng-DistributedSharedCacheTwoNodesMapReduceTest:) [DefaultCacheManager] About to wire and start cache __tmpMapReduce 09:55:37,646 DEBUG (testng-DistributedSharedCacheTwoNodesMapReduceTest:) [MapReduceTask] Invoking CreateCacheCommand{cacheManager=null, cacheNameToCreate='__tmpMapReduce', cacheConfigurationName='__tmpMapReduce', start=true', size=2} across members [DistributedSharedCacheTwoNodesMapReduceTest-NodeA-19271, DistributedSharedCacheTwoNodesMapReduceTest-NodeB-10341] 10:32:56,324 ERROR (testng-DistributedSharedCacheTwoNodesMapReduceTest:) [UnitTestTestNGListener] Test testInvokeMapReduceOnAllKeys(org.infinispan.distexec.mapreduce.DistributedSharedCacheTwoNodesMapReduceTest) failed. org.infinispan.distexec.mapreduce.MapReduceException: Map phase failed at org.infinispan.distexec.mapreduce.MapReduceTask.executeMapPhase(MapReduceTask.java:607) at org.infinispan.distexec.mapreduce.MapReduceTask.executeHelper(MapReduceTask.java:473) at org.infinispan.distexec.mapreduce.MapReduceTask.execute(MapReduceTask.java:414) at org.infinispan.distexec.mapreduce.BaseWordCountMapReduceTest.testInvokeMapReduceOnAllKeys(BaseWordCountMapReduceTest.java:162) Caused by: org.infinispan.commons.CacheException: java.lang.NullPointerException at org.infinispan.distexec.mapreduce.MapReduceManagerImpl.mapAndCombineForDistributedReduction(MapReduceManagerImpl.java:105) at org.infinispan.distexec.mapreduce.MapReduceTask$MapTaskPart.invokeMapCombineLocally(MapReduceTask.java:1174) at org.infinispan.distexec.mapreduce.MapReduceTask$MapTaskPart.access$300(MapReduceTask.java:1101) at org.infinispan.distexec.mapreduce.MapReduceTask$MapTaskPart$1.call(MapReduceTask.java:1123) at org.infinispan.distexec.mapreduce.MapReduceTask$MapTaskPart$1.call(MapReduceTask.java:1119) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.infinispan.distexec.mapreduce.MapReduceManagerImpl.mapKeysToNodes(MapReduceManagerImpl.java:363) at org.infinispan.distexec.mapreduce.MapReduceManagerImpl.migrateIntermediateKeysAndValues(MapReduceManagerImpl.java:327) at org.infinispan.distexec.mapreduce.MapReduceManagerImpl.mapAndCombine(MapReduceManagerImpl.java:260) at org.infinispan.distexec.mapreduce.MapReduceManagerImpl.mapAndCombineForDistributedReduction(MapReduceManagerImpl.java:103) ... 10 more
Even if the check is moved after the M/R task is finished, it still wouldn't be correct, because the task only cleans up the shared intermediary cache asynchronously. So it needs to use eventually() to avoid errors like this:
04:06:32,260 ERROR (testng-DistributedSharedCacheTwoNodesMapReduceTest:) [UnitTestTestNGListener] Test testInvokeMapReduceOnAllKeys(org.infinispan.distexec.mapreduce.DistributedSharedCacheTwoNodesMapReduceTest) failed. java.lang.AssertionError: Shared cache __tmpMapReduce is not empty. It has 5 keys/values: [ImmortalCacheEntry{key=IntermediateCompositeKey [taskId=88948a8b-2a8a-4c13-bc45-4dc3a9f6b0fb, key=is], value=org.infinispan.distexec.mapreduce.MapReduceManagerImpl$DeltaAwareList@21ae10d3}, ImmortalCacheEntry{key=IntermediateCompositeKey [taskId=88948a8b-2a8a-4c13-bc45-4dc3a9f6b0fb, key=JUDCon], value=org.infinispan.distexec.mapreduce.MapReduceManagerImpl$DeltaAwareList@108d6b51}, ImmortalCacheEntry{key=IntermediateCompositeKey [taskId=88948a8b-2a8a-4c13-bc45-4dc3a9f6b0fb, key=cool], value=org.infinispan.distexec.mapreduce.MapReduceManagerImpl$DeltaAwareList@77949e8f}, ImmortalCacheEntry{key=IntermediateCompositeKey [taskId=88948a8b-2a8a-4c13-bc45-4dc3a9f6b0fb, key=Infinispan], value=org.infinispan.distexec.mapreduce.MapReduceManagerImpl$DeltaAwareList@712a6071}, ImmortalCacheEntry{key=IntermediateCompositeKey [taskId=88948a8b-2a8a-4c13-bc45-4dc3a9f6b0fb, key=community], value=org.infinispan.distexec.mapreduce.MapReduceManagerImpl$DeltaAwareList@291bdf76}] expected:<0> but was:<5> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:743) at org.junit.Assert.assertEquals(Assert.java:118) at org.junit.Assert.assertEquals(Assert.java:555) at org.infinispan.distexec.mapreduce.DistributedSharedCacheTwoNodesMapReduceTest.invokeMapReduce(DistributedSharedCacheTwoNodesMapReduceTest.java:44) at org.infinispan.distexec.mapreduce.BaseWordCountMapReduceTest.testInvokeMapReduceOnAllKeys(BaseWordCountMapReduceTest.java:161) 04:06:32,579 TRACE (transport-thread-NodeA-p29577-t6:) [InvocationContextInterceptor] Invoked with command RemoveCommand{key=IntermediateCompositeKey [taskId=eb7da48a-5922-4671-9037-4077e209744c, key=RedHat], value=null, flags=null, valueMatcher=MATCH_ALWAYS} and InvocationContext [org.infinispan.context.SingleKeyNonTxInvocationContext@c0bbc61]