Details
-
Bug
-
Resolution: Done
-
Major
-
9.2.0.Beta1
-
None
Description
I have a cache defined as:
<distributed-cache name="analytics"> <compatibility enabled="true" marshaller="org.infinispan.query.remote.CompatibilityProtoStreamMarshaller"/> </distributed-cache>
Then, I have a task like this:
package delays.java.stream.task; import java.util.Arrays; import java.util.Calendar; import java.util.Collections; import java.util.Date; import java.util.Locale; import java.util.Map; import java.util.TimeZone; import java.util.TreeMap; import java.util.stream.Collector; import java.util.stream.Collectors; import org.infinispan.Cache; import org.infinispan.stream.CacheCollectors; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; import org.infinispan.tasks.TaskExecutionMode; import org.infinispan.util.function.SerializableSupplier; import delays.java.stream.pojos.Stop; public class DelayRatioTask implements ServerTask { private TaskContext ctx; @Override public void setTaskContext(TaskContext ctx) { this.ctx = ctx; } @Override public String getName() { return "delay-ratio"; } @Override public Object call() throws Exception { System.out.println("Execute delay-ratio task"); Cache<String, Stop> cache = getCache(); Map<Integer, Long> totalPerHour = cache.values().stream() .collect( serialize(() -> Collectors.groupingBy( e -> getHourOfDay(e.departureTs), Collectors.counting() ))); Map<Integer, Long> delayedPerHour = cache.values().stream() .filter(e -> e.delayMin > 0) .collect( serialize(() -> Collectors.groupingBy( e -> getHourOfDay(e.departureTs), Collectors.counting() ))); return Arrays.asList(delayedPerHour, totalPerHour); // return Arrays.asList(Collections.emptyMap(), Collections.emptyMap()); } @Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ONE_NODE; } @SuppressWarnings("unchecked") private <K, V> Cache<K, V> getCache() { return (Cache<K, V>) ctx.getCache().get(); } private static <T, R> Collector<T, ?, R> serialize(SerializableSupplier<Collector<T, ?, R>> s) { return CacheCollectors.serializableCollector(s); } private static int getHourOfDay(Date date) { Calendar c = Calendar.getInstance(TimeZone.getTimeZone("GMT+1"), Locale.ENGLISH); c.setTime(date); return c.get(Calendar.HOUR_OF_DAY); } }
When the groupBy executes, it fails with:
java.lang.AssertionError: org.infinispan.client.hotrod.exceptions.HotRodClientException:Request for messageId=333 returned server error (status=0x85): java.util.concurrent.ExecutionException: java.lang.ClassCastException: [B cannot be cast to delays.java.stream.pojos.Stop java.lang.ClassCastException: [B cannot be cast to delays.java.stream.pojos.Stop at delays.java.stream.AnalyticsUtil.timed(AnalyticsUtil.java:16) at delays.java.stream.AnalyticsVerticle.getDelaysRatio(AnalyticsVerticle.java:72) at io.vertx.ext.web.impl.BlockingHandlerDecorator.lambda$handle$0(BlockingHandlerDecorator.java:48) at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:271) at io.vertx.core.impl.TaskQueue.lambda$new$0(TaskQueue.java:60) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.infinispan.client.hotrod.exceptions.HotRodClientException:Request for messageId=333 returned server error (status=0x85): java.util.concurrent.ExecutionException: java.lang.ClassCastException: [B cannot be cast to delays.java.stream.pojos.Stop java.lang.ClassCastException: [B cannot be cast to delays.java.stream.pojos.Stop at org.infinispan.client.hotrod.impl.protocol.Codec20.checkForErrorsInResponseStatus(Codec20.java:363) at org.infinispan.client.hotrod.impl.protocol.Codec20.readPartialHeader(Codec20.java:152) at org.infinispan.client.hotrod.impl.protocol.Codec20.readHeader(Codec20.java:138) at org.infinispan.client.hotrod.impl.operations.HotRodOperation.readHeaderAndValidate(HotRodOperation.java:60) at org.infinispan.client.hotrod.impl.operations.ExecuteOperation.executeOperation(ExecuteOperation.java:50) at org.infinispan.client.hotrod.impl.operations.RetryOnFailureOperation.execute(RetryOnFailureOperation.java:56) at org.infinispan.client.hotrod.impl.RemoteCacheImpl.execute(RemoteCacheImpl.java:542) at delays.java.stream.AnalyticsVerticle.lambda$getDelaysRatio$1(AnalyticsVerticle.java:73) at delays.java.stream.AnalyticsUtil.timed(AnalyticsUtil.java:14) ... 7 more
This is coming from:
10:36:18,765 WARN [org.infinispan.remoting.inboundhandler.NonTotalOrderPerCacheInboundInvocationHandler] (remote-thread--p2-t22) ISPN000071: Caught exception when handling command StreamRequestCommand{type=TERMINAL_REHASH, includeLoader=true, terminalOperation=org.infinispan.stream.impl.termop.SegmentRetryingOperation@1b024f9, topologyId=9, id=datagrid-1-bmspw0, segments=[128, 130, 6, 135, 137, 138, 11, 12, 140, 13, 143, 16, 144, 17, 146, 22, 152, 155, 28, 29, 31, 36, 37, 41, 42, 44, 172, 173, 177, 178, 179, 181, 183, 57, 185, 60, 61, 189, 64, 192, 65, 193, 66, 197, 201, 75, 204, 207, 80, 208, 82, 83, 84, 212, 85, 86, 89, 92, 96, 225, 98, 226, 99, 100, 101, 102, 231, 105, 233, 234, 107, 108, 237, 112, 242, 115, 246, 247, 120, 251, 124, 125, 253, 255], keys=[], excludedKeys=[]}: java.lang.ClassCastException: [B cannot be cast to delays.java.stream.pojos.Stop at java.util.stream.Collectors.lambda$groupingBy$45(Collectors.java:907) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1812) at org.infinispan.commons.util.Closeables$SpliteratorAsCloseableSpliterator.tryAdvance(Closeables.java:144) at java.util.Spliterator.forEachRemaining(Spliterator.java:326) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.infinispan.stream.impl.local.LocalCacheStream.collect(LocalCacheStream.java:259) at org.infinispan.stream.impl.TerminalFunctions$CollectorFunction.apply(TerminalFunctions.java:1093) at org.infinispan.stream.impl.TerminalFunctions$CollectorFunction.apply(TerminalFunctions.java:1083) at org.infinispan.stream.impl.termop.SegmentRetryingOperation.innerPerformOperation(SegmentRetryingOperation.java:68) at org.infinispan.stream.impl.termop.SegmentRetryingOperation.performOperation(SegmentRetryingOperation.java:79) at org.infinispan.stream.impl.LocalStreamManagerImpl.streamOperationRehashAware(LocalStreamManagerImpl.java:302) at org.infinispan.stream.impl.StreamRequestCommand.invokeAsync(StreamRequestCommand.java:96) at org.infinispan.remoting.inboundhandler.BasePerCacheInboundInvocationHandler.invokeCommand(BasePerCacheInboundInvocationHandler.java:102) at org.infinispan.remoting.inboundhandler.BaseBlockingRunnable.invoke(BaseBlockingRunnable.java:99) at org.infinispan.remoting.inboundhandler.BaseBlockingRunnable.runAsync(BaseBlockingRunnable.java:71) at org.infinispan.remoting.inboundhandler.BaseBlockingRunnable.run(BaseBlockingRunnable.java:40) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)