ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [1/4] ignite git commit: IGNITE-7253: Streaming mode for JDBC thin driver. This closes #3499.
Date Mon, 19 Feb 2018 15:26:21 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-7725 85c4db5ba -> 192b70782


http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
new file mode 100644
index 0000000..b185535
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Container for connection properties passed by various drivers (JDBC drivers, possibly
ODBC) having notion of an
+ * <b>SQL connection</b> - Ignite basically does not have one.<p>
+ * Also contains anything that a driver may need to share between threads processing queries
of logically same client -
+ * see JDBC thin driver
+ */
+public class SqlClientContext implements AutoCloseable {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Distributed joins flag. */
+    private final boolean distributedJoins;
+
+    /** Enforce join order flag. */
+    private final boolean enforceJoinOrder;
+
+    /** Collocated flag. */
+    private final boolean collocated;
+
+    /** Lazy query execution flag. */
+    private final boolean lazy;
+
+    /** Skip reducer on update flag. */
+    private final boolean skipReducerOnUpdate;
+
+    /** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */
+    private final boolean streamAllowOverwrite;
+
+    /** Parallel ops count per node for data streamer. */
+    private final int streamNodeParOps;
+
+    /** Node buffer size for data streamer. */
+    private final int streamNodeBufSize;
+
+    /** Auto flush frequency for streaming. */
+    private final long streamFlushTimeout;
+
+    /** Streamers for various caches. */
+    private final Map<String, IgniteDataStreamer<?, ?>> streamers;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /**
+     * @param ctx Kernal context.
+     * @param distributedJoins Distributed joins flag.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param collocated Collocated flag.
+     * @param lazy Lazy query execution flag.
+     * @param skipReducerOnUpdate Skip reducer on update flag.
+     * @param stream Streaming state flag
+     * @param streamAllowOverwrite Allow overwrites for duplicate keys on streamed {@code
INSERT}s.
+     * @param streamNodeParOps Parallel ops count per node for data streamer.
+     * @param streamNodeBufSize Node buffer size for data streamer.
+     * @param streamFlushTimeout Auto flush frequency for streaming.
+     */
+    public SqlClientContext(GridKernalContext ctx, boolean distributedJoins, boolean enforceJoinOrder,
+        boolean collocated, boolean lazy, boolean skipReducerOnUpdate, boolean stream, boolean
streamAllowOverwrite,
+        int streamNodeParOps, int streamNodeBufSize, long streamFlushTimeout) {
+        this.ctx = ctx;
+        this.distributedJoins = distributedJoins;
+        this.enforceJoinOrder = enforceJoinOrder;
+        this.collocated = collocated;
+        this.lazy = lazy;
+        this.skipReducerOnUpdate = skipReducerOnUpdate;
+        this.streamAllowOverwrite = streamAllowOverwrite;
+        this.streamNodeParOps = streamNodeParOps;
+        this.streamNodeBufSize = streamNodeBufSize;
+        this.streamFlushTimeout = streamFlushTimeout;
+
+        streamers = stream ? new HashMap<>() : null;
+
+        log = ctx.log(SqlClientContext.class.getName());
+
+        ctx.query().registerClientContext(this);
+    }
+
+    /**
+     * @return Collocated flag.
+     */
+    public boolean isCollocated() {
+        return collocated;
+    }
+
+    /**
+     * @return Distributed joins flag.
+     */
+    public boolean isDistributedJoins() {
+        return distributedJoins;
+    }
+
+    /**
+     * @return Enforce join order flag.
+     */
+    public boolean isEnforceJoinOrder() {
+        return enforceJoinOrder;
+    }
+
+    /**
+     * @return Lazy query execution flag.
+     */
+    public boolean isLazy() {
+        return lazy;
+    }
+
+    /**
+     * @return Skip reducer on update flag,
+     */
+    public boolean isSkipReducerOnUpdate() {
+        return skipReducerOnUpdate;
+    }
+
+    /**
+     * @return Streaming state flag (on or off).
+     */
+    public boolean isStream() {
+        return streamers != null;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Streamer for given cache.
+     */
+    public IgniteDataStreamer<?, ?> streamerForCache(String cacheName) {
+        Map<String, IgniteDataStreamer<?, ?>> curStreamers = streamers;
+
+        if (curStreamers == null)
+            return null;
+
+        IgniteDataStreamer<?, ?> res = curStreamers.get(cacheName);
+
+        if (res != null)
+            return res;
+
+        res = ctx.grid().dataStreamer(cacheName);
+
+        IgniteDataStreamer<?, ?> exStreamer = curStreamers.putIfAbsent(cacheName, res);
+
+        if (exStreamer == null) {
+            res.autoFlushFrequency(streamFlushTimeout);
+
+            res.allowOverwrite(streamAllowOverwrite);
+
+            if (streamNodeBufSize > 0)
+                res.perNodeBufferSize(streamNodeBufSize);
+
+            if (streamNodeParOps > 0)
+                res.perNodeParallelOperations(streamNodeParOps);
+
+            return res;
+        }
+        else { // Someone got ahead of us.
+            res.close();
+
+            return exStreamer;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        ctx.query().unregisterClientContext(this);
+
+        if (streamers == null)
+            return;
+
+        for (IgniteDataStreamer<?, ?> s : streamers.values())
+            U.close(s, log);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
index 0627def..70f72a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
@@ -122,7 +122,7 @@ public class SqlParser {
                         return cmd;
                     }
                     else
-                        throw errorUnexpectedToken(lex, CREATE, DROP, COPY, ALTER);
+                        throw errorUnexpectedToken(lex, CREATE, DROP, ALTER, COPY);
 
                 case QUOTED:
                 case MINUS:

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index df27c5f..6d7e9ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
@@ -244,12 +245,18 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
 
         /** {@inheritDoc} */
         @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String
schemaName, SqlFieldsQuery qry,
-            boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) {
+            SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel
cancel) {
             return null;
         }
 
         /** {@inheritDoc} */
-        @Override public long streamUpdateQuery(String spaceName, String qry, @Nullable Object[]
params,
+        @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String
qry, List<Object[]> params,
+            SqlClientContext cliCtx) throws IgniteCheckedException {
+            return Collections.emptyList();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long streamUpdateQuery(String schemaName, String qry, @Nullable
Object[] params,
             IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException {
             return 0;
         }
@@ -372,8 +379,8 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
         }
 
         /** {@inheritDoc} */
-        @Override public boolean isInsertStatement(PreparedStatement nativeStmt) {
-            return false;
+        @Override public void checkStatementStreamable(PreparedStatement nativeStmt) {
+            // No-op.
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index b114828..8d2a820 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -299,7 +299,7 @@ public final class GridTestUtils {
             call.call();
         }
         catch (Throwable e) {
-            if (cls != e.getClass()) {
+            if (cls != e.getClass() && !cls.isAssignableFrom(e.getClass())) {
                 if (e.getClass() == CacheException.class && e.getCause() != null
&& e.getCause().getClass() == cls)
                     e = e.getCause();
                 else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index ea6c7c1..284d50a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -385,6 +385,7 @@ public class DmlStatementsProcessor {
     /**
      * Perform given statement against given data streamer. Only rows based INSERT is supported.
      *
+     * @param schemaName Schema name.
      * @param streamer Streamer to feed data to.
      * @param stmt Statement.
      * @param args Statement arguments.
@@ -392,81 +393,74 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, final Object[]
args)
+    long streamUpdateQuery(String schemaName, IgniteDataStreamer streamer, PreparedStatement
stmt, final Object[] args)
         throws IgniteCheckedException {
+        idx.checkStatementStreamable(stmt);
+
         Prepared p = GridSqlQueryParser.prepared(stmt);
 
         assert p != null;
 
-        final UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null,
null);
+        final UpdatePlan plan = getPlanForStatement(schemaName, null, p, null, true, null);
 
-        if (!F.eq(streamer.cacheName(), plan.cacheContext().name()))
-            throw new IgniteSQLException("Cross cache streaming is not supported, please
specify cache explicitly" +
-                " in connection options", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+        assert plan.isLocalSubquery();
 
-        if (plan.mode() == UpdateMode.INSERT && plan.rowCount() > 0) {
-            assert plan.isLocalSubquery();
+        final GridCacheContext cctx = plan.cacheContext();
 
-            final GridCacheContext cctx = plan.cacheContext();
+        QueryCursorImpl<List<?>> cur;
 
-            QueryCursorImpl<List<?>> cur;
+        final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
 
-            final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
-
-            QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new
Iterable<List<?>>() {
-                @Override public Iterator<List<?>> iterator() {
-                    try {
-                        Iterator<List<?>> it;
-
-                        if (!F.isEmpty(plan.selectQuery())) {
-                            GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()),
-                                plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)),
-                                null, false, 0, null);
+        QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>()
{
+            @Override public Iterator<List<?>> iterator() {
+                try {
+                    Iterator<List<?>> it;
 
-                            it = res.iterator();
-                        }
-                        else
-                            it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator();
+                    if (!F.isEmpty(plan.selectQuery())) {
+                        GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()),
+                            plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)),
+                            null, false, 0, null);
 
-                        return new GridQueryCacheObjectsIterator(it, idx.objectContext(),
cctx.keepBinary());
+                        it = res.iterator();
                     }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteException(e);
-                    }
-                }
-            }, null);
+                    else
+                        it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator();
 
-            data.addAll(stepCur.getAll());
-
-            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
-                @Override public Iterator<List<?>> iterator() {
-                    return data.iterator();
+                    return new GridQueryCacheObjectsIterator(it, idx.objectContext(), cctx.keepBinary());
                 }
-            }, null);
-
-            if (plan.rowCount() == 1) {
-                IgniteBiTuple t = plan.processRow(cur.iterator().next());
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, null);
 
-                streamer.addData(t.getKey(), t.getValue());
+        data.addAll(stepCur.getAll());
 
-                return 1;
+        cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+            @Override public Iterator<List<?>> iterator() {
+                return data.iterator();
             }
+        }, null);
 
-            Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
+        if (plan.rowCount() == 1) {
+            IgniteBiTuple t = plan.processRow(cur.iterator().next());
 
-            for (List<?> row : cur) {
-                final IgniteBiTuple t = plan.processRow(row);
+            streamer.addData(t.getKey(), t.getValue());
 
-                rows.put(t.getKey(), t.getValue());
-            }
+            return 1;
+        }
+
+        Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
 
-            streamer.addData(rows);
+        for (List<?> row : cur) {
+            final IgniteBiTuple t = plan.processRow(row);
 
-            return rows.size();
+            rows.put(t.getKey(), t.getValue());
         }
-        else
-            throw new IgniteSQLException("Only tuple based INSERT statements are supported
in streaming mode",
-                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+        streamer.addData(rows);
+
+        return rows.size();
     }
 
     /**
@@ -519,7 +513,7 @@ public class DmlStatementsProcessor {
                 .setPageSize(fieldsQry.getPageSize())
                 .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
 
-            cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry,
true, true,
+            cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry,
null, true, true,
                 cancel).get(0);
         }
         else if (plan.hasRows())
@@ -610,7 +604,7 @@ public class DmlStatementsProcessor {
      * @return Update plan.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    private UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery
fieldsQry,
+    UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery
fieldsQry,
         boolean loc, @Nullable Integer errKeysPos) throws IgniteCheckedException {
         H2CachedStatementKey planKey = H2CachedStatementKey.forDmlStatement(schema, p.getSQL(),
fieldsQry, loc);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 361623c..a0b2c34 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerIO;
@@ -100,6 +101,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
 import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory;
@@ -149,7 +151,6 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
 import org.h2.api.ErrorCode;
 import org.h2.api.JavaObjectSerializer;
 import org.h2.command.Prepared;
-import org.h2.command.dml.Insert;
 import org.h2.command.dml.NoOperation;
 import org.h2.engine.Session;
 import org.h2.engine.SysProperties;
@@ -191,7 +192,7 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType
 @SuppressWarnings({"UnnecessaryFullyQualifiedName", "NonFinalStaticVariableUsedInClassInitialization"})
 public class IgniteH2Indexing implements GridQueryIndexing {
     public static final Pattern INTERNAL_CMD_RE = Pattern.compile(
-        "^(create|drop)\\s+index|^alter\\s+table|^copy", Pattern.CASE_INSENSITIVE);
+        "^(create|drop)\\s+index|^alter\\s+table|^copy|^set|^flush", Pattern.CASE_INSENSITIVE);
 
     /*
      * Register IO for indexes.
@@ -500,10 +501,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql)
throws SQLException {
+    @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql)
{
         Connection conn = connectionForSchema(schemaName);
 
-        return prepareStatement(conn, sql, true);
+        return prepareStatementAndCaches(conn, sql);
     }
 
     /**
@@ -1013,7 +1014,60 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             throw new IgniteSQLException(e);
         }
 
-        return dmlProc.streamUpdateQuery(streamer, stmt, params);
+        return dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String
qry, List<Object[]> params,
+        SqlClientContext cliCtx) throws IgniteCheckedException {
+        if (cliCtx == null || !cliCtx.isStream()) {
+            U.warn(log, "Connection is not in streaming mode.");
+
+            return zeroBatchedStreamedUpdateResult(params.size());
+        }
+
+        final Connection conn = connectionForSchema(schemaName);
+
+        final PreparedStatement stmt = prepareStatementAndCaches(conn, qry);
+
+        if (GridSqlQueryParser.checkMultipleStatements(stmt))
+            throw new IgniteSQLException("Multiple statements queries are not supported for
streaming mode.",
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+        checkStatementStreamable(stmt);
+
+        Prepared p = GridSqlQueryParser.prepared(stmt);
+
+        UpdatePlan plan = dmlProc.getPlanForStatement(schemaName, conn, p, null, true, null);
+
+        IgniteDataStreamer<?, ?> streamer = cliCtx.streamerForCache(plan.cacheContext().name());
+
+        if (streamer != null) {
+            List<Long> res = new ArrayList<>(params.size());
+
+            for (int i = 0; i < params.size(); i++)
+                res.add(dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params.get(i)));
+
+            return res;
+        }
+        else {
+            U.warn(log, "Streaming has been turned off by concurrent command.");
+
+            return zeroBatchedStreamedUpdateResult(params.size());
+        }
+    }
+
+    /**
+     * @param size Result size.
+     * @return List of given size filled with 0Ls.
+     */
+    private static List<Long> zeroBatchedStreamedUpdateResult(int size) {
+        Long[] res = new Long[size];
+
+        Arrays.fill(res, 0);
+
+        return Arrays.asList(res);
     }
 
     /**
@@ -1399,7 +1453,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
 
         final QueryCursor<List<?>> res =
-            querySqlFields(schemaName, fqry, keepBinary, true, null).get(0);
+            querySqlFields(schemaName, fqry, null, keepBinary, true, null).get(0);
 
         final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K,
V>>() {
             @Override public Iterator<Cache.Entry<K, V>> iterator() {
@@ -1435,19 +1489,19 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * Try executing query using native facilities.
      *
      * @param schemaName Schema name.
-     * @param qry Query.
+     * @param sql Query.
      * @return Result or {@code null} if cannot parse/process this query.
      */
-    private List<FieldsQueryCursor<List<?>>> tryQueryDistributedSqlFieldsNative(String
schemaName, SqlFieldsQuery qry) {
+    private List<FieldsQueryCursor<List<?>>> tryQueryDistributedSqlFieldsNative(String
schemaName, String sql) {
         // Heuristic check for fast return.
-        if (!INTERNAL_CMD_RE.matcher(qry.getSql().trim()).find())
+        if (!INTERNAL_CMD_RE.matcher(sql.trim()).find())
             return null;
 
         // Parse.
         SqlCommand cmd;
 
         try {
-            SqlParser parser = new SqlParser(schemaName, qry.getSql());
+            SqlParser parser = new SqlParser(schemaName, sql);
 
             cmd = parser.nextCommand();
 
@@ -1455,15 +1509,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (parser.nextCommand() != null)
                 return null;
 
-            // Currently supported commands are: CREATE/DROP INDEX/COPY/ALTER TABLE
+            // Currently supported commands are:
+            // CREATE/DROP INDEX
+            // COPY
+            // ALTER TABLE
+            // SET STREAMING
+            // FLUSH STREAMER
             if (!(cmd instanceof SqlCreateIndexCommand || cmd instanceof SqlDropIndexCommand
||
-                cmd instanceof SqlBulkLoadCommand || cmd instanceof SqlAlterTableCommand))
+                cmd instanceof SqlAlterTableCommand || cmd instanceof SqlBulkLoadCommand))
                 return null;
         }
         catch (Exception e) {
             // Cannot parse, return.
             if (log.isDebugEnabled())
-                log.debug("Failed to parse SQL with native parser [qry=" + qry.getSql() +
", err=" + e + ']');
+                log.debug("Failed to parse SQL with native parser [qry=" + sql + ", err="
+ e + ']');
 
             if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK))
                 return null;
@@ -1473,24 +1532,24 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (e instanceof SqlParseException)
                 code = ((SqlParseException)e).code();
 
-            throw new IgniteSQLException("Failed to parse DDL statement: " + qry.getSql()
+ ": " + e.getMessage(),
+            throw new IgniteSQLException("Failed to parse DDL statement: " + sql + ": " +
e.getMessage(),
                 code, e);
         }
 
         // Execute.
         if (cmd instanceof SqlBulkLoadCommand) {
-            FieldsQueryCursor<List<?>> cursor = dmlProc.runNativeDmlStatement(qry.getSql(),
cmd);
+            FieldsQueryCursor<List<?>> cursor = dmlProc.runNativeDmlStatement(sql,
cmd);
 
             return Collections.singletonList(cursor);
         }
         else {
             try {
-                FieldsQueryCursor<List<?>> cursor = ddlProc.runDdlStatement(qry.getSql(),
cmd);
+                FieldsQueryCursor<List<?>> cursor = ddlProc.runDdlStatement(sql,
cmd);
 
                 return Collections.singletonList(cursor);
             }
             catch (IgniteCheckedException e) {
-                throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql()
+ "]: "
+                throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sql
+ "]: "
                     + e.getMessage(), e);
             }
         }
@@ -1514,8 +1573,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** {@inheritDoc} */
     @SuppressWarnings("StringEquality")
     @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String
schemaName, SqlFieldsQuery qry,
-        boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) {
-        List<FieldsQueryCursor<List<?>>> res = tryQueryDistributedSqlFieldsNative(schemaName,
qry);
+        SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel
cancel) {
+        List<FieldsQueryCursor<List<?>>> res = tryQueryDistributedSqlFieldsNative(schemaName,
qry.getSql());
 
         if (res != null)
             return res;
@@ -1553,8 +1612,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                 // We may use this cached statement only for local queries and non queries.
                 if (qry.isLocal() || !prepared.isQuery())
-                    return (List<FieldsQueryCursor<List<?>>>)doRunPrepared(schemaName,
prepared, qry, null, null,
-                            keepBinary, cancel);
+                    return (List<FieldsQueryCursor<List<?>>>)doRunPrepared(schemaName,
prepared, qry, null, cliCtx,
+                        null, keepBinary, cancel);
             }
         }
 
@@ -1584,7 +1643,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             firstArg += prepared.getParameters().size();
 
-            res.addAll(doRunPrepared(schemaName, prepared, newQry, twoStepQry, meta, keepBinary,
cancel));
+            res.addAll(doRunPrepared(schemaName, prepared, newQry, twoStepQry, cliCtx, meta,
keepBinary, cancel));
 
             if (parseRes.twoStepQuery() != null && parseRes.twoStepQueryKey() !=
null &&
                     !parseRes.twoStepQuery().explain())
@@ -1600,14 +1659,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param prepared H2 command.
      * @param qry Fields query with flags.
      * @param twoStepQry Two-step query if this query must be executed in a distributed way.
+     * @param cliCtx Client context, or {@code null} if not applicable.
      * @param meta Metadata for {@code twoStepQry}.
      * @param keepBinary Whether binary objects must not be deserialized automatically.
-     * @param cancel Query cancel state holder.
-     * @return Query result.
+     * @param cancel Query cancel state holder.    @return Query result.
      */
     private List<? extends FieldsQueryCursor<List<?>>> doRunPrepared(String
schemaName, Prepared prepared,
-        SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata>
meta, boolean keepBinary,
-        GridQueryCancel cancel) {
+        SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, @Nullable SqlClientContext
cliCtx,
+        List<GridQueryFieldMetadata> meta, boolean keepBinary, GridQueryCancel cancel)
{
         String sqlQry = qry.getSql();
 
         boolean loc = qry.isLocal();
@@ -2276,10 +2335,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isInsertStatement(PreparedStatement nativeStmt) {
-        Prepared prep = GridSqlQueryParser.prepared(nativeStmt);
-
-        return prep instanceof Insert;
+    @Override public void checkStatementStreamable(PreparedStatement nativeStmt) {
+        if (!GridSqlQueryParser.isStreamableInsertStatement(nativeStmt))
+            throw new IgniteSQLException("Only tuple based INSERT statements are supported
in streaming mode.",
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
index 6f5b51f..5441e36 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
@@ -106,7 +106,7 @@ public class DdlStatementsProcessor {
      * @throws IgniteCheckedException On error.
      */
     @SuppressWarnings("unchecked")
-    public FieldsQueryCursor<List<?>> runDdlStatement(String sql, SqlCommand
cmd) throws IgniteCheckedException{
+    public FieldsQueryCursor<List<?>> runDdlStatement(String sql, SqlCommand
cmd) throws IgniteCheckedException {
         IgniteInternalFuture fut;
 
         try {
@@ -211,12 +211,7 @@ public class DdlStatementsProcessor {
             if (fut != null)
                 fut.get();
 
-            QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new
QueryCursorImpl(Collections.singletonList
-                (Collections.singletonList(0L)), null, false);
-
-            resCur.fieldsMeta(UPDATE_RESULT_META);
-
-            return resCur;
+            return zeroCursor();
         }
         catch (SchemaOperationException e) {
             throw convert(e);
@@ -230,6 +225,19 @@ public class DdlStatementsProcessor {
     }
 
     /**
+     * @return Single-column, single-row cursor with 0 as number of updated records.
+     */
+    @SuppressWarnings("unchecked")
+    public static QueryCursorImpl<List<?>> zeroCursor() {
+        QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new
QueryCursorImpl(Collections.singletonList
+            (Collections.singletonList(0L)), null, false);
+
+        resCur.fieldsMeta(UPDATE_RESULT_META);
+
+        return resCur;
+    }
+
+    /**
      * Execute DDL statement.
      *
      * @param sql SQL.

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index 10d485a..98fbb97 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -505,7 +505,7 @@ public final class UpdatePlan {
     /**
      * @return Local subquery flag.
      */
-    @Nullable public boolean isLocalSubquery() {
+    public boolean isLocalSubquery() {
         return isLocSubqry;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index bced836..d897ac7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -85,20 +85,21 @@ public final class UpdatePlanBuilder {
      * @param loc Local query flag.
      * @param idx Indexing.
      * @param conn Connection.
-     * @param fieldsQuery Original query.
+     * @param fieldsQry Original query.
      * @return Update plan.
      */
     public static UpdatePlan planForStatement(Prepared prepared, boolean loc, IgniteH2Indexing
idx,
-        @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery, @Nullable Integer
errKeysPos)
+        @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQry, @Nullable Integer
errKeysPos)
         throws IgniteCheckedException {
-        assert !prepared.isQuery();
-
         GridSqlStatement stmt = new GridSqlQueryParser(false).parse(prepared);
 
         if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert)
-            return planForInsert(stmt, loc, idx, conn, fieldsQuery);
+            return planForInsert(stmt, loc, idx, conn, fieldsQry);
+        else if (stmt instanceof GridSqlUpdate || stmt instanceof GridSqlDelete)
+            return planForUpdate(stmt, loc, idx, conn, fieldsQry, errKeysPos);
         else
-            return planForUpdate(stmt, loc, idx, conn, fieldsQuery, errKeysPos);
+            throw new IgniteSQLException("Unsupported operation: " + prepared.getSQL(),
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 04bc212..2d2c25c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -1993,6 +1993,18 @@ public class GridSqlQueryParser {
     }
 
     /**
+     * Check if passed statement is insert statement eligible for streaming.
+     *
+     * @param nativeStmt Native statement.
+     * @return {@code True} if streamable insert.
+     */
+    public static boolean isStreamableInsertStatement(PreparedStatement nativeStmt) {
+        Prepared prep = prepared(nativeStmt);
+
+        return prep instanceof Insert && INSERT_QUERY.get((Insert)prep) == null;
+    }
+
+    /**
      * @param cond Condition.
      * @param o Object.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 069bdd7..cf8bb2e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -140,7 +140,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest
{
         SqlFieldsQuery qry = new SqlFieldsQuery("select f.productId, p.name, f.price " +
             "from FactPurchase f, \"replicated-prod\".DimProduct p where p.id = f.productId
");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll())
{
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false,
true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             set1.add((Integer)o.get(0));
@@ -154,7 +154,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest
{
 
         qry = new SqlFieldsQuery("select productId from FactPurchase group by productId");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll())
{
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false,
true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             assertTrue(set0.add((Integer) o.get(0)));
@@ -173,7 +173,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest
{
             "where p.id = f.productId " +
             "group by f.productId, p.name");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll())
{
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false,
true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             assertTrue(names.add((String)o.get(0)));
@@ -190,7 +190,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest
{
             "group by f.productId, p.name " +
             "having s >= 15");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll())
{
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false,
true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             assertTrue(i(o, 1) >= 15);
@@ -203,7 +203,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest
{
         qry = new SqlFieldsQuery("select top 3 distinct productId " +
             "from FactPurchase f order by productId desc ");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll())
{
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false,
true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             assertEquals(top--, o.get(0));
@@ -216,7 +216,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest
{
         qry = new SqlFieldsQuery("select distinct productId " +
             "from FactPurchase f order by productId desc limit 2 offset 1");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll())
{
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false,
true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             assertEquals(top--, o.get(0));
@@ -256,13 +256,13 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest
{
         GridTestUtils.assertThrows(log,
             new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    qryProc.querySqlFields(cache.context(), qry, false, true);
+                    qryProc.querySqlFields(cache.context(), qry, null, false, true);
 
                     return null;
                 }
             }, IgniteSQLException.class, "Multiple statements queries are not supported");
 
-        List<FieldsQueryCursor<List<?>>> cursors = qryProc.querySqlFields(cache.context(),
qry, false, false);
+        List<FieldsQueryCursor<List<?>>> cursors = qryProc.querySqlFields(cache.context(),
qry, null, false, false);
 
         assertEquals(2, cursors.size());
 
@@ -274,7 +274,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest
{
         GridTestUtils.assertThrows(log,
             new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    qryProc.querySqlFields(cache.context(), qry, false, false);
+                    qryProc.querySqlFields(cache.context(), qry, null, false, false);
 
                     return null;
                 }


Mime
View raw message