Index: modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencerContext.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencerContext.java (revision 2525) +++ modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencerContext.java (working copy) @@ -7,35 +7,37 @@ import org.modeshape.graph.io.Destination; import org.modeshape.graph.io.GraphBatchDestination; /** - * The sequencer context represents the complete context of a sequencer invocation, including the execution context - * (which contains JAAS credentials, namespace mappings, and value factories) and the I/O environment for writing - * output. - * - *

- * This class is not thread safe due to its use of {@link Destination a destination}. - *

+ * The sequencer context represents the complete context of a sequencer invocation, including the execution context (which + * contains JAAS credentials, namespace mappings, and value factories) and the I/O environment for writing output. + *

+ * This class is not thread safe due to its use of {@link Destination a destination}. + *

*/ @NotThreadSafe public class SequencerContext { - + private final ExecutionContext executionContext; - private final Graph graph; + private final Graph sourceGraph; + private final Graph destinationGraph; private final Destination destination; - + public SequencerContext( ExecutionContext executionContext, - Graph graph ) { + Graph sourceGraph, + Graph outputGraph ) { super(); - + assert executionContext != null; - assert graph != null; - + assert sourceGraph != null; + this.executionContext = executionContext; - this.graph = graph; - this.destination = new GraphBatchDestination(graph.batch()); + this.sourceGraph = sourceGraph; + this.destinationGraph = outputGraph != null ? outputGraph : sourceGraph; + this.destination = new GraphBatchDestination(destinationGraph.batch()); } /** * Returns the execution context under which this sequencer context operates + * * @return the execution context under which this sequencer context operates */ public ExecutionContext getExecutionContext() { @@ -44,14 +46,19 @@ public class SequencerContext { /** * Returns the I/O environment in which this sequencer context operates + * * @return the I/O environment in which this sequencer context operates */ public Destination getDestination() { return destination; } - + Graph graph() { - return this.graph; + return this.sourceGraph; + } + + Graph destinationGraph() { + return destinationGraph; } } Index: modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencingService.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencingService.java (revision 2525) +++ modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencingService.java (working copy) @@ -26,6 +26,7 @@ package org.modeshape.repository.sequencer; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -424,7 +425,7 @@ public class SequencingService implements AdministeredService { // and track which output nodes should be passed to each sequencer... final Path nodePath = change.getPath(); final String nodePathStr = context.getValueFactories().getStringFactory().create(nodePath); - Map> sequencerCalls = new HashMap>(); + SequencerCalls sequencerCalls = new SequencerCalls(); if (allSequencers == null) { allSequencers = this.sequencerLibrary.getInstances(); } @@ -445,12 +446,7 @@ public class SequencingService implements AdministeredService { matcher.getOutputWorkspaceName()); SequencerCall call = new SequencerCall(sequencer, propertyNameStr); // Record the output path ... - Set outputPaths = sequencerCalls.get(call); - if (outputPaths == null) { - outputPaths = new HashSet(); - sequencerCalls.put(call, outputPaths); - } - outputPaths.add(outputPath); + sequencerCalls.record(call, outputPath); sequencers.add(sequencer); break; } @@ -459,12 +455,12 @@ public class SequencingService implements AdministeredService { } RepositorySource source = repositoryLibrary.getSource(repositorySourceName); - Graph graph = Graph.create(source, context); + Graph sourceGraph = Graph.create(source, context); Node node = null; if (!sequencers.isEmpty()) { // Find the changed node ... - node = graph.getNodeAt(nodePath); + node = sourceGraph.getNodeAt(nodePath); // Figure out which sequencers should run ... sequencers = this.sequencerSelector.selectSequencers(sequencers, node, change); @@ -476,25 +472,30 @@ public class SequencingService implements AdministeredService { } } else { // Run each of those sequencers ... - for (Map.Entry> entry : sequencerCalls.entrySet()) { - - final SequencerCall sequencerCall = entry.getKey(); - final Set outputPaths = entry.getValue(); + for (SequencerCall sequencerCall : sequencerCalls) { final Sequencer sequencer = sequencerCall.getSequencer(); final String sequencerName = sequencer.getConfiguration().getName(); final String propertyName = sequencerCall.getSequencedPropertyName(); - // Get the paths to the nodes where the sequencer should write it's output ... - assert outputPaths != null && outputPaths.size() != 0; - - // Create a new execution context for each sequencer - final SimpleProblems problems = new SimpleProblems(); - SequencerContext sequencerContext = new SequencerContext(context, graph); - try { - sequencer.execute(node, propertyName, change, outputPaths, sequencerContext, problems); - sequencerContext.getDestination().submit(); - } catch (SequencerException e) { - logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, change); + // Figure out the different output paths for each output source ... + Map> outputPathsBySourceName = sequencerCalls.getOutputPathsFor(sequencerCall); + assert !outputPathsBySourceName.isEmpty(); + + // Create a new execution context for the output paths in each output source ... + for (Map.Entry> outputEntry : outputPathsBySourceName.entrySet()) { + String sourceName = outputEntry.getKey(); + Set outputPathsInSource = outputEntry.getValue(); + RepositorySource outputSource = repositoryLibrary.getSource(sourceName); + Graph outputGraph = Graph.create(outputSource, context); + + final SimpleProblems problems = new SimpleProblems(); + SequencerContext sequencerContext = new SequencerContext(context, sourceGraph, outputGraph); + try { + sequencer.execute(node, propertyName, change, outputPathsInSource, sequencerContext, problems); + sequencerContext.getDestination().submit(); + } catch (SequencerException e) { + logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, change); + } } } this.statistics.recordNodeSequenced(); @@ -625,6 +626,16 @@ public class SequencingService implements AdministeredService { } return false; } + + /** + * {@inheritDoc} + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return sequencerName + " (" + sequencedPropertyName; + } } protected class RepositoryObserver extends NetChangeObserver { @@ -650,4 +661,56 @@ public class SequencingService implements AdministeredService { } } } + + protected class SequencerCalls implements Iterable { + private final Map>> sequencerCalls = new HashMap>>(); + + protected void record( SequencerCall call, + RepositoryNodePath outputPath ) { + assert outputPath != null; + String sourceName = outputPath.getRepositorySourceName(); + assert sourceName != null; + + // Record the output path ... + Map> outputPathsBySourceName = sequencerCalls.get(call); + if (outputPathsBySourceName == null) { + outputPathsBySourceName = new HashMap>(); + sequencerCalls.put(call, outputPathsBySourceName); + } + Set outputPaths = outputPathsBySourceName.get(sourceName); + if (outputPaths == null) { + outputPaths = new HashSet(); + outputPathsBySourceName.put(sourceName, outputPaths); + } + outputPaths.add(outputPath); + } + + protected Iterable getCalls() { + return sequencerCalls.keySet(); + } + + /** + * {@inheritDoc} + * + * @see java.lang.Iterable#iterator() + */ + @Override + public Iterator iterator() { + return sequencerCalls.keySet().iterator(); + } + + protected Map> getOutputPathsFor( SequencerCall call ) { + return sequencerCalls.get(call); + } + + /** + * {@inheritDoc} + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return sequencerCalls.toString(); + } + } } Index: modeshape-repository/src/main/java/org/modeshape/repository/sequencer/StreamSequencerAdapter.java =================================================================== --- modeshape-repository/src/main/java/org/modeshape/repository/sequencer/StreamSequencerAdapter.java (revision 2525) +++ modeshape-repository/src/main/java/org/modeshape/repository/sequencer/StreamSequencerAdapter.java (working copy) @@ -150,11 +150,11 @@ public class StreamSequencerAdapter implements Sequencer { try { // Parallel the JCR lemma for converting objects into streams stream = binary.getStream(); - StreamSequencerContext StreamSequencerContext = createStreamSequencerContext(input, + StreamSequencerContext streamSequencerContext = createStreamSequencerContext(input, sequencedProperty, context, problems); - this.streamSequencer.sequence(stream, output, StreamSequencerContext); + this.streamSequencer.sequence(stream, output, streamSequencerContext); } catch (Throwable t) { // Record the error ... firstError = t; @@ -184,12 +184,12 @@ public class StreamSequencerAdapter implements Sequencer { // Find each output node and save the image metadata there ... for (RepositoryNodePath outputPath : outputPaths) { - // Get the name of the repository workspace and the path to the output node + // Get the name of the repository source, workspace and the path to the output node final String repositoryWorkspaceName = outputPath.getWorkspaceName(); final String nodePath = outputPath.getNodePath(); // Find or create the output node in this session ... - context.graph().useWorkspace(repositoryWorkspaceName); + context.destinationGraph().useWorkspace(repositoryWorkspaceName); buildPathTo(nodePath, context, builtPaths); // Node outputNode = context.graph().getNodeAt(nodePath); @@ -240,7 +240,7 @@ public class StreamSequencerAdapter implements Sequencer { if (!builtPaths.contains(workingPath)) { try { - context.graph().getNodeAt(workingPath); + context.destinationGraph().getNodeAt(workingPath); } catch (PathNotFoundException pnfe) { context.getDestination().create(workingPath, primaryType); builtPaths.add(workingPath); Index: modeshape-repository/src/test/java/org/modeshape/repository/sequencer/StreamSequencerAdapterTest.java =================================================================== --- modeshape-repository/src/test/java/org/modeshape/repository/sequencer/StreamSequencerAdapterTest.java (revision 2525) +++ modeshape-repository/src/test/java/org/modeshape/repository/sequencer/StreamSequencerAdapterTest.java (working copy) @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Set; import org.junit.Before; import org.junit.Test; +import org.modeshape.common.FixFor; import org.modeshape.common.collection.Problems; import org.modeshape.common.collection.SimpleProblems; import org.modeshape.graph.ExecutionContext; @@ -88,7 +89,7 @@ public class StreamSequencerAdapterTest { final SequencerOutputMap finalOutput = sequencerOutput; InMemoryRepositorySource source = new InMemoryRepositorySource(); - source.setName("repository"); + source.setName(repositorySourceName); graph = Graph.create(source.getConnection(), context); this.streamSequencer = new StreamSequencer() { @@ -108,7 +109,7 @@ public class StreamSequencerAdapterTest { } }; sequencer = new StreamSequencerAdapter(streamSequencer); - seqContext = new SequencerContext(context, graph); + seqContext = new SequencerContext(context, graph, graph); } protected Path path( String path ) { @@ -119,6 +120,23 @@ public class StreamSequencerAdapterTest { return context.getValueFactories().getNameFactory().create(name); } + protected Node assertNodeDoesExist( Graph graph, + String path ) { + Node node = graph.getNodeAt(path); + assertThat(node, is(notNullValue())); + return node; + } + + protected void assertNodeDoesNotExist( Graph graph, + String path ) { + try { + graph.getNodeAt(path); + fail(); + } catch (PathNotFoundException pnfe) { + // Expected + } + } + protected void testSequencer( final StreamSequencer sequencer ) throws Throwable { StreamSequencer streamSequencer = new StreamSequencer() { @@ -294,13 +312,8 @@ public class StreamSequencerAdapterTest { graph.set("sequencedProperty").on("/a/b/c").to(new ByteArrayInputStream(sampleData.getBytes())); Node nodeC = graph.getNodeAt("/a/b/c"); - try { - graph.getNodeAt("/d"); - fail(); - } catch (PathNotFoundException pnfe) { - // Expected - } assertThat(nodeC, is(notNullValue())); + assertNodeDoesNotExist(graph, "/d"); // Set up the node changes ... Location location = Location.create(context.getValueFactories().getPathFactory().create("/a/b/c")); @@ -331,20 +344,9 @@ public class StreamSequencerAdapterTest { // Set the property that will be sequenced ... graph.set("sequencedProperty").on("/a/b/c").to(new ByteArrayInputStream(sampleData.getBytes())); - Node nodeC = graph.getNodeAt("/a/b/c"); - try { - graph.getNodeAt("/d"); - fail(); - } catch (PathNotFoundException pnfe) { - // Expected - } - try { - graph.getNodeAt("/x"); - fail(); - } catch (PathNotFoundException pnfe) { - // Expected - } - assertThat(nodeC, is(notNullValue())); + Node nodeC = assertNodeDoesExist(graph, "/a/b/c"); + assertNodeDoesNotExist(graph, "/d"); + assertNodeDoesNotExist(graph, "/x"); // Set up the node changes ... Location location = Location.create(context.getValueFactories().getPathFactory().create("/a/b/c")); @@ -594,6 +596,54 @@ public class StreamSequencerAdapterTest { } + @FixFor( "MODE-1012" ) + @Test + public void shouldSequenceInputFromOneGraphAndSaveOutputToAnotherGraph() throws Exception { + // Set up the second source ... + String repositorySourceName2 = "repository2"; + InMemoryRepositorySource source2 = new InMemoryRepositorySource(); + source2.setName(repositorySourceName2); + Graph graph2 = Graph.create(source2.getConnection(), context); + seqContext = new SequencerContext(context, graph, graph2); + + // Set up the node that will be sequenced ... + graph.create("/a").and().create("/a/b").and().create("/a/b/c").and(); + graph.set("sequencedProperty").on("/a/b/c").to(new ByteArrayInputStream(sampleData.getBytes())); + + Node nodeC = assertNodeDoesExist(graph, "/a/b/c"); + assertNodeDoesNotExist(graph, "/d"); + assertNodeDoesNotExist(graph, "/x"); + + // Set up the node changes ... + Location location = Location.create(context.getValueFactories().getPathFactory().create("/a/b/c")); + Property sequencedProperty = nodeC.getProperty("sequencedProperty"); + NetChange nodeChange = new NetChange(repositoryWorkspaceName, location, EnumSet.of(ChangeType.PROPERTY_CHANGED), null, + Collections.singleton(sequencedProperty), null, null, null, false); + + // Set up the output directory ... + Set outputPaths = new HashSet(); + outputPaths.add(new RepositoryNodePath(repositorySourceName2, repositoryWorkspaceName, "/d/e")); + outputPaths.add(new RepositoryNodePath(repositorySourceName2, repositoryWorkspaceName, "/x/y/z")); + outputPaths.add(new RepositoryNodePath(repositorySourceName2, repositoryWorkspaceName, "/x/z")); + + // Generate the output data that the sequencer subclass will produce and that should be saved to the repository ... + sequencerOutput.setProperty(path("alpha/beta"), name("isSomething"), true); + + // Call the sequencer ... + sequencer.execute(nodeC, "sequencedProperty", nodeChange, outputPaths, seqContext, problems); + + // Check to see that the output nodes have been created ... + assertThat(graph2.getNodeAt("/d/e"), is(notNullValue())); + assertThat(graph2.getNodeAt("/x/y/z"), is(notNullValue())); + assertThat(graph2.getNodeAt("/x/z"), is(notNullValue())); + + // Check to see that the sequencer-generated nodes have been created ... + assertThat(graph2.getNodeAt("/d/e/alpha/beta").getProperty("isSomething").getFirstValue().toString(), is("true")); + assertThat(graph2.getNodeAt("/x/y/z/alpha/beta").getProperty("isSomething").getFirstValue().toString(), is("true")); + assertThat(graph2.getNodeAt("/x/z/alpha/beta").getProperty("isSomething").getFirstValue().toString(), is("true")); + + } + private void verifyProperty( StreamSequencerContext context, String name, Object... values ) {