jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sal...@apache.org
Subject svn commit: r1415420 [3/3] - in /jena/branches/streaming-update: jena-arq/Grammar/ jena-arq/src/main/java/com/hp/hpl/jena/sparql/lang/ jena-arq/src/main/java/com/hp/hpl/jena/sparql/lang/arq/ jena-arq/src/main/java/com/hp/hpl/jena/sparql/lang/sparql_10/...
Date Thu, 29 Nov 2012 22:43:46 GMT
Added: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateRequestSink.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateRequestSink.java?rev=1415420&view=auto
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateRequestSink.java (added)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateRequestSink.java Thu Nov 29 22:43:42 2012
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.sparql.modify;
+
+import com.hp.hpl.jena.sparql.core.Prologue;
+import com.hp.hpl.jena.sparql.modify.request.QuadDataAcc;
+import com.hp.hpl.jena.sparql.modify.request.QuadDataAccSink;
+import com.hp.hpl.jena.sparql.modify.request.UpdateDataDelete;
+import com.hp.hpl.jena.sparql.modify.request.UpdateDataInsert;
+import com.hp.hpl.jena.update.Update ;
+import com.hp.hpl.jena.update.UpdateRequest ;
+
+public class UpdateRequestSink extends AbstractUpdateSink implements UpdateSink
+{
+    final UpdateRequest updateRequest;
+    
+    public UpdateRequestSink(UpdateRequest updateRequest, UsingList usingList)
+    {
+        super(usingList);
+        this.updateRequest = updateRequest;
+    }
+    
+    @Override
+    public void doSend(Update item)
+    {
+        updateRequest.add(item);
+    }
+    
+    @Override
+    public void flush()
+    { }
+    
+    @Override
+    public void close()
+    { }
+    
+    @Override
+    public Prologue getPrologue()
+    {
+        return updateRequest;
+    }
+    
+    @Override
+    public QuadDataAccSink getInsertDataSink()
+    {
+        QuadDataAcc quads = new QuadDataAcc();
+        send(new UpdateDataInsert(quads));
+        
+        return quads;
+    }
+    
+    @Override
+    public QuadDataAccSink getDeleteDataSink()
+    {
+        QuadDataAcc quads = new QuadDataAcc();
+        send(new UpdateDataDelete(quads));
+        
+        return quads;
+    }
+}

Added: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateSink.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateSink.java?rev=1415420&view=auto
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateSink.java (added)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateSink.java Thu Nov 29 22:43:42 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.sparql.modify;
+
+import org.apache.jena.atlas.lib.Sink ;
+
+import com.hp.hpl.jena.sparql.core.Prologue ;
+import com.hp.hpl.jena.sparql.modify.request.QuadDataAccSink ;
+import com.hp.hpl.jena.update.Update ;
+
+/**
+ * An update sink is used by the SPARQL Update parser, which sends queries and sends quads to insert/delete data sinks.
+ */
+public interface UpdateSink extends Sink<Update>
+{
+    public Prologue getPrologue();
+
+    public QuadDataAccSink getInsertDataSink();
+
+    public QuadDataAccSink getDeleteDataSink();
+
+}
\ No newline at end of file

Added: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateVisitorSink.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateVisitorSink.java?rev=1415420&view=auto
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateVisitorSink.java (added)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UpdateVisitorSink.java Thu Nov 29 22:43:42 2012
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.sparql.modify;
+
+import com.hp.hpl.jena.sparql.core.Prologue ;
+import com.hp.hpl.jena.sparql.modify.request.QuadDataAccSink ;
+import com.hp.hpl.jena.sparql.modify.request.UpdateVisitor ;
+import com.hp.hpl.jena.update.Update ;
+
+public class UpdateVisitorSink extends AbstractUpdateSink implements UpdateSink
+{
+    private final Prologue prologue;
+    private final UpdateVisitor worker;
+    
+    public UpdateVisitorSink(UpdateVisitor worker, UsingList usingList)
+    {
+        super(usingList);
+        this.prologue = new Prologue();
+        this.worker = worker;
+    }
+    
+    @Override
+    public Prologue getPrologue()
+    {
+        return prologue;
+    }
+    
+    @Override
+    public void doSend(Update update)
+    {
+        update.visit(worker);
+    }
+    
+    @Override
+    public QuadDataAccSink getInsertDataSink()
+    {
+        return new QuadDataAccSink(worker.getInsertDataSink());
+    }
+    
+    @Override
+    public QuadDataAccSink getDeleteDataSink()
+    {
+        return new QuadDataAccSink(worker.getDeleteDataSink());
+    }
+
+    @Override
+    public void flush()
+    { }
+
+    @Override
+    public void close()
+    { }
+}

Added: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UsingList.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UsingList.java?rev=1415420&view=auto
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UsingList.java (added)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/UsingList.java Thu Nov 29 22:43:42 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.sparql.modify;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import com.hp.hpl.jena.graph.Node;
+
+public class UsingList
+{
+    private List<Node> using = new ArrayList<Node>() ;
+    private List<Node> usingNamed = new ArrayList<Node>() ;
+    
+    public void addUsing(Node node)                      { using.add(node) ; }
+    public void addAllUsing(Collection<Node> nodes)      { using.addAll(nodes); }
+    public void addUsingNamed(Node node)                 { usingNamed.add(node) ; }
+    public void addAllUsingNamed(Collection<Node> nodes) { usingNamed.addAll(nodes); }
+    
+    public List<Node> getUsing()                         { return Collections.unmodifiableList(using) ; }
+    public List<Node> getUsingNamed()                    { return Collections.unmodifiableList(usingNamed) ; }
+    
+    public boolean usingIsPresent()                      { return using.size() > 0 || usingNamed.size() > 0 ; }
+}

Modified: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadAcc.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadAcc.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadAcc.java (original)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadAcc.java Thu Nov 29 22:43:42 2012
@@ -22,70 +22,33 @@ import java.util.ArrayList ;
 import java.util.Collections ;
 import java.util.List ;
 
-import com.hp.hpl.jena.graph.Node ;
-import com.hp.hpl.jena.graph.Triple ;
+import org.apache.jena.atlas.lib.SinkToCollection ;
+
 import com.hp.hpl.jena.sparql.core.Quad ;
-import com.hp.hpl.jena.sparql.core.TriplePath ;
-import com.hp.hpl.jena.sparql.syntax.TripleCollector ;
 
 /** Accumulate quads (including allowing variables) during parsing. */
-public class QuadAcc implements TripleCollector
+public class QuadAcc extends QuadAccSink
 {
     // A lists of Pairs: Node and Triple connector
     
-    private Node graphNode = Quad.defaultGraphNodeGenerated ;
-    private List<Quad> quads = new ArrayList<Quad>() ;
-    private List<Quad> quadsView = Collections.unmodifiableList(quads) ;
-    
-    public QuadAcc()     {}
+    private final List<Quad> quads ;
+    private final List<Quad> quadsView ;
     
-    protected void check(Triple triple) {} 
-    protected void check(Quad quad) {} 
-    
-    public void setGraph(Node n) 
-    { 
-        graphNode = n ;
-    }
-    
-    public Node getGraph()    { return graphNode ; }
-    
-    public List<Quad> getQuads()
+    public QuadAcc()
     {
-        return quadsView ;
+        this(new ArrayList<Quad>());
     }
     
-    public void addQuad(Quad quad)
-    {
-        check(quad) ;
-        quads.add(quad) ;
-    }
-
-    @Override
-    public void addTriple(Triple triple)
+    public QuadAcc(List<Quad> quads)
     {
-        check(triple) ;
-        quads.add(new Quad(graphNode, triple)) ;
+        super(new SinkToCollection<Quad>(quads)) ;
+        this.quads = quads ;
+        this.quadsView = Collections.unmodifiableList(quads) ;
     }
-
-    @Override
-    public void addTriple(int index, Triple triple)
-    {
-        check(triple) ;
-        quads.add(index, new Quad(graphNode, triple)) ;
-    }
-
-    @Override
-    public void addTriplePath(TriplePath tPath)
-    { throw new UnsupportedOperationException("Can't add paths to quads") ; }
-
-    @Override
-    public void addTriplePath(int index, TriplePath tPath)
-    { throw new UnsupportedOperationException("Can't add paths to quads") ; }
-
-    @Override
-    public int mark()
+    
+    public List<Quad> getQuads()
     {
-        return quads.size() ;
+        return quadsView ;
     }
     
     @Override

Added: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadAccSink.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadAccSink.java?rev=1415420&view=auto
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadAccSink.java (added)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadAccSink.java Thu Nov 29 22:43:42 2012
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.sparql.modify.request;
+
+import org.apache.jena.atlas.lib.Closeable ;
+import org.apache.jena.atlas.lib.Sink ;
+
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+import com.hp.hpl.jena.sparql.core.TriplePath ;
+import com.hp.hpl.jena.sparql.syntax.TripleCollector ;
+
+/** Accumulate quads into a Sink (including allowing variables) during parsing. */
+public class QuadAccSink implements TripleCollector, Closeable
+{
+    protected Node graphNode = Quad.defaultGraphNodeGenerated ;
+    private final Sink<Quad> sink;
+    
+    public QuadAccSink(Sink<Quad> sink)
+    {
+        this.sink = sink;
+    }
+    
+    protected void check(Triple triple) {} 
+    protected void check(Quad quad) {} 
+    
+    public void setGraph(Node n) 
+    { 
+        graphNode = n ;
+    }
+    
+    public Node getGraph()    { return graphNode ; }
+    
+    public void addQuad(Quad quad)
+    {
+        check(quad) ;
+        sink.send(quad) ;
+    }
+
+    @Override
+    public void addTriple(Triple triple)
+    {
+        check(triple) ;
+        sink.send(new Quad(graphNode, triple)) ;
+    }
+
+    @Override
+    public void addTriplePath(TriplePath tPath)
+    { throw new UnsupportedOperationException("Can't add paths to quads") ; }
+
+    @Override
+    public void close()
+    {
+        sink.close();
+    }
+}

Modified: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java (original)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java Thu Nov 29 22:43:42 2012
@@ -18,34 +18,45 @@
 
 package com.hp.hpl.jena.sparql.modify.request;
 
-import com.hp.hpl.jena.graph.Triple ;
-import com.hp.hpl.jena.query.QueryParseException ;
+import java.util.ArrayList ;
+import java.util.Collections ;
+import java.util.List ;
+
+import org.apache.jena.atlas.lib.SinkToCollection ;
+
 import com.hp.hpl.jena.sparql.core.Quad ;
-import com.hp.hpl.jena.sparql.core.Var ;
 
 /** Accumulate quads (excluding allowing variables) during parsing. */
-public class QuadDataAcc extends QuadAcc
+public class QuadDataAcc extends QuadDataAccSink
 {
-    @Override
-    protected void check(Triple t)
+    private final List<Quad> quads ;
+    private final List<Quad> quadsView ;
+    
+    public QuadDataAcc()
+    {
+        this(new ArrayList<Quad>());
+    }
+    
+    public QuadDataAcc(List<Quad> quads)
+    {
+        super(new SinkToCollection<Quad>(quads));
+        this.quads = quads;
+        this.quadsView = Collections.unmodifiableList(quads) ;
+    }
+    
+    public List<Quad> getQuads()
     {
-        if ( Var.isVar(getGraph()) )
-            throw new QueryParseException("Variables not permitted in data quad", -1, -1) ;   
-        if ( Var.isVar(t.getSubject()) || Var.isVar(t.getPredicate()) || Var.isVar(t.getObject())) 
-            throw new QueryParseException("Variables not permitted in data quad", -1, -1) ;  
-        if ( t.getSubject().isLiteral() )
-            throw new QueryParseException("Literals not allowed as subjects in data", -1, -1) ;
+        return quadsView ;
     }
     
     @Override
-    protected void check(Quad quad)
+    public int hashCode() { return quads.hashCode() ; }
+
+    @Override
+    public boolean equals(Object other)
     {
-        if ( Var.isVar(quad.getGraph()) || 
-             Var.isVar(quad.getSubject()) || 
-             Var.isVar(quad.getPredicate()) || 
-             Var.isVar(quad.getObject())) 
-            throw new QueryParseException("Variables not permitted in data quad", -1, -1) ;   
-        if ( quad.getSubject().isLiteral() )
-            throw new QueryParseException("Literals not allowed as subjects in quad data", -1, -1) ;
+        if ( ! ( other instanceof QuadDataAcc ) ) return false ;
+        QuadDataAcc acc = (QuadDataAcc)other ;
+        return quads.equals(acc.quads) ; 
     }
 }

Added: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAccSink.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAccSink.java?rev=1415420&view=auto
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAccSink.java (added)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAccSink.java Thu Nov 29 22:43:42 2012
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.sparql.modify.request;
+
+import org.apache.jena.atlas.lib.Sink ;
+
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.query.QueryParseException ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+import com.hp.hpl.jena.sparql.core.Var ;
+
+/** Accumulate quads (excluding allowing variables) during parsing. */
+public class QuadDataAccSink extends QuadAccSink
+{
+    public QuadDataAccSink(Sink<Quad> sink)
+    {
+        super(sink);
+    }
+
+    @Override
+    protected void check(Triple t)
+    {
+        if ( Var.isVar(getGraph()) )
+            throw new QueryParseException("Variables not permitted in data quad", -1, -1) ;   
+        if ( Var.isVar(t.getSubject()) || Var.isVar(t.getPredicate()) || Var.isVar(t.getObject())) 
+            throw new QueryParseException("Variables not permitted in data quad", -1, -1) ;  
+        if ( t.getSubject().isLiteral() )
+            throw new QueryParseException("Literals not allowed as subjects in data", -1, -1) ;
+    }
+    
+    @Override
+    protected void check(Quad quad)
+    {
+        if ( Var.isVar(quad.getGraph()) || 
+             Var.isVar(quad.getSubject()) || 
+             Var.isVar(quad.getPredicate()) || 
+             Var.isVar(quad.getObject())) 
+            throw new QueryParseException("Variables not permitted in data quad", -1, -1) ;   
+        if ( quad.getSubject().isLiteral() )
+            throw new QueryParseException("Literals not allowed as subjects in quad data", -1, -1) ;
+    }
+}

Modified: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java (original)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java Thu Nov 29 22:43:42 2012
@@ -18,6 +18,10 @@
 
 package com.hp.hpl.jena.sparql.modify.request;
 
+import org.apache.jena.atlas.lib.Sink ;
+
+import com.hp.hpl.jena.sparql.core.Quad ;
+
 public interface UpdateVisitor
 {
     public void visit(UpdateDrop update) ;
@@ -34,4 +38,7 @@ public interface UpdateVisitor
     public void visit(UpdateDataDelete update) ;
     public void visit(UpdateDeleteWhere update) ;
     public void visit(UpdateModify update) ;
+    
+    public Sink<Quad> getInsertDataSink();
+    public Sink<Quad> getDeleteDataSink();
 }

Modified: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java (original)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java Thu Nov 29 22:43:42 2012
@@ -18,26 +18,27 @@
 
 package com.hp.hpl.jena.sparql.modify.request;
 
-import java.util.Iterator;
-import java.util.List;
+import java.util.Iterator ;
+import java.util.List ;
 
 import org.apache.jena.atlas.io.IndentedWriter ;
 import org.apache.jena.atlas.iterator.Iter ;
 import org.apache.jena.atlas.lib.Closeable ;
-import org.openjena.riot.out.SinkQuadBracedOutput;
+import org.apache.jena.atlas.lib.Sink ;
+import org.openjena.riot.out.SinkQuadBracedOutput ;
 
-import com.hp.hpl.jena.graph.Node;
-import com.hp.hpl.jena.graph.Triple;
-import com.hp.hpl.jena.sparql.ARQException;
-import com.hp.hpl.jena.sparql.core.Quad;
-import com.hp.hpl.jena.sparql.modify.request.UpdateDataWriter.UpdateMode;
-import com.hp.hpl.jena.sparql.serializer.FormatterElement;
-import com.hp.hpl.jena.sparql.serializer.PrologueSerializer;
-import com.hp.hpl.jena.sparql.serializer.SerializationContext;
-import com.hp.hpl.jena.sparql.syntax.Element;
-import com.hp.hpl.jena.sparql.util.FmtUtils;
-import com.hp.hpl.jena.update.Update;
-import com.hp.hpl.jena.update.UpdateRequest;
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.ARQException ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+import com.hp.hpl.jena.sparql.modify.request.UpdateDataWriter.UpdateMode ;
+import com.hp.hpl.jena.sparql.serializer.FormatterElement ;
+import com.hp.hpl.jena.sparql.serializer.PrologueSerializer ;
+import com.hp.hpl.jena.sparql.serializer.SerializationContext ;
+import com.hp.hpl.jena.sparql.syntax.Element ;
+import com.hp.hpl.jena.sparql.util.FmtUtils ;
+import com.hp.hpl.jena.update.Update ;
+import com.hp.hpl.jena.update.UpdateRequest ;
 
 public class UpdateWriter implements Closeable
 {
@@ -391,19 +392,30 @@ public class UpdateWriter implements Clo
         { printUpdate2(update, "MOVE") ; }
 
         @Override
-        public void visit(UpdateDataInsert update)
+        public Sink<Quad> getInsertDataSink()
         {
             UpdateDataWriter udw = new UpdateDataWriter(UpdateMode.INSERT, out, sCxt);
             udw.open();
-            Iter.sendToSink(update.getQuads(), udw);  // udw.close() is called by Iter.sendToSink()
+            return udw;
         }
 
         @Override
-        public void visit(UpdateDataDelete update)
+        public void visit(UpdateDataInsert update)
+        {
+            Iter.sendToSink(update.getQuads(), getInsertDataSink());  // Iter.sendToSink() will call close() on the sink
+        }
+        
+        public Sink<Quad> getDeleteDataSink()
         {
             UpdateDataWriter udw = new UpdateDataWriter(UpdateMode.DELETE, out, sCxt);
             udw.open();
-            Iter.sendToSink(update.getQuads(), udw);
+            return udw;
+        }
+
+        @Override
+        public void visit(UpdateDataDelete update)
+        {
+            Iter.sendToSink(update.getQuads(), getDeleteDataSink()); // Iter.sendToSink() will call close() on the sink
         }
 
         // Prettier later.

Modified: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java (original)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java Thu Nov 29 22:43:42 2012
@@ -47,23 +47,12 @@ public class ElementPathBlock extends El
     { pattern.add(tp) ; }
     
     @Override
-    public int mark() { return pattern.size() ; }
-    
-    @Override
     public void addTriple(Triple t)
     { addTriplePath(new TriplePath(t)) ; }
 
     @Override
-    public void addTriple(int index, Triple t)
-    { addTriplePath(index, new TriplePath(t)) ; }
-
-    @Override
     public void addTriplePath(TriplePath tPath)
     { pattern.add(tPath) ; }
-
-    @Override
-    public void addTriplePath(int index, TriplePath tPath)
-    { pattern.add(index, tPath) ; }
     
     public PathBlock getPattern() { return pattern ; }
     public Iterator<TriplePath> patternElts() { return pattern.iterator(); }

Modified: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java (original)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java Thu Nov 29 22:43:42 2012
@@ -49,20 +49,9 @@ public class ElementTriplesBlock extends
     { pattern.add(t) ; }
     
     @Override
-    public int mark() { return pattern.size() ; }
-    
-    @Override
-    public void addTriple(int index, Triple t)
-    { pattern.add(index, t) ; }
-    
-    @Override
     public void addTriplePath(TriplePath path)
     { throw new ARQException("Triples-only collector") ; }
 
-    @Override
-    public void addTriplePath(int index, TriplePath path)
-    { throw new ARQException("Triples-only collector") ; }
-    
     public BasicPattern getPattern() { return pattern ; }
     public Iterator<Triple> patternElts() { return pattern.iterator(); }
     

Modified: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java (original)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java Thu Nov 29 22:43:42 2012
@@ -25,12 +25,6 @@ import com.hp.hpl.jena.sparql.core.Tripl
 public interface TripleCollector
 {
     public void addTriple(Triple t) ;
-    // The contract with the mark is that there should be no disturbing
-    // triples 0..(mark-1) before using a mark. That is, use marks in
-    // LIFO (stack) order.
-    public int mark() ;
-    public void addTriple(int index, Triple t) ;
     
     public void addTriplePath(TriplePath tPath) ;
-    public void addTriplePath(int index, TriplePath tPath) ;
 }

Modified: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java (original)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java Thu Nov 29 22:43:42 2012
@@ -37,16 +37,6 @@ public class TripleCollectorBGP implemen
     public void addTriple(Triple t) { bgp.add(t) ; }
     
     @Override
-    public int mark() { return bgp.size() ; }
-    
-    @Override
-    public void addTriple(int index, Triple t) { bgp.add(index, t) ; }
-    
-    @Override
     public void addTriplePath(TriplePath path)
     { throw new ARQException("Triples-only collector") ; }
-
-    @Override
-    public void addTriplePath(int index, TriplePath path)
-    { throw new ARQException("Triples-only collector") ; }
 }

Modified: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java (original)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java Thu Nov 29 22:43:42 2012
@@ -18,13 +18,21 @@
 
 package com.hp.hpl.jena.update;
 
+import java.io.InputStream ;
+
+import org.apache.jena.atlas.io.IO ;
+
 import com.hp.hpl.jena.graph.Graph ;
 import com.hp.hpl.jena.query.Dataset ;
 import com.hp.hpl.jena.query.QuerySolution ;
+import com.hp.hpl.jena.query.Syntax ;
 import com.hp.hpl.jena.rdf.model.Model ;
 import com.hp.hpl.jena.sparql.core.DatasetGraph ;
 import com.hp.hpl.jena.sparql.engine.binding.Binding ;
 import com.hp.hpl.jena.sparql.engine.binding.BindingUtils ;
+import com.hp.hpl.jena.sparql.lang.UpdateParser ;
+import com.hp.hpl.jena.sparql.modify.UpdateSink ;
+import com.hp.hpl.jena.sparql.modify.UsingList ;
 
 /** A class of forms for executing SPARQL Update operations. 
  * parse* means the update request is in a string;
@@ -327,7 +335,7 @@ public class UpdateAction
         execute$(request, graphStore, binding) ;
     }
     
-    // Everything comes through here.
+    // All non-streaming updates come through here.
     private static void execute$(UpdateRequest request, GraphStore graphStore, Binding binding)
     {
         UpdateProcessor uProc = UpdateExecutionFactory.create(request, graphStore, binding) ;
@@ -436,4 +444,95 @@ public class UpdateAction
         execute$(request, graphStore, binding) ;
     }  
 
+    
+    
+    // Streaming Updates:
+    
+    /** Parse update operations into a GraphStore by reading it from a file */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, String fileName)
+    { 
+        parseExecute(usingList, dataset, fileName, null, Syntax.defaultUpdateSyntax) ;
+    }
+    
+    /** Parse update operations into a GraphStore by reading it from a file */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, String fileName, Syntax syntax)
+    {
+        parseExecute(usingList, dataset, fileName, null, syntax) ;
+    }
+
+    /** Parse update operations into a GraphStore by reading it from a file */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, String fileName, String baseURI, Syntax syntax)
+    { 
+        InputStream in = null ;
+        if ( fileName.equals("-") )
+            in = System.in ;
+        else
+        {
+            in = IO.openFile(fileName) ;
+            if ( in == null )
+                throw new UpdateException("File could not be opened: "+fileName) ;
+        }
+        parseExecute(usingList, dataset, in, baseURI, syntax) ;
+    }
+    
+    /** 
+     * Parse update operations into a GraphStore by parsing from an InputStream.
+     * @param input     The source of the update request (must be UTF-8). 
+     */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, InputStream input)
+    {
+        parseExecute(usingList, dataset, input, Syntax.defaultUpdateSyntax) ;
+    }
+
+    /** 
+     * Parse update operations into a GraphStore by parsing from an InputStream.
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param syntax    The update language syntax 
+     */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, InputStream input, Syntax syntax)
+    {
+        parseExecute(usingList, dataset, input, null, syntax) ;
+    }
+    
+    /**
+     * Parse update operations into a GraphStore by parsing from an InputStream.
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param baseURI   The base URI for resolving relative URIs. 
+     */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, InputStream input, String baseURI)
+    { 
+        parseExecute(usingList, dataset, input, baseURI, Syntax.defaultUpdateSyntax) ;
+    }
+    
+    /**
+     * Parse update operations into a GraphStore by parsing from an InputStream.
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param baseURI   The base URI for resolving relative URIs. 
+     * @param syntax    The update language syntax 
+     */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, InputStream input, String baseURI, Syntax syntax)
+    {
+        GraphStore graphStore = GraphStoreFactory.create(dataset);
+        
+        UpdateProcessorStreaming uProc = UpdateExecutionFactory.createStreaming(usingList, graphStore) ;
+        
+        uProc.startRequest();
+        try
+        {
+            UpdateSink sink = uProc.getUpdateSink();
+            try
+            {
+                UpdateParser parser = UpdateFactory.setupParser(sink.getPrologue(), baseURI, syntax) ;
+                parser.parse(sink, input) ;
+            }
+            finally
+            {
+                sink.close() ;
+            }
+        }
+        finally
+        {
+            uProc.finishRequest();
+        }
+    }
 }

Modified: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java (original)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java Thu Nov 29 22:43:42 2012
@@ -92,6 +92,38 @@ public class UpdateExecutionFactory
     {        
         return make(updateRequest, graphStore, initialBinding, null) ;
     }
+    
+    /** Create an UpdateProcessor appropriate to the GraphStore, or null if no available factory to make an UpdateProcessor 
+     * @param graphStore
+     * @return UpdateProcessor or null
+     */
+    public static UpdateProcessorStreaming createStreaming(GraphStore graphStore)
+    {        
+        return createStreaming(new UsingList(), graphStore) ;
+    }
+    
+    /** Create an UpdateProcessor appropriate to the GraphStore, or null if no available factory to make an UpdateProcessor 
+     * @param usingList
+     * @param graphStore
+     * @return UpdateProcessor or null
+     */
+    public static UpdateProcessorStreaming createStreaming(UsingList usingList, GraphStore graphStore)
+    {        
+        return createStreaming(usingList, graphStore, null, null) ;
+    }
+    
+    /** Create an UpdateProcessor appropriate to the GraphStore, or null if no available factory to make an UpdateProcessor 
+     * @param usingList
+     * @param graphStore
+     * @param initialBinding (may be null for none)
+     * @param context  (null means use merge of global and graph store context))
+     * @return UpdateProcessor or null
+     */
+    public static UpdateProcessorStreaming createStreaming(UsingList usingList, GraphStore graphStore, Binding initialBinding, Context context)
+    {        
+        return makeStreaming(usingList, graphStore, initialBinding, context) ;
+    }
+    
 
     /** Create an UpdateProcessor appropriate to the GraphStore, or null if no available factory to make an UpdateProcessor 
      * @param updateRequest
@@ -105,7 +137,7 @@ public class UpdateExecutionFactory
         return make(updateRequest, graphStore, initialBinding, context) ;
     }
 
-    // Everything comes through here
+    // Everything comes through one of these two make methods
     private static UpdateProcessor make(UpdateRequest updateRequest, GraphStore graphStore, Binding initialBinding, Context context)
     {
         if ( context == null )
@@ -124,6 +156,27 @@ public class UpdateExecutionFactory
         return uProc ;
     }
     
+    // Everything comes through one of these two make methods
+    private static UpdateProcessorStreaming makeStreaming(UsingList usingList, GraphStore graphStore, Binding initialBinding, Context context)
+    {
+        if ( context == null )
+        {
+            context = ARQ.getContext().copy();
+            context.putAll(graphStore.getContext()) ;
+        }
+        
+        UpdateEngineFactory f = UpdateEngineRegistry.get().findStreaming(graphStore, context) ;
+        if ( f == null )
+            return null ;
+        
+        UpdateProcessorStreamingBase uProc = new UpdateProcessorStreamingBase(usingList, graphStore, context, f) ;
+        if ( initialBinding != null )
+            uProc.setInitialBinding(initialBinding) ;
+        return uProc;
+    }
+    
+    
+    
     /** Create an UpdateProcessor that send the update to a remote SPARQL Update service.
      * @param update
      * @param remoteEndpoint

Modified: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java (original)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java Thu Nov 29 22:43:42 2012
@@ -28,7 +28,11 @@ import org.apache.jena.atlas.io.IO ;
 
 import com.hp.hpl.jena.n3.IRIResolver ;
 import com.hp.hpl.jena.query.Syntax ;
+import com.hp.hpl.jena.sparql.core.Prologue ;
 import com.hp.hpl.jena.sparql.lang.UpdateParser ;
+import com.hp.hpl.jena.sparql.modify.UpdateRequestSink ;
+import com.hp.hpl.jena.sparql.modify.UpdateSink ;
+import com.hp.hpl.jena.sparql.modify.UsingList ;
 
 public class UpdateFactory
 {
@@ -82,7 +86,7 @@ public class UpdateFactory
     private static void make(UpdateRequest request, String input,  String baseURI, Syntax syntax)
     {
         UpdateParser parser = setupParser(request, baseURI, syntax) ;
-        parser.parse(request, input) ;
+        parser.parse(new UpdateRequestSink(request, null), input) ;
     }
     
     /* Parse operations and add to an UpdateRequest */ 
@@ -110,7 +114,7 @@ public class UpdateFactory
     }
     
     /** Append update operations to a request */
-    private static UpdateParser setupParser(UpdateRequest request, String baseURI, Syntax syntax)
+    protected static UpdateParser setupParser(Prologue prologue, String baseURI, Syntax syntax)
     {
         if ( syntax != syntaxSPARQL_11 && syntax != syntaxARQ ) 
             throw new UnsupportedOperationException("Unrecognized syntax for parsing update: "+syntax) ;
@@ -120,19 +124,25 @@ public class UpdateFactory
         if ( parser == null )
             throw new UnsupportedOperationException("Unrecognized syntax for parsing update: "+syntax) ;
         
-        if ( request.getResolver() == null )
+        if ( prologue.getResolver() == null )
         {
             // Sort out the baseURI - if that fails, dump in a dummy one and continue.
             try { baseURI = IRIResolver.chooseBaseURI(baseURI) ; }
             catch (Exception ex)
             { baseURI = "http://localhost/defaultBase#" ; }
-            request.setResolver(new IRIResolver(baseURI)) ;
+            prologue.setResolver(new IRIResolver(baseURI)) ;
         }
         
         return parser ;
     }
     
     /** Create an UpdateRequest by reading it from a file */
+    public static UpdateRequest read(UsingList usingList, String fileName)
+    { 
+        return read(usingList, fileName, null, defaultUpdateSyntax) ;
+    }
+    
+    /** Create an UpdateRequest by reading it from a file */
     public static UpdateRequest read(String fileName)
     { 
         return read(fileName, null, defaultUpdateSyntax) ;
@@ -143,10 +153,22 @@ public class UpdateFactory
     {
         return read(fileName, null, syntax) ;
     }
+    
+    /** Create an UpdateRequest by reading it from a file */
+    public static UpdateRequest read(UsingList usingList, String fileName, Syntax syntax)
+    {
+        return read(usingList, fileName, null, syntax) ;
+    }
 
     /** Create an UpdateRequest by reading it from a file */
     public static UpdateRequest read(String fileName, String baseURI, Syntax syntax)
     { 
+        return read(null, fileName, baseURI, syntax);
+    }
+    
+    /** Create an UpdateRequest by reading it from a file */
+    public static UpdateRequest read(UsingList usingList, String fileName, String baseURI, Syntax syntax)
+    { 
         InputStream in = null ;
         if ( fileName.equals("-") )
             in = System.in ;
@@ -156,10 +178,10 @@ public class UpdateFactory
             if ( in == null )
                 throw new UpdateException("File could not be opened: "+fileName) ;
         }
-        return read(in, baseURI, syntax) ;
+        return read(usingList, in, baseURI, syntax) ;
     }
     
-    /**  Create an UpdateRequest by parsing from a string.
+    /**  Create an UpdateRequest by parsing from an InputStream.
      * See also <tt>read</tt> operations for parsing contents of a file.
      * @param input     The source of the update request (must be UTF-8). 
      */
@@ -167,8 +189,18 @@ public class UpdateFactory
     {
         return read(input, defaultUpdateSyntax) ;
     }
+    
+    /**  Create an UpdateRequest by parsing from an InputStream.
+     * See also <tt>read</tt> operations for parsing contents of a file.
+     * @param usingList The list of externally defined USING statements
+     * @param input     The source of the update request (must be UTF-8). 
+     */
+    public static UpdateRequest read(UsingList usingList, InputStream input)
+    {
+        return read(usingList, input, defaultUpdateSyntax) ;
+    }
 
-    /**  Create an UpdateRequest by parsing from a string.
+    /**  Create an UpdateRequest by parsing from an InputStream.
      * See also <tt>read</tt> operations for parsing contents of a file.
      * @param input     The source of the update request (must be UTF-8). 
      * @param syntax    The update language syntax 
@@ -178,7 +210,18 @@ public class UpdateFactory
         return read(input, null, syntax) ;
     }
     
-    /**  Create an UpdateRequest by parsing from a string.
+    /**  Create an UpdateRequest by parsing from an InputStream.
+     * See also <tt>read</tt> operations for parsing contents of a file.
+     * @param usingList The list of externally defined USING statements
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param syntax    The update language syntax 
+     */
+    public static UpdateRequest read(UsingList usingList, InputStream input, Syntax syntax)
+    {
+        return read(usingList, input, null, syntax) ;
+    }
+    
+    /**  Create an UpdateRequest by parsing from an InputStream.
      * See also <tt>read</tt> operations for parsing contents of a file.
      * @param input     The source of the update request (must be UTF-8). 
      * @param baseURI   The base URI for resolving relative URIs. 
@@ -188,7 +231,18 @@ public class UpdateFactory
         return read(input, baseURI, defaultUpdateSyntax) ;
     }
     
-    /**  Create an UpdateRequest by parsing from a string.
+    /**  Create an UpdateRequest by parsing from an InputStream.
+     * See also <tt>read</tt> operations for parsing contents of a file.
+     * @param usingList The list of externally defined USING statements
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param baseURI   The base URI for resolving relative URIs. 
+     */
+    public static UpdateRequest read(UsingList usingList, InputStream input, String baseURI)
+    { 
+        return read(usingList, input, baseURI, defaultUpdateSyntax) ;
+    }
+    
+    /**  Create an UpdateRequest by parsing from an InputStream.
      * See also <tt>read</tt> operations for parsing contents of a file.
      * @param input     The source of the update request (must be UTF-8). 
      * @param baseURI   The base URI for resolving relative URIs. 
@@ -196,15 +250,35 @@ public class UpdateFactory
      */
     public static UpdateRequest read(InputStream input, String baseURI, Syntax syntax)
     {
+        return read(null, input, baseURI, syntax);
+    }
+    
+    /**  Create an UpdateRequest by parsing from an InputStream.
+     * See also <tt>read</tt> operations for parsing contents of a file.
+     * @param usingList The list of externally defined USING statements
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param baseURI   The base URI for resolving relative URIs. 
+     * @param syntax    The update language syntax 
+     */
+    public static UpdateRequest read(UsingList usingList, InputStream input, String baseURI, Syntax syntax)
+    {
         UpdateRequest request = new UpdateRequest() ;
-        make(request, input, baseURI, syntax) ;
+        make(request, usingList, input, baseURI, syntax) ;
         return request ;
     }
     
     /** Append update operations to a request */
-    private static void make(UpdateRequest request, InputStream input,  String baseURI, Syntax syntax)
+    private static void make(UpdateRequest request, UsingList usingList, InputStream input,  String baseURI, Syntax syntax)
     {
         UpdateParser parser = setupParser(request, baseURI, syntax) ;
-        parser.parse(request, input) ;
+        UpdateSink sink = new UpdateRequestSink(request, usingList) ;
+        try
+        {
+            parser.parse(sink, input) ;
+        }
+        finally
+        {
+            sink.close() ;
+        }
     }
 }

Added: jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateProcessorStreaming.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateProcessorStreaming.java?rev=1415420&view=auto
==============================================================================
--- jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateProcessorStreaming.java (added)
+++ jena/branches/streaming-update/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateProcessorStreaming.java Thu Nov 29 22:43:42 2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.update;
+
+import com.hp.hpl.jena.query.QuerySolution ;
+import com.hp.hpl.jena.sparql.modify.UpdateSink ;
+import com.hp.hpl.jena.sparql.util.Context ;
+
+/** An instance of a execution of an UpdateRequest */ 
+public interface UpdateProcessorStreaming
+{
+    /** The properties associated with a query execution -  
+     *  implementation specific parameters  This includes
+     *  Java objects (so it is not an RDF graph).
+     *  Keys should be URIs as strings.  
+     *  May be null (this implementation does not provide any configuration).
+     */ 
+    public Context getContext() ;
+    
+    /** Set the initial association of variables and values.
+     * May not be supported by all UpdateProcessor implementations.
+     * @param binding
+     */
+    public void setInitialBinding(QuerySolution binding) ;
+    
+    /**
+     * The dataset against which the query will execute.
+     * May be null, implying the there isn't a local GraphStore target for this UpdateProcessor.
+     */
+    public GraphStore getGraphStore() ;
+    
+    /** Start the request, call before putting updates into the Sink */
+    public void startRequest() ;
+    
+    /** Finish the request, call after putting updates into the Sink */
+    public void finishRequest() ;
+    
+    /** The UpdateSink into which Updates are added and executed */
+    public UpdateSink getUpdateSink() ;
+}

Modified: jena/branches/streaming-update/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java (original)
+++ jena/branches/streaming-update/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java Thu Nov 29 22:43:42 2012
@@ -39,6 +39,7 @@ public class HttpAction
     public final long id ;
     private DatasetGraph dsg ;                  // The data
     private final Transactional transactional ;
+    private final boolean isTransactional;
     private DatasetRef desc ;
     private DatasetGraph  activeDSG ;           // Set when inside begin/end.
     
@@ -69,13 +70,17 @@ public class HttpAction
         this.dsg = desc.dataset ;
 
         if ( dsg instanceof Transactional )
+        {
             transactional = (Transactional)dsg ;
+            isTransactional = true ;
+        }
         else
         {
-            // Non-trsanctional - wrap in something that does locking to give the same 
+            // Non-transactional - wrap in something that does locking to give the same 
             // functionality in the absense of errors, with less concurrency.
             DatasetGraphWithLock dsglock = new DatasetGraphWithLock(dsg) ; 
             transactional = dsglock ;
+            isTransactional = false ;
             dsg = dsglock ;
         }
         this.request = request ;
@@ -83,6 +88,14 @@ public class HttpAction
         this.verbose = verbose ;
     }
     
+    /**
+     * Returns whether or not the underlying DatasetGraph is fully transactional (supports rollback)
+     */
+    public boolean isTransactional()
+    {
+        return isTransactional;
+    }
+    
     public void beginRead()
     {
         transactional.begin(ReadWrite.READ) ;

Modified: jena/branches/streaming-update/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java (original)
+++ jena/branches/streaming-update/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java Thu Nov 29 22:43:42 2012
@@ -25,6 +25,7 @@ import static org.apache.jena.fuseki.Htt
 import static org.apache.jena.fuseki.HttpNames.paramUsingGraphURI ;
 import static org.apache.jena.fuseki.HttpNames.paramUsingNamedGraphURI ;
 
+import java.io.ByteArrayInputStream ;
 import java.io.IOException ;
 import java.io.InputStream ;
 import java.util.Arrays ;
@@ -49,8 +50,14 @@ import org.openjena.riot.system.IRIResol
 import com.hp.hpl.jena.graph.Node ;
 import com.hp.hpl.jena.query.QueryParseException ;
 import com.hp.hpl.jena.query.Syntax ;
-import com.hp.hpl.jena.sparql.modify.request.UpdateWithUsing ;
-import com.hp.hpl.jena.update.* ;
+import com.hp.hpl.jena.sparql.modify.UpdateVisitorSink ;
+import com.hp.hpl.jena.sparql.modify.UpdateRequestSink ;
+import com.hp.hpl.jena.sparql.modify.UpdateSink ;
+import com.hp.hpl.jena.sparql.modify.UsingList ;
+import com.hp.hpl.jena.update.UpdateAction ;
+import com.hp.hpl.jena.update.UpdateException ;
+import com.hp.hpl.jena.update.UpdateFactory ;
+import com.hp.hpl.jena.update.UpdateRequest ;
 
 public class SPARQL_Update extends SPARQL_Protocol
 {
@@ -191,25 +198,19 @@ public class SPARQL_Update extends SPARQ
         try { input = action.request.getInputStream() ; }
         catch (IOException ex) { errorOccurred(ex) ; }
 
-        UpdateRequest req ;
-        try {
-            if ( super.verbose_debug || action.verbose )
-            {
-                // Verbose mode only .... capture request for logging (does not scale). 
-                String requestStr = null ;
-                try { requestStr = IO.readWholeFileAsUTF8(action.request.getInputStream()) ; }
-                catch (IOException ex) { IO.exception(ex) ; }
-
-                String requestStrLog = formatForLog(requestStr) ;
-                requestLog.info(format("[%d] Update = %s", action.id, requestStrLog)) ;
-                req = UpdateFactory.create(requestStr, Syntax.syntaxARQ) ;
-            }    
-            else
-                req = UpdateFactory.read(input, Syntax.syntaxARQ) ;
-        } 
-        catch (UpdateException ex) { errorBadRequest(ex.getMessage()) ; req = null ; }
-        catch (QueryParseException ex)  { errorBadRequest(messageForQPE(ex)) ; req = null ; } 
-        execute(action, req) ;
+        if ( super.verbose_debug || action.verbose )
+        {
+            // Verbose mode only .... capture request for logging (does not scale). 
+            String requestStr = null ;
+            try { requestStr = IO.readWholeFileAsUTF8(input) ; }
+            catch (IOException ex) { IO.exception(ex) ; }
+            requestLog.info(format("[%d] Update = %s", action.id, formatForLog(requestStr))) ;
+            
+            input = new ByteArrayInputStream(requestStr.getBytes());
+            requestStr = null;
+        }
+        
+        execute(action, input) ;
         successNoContent(action) ;
     }
 
@@ -222,41 +223,71 @@ public class SPARQL_Update extends SPARQ
         if ( super.verbose_debug || action.verbose )
             //requestLog.info(format("[%d] Form update = %s", action.id, formatForLog(requestStr))) ;
             requestLog.info(format("[%d] Form update = \n%s", action.id, requestStr)) ;
-        
-        UpdateRequest req ; 
-        try {
-            req = UpdateFactory.create(requestStr, updateParseBase) ;
-        }
-        catch (UpdateException ex) { errorBadRequest(ex.getMessage()) ; req = null ; }
-        catch (QueryParseException ex) { errorBadRequest(messageForQPE(ex)) ; req = null ; }
-        execute(action, req) ;
+
+        // A little ugly because we are taking a copy of the string, but hopefully shouldn't be too big if we are in this code-path
+        // If we didn't want this additional copy, we could make the parser take a Reader in addition to an InputStream
+        ByteArrayInputStream input = new ByteArrayInputStream(requestStr.getBytes());
+        requestStr = null;  // free it early at least
+
+        execute(action, input);
         successPage(action,"Update succeeded") ;
     }
     
-    private void execute(HttpActionUpdate action, UpdateRequest updateRequest)
+    private void execute(HttpActionUpdate action, InputStream input)
     {
-        processProtocol(action.request, updateRequest) ;
+        UsingList usingList = processProtocol(action.request) ;
+        
+        // If the dsg is transactional, then we can parse and execute the update in a streaming fashion.
+        // If it isn't, we need to read the entire update request before performing any updates, because
+        // we have to attempt to make the request atomic in the face of malformed queries
+        UpdateRequest req = null ;
+        if (!action.isTransactional())
+        {
+            try
+            {
+                // TODO implement a spill-to-disk version of this
+                req = UpdateFactory.read(usingList, input, Syntax.syntaxARQ);
+            }
+            catch (UpdateException ex) { errorBadRequest(ex.getMessage()) ; return ; }
+            catch (QueryParseException ex)  { errorBadRequest(messageForQPE(ex)) ; return ; }
+        }
         
         action.beginWrite() ;
-        try {
-            UpdateAction.execute(updateRequest, action.getActiveDSG()) ;
+        try
+        {
+            if (action.isTransactional())
+            {
+                UpdateAction.parseExecute(usingList, action.getActiveDSG(), input, Syntax.syntaxARQ);
+            }
+            else
+            {
+                UpdateAction.execute(req, action.getActiveDSG()) ;
+            }
+            
             action.commit() ;
-        }
-        catch ( UpdateException ex) { action.abort() ; errorBadRequest(ex.getMessage()) ; }
-        finally { action.endWrite() ; }
+        } 
+        catch (UpdateException ex) { action.abort(); errorBadRequest(ex.getMessage()) ; }
+        catch (QueryParseException ex)  { action.abort(); errorBadRequest(messageForQPE(ex)) ; }
+        finally { action.endWrite(); }
     }
 
     /* [It is an error to supply the using-graph-uri or using-named-graph-uri parameters 
      * when using this protocol to convey a SPARQL 1.1 Update request that contains an 
      * operation that uses the USING, USING NAMED, or WITH clause.]
+     * 
+     * We will simply capture any using parameters here and pass them to the parser, which will be
+     * responsible for throwing an UpdateException if the query violates the above requirement,
+     * and will also be responsible for adding the using parameters to update queries that can
+     * accept them.
      */
-    
-    private void processProtocol(HttpServletRequest request, UpdateRequest updateRequest)
+    private UsingList processProtocol(HttpServletRequest request)
     {
+        UsingList toReturn = new UsingList();
+        
         String[] usingArgs = request.getParameterValues(paramUsingGraphURI) ;
         String[] usingNamedArgs = request.getParameterValues(paramUsingNamedGraphURI) ;
         if ( usingArgs == null && usingNamedArgs == null )
-            return ;
+            return toReturn;
         if ( usingArgs == null )
             usingArgs = new String[0] ;
         if ( usingNamedArgs == null )
@@ -264,21 +295,17 @@ public class SPARQL_Update extends SPARQ
         // Impossible.
 //        if ( usingArgs.length == 0 && usingNamedArgs.length == 0 )
 //            return ;
-        // ---- check USING/USING NAMED/WITH not used.
-        // ---- update request to have USING/USING NAMED 
-        for ( Update up : updateRequest.getOperations() )
+        
+        for (String nodeUri : usingArgs)
         {
-            if ( up instanceof UpdateWithUsing )
-            {
-                UpdateWithUsing upu = (UpdateWithUsing)up ;
-                if ( upu.getUsing().size() != 0 || upu.getUsingNamed().size() != 0 || upu.getWithIRI() != null )
-                    errorBadRequest("SPARQL Update: Protocol using-graph-uri or using-named-graph-uri present where update request has USING, USING NAMED or WITH") ;
-                for ( String a : usingArgs )
-                    upu.addUsing(createNode(a)) ;
-                for ( String a : usingNamedArgs )
-                    upu.addUsingNamed(createNode(a)) ;
-            }
+            toReturn.addUsing(createNode(nodeUri));
+        }
+        for (String nodeUri : usingNamedArgs)
+        {
+            toReturn.addUsingNamed(createNode(nodeUri));
         }
+        
+        return toReturn;
     }
     
     private static Node createNode(String x)

Modified: jena/branches/streaming-update/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java
URL: http://svn.apache.org/viewvc/jena/branches/streaming-update/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java?rev=1415420&r1=1415419&r2=1415420&view=diff
==============================================================================
--- jena/branches/streaming-update/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java (original)
+++ jena/branches/streaming-update/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java Thu Nov 29 22:43:42 2012
@@ -23,6 +23,8 @@ import com.hp.hpl.jena.sparql.modify.Upd
 import com.hp.hpl.jena.sparql.modify.UpdateEngineFactory ;
 import com.hp.hpl.jena.sparql.modify.UpdateEngineMain ;
 import com.hp.hpl.jena.sparql.modify.UpdateEngineRegistry ;
+import com.hp.hpl.jena.sparql.modify.UpdateEngineStreaming ;
+import com.hp.hpl.jena.sparql.modify.UsingList ;
 import com.hp.hpl.jena.sparql.util.Context ;
 import com.hp.hpl.jena.tdb.store.DatasetGraphTDB ;
 import com.hp.hpl.jena.update.GraphStore ;
@@ -33,9 +35,9 @@ public class UpdateEngineTDB extends Upd
     public UpdateEngineTDB(DatasetGraphTDB graphStore, UpdateRequest request, Binding inputBinding, Context context)
     { super(graphStore, request, inputBinding, context) ; }
     
-    @Override
-    public void execute()
-    { super.execute() ; }
+    public UpdateEngineTDB(DatasetGraphTDB graphStore, UsingList usingList, Binding inputBinding, Context context)
+    { super(graphStore, usingList, inputBinding, context) ; }
+    
 
     // ---- Factory
     public static UpdateEngineFactory getFactory() { 
@@ -53,6 +55,17 @@ public class UpdateEngineTDB extends Upd
                 return new UpdateEngineTDB((DatasetGraphTDB)graphStore, request, inputBinding, context) ;
             }
 
+            @Override
+            public boolean acceptStreaming(GraphStore graphStore, Context context)
+            {
+                return (graphStore instanceof DatasetGraphTDB) ;
+            }
+            
+            @Override
+            public UpdateEngineStreaming createStreaming(UsingList usingList, GraphStore graphStore, Binding inputBinding, Context context)
+            {
+                return new UpdateEngineTDB((DatasetGraphTDB)graphStore, usingList, inputBinding, context);
+            }
         } ;
     }
 



Mime
View raw message