ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject ignite git commit: Direct marshalling
Date Fri, 20 Nov 2015 03:35:19 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-direct-marsh-opt d9ab919dd -> a34cb3765


Direct marshalling


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a34cb376
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a34cb376
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a34cb376

Branch: refs/heads/ignite-direct-marsh-opt
Commit: a34cb37655fa71f02cc42bf3f4678814763b2aea
Parents: d9ab919
Author: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Authored: Wed Nov 18 16:42:25 2015 -0800
Committer: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Committed: Thu Nov 19 19:22:32 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectMessageReader.java    | 109 ++++++++++----
 .../direct/DirectMessageReaderState.java        | 150 -------------------
 .../internal/direct/DirectMessageWriter.java    |  43 ++++--
 .../direct/DirectMessageWriterState.java        | 123 ---------------
 .../direct/state/DirectMessageState.java        |  98 ++++++++++++
 .../direct/state/DirectMessageStateItem.java    |  25 ++++
 6 files changed, 232 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index e7db846..e0b7b22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -22,7 +22,12 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.internal.direct.state.DirectMessageState;
+import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
+import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
+import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -35,7 +40,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public class DirectMessageReader implements MessageReader {
     /** State. */
-    private final DirectMessageReaderState state;
+    private final DirectMessageState<StateItem> state;
 
     /** Whether last field was fully read. */
     private boolean lastRead;
@@ -44,13 +49,17 @@ public class DirectMessageReader implements MessageReader {
      * @param msgFactory Message factory.
      * @param protoVer Protocol version.
      */
-    public DirectMessageReader(MessageFactory msgFactory, byte protoVer) {
-        state = new DirectMessageReaderState(msgFactory, protoVer);
+    public DirectMessageReader(final MessageFactory msgFactory, final byte protoVer) {
+        state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>()
{
+            @Override public StateItem apply() {
+                return new StateItem(msgFactory, protoVer);
+            }
+        });
     }
 
     /** {@inheritDoc} */
     @Override public void setBuffer(ByteBuffer buf) {
-        state.stream().setBuffer(buf);
+        state.item().stream.setBuffer(buf);
     }
 
     /** {@inheritDoc} */
@@ -71,7 +80,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public byte readByte(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         byte val = stream.readByte();
 
@@ -82,7 +91,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public short readShort(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         short val = stream.readShort();
 
@@ -93,7 +102,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public int readInt(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         int val = stream.readInt();
 
@@ -104,7 +113,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public long readLong(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         long val = stream.readLong();
 
@@ -115,7 +124,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public float readFloat(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         float val = stream.readFloat();
 
@@ -126,7 +135,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public double readDouble(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         double val = stream.readDouble();
 
@@ -137,7 +146,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public char readChar(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         char val = stream.readChar();
 
@@ -148,7 +157,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public boolean readBoolean(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         boolean val = stream.readBoolean();
 
@@ -159,7 +168,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public byte[] readByteArray(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         byte[] arr = stream.readByteArray();
 
@@ -170,7 +179,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public short[] readShortArray(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         short[] arr = stream.readShortArray();
 
@@ -181,7 +190,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public int[] readIntArray(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         int[] arr = stream.readIntArray();
 
@@ -192,7 +201,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public long[] readLongArray(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         long[] arr = stream.readLongArray();
 
@@ -203,7 +212,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public float[] readFloatArray(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         float[] arr = stream.readFloatArray();
 
@@ -214,7 +223,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public double[] readDoubleArray(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         double[] arr = stream.readDoubleArray();
 
@@ -225,7 +234,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public char[] readCharArray(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         char[] arr = stream.readCharArray();
 
@@ -236,7 +245,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public boolean[] readBooleanArray(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         boolean[] arr = stream.readBooleanArray();
 
@@ -247,7 +256,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public String readString(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         String val = stream.readString();
 
@@ -258,7 +267,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public BitSet readBitSet(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         BitSet val = stream.readBitSet();
 
@@ -269,7 +278,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public UUID readUuid(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         UUID val = stream.readUuid();
 
@@ -280,7 +289,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public IgniteUuid readIgniteUuid(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         IgniteUuid val = stream.readIgniteUuid();
 
@@ -291,7 +300,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public <T extends Message> T readMessage(String name) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         T msg = stream.readMessage(this);
 
@@ -302,7 +311,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public <T> T[] readObjectArray(String name, MessageCollectionItemType
itemType, Class<T> itemCls) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         T[] msg = stream.readObjectArray(itemType, itemCls, this);
 
@@ -313,7 +322,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public <C extends Collection<?>> C readCollection(String name,
MessageCollectionItemType itemType) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         C col = stream.readCollection(itemType, this);
 
@@ -325,7 +334,7 @@ public class DirectMessageReader implements MessageReader {
     /** {@inheritDoc} */
     @Override public <M extends Map<?, ?>> M readMap(String name, MessageCollectionItemType
keyType,
         MessageCollectionItemType valType, boolean linked) {
-        DirectByteBufferStream stream = state.stream();
+        DirectByteBufferStream stream = state.item().stream;
 
         M map = stream.readMap(keyType, valType, linked, this);
 
@@ -341,26 +350,62 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public int state() {
-        return state.state();
+        return state.item().state;
     }
 
     /** {@inheritDoc} */
     @Override public void incrementState() {
-        state.incrementState();
+        state.item().state++;
     }
 
     /** {@inheritDoc} */
     @Override public void beforeInnerMessageRead() {
-        state.beforeInnerMessageRead();
+        state.forward();
     }
 
     /** {@inheritDoc} */
     @Override public void afterInnerMessageRead(boolean finished) {
-        state.afterInnerMessageRead(finished);
+        state.backward(finished);
     }
 
     /** {@inheritDoc} */
     @Override public void reset() {
         state.reset();
     }
+
+    /**
+     */
+    private static class StateItem implements DirectMessageStateItem {
+        /** Stream. */
+        private final DirectByteBufferStream stream;
+
+        /** State. */
+        private int state;
+
+        /**
+         * @param msgFactory Message factory.
+         * @param protoVer Protocol version.
+         */
+        public StateItem(MessageFactory msgFactory, byte protoVer) {
+            switch (protoVer) {
+                case 1:
+                    stream = new DirectByteBufferStreamImplV1(msgFactory);
+
+                    break;
+
+                case 2:
+                    stream = new DirectByteBufferStreamImplV2(msgFactory);
+
+                    break;
+
+                default:
+                    throw new IllegalStateException("Invalid protocol version: " + protoVer);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            state = 0;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
deleted file mode 100644
index 1b02213..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.direct;
-
-import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
-import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
-import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-
-/**
- * Writer state.
- */
-public class DirectMessageReaderState {
-    /** Initial array size. */
-    private static final int INIT_SIZE = 10;
-
-    /** Message factory. */
-    private final MessageFactory msgFactory;
-
-    /** Protocol version. */
-    private final byte protoVer;
-
-    /** Stack array. */
-    private StateItem[] stack;
-
-    /** Current position. */
-    private int pos;
-
-    /**
-     * @param msgFactory Message factory.
-     * @param protoVer Protocol version.
-     */
-    public DirectMessageReaderState(MessageFactory msgFactory, byte protoVer) {
-        this.msgFactory = msgFactory;
-        this.protoVer = protoVer;
-
-        stack = new StateItem[INIT_SIZE];
-
-        stack[0] = new StateItem(msgFactory, protoVer);
-    }
-
-    /**
-     * @return Current state.
-     */
-    public int state() {
-        return stack[pos].state;
-    }
-
-    /**
-     * Increments state.
-     */
-    public void incrementState() {
-        stack[pos].state++;
-    }
-
-    /**
-     * @return Current stream.
-     */
-    public DirectByteBufferStream stream() {
-        return stack[pos].stream;
-    }
-
-    /**
-     * Callback called before inner message is written.
-     */
-    public void beforeInnerMessageRead() {
-        pos++;
-
-        // Growing never happen for Ignite messages, but we need
-        // to support it for custom messages from plugins.
-        if (pos == stack.length) {
-            StateItem[] stack0 = stack;
-
-            stack = new StateItem[stack.length << 1];
-
-            System.arraycopy(stack0, 0, stack, 0, stack0.length);
-        }
-
-        if (stack[pos] == null)
-            stack[pos] = new StateItem(msgFactory, protoVer);
-    }
-
-    /**
-     * Callback called after inner message is written.
-     *
-     * @param finished Whether message was fully written.
-     */
-    public void afterInnerMessageRead(boolean finished) {
-        if (finished)
-            stack[pos].state = 0;
-
-        pos--;
-    }
-
-    /**
-     * Resets state.
-     */
-    public void reset() {
-        assert pos == 0;
-
-        stack[0].state = 0;
-    }
-
-    /**
-     * State item.
-     */
-    private static class StateItem {
-        /** Stream. */
-        private final DirectByteBufferStream stream;
-
-        /** State. */
-        private int state;
-
-        /**
-         * @param msgFactory Message factory.
-         * @param protoVer Protocol version.
-         */
-        public StateItem(MessageFactory msgFactory, byte protoVer) {
-            switch (protoVer) {
-                case 1:
-                    stream = new DirectByteBufferStreamImplV1(msgFactory);
-
-                    break;
-
-                case 2:
-                    stream = new DirectByteBufferStreamImplV2(msgFactory);
-
-                    break;
-
-                default:
-                    throw new IllegalStateException("Invalid protocol version: " + protoVer);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index 07a037e..ad122ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -22,9 +22,12 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.internal.direct.state.DirectMessageState;
+import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
 import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
+import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -39,7 +42,12 @@ public class DirectMessageWriter implements MessageWriter {
     private final DirectByteBufferStream stream;
 
     /** State. */
-    private final DirectMessageWriterState state = new DirectMessageWriterState();
+    private final DirectMessageState<StateItem> state = new DirectMessageState<>(StateItem.class,
+        new IgniteOutClosure<StateItem>() {
+            @Override public StateItem apply() {
+                return new StateItem();
+            }
+        });
 
     /**
      * @param protoVer Protocol version.
@@ -236,10 +244,7 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public <T> boolean writeCollection(String name, Collection<T> col,
MessageCollectionItemType itemType) {
-//        if (col instanceof List && col instanceof RandomAccess)
-//            stream.writeRandomAccessList((List<T>)col, itemType, this);
-//        else
-            stream.writeCollection(col, itemType, this);
+        stream.writeCollection(col, itemType, this);
 
         return stream.lastFinished();
     }
@@ -254,36 +259,52 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean isHeaderWritten() {
-        return state.isTypeWritten();
+        return state.item().hdrWritten;
     }
 
     /** {@inheritDoc} */
     @Override public void onHeaderWritten() {
-        state.onTypeWritten();
+        state.item().hdrWritten = true;
     }
 
     /** {@inheritDoc} */
     @Override public int state() {
-        return state.state();
+        return state.item().state;
     }
 
     /** {@inheritDoc} */
     @Override public void incrementState() {
-        state.incrementState();
+        state.item().state++;
     }
 
     /** {@inheritDoc} */
     @Override public void beforeInnerMessageWrite() {
-        state.beforeInnerMessageWrite();
+        state.forward();
     }
 
     /** {@inheritDoc} */
     @Override public void afterInnerMessageWrite(boolean finished) {
-        state.afterInnerMessageWrite(finished);
+        state.backward(finished);
     }
 
     /** {@inheritDoc} */
     @Override public void reset() {
         state.reset();
     }
+
+    /**
+     */
+    private static class StateItem implements DirectMessageStateItem {
+        /** */
+        private int state;
+
+        /** */
+        private boolean hdrWritten;
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            state = 0;
+            hdrWritten = false;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java
deleted file mode 100644
index 7b4cd9e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.direct;
-
-import java.util.Arrays;
-
-/**
- * Writer state.
- */
-public class DirectMessageWriterState {
-    /** Initial array size. */
-    private static final int INIT_SIZE = 10;
-
-    /** Initial value. */
-    private static final int INIT_VAL = -1;
-
-    /** Stack array. */
-    private int[] stack;
-
-    /** Current position. */
-    private int pos;
-
-    /**
-     *
-     */
-    public DirectMessageWriterState() {
-        stack = new int[INIT_SIZE];
-
-        Arrays.fill(stack, INIT_VAL);
-    }
-
-    /**
-     * @return Whether type is written.
-     */
-    public boolean isTypeWritten() {
-        return stack[pos] >= 0;
-    }
-
-    /**
-     * Callback called after type is written.
-     */
-    public void onTypeWritten() {
-        assert stack[pos] == -1;
-
-        stack[pos] = 0;
-    }
-
-    /**
-     * @return Current state.
-     */
-    public int state() {
-        return stack[pos];
-    }
-
-    /**
-     * Increments state.
-     */
-    public void incrementState() {
-        stack[pos]++;
-    }
-
-    /**
-     * @param val New state value.
-     */
-    protected void setState(int val) {
-        stack[pos] = val;
-    }
-
-    /**
-     * Callback called before inner message is written.
-     */
-    public void beforeInnerMessageWrite() {
-        pos++;
-
-        // Growing never happen for Ignite messages, but we need
-        // to support it for custom messages from plugins.
-        if (pos == stack.length) {
-            int[] stack0 = stack;
-
-            stack = new int[stack.length << 1];
-
-            System.arraycopy(stack0, 0, stack, 0, stack0.length);
-
-            Arrays.fill(stack, stack0.length, stack.length, INIT_VAL);
-        }
-    }
-
-    /**
-     * Callback called after inner message is written.
-     *
-     * @param finished Whether message was fully written.
-     */
-    public void afterInnerMessageWrite(boolean finished) {
-        if (finished)
-            stack[pos] = INIT_VAL;
-
-        pos--;
-    }
-
-    /**
-     * Resets state.
-     */
-    public void reset() {
-        assert pos == 0;
-
-        stack[0] = INIT_VAL;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
new file mode 100644
index 0000000..15aa92b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct.state;
+
+import java.lang.reflect.Array;
+import org.apache.ignite.lang.IgniteOutClosure;
+
+/**
+ * Writer state.
+ */
+public class DirectMessageState<T extends DirectMessageStateItem> {
+    /** Initial array size. */
+    private static final int INIT_SIZE = 10;
+
+    /** Item factory. */
+    private final IgniteOutClosure<T> factory;
+
+    /** Stack array. */
+    private T[] stack;
+
+    /** Current position. */
+    private int pos;
+
+    /**
+     * @param cls State item type.
+     * @param factory Item factory.
+     */
+    @SuppressWarnings("unchecked")
+    public DirectMessageState(Class<T> cls, IgniteOutClosure<T> factory) {
+        this.factory = factory;
+
+        stack = (T[])Array.newInstance(cls, INIT_SIZE);
+
+        stack[0] = factory.apply();
+    }
+
+    /**
+     * @return Current item.
+     */
+    public T item() {
+        return stack[pos];
+    }
+
+    /**
+     * Go forward.
+     */
+    @SuppressWarnings("unchecked")
+    public void forward() {
+        pos++;
+
+        if (pos == stack.length) {
+            T[] stack0 = stack;
+
+            stack = (T[])Array.newInstance(stack.getClass().getComponentType(), stack.length
<< 1);
+
+            System.arraycopy(stack0, 0, stack, 0, stack0.length);
+        }
+
+        if (stack[pos] == null)
+            stack[pos] = factory.apply();
+    }
+
+    /**
+     * Go backward.
+     *
+     * @param reset Whether to reset current item.
+     */
+    public void backward(boolean reset) {
+        if (reset)
+            stack[pos].reset();
+
+        pos--;
+    }
+
+    /**
+     * Resets state.
+     */
+    public void reset() {
+        assert pos == 0;
+
+        stack[0].reset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java
new file mode 100644
index 0000000..50a4879
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct.state;
+
+/**
+ * TODO
+ */
+public interface DirectMessageStateItem {
+    public void reset();
+}


Mime
View raw message