Uploaded image for project: 'Red Hat Fuse'
  1. Red Hat Fuse
  2. ENTESB-2460

ParallelAggregate option for camel multicast with parallel processing is not working

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • jboss-fuse-6.2
    • jboss-fuse-6.2
    • Camel
    • None
    • % %
    • Hide

      You can reproduce the issue with this configuration:

      <route>
      	<from uri="direct:multicastParallelAggregate"/>
      	<multicast parallelProcessing="true" parallelAggregate="true" strategyRef="parallelAggregate">
      		<to uri="direct:a"/>
      		<to uri="direct:b"/>
      	</multicast>
      </route>
      
      <route>
      	<from uri="direct:a"/>
      	<setBody>
      		<constant>A</constant>
      	</setBody>
      </route>
      <route>
      	<from uri="direct:b"/>
      	<setBody>
      		<constant>B</constant>
      	</setBody>
      </route>
      
      <bean id="parallelAggregate" class="org.jboss.camel.ParallelAggregate"/>
      
      public class ParallelAggregate implements AggregationStrategy {
      
      	private static boolean invoked;
      	private static boolean calledInParallel;
      
      	public static boolean isCalledInParallel() {
      		return calledInParallel;
      	}
      
      	public static void reset() {
      		invoked = calledInParallel = false;
      	}
      
      	@Override
      	public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
      		System.out.println("A:" + new Date().toString());
      		synchronized (this) {
      			if (!invoked) {
      				invoked = true;
      			} else {
      				calledInParallel = true;
      			}
      		}
      		System.out.println("B:" + new Date().toString());
      
      		// Simulate processing, another thread should access aggregator
      		try {
      			Thread.sleep(5000);
      		} catch (InterruptedException e) {
      			throw new RuntimeException(e);
      		}
      		// or like this
      		// long t = new Date().getTime() + 10000;
      		// while (t > new Date().getTime()) {
      		// }
      		System.out.println("C:" + new Date().toString());
      
      		invoked = false;
      		if (oldExchange == null) {
      			return newExchange;
      		}
      		oldExchange.getIn().setBody(this.getClass().getName());
      		return oldExchange;
      	}
      }
      
      Show
      You can reproduce the issue with this configuration: <route> <from uri= "direct:multicastParallelAggregate" /> <multicast parallelProcessing= "true" parallelAggregate= "true" strategyRef= "parallelAggregate" > <to uri= "direct:a" /> <to uri= "direct:b" /> </multicast> </route> <route> <from uri= "direct:a" /> <setBody> <constant> A </constant> </setBody> </route> <route> <from uri= "direct:b" /> <setBody> <constant> B </constant> </setBody> </route> <bean id= "parallelAggregate" class= "org.jboss.camel.ParallelAggregate" /> public class ParallelAggregate implements AggregationStrategy { private static boolean invoked; private static boolean calledInParallel; public static boolean isCalledInParallel() { return calledInParallel; } public static void reset() { invoked = calledInParallel = false ; } @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { System .out.println( "A:" + new Date().toString()); synchronized ( this ) { if (!invoked) { invoked = true ; } else { calledInParallel = true ; } } System .out.println( "B:" + new Date().toString()); // Simulate processing, another thread should access aggregator try { Thread .sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } // or like this // long t = new Date().getTime() + 10000; // while (t > new Date().getTime()) { // } System .out.println( "C:" + new Date().toString()); invoked = false ; if (oldExchange == null ) { return newExchange; } oldExchange.getIn().setBody( this .getClass().getName()); return oldExchange; } }

      ParallelAggregate is implemented only for serial processing in MulticastProcessor.java. I do not know if it is necessary to support parallel aggregate for serial processing, but it is definitely missing for parallel processing, so the aggregator does not have to be thread-safe because it is called from synchronized method.

            cibsen@redhat.com Claus Ibsen
            vchalupa_jira Václav Chalupa (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: