Index: modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencerContext.java
===================================================================
--- modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencerContext.java (revision 2525)
+++ modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencerContext.java (working copy)
@@ -7,35 +7,37 @@ import org.modeshape.graph.io.Destination;
import org.modeshape.graph.io.GraphBatchDestination;
/**
- * The sequencer context represents the complete context of a sequencer invocation, including the execution context
- * (which contains JAAS credentials, namespace mappings, and value factories) and the I/O environment for writing
- * output.
- *
- *
- * This class is not thread safe due to its use of {@link Destination a destination}.
- *
+ * The sequencer context represents the complete context of a sequencer invocation, including the execution context (which
+ * contains JAAS credentials, namespace mappings, and value factories) and the I/O environment for writing output.
+ *
+ * This class is not thread safe due to its use of {@link Destination a destination}.
+ *
*/
@NotThreadSafe
public class SequencerContext {
-
+
private final ExecutionContext executionContext;
- private final Graph graph;
+ private final Graph sourceGraph;
+ private final Graph destinationGraph;
private final Destination destination;
-
+
public SequencerContext( ExecutionContext executionContext,
- Graph graph ) {
+ Graph sourceGraph,
+ Graph outputGraph ) {
super();
-
+
assert executionContext != null;
- assert graph != null;
-
+ assert sourceGraph != null;
+
this.executionContext = executionContext;
- this.graph = graph;
- this.destination = new GraphBatchDestination(graph.batch());
+ this.sourceGraph = sourceGraph;
+ this.destinationGraph = outputGraph != null ? outputGraph : sourceGraph;
+ this.destination = new GraphBatchDestination(destinationGraph.batch());
}
/**
* Returns the execution context under which this sequencer context operates
+ *
* @return the execution context under which this sequencer context operates
*/
public ExecutionContext getExecutionContext() {
@@ -44,14 +46,19 @@ public class SequencerContext {
/**
* Returns the I/O environment in which this sequencer context operates
+ *
* @return the I/O environment in which this sequencer context operates
*/
public Destination getDestination() {
return destination;
}
-
+
Graph graph() {
- return this.graph;
+ return this.sourceGraph;
+ }
+
+ Graph destinationGraph() {
+ return destinationGraph;
}
}
Index: modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencingService.java
===================================================================
--- modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencingService.java (revision 2525)
+++ modeshape-repository/src/main/java/org/modeshape/repository/sequencer/SequencingService.java (working copy)
@@ -26,6 +26,7 @@ package org.modeshape.repository.sequencer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -424,7 +425,7 @@ public class SequencingService implements AdministeredService {
// and track which output nodes should be passed to each sequencer...
final Path nodePath = change.getPath();
final String nodePathStr = context.getValueFactories().getStringFactory().create(nodePath);
- Map> sequencerCalls = new HashMap>();
+ SequencerCalls sequencerCalls = new SequencerCalls();
if (allSequencers == null) {
allSequencers = this.sequencerLibrary.getInstances();
}
@@ -445,12 +446,7 @@ public class SequencingService implements AdministeredService {
matcher.getOutputWorkspaceName());
SequencerCall call = new SequencerCall(sequencer, propertyNameStr);
// Record the output path ...
- Set outputPaths = sequencerCalls.get(call);
- if (outputPaths == null) {
- outputPaths = new HashSet();
- sequencerCalls.put(call, outputPaths);
- }
- outputPaths.add(outputPath);
+ sequencerCalls.record(call, outputPath);
sequencers.add(sequencer);
break;
}
@@ -459,12 +455,12 @@ public class SequencingService implements AdministeredService {
}
RepositorySource source = repositoryLibrary.getSource(repositorySourceName);
- Graph graph = Graph.create(source, context);
+ Graph sourceGraph = Graph.create(source, context);
Node node = null;
if (!sequencers.isEmpty()) {
// Find the changed node ...
- node = graph.getNodeAt(nodePath);
+ node = sourceGraph.getNodeAt(nodePath);
// Figure out which sequencers should run ...
sequencers = this.sequencerSelector.selectSequencers(sequencers, node, change);
@@ -476,25 +472,30 @@ public class SequencingService implements AdministeredService {
}
} else {
// Run each of those sequencers ...
- for (Map.Entry> entry : sequencerCalls.entrySet()) {
-
- final SequencerCall sequencerCall = entry.getKey();
- final Set outputPaths = entry.getValue();
+ for (SequencerCall sequencerCall : sequencerCalls) {
final Sequencer sequencer = sequencerCall.getSequencer();
final String sequencerName = sequencer.getConfiguration().getName();
final String propertyName = sequencerCall.getSequencedPropertyName();
- // Get the paths to the nodes where the sequencer should write it's output ...
- assert outputPaths != null && outputPaths.size() != 0;
-
- // Create a new execution context for each sequencer
- final SimpleProblems problems = new SimpleProblems();
- SequencerContext sequencerContext = new SequencerContext(context, graph);
- try {
- sequencer.execute(node, propertyName, change, outputPaths, sequencerContext, problems);
- sequencerContext.getDestination().submit();
- } catch (SequencerException e) {
- logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, change);
+ // Figure out the different output paths for each output source ...
+ Map> outputPathsBySourceName = sequencerCalls.getOutputPathsFor(sequencerCall);
+ assert !outputPathsBySourceName.isEmpty();
+
+ // Create a new execution context for the output paths in each output source ...
+ for (Map.Entry> outputEntry : outputPathsBySourceName.entrySet()) {
+ String sourceName = outputEntry.getKey();
+ Set outputPathsInSource = outputEntry.getValue();
+ RepositorySource outputSource = repositoryLibrary.getSource(sourceName);
+ Graph outputGraph = Graph.create(outputSource, context);
+
+ final SimpleProblems problems = new SimpleProblems();
+ SequencerContext sequencerContext = new SequencerContext(context, sourceGraph, outputGraph);
+ try {
+ sequencer.execute(node, propertyName, change, outputPathsInSource, sequencerContext, problems);
+ sequencerContext.getDestination().submit();
+ } catch (SequencerException e) {
+ logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, change);
+ }
}
}
this.statistics.recordNodeSequenced();
@@ -625,6 +626,16 @@ public class SequencingService implements AdministeredService {
}
return false;
}
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return sequencerName + " (" + sequencedPropertyName;
+ }
}
protected class RepositoryObserver extends NetChangeObserver {
@@ -650,4 +661,56 @@ public class SequencingService implements AdministeredService {
}
}
}
+
+ protected class SequencerCalls implements Iterable {
+ private final Map>> sequencerCalls = new HashMap>>();
+
+ protected void record( SequencerCall call,
+ RepositoryNodePath outputPath ) {
+ assert outputPath != null;
+ String sourceName = outputPath.getRepositorySourceName();
+ assert sourceName != null;
+
+ // Record the output path ...
+ Map> outputPathsBySourceName = sequencerCalls.get(call);
+ if (outputPathsBySourceName == null) {
+ outputPathsBySourceName = new HashMap>();
+ sequencerCalls.put(call, outputPathsBySourceName);
+ }
+ Set outputPaths = outputPathsBySourceName.get(sourceName);
+ if (outputPaths == null) {
+ outputPaths = new HashSet();
+ outputPathsBySourceName.put(sourceName, outputPaths);
+ }
+ outputPaths.add(outputPath);
+ }
+
+ protected Iterable getCalls() {
+ return sequencerCalls.keySet();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see java.lang.Iterable#iterator()
+ */
+ @Override
+ public Iterator iterator() {
+ return sequencerCalls.keySet().iterator();
+ }
+
+ protected Map> getOutputPathsFor( SequencerCall call ) {
+ return sequencerCalls.get(call);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return sequencerCalls.toString();
+ }
+ }
}
Index: modeshape-repository/src/main/java/org/modeshape/repository/sequencer/StreamSequencerAdapter.java
===================================================================
--- modeshape-repository/src/main/java/org/modeshape/repository/sequencer/StreamSequencerAdapter.java (revision 2525)
+++ modeshape-repository/src/main/java/org/modeshape/repository/sequencer/StreamSequencerAdapter.java (working copy)
@@ -150,11 +150,11 @@ public class StreamSequencerAdapter implements Sequencer {
try {
// Parallel the JCR lemma for converting objects into streams
stream = binary.getStream();
- StreamSequencerContext StreamSequencerContext = createStreamSequencerContext(input,
+ StreamSequencerContext streamSequencerContext = createStreamSequencerContext(input,
sequencedProperty,
context,
problems);
- this.streamSequencer.sequence(stream, output, StreamSequencerContext);
+ this.streamSequencer.sequence(stream, output, streamSequencerContext);
} catch (Throwable t) {
// Record the error ...
firstError = t;
@@ -184,12 +184,12 @@ public class StreamSequencerAdapter implements Sequencer {
// Find each output node and save the image metadata there ...
for (RepositoryNodePath outputPath : outputPaths) {
- // Get the name of the repository workspace and the path to the output node
+ // Get the name of the repository source, workspace and the path to the output node
final String repositoryWorkspaceName = outputPath.getWorkspaceName();
final String nodePath = outputPath.getNodePath();
// Find or create the output node in this session ...
- context.graph().useWorkspace(repositoryWorkspaceName);
+ context.destinationGraph().useWorkspace(repositoryWorkspaceName);
buildPathTo(nodePath, context, builtPaths);
// Node outputNode = context.graph().getNodeAt(nodePath);
@@ -240,7 +240,7 @@ public class StreamSequencerAdapter implements Sequencer {
if (!builtPaths.contains(workingPath)) {
try {
- context.graph().getNodeAt(workingPath);
+ context.destinationGraph().getNodeAt(workingPath);
} catch (PathNotFoundException pnfe) {
context.getDestination().create(workingPath, primaryType);
builtPaths.add(workingPath);
Index: modeshape-repository/src/test/java/org/modeshape/repository/sequencer/StreamSequencerAdapterTest.java
===================================================================
--- modeshape-repository/src/test/java/org/modeshape/repository/sequencer/StreamSequencerAdapterTest.java (revision 2525)
+++ modeshape-repository/src/test/java/org/modeshape/repository/sequencer/StreamSequencerAdapterTest.java (working copy)
@@ -41,6 +41,7 @@ import java.util.Map;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
+import org.modeshape.common.FixFor;
import org.modeshape.common.collection.Problems;
import org.modeshape.common.collection.SimpleProblems;
import org.modeshape.graph.ExecutionContext;
@@ -88,7 +89,7 @@ public class StreamSequencerAdapterTest {
final SequencerOutputMap finalOutput = sequencerOutput;
InMemoryRepositorySource source = new InMemoryRepositorySource();
- source.setName("repository");
+ source.setName(repositorySourceName);
graph = Graph.create(source.getConnection(), context);
this.streamSequencer = new StreamSequencer() {
@@ -108,7 +109,7 @@ public class StreamSequencerAdapterTest {
}
};
sequencer = new StreamSequencerAdapter(streamSequencer);
- seqContext = new SequencerContext(context, graph);
+ seqContext = new SequencerContext(context, graph, graph);
}
protected Path path( String path ) {
@@ -119,6 +120,23 @@ public class StreamSequencerAdapterTest {
return context.getValueFactories().getNameFactory().create(name);
}
+ protected Node assertNodeDoesExist( Graph graph,
+ String path ) {
+ Node node = graph.getNodeAt(path);
+ assertThat(node, is(notNullValue()));
+ return node;
+ }
+
+ protected void assertNodeDoesNotExist( Graph graph,
+ String path ) {
+ try {
+ graph.getNodeAt(path);
+ fail();
+ } catch (PathNotFoundException pnfe) {
+ // Expected
+ }
+ }
+
protected void testSequencer( final StreamSequencer sequencer ) throws Throwable {
StreamSequencer streamSequencer = new StreamSequencer() {
@@ -294,13 +312,8 @@ public class StreamSequencerAdapterTest {
graph.set("sequencedProperty").on("/a/b/c").to(new ByteArrayInputStream(sampleData.getBytes()));
Node nodeC = graph.getNodeAt("/a/b/c");
- try {
- graph.getNodeAt("/d");
- fail();
- } catch (PathNotFoundException pnfe) {
- // Expected
- }
assertThat(nodeC, is(notNullValue()));
+ assertNodeDoesNotExist(graph, "/d");
// Set up the node changes ...
Location location = Location.create(context.getValueFactories().getPathFactory().create("/a/b/c"));
@@ -331,20 +344,9 @@ public class StreamSequencerAdapterTest {
// Set the property that will be sequenced ...
graph.set("sequencedProperty").on("/a/b/c").to(new ByteArrayInputStream(sampleData.getBytes()));
- Node nodeC = graph.getNodeAt("/a/b/c");
- try {
- graph.getNodeAt("/d");
- fail();
- } catch (PathNotFoundException pnfe) {
- // Expected
- }
- try {
- graph.getNodeAt("/x");
- fail();
- } catch (PathNotFoundException pnfe) {
- // Expected
- }
- assertThat(nodeC, is(notNullValue()));
+ Node nodeC = assertNodeDoesExist(graph, "/a/b/c");
+ assertNodeDoesNotExist(graph, "/d");
+ assertNodeDoesNotExist(graph, "/x");
// Set up the node changes ...
Location location = Location.create(context.getValueFactories().getPathFactory().create("/a/b/c"));
@@ -594,6 +596,54 @@ public class StreamSequencerAdapterTest {
}
+ @FixFor( "MODE-1012" )
+ @Test
+ public void shouldSequenceInputFromOneGraphAndSaveOutputToAnotherGraph() throws Exception {
+ // Set up the second source ...
+ String repositorySourceName2 = "repository2";
+ InMemoryRepositorySource source2 = new InMemoryRepositorySource();
+ source2.setName(repositorySourceName2);
+ Graph graph2 = Graph.create(source2.getConnection(), context);
+ seqContext = new SequencerContext(context, graph, graph2);
+
+ // Set up the node that will be sequenced ...
+ graph.create("/a").and().create("/a/b").and().create("/a/b/c").and();
+ graph.set("sequencedProperty").on("/a/b/c").to(new ByteArrayInputStream(sampleData.getBytes()));
+
+ Node nodeC = assertNodeDoesExist(graph, "/a/b/c");
+ assertNodeDoesNotExist(graph, "/d");
+ assertNodeDoesNotExist(graph, "/x");
+
+ // Set up the node changes ...
+ Location location = Location.create(context.getValueFactories().getPathFactory().create("/a/b/c"));
+ Property sequencedProperty = nodeC.getProperty("sequencedProperty");
+ NetChange nodeChange = new NetChange(repositoryWorkspaceName, location, EnumSet.of(ChangeType.PROPERTY_CHANGED), null,
+ Collections.singleton(sequencedProperty), null, null, null, false);
+
+ // Set up the output directory ...
+ Set outputPaths = new HashSet();
+ outputPaths.add(new RepositoryNodePath(repositorySourceName2, repositoryWorkspaceName, "/d/e"));
+ outputPaths.add(new RepositoryNodePath(repositorySourceName2, repositoryWorkspaceName, "/x/y/z"));
+ outputPaths.add(new RepositoryNodePath(repositorySourceName2, repositoryWorkspaceName, "/x/z"));
+
+ // Generate the output data that the sequencer subclass will produce and that should be saved to the repository ...
+ sequencerOutput.setProperty(path("alpha/beta"), name("isSomething"), true);
+
+ // Call the sequencer ...
+ sequencer.execute(nodeC, "sequencedProperty", nodeChange, outputPaths, seqContext, problems);
+
+ // Check to see that the output nodes have been created ...
+ assertThat(graph2.getNodeAt("/d/e"), is(notNullValue()));
+ assertThat(graph2.getNodeAt("/x/y/z"), is(notNullValue()));
+ assertThat(graph2.getNodeAt("/x/z"), is(notNullValue()));
+
+ // Check to see that the sequencer-generated nodes have been created ...
+ assertThat(graph2.getNodeAt("/d/e/alpha/beta").getProperty("isSomething").getFirstValue().toString(), is("true"));
+ assertThat(graph2.getNodeAt("/x/y/z/alpha/beta").getProperty("isSomething").getFirstValue().toString(), is("true"));
+ assertThat(graph2.getNodeAt("/x/z/alpha/beta").getProperty("isSomething").getFirstValue().toString(), is("true"));
+
+ }
+
private void verifyProperty( StreamSequencerContext context,
String name,
Object... values ) {