ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [27/41] ignite git commit: IGNITE-1308: Moved regular (not continuous!) queries to Ignite.
Date Fri, 28 Aug 2015 12:49:45 GMT
IGNITE-1308: Moved regular (not continuous!) queries to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8529e108
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8529e108
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8529e108

Branch: refs/heads/ignite-1093
Commit: 8529e10855e71c63c4fc5a83a9cdb2300109bb19
Parents: 975f47e
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Wed Aug 26 15:43:34 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed Aug 26 15:43:34 2015 +0300

----------------------------------------------------------------------
 .../query/PlatformAbstractQueryCursor.java      | 192 +++++++++++++++++++
 .../cache/query/PlatformFieldsQueryCursor.java  |  50 +++++
 .../cache/query/PlatformQueryCursor.java        |  46 +++++
 3 files changed, 288 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8529e108/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
new file mode 100644
index 0000000..cdd29fd
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTarget
implements AutoCloseable {
+    /** Get multiple entries. */
+    private static final int OP_GET_ALL = 1;
+
+    /** Get all entries. */
+    private static final int OP_GET_BATCH = 2;
+
+    /** Get single entry. */
+    private static final int OP_GET_SINGLE = 3;
+
+    /** Underlying cursor. */
+    private final QueryCursorEx<T> cursor;
+
+    /** Batch size size. */
+    private final int batchSize;
+
+    /** Underlying iterator. */
+    private Iterator<T> iter;
+
+    /**
+     * Constructor.
+     *
+     * @param interopCtx Interop context.
+     * @param cursor Underlying cursor.
+     * @param batchSize Batch size.
+     */
+    public PlatformAbstractQueryCursor(PlatformContext interopCtx, QueryCursorEx<T>
cursor, int batchSize) {
+        super(interopCtx);
+
+        this.cursor = cursor;
+        this.batchSize = batchSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void processOutOp(int type, final PortableRawWriterEx writer) throws
IgniteCheckedException {
+        switch (type) {
+            case OP_GET_BATCH: {
+                assert iter != null : "iterator() has not been called";
+
+                try {
+                    int cntPos = writer.reserveInt();
+
+                    int cnt;
+
+                    for (cnt = 0; cnt < batchSize; cnt++) {
+                        if (iter.hasNext())
+                            write(writer, iter.next());
+                        else
+                            break;
+                    }
+
+                    writer.writeInt(cntPos, cnt);
+                }
+                catch (Exception err) {
+                    throw PlatformUtils.unwrapQueryException(err);
+                }
+
+                break;
+            }
+
+            case OP_GET_SINGLE: {
+                assert iter != null : "iterator() has not been called";
+
+                try {
+                    if (iter.hasNext()) {
+                        write(writer, iter.next());
+
+                        return;
+                    }
+                }
+                catch (Exception err) {
+                    throw PlatformUtils.unwrapQueryException(err);
+                }
+
+                throw new IgniteCheckedException("No more data available.");
+            }
+
+            case OP_GET_ALL: {
+                try {
+                    int pos = writer.reserveInt();
+
+                    Consumer<T> consumer = new Consumer<>(this, writer);
+
+                    cursor.getAll(consumer);
+
+                    writer.writeInt(pos, consumer.cnt);
+                }
+                catch (Exception err) {
+                    throw PlatformUtils.unwrapQueryException(err);
+                }
+
+                break;
+            }
+
+            default:
+                throwUnsupported(type);
+        }
+    }
+
+    /**
+     * Get cursor iterator.
+     */
+    public void iterator() {
+        iter = cursor.iterator();
+    }
+
+    /**
+     * Check whether next iterator entry exists.
+     *
+     * @return {@code True} if exists.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public boolean iteratorHasNext() {
+        assert iter != null : "iterator() has not been called";
+
+        return iter.hasNext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        cursor.close();
+    }
+
+    /**
+     * Write value to the stream. Extension point to perform conversions on the object before
writing it.
+     *
+     * @param writer Writer.
+     * @param val Value.
+     */
+    protected abstract void write(PortableRawWriterEx writer, T val);
+
+    /**
+     * Query cursor consumer.
+     */
+    private static class Consumer<T> implements QueryCursorEx.Consumer<T> {
+        /** Current query cursor. */
+        private final PlatformAbstractQueryCursor<T> cursor;
+
+        /** Writer. */
+        private final PortableRawWriterEx writer;
+
+        /** Count. */
+        private int cnt;
+
+        /**
+         * Constructor.
+         *
+         * @param writer Writer.
+         */
+        public Consumer(PlatformAbstractQueryCursor<T> cursor, PortableRawWriterEx
writer) {
+            this.cursor = cursor;
+            this.writer = writer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void consume(T val) throws IgniteCheckedException {
+            cursor.write(writer, val);
+
+            cnt++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8529e108/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
new file mode 100644
index 0000000..f18a79a
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.platform.*;
+
+import java.util.*;
+
+/**
+ * Interop cursor for fields query.
+ */
+public class PlatformFieldsQueryCursor extends PlatformAbstractQueryCursor<List<?>>
{
+    /**
+     * Constructor.
+     *
+     * @param interopCtx Interop context.
+     * @param cursor Backing cursor.
+     * @param batchSize Batch size.
+     */
+    public PlatformFieldsQueryCursor(PlatformContext interopCtx, QueryCursorEx<List<?>>
cursor, int batchSize) {
+        super(interopCtx, cursor, batchSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void write(PortableRawWriterEx writer, List vals) {
+        assert vals != null;
+
+        writer.writeInt(vals.size());
+
+        for (Object val : vals)
+            writer.writeObjectDetached(val);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8529e108/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
new file mode 100644
index 0000000..cc96d6f
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.platform.*;
+
+import javax.cache.*;
+
+/**
+ * Interop cursor for regular queries.
+ */
+public class PlatformQueryCursor extends PlatformAbstractQueryCursor<Cache.Entry> {
+    /**
+     * Constructor.
+     *
+     * @param interopCtx Interop context.
+     * @param cursor Backing cursor.
+     * @param batchSize Batch size.
+     */
+    public PlatformQueryCursor(PlatformContext interopCtx, QueryCursorEx<Cache.Entry>
cursor, int batchSize) {
+        super(interopCtx, cursor, batchSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void write(PortableRawWriterEx writer, Cache.Entry val) {
+        writer.writeObjectDetached(val.getKey());
+        writer.writeObjectDetached(val.getValue());
+    }
+}


Mime
View raw message