Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A3DD718C77 for ; Fri, 20 Nov 2015 03:35:19 +0000 (UTC) Received: (qmail 82014 invoked by uid 500); 20 Nov 2015 03:35:19 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 81985 invoked by uid 500); 20 Nov 2015 03:35:19 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 81976 invoked by uid 99); 20 Nov 2015 03:35:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Nov 2015 03:35:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1E541E33D7; Fri, 20 Nov 2015 03:35:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vkulichenko@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: Direct marshalling Date: Fri, 20 Nov 2015 03:35:19 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/ignite-direct-marsh-opt d9ab919dd -> a34cb3765 Direct marshalling Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a34cb376 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a34cb376 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a34cb376 Branch: refs/heads/ignite-direct-marsh-opt Commit: a34cb37655fa71f02cc42bf3f4678814763b2aea Parents: d9ab919 Author: Valentin Kulichenko Authored: Wed Nov 18 16:42:25 2015 -0800 Committer: Valentin Kulichenko Committed: Thu Nov 19 19:22:32 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectMessageReader.java | 109 ++++++++++---- .../direct/DirectMessageReaderState.java | 150 ------------------- .../internal/direct/DirectMessageWriter.java | 43 ++++-- .../direct/DirectMessageWriterState.java | 123 --------------- .../direct/state/DirectMessageState.java | 98 ++++++++++++ .../direct/state/DirectMessageStateItem.java | 25 ++++ 6 files changed, 232 insertions(+), 316 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index e7db846..e0b7b22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -22,7 +22,12 @@ import java.util.BitSet; import java.util.Collection; import java.util.Map; import java.util.UUID; +import org.apache.ignite.internal.direct.state.DirectMessageState; +import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; +import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1; +import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2; +import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -35,7 +40,7 @@ import org.jetbrains.annotations.Nullable; */ public class DirectMessageReader implements MessageReader { /** State. */ - private final DirectMessageReaderState state; + private final DirectMessageState state; /** Whether last field was fully read. */ private boolean lastRead; @@ -44,13 +49,17 @@ public class DirectMessageReader implements MessageReader { * @param msgFactory Message factory. * @param protoVer Protocol version. */ - public DirectMessageReader(MessageFactory msgFactory, byte protoVer) { - state = new DirectMessageReaderState(msgFactory, protoVer); + public DirectMessageReader(final MessageFactory msgFactory, final byte protoVer) { + state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure() { + @Override public StateItem apply() { + return new StateItem(msgFactory, protoVer); + } + }); } /** {@inheritDoc} */ @Override public void setBuffer(ByteBuffer buf) { - state.stream().setBuffer(buf); + state.item().stream.setBuffer(buf); } /** {@inheritDoc} */ @@ -71,7 +80,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public byte readByte(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; byte val = stream.readByte(); @@ -82,7 +91,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public short readShort(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; short val = stream.readShort(); @@ -93,7 +102,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public int readInt(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; int val = stream.readInt(); @@ -104,7 +113,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public long readLong(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; long val = stream.readLong(); @@ -115,7 +124,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public float readFloat(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; float val = stream.readFloat(); @@ -126,7 +135,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public double readDouble(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; double val = stream.readDouble(); @@ -137,7 +146,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public char readChar(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; char val = stream.readChar(); @@ -148,7 +157,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public boolean readBoolean(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; boolean val = stream.readBoolean(); @@ -159,7 +168,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public byte[] readByteArray(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; byte[] arr = stream.readByteArray(); @@ -170,7 +179,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public short[] readShortArray(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; short[] arr = stream.readShortArray(); @@ -181,7 +190,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public int[] readIntArray(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; int[] arr = stream.readIntArray(); @@ -192,7 +201,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public long[] readLongArray(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; long[] arr = stream.readLongArray(); @@ -203,7 +212,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public float[] readFloatArray(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; float[] arr = stream.readFloatArray(); @@ -214,7 +223,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public double[] readDoubleArray(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; double[] arr = stream.readDoubleArray(); @@ -225,7 +234,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public char[] readCharArray(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; char[] arr = stream.readCharArray(); @@ -236,7 +245,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public boolean[] readBooleanArray(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; boolean[] arr = stream.readBooleanArray(); @@ -247,7 +256,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public String readString(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; String val = stream.readString(); @@ -258,7 +267,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public BitSet readBitSet(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; BitSet val = stream.readBitSet(); @@ -269,7 +278,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public UUID readUuid(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; UUID val = stream.readUuid(); @@ -280,7 +289,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public IgniteUuid readIgniteUuid(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; IgniteUuid val = stream.readIgniteUuid(); @@ -291,7 +300,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public T readMessage(String name) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; T msg = stream.readMessage(this); @@ -302,7 +311,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public T[] readObjectArray(String name, MessageCollectionItemType itemType, Class itemCls) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; T[] msg = stream.readObjectArray(itemType, itemCls, this); @@ -313,7 +322,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public > C readCollection(String name, MessageCollectionItemType itemType) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; C col = stream.readCollection(itemType, this); @@ -325,7 +334,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public > M readMap(String name, MessageCollectionItemType keyType, MessageCollectionItemType valType, boolean linked) { - DirectByteBufferStream stream = state.stream(); + DirectByteBufferStream stream = state.item().stream; M map = stream.readMap(keyType, valType, linked, this); @@ -341,26 +350,62 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public int state() { - return state.state(); + return state.item().state; } /** {@inheritDoc} */ @Override public void incrementState() { - state.incrementState(); + state.item().state++; } /** {@inheritDoc} */ @Override public void beforeInnerMessageRead() { - state.beforeInnerMessageRead(); + state.forward(); } /** {@inheritDoc} */ @Override public void afterInnerMessageRead(boolean finished) { - state.afterInnerMessageRead(finished); + state.backward(finished); } /** {@inheritDoc} */ @Override public void reset() { state.reset(); } + + /** + */ + private static class StateItem implements DirectMessageStateItem { + /** Stream. */ + private final DirectByteBufferStream stream; + + /** State. */ + private int state; + + /** + * @param msgFactory Message factory. + * @param protoVer Protocol version. + */ + public StateItem(MessageFactory msgFactory, byte protoVer) { + switch (protoVer) { + case 1: + stream = new DirectByteBufferStreamImplV1(msgFactory); + + break; + + case 2: + stream = new DirectByteBufferStreamImplV2(msgFactory); + + break; + + default: + throw new IllegalStateException("Invalid protocol version: " + protoVer); + } + } + + /** {@inheritDoc} */ + @Override public void reset() { + state = 0; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java deleted file mode 100644 index 1b02213..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.direct; - -import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; -import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1; -import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2; -import org.apache.ignite.plugin.extensions.communication.MessageFactory; - -/** - * Writer state. - */ -public class DirectMessageReaderState { - /** Initial array size. */ - private static final int INIT_SIZE = 10; - - /** Message factory. */ - private final MessageFactory msgFactory; - - /** Protocol version. */ - private final byte protoVer; - - /** Stack array. */ - private StateItem[] stack; - - /** Current position. */ - private int pos; - - /** - * @param msgFactory Message factory. - * @param protoVer Protocol version. - */ - public DirectMessageReaderState(MessageFactory msgFactory, byte protoVer) { - this.msgFactory = msgFactory; - this.protoVer = protoVer; - - stack = new StateItem[INIT_SIZE]; - - stack[0] = new StateItem(msgFactory, protoVer); - } - - /** - * @return Current state. - */ - public int state() { - return stack[pos].state; - } - - /** - * Increments state. - */ - public void incrementState() { - stack[pos].state++; - } - - /** - * @return Current stream. - */ - public DirectByteBufferStream stream() { - return stack[pos].stream; - } - - /** - * Callback called before inner message is written. - */ - public void beforeInnerMessageRead() { - pos++; - - // Growing never happen for Ignite messages, but we need - // to support it for custom messages from plugins. - if (pos == stack.length) { - StateItem[] stack0 = stack; - - stack = new StateItem[stack.length << 1]; - - System.arraycopy(stack0, 0, stack, 0, stack0.length); - } - - if (stack[pos] == null) - stack[pos] = new StateItem(msgFactory, protoVer); - } - - /** - * Callback called after inner message is written. - * - * @param finished Whether message was fully written. - */ - public void afterInnerMessageRead(boolean finished) { - if (finished) - stack[pos].state = 0; - - pos--; - } - - /** - * Resets state. - */ - public void reset() { - assert pos == 0; - - stack[0].state = 0; - } - - /** - * State item. - */ - private static class StateItem { - /** Stream. */ - private final DirectByteBufferStream stream; - - /** State. */ - private int state; - - /** - * @param msgFactory Message factory. - * @param protoVer Protocol version. - */ - public StateItem(MessageFactory msgFactory, byte protoVer) { - switch (protoVer) { - case 1: - stream = new DirectByteBufferStreamImplV1(msgFactory); - - break; - - case 2: - stream = new DirectByteBufferStreamImplV2(msgFactory); - - break; - - default: - throw new IllegalStateException("Invalid protocol version: " + protoVer); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index 07a037e..ad122ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -22,9 +22,12 @@ import java.util.BitSet; import java.util.Collection; import java.util.Map; import java.util.UUID; +import org.apache.ignite.internal.direct.state.DirectMessageState; +import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1; import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2; +import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -39,7 +42,12 @@ public class DirectMessageWriter implements MessageWriter { private final DirectByteBufferStream stream; /** State. */ - private final DirectMessageWriterState state = new DirectMessageWriterState(); + private final DirectMessageState state = new DirectMessageState<>(StateItem.class, + new IgniteOutClosure() { + @Override public StateItem apply() { + return new StateItem(); + } + }); /** * @param protoVer Protocol version. @@ -236,10 +244,7 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeCollection(String name, Collection col, MessageCollectionItemType itemType) { -// if (col instanceof List && col instanceof RandomAccess) -// stream.writeRandomAccessList((List)col, itemType, this); -// else - stream.writeCollection(col, itemType, this); + stream.writeCollection(col, itemType, this); return stream.lastFinished(); } @@ -254,36 +259,52 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean isHeaderWritten() { - return state.isTypeWritten(); + return state.item().hdrWritten; } /** {@inheritDoc} */ @Override public void onHeaderWritten() { - state.onTypeWritten(); + state.item().hdrWritten = true; } /** {@inheritDoc} */ @Override public int state() { - return state.state(); + return state.item().state; } /** {@inheritDoc} */ @Override public void incrementState() { - state.incrementState(); + state.item().state++; } /** {@inheritDoc} */ @Override public void beforeInnerMessageWrite() { - state.beforeInnerMessageWrite(); + state.forward(); } /** {@inheritDoc} */ @Override public void afterInnerMessageWrite(boolean finished) { - state.afterInnerMessageWrite(finished); + state.backward(finished); } /** {@inheritDoc} */ @Override public void reset() { state.reset(); } + + /** + */ + private static class StateItem implements DirectMessageStateItem { + /** */ + private int state; + + /** */ + private boolean hdrWritten; + + /** {@inheritDoc} */ + @Override public void reset() { + state = 0; + hdrWritten = false; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java deleted file mode 100644 index 7b4cd9e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.direct; - -import java.util.Arrays; - -/** - * Writer state. - */ -public class DirectMessageWriterState { - /** Initial array size. */ - private static final int INIT_SIZE = 10; - - /** Initial value. */ - private static final int INIT_VAL = -1; - - /** Stack array. */ - private int[] stack; - - /** Current position. */ - private int pos; - - /** - * - */ - public DirectMessageWriterState() { - stack = new int[INIT_SIZE]; - - Arrays.fill(stack, INIT_VAL); - } - - /** - * @return Whether type is written. - */ - public boolean isTypeWritten() { - return stack[pos] >= 0; - } - - /** - * Callback called after type is written. - */ - public void onTypeWritten() { - assert stack[pos] == -1; - - stack[pos] = 0; - } - - /** - * @return Current state. - */ - public int state() { - return stack[pos]; - } - - /** - * Increments state. - */ - public void incrementState() { - stack[pos]++; - } - - /** - * @param val New state value. - */ - protected void setState(int val) { - stack[pos] = val; - } - - /** - * Callback called before inner message is written. - */ - public void beforeInnerMessageWrite() { - pos++; - - // Growing never happen for Ignite messages, but we need - // to support it for custom messages from plugins. - if (pos == stack.length) { - int[] stack0 = stack; - - stack = new int[stack.length << 1]; - - System.arraycopy(stack0, 0, stack, 0, stack0.length); - - Arrays.fill(stack, stack0.length, stack.length, INIT_VAL); - } - } - - /** - * Callback called after inner message is written. - * - * @param finished Whether message was fully written. - */ - public void afterInnerMessageWrite(boolean finished) { - if (finished) - stack[pos] = INIT_VAL; - - pos--; - } - - /** - * Resets state. - */ - public void reset() { - assert pos == 0; - - stack[0] = INIT_VAL; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java new file mode 100644 index 0000000..15aa92b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.direct.state; + +import java.lang.reflect.Array; +import org.apache.ignite.lang.IgniteOutClosure; + +/** + * Writer state. + */ +public class DirectMessageState { + /** Initial array size. */ + private static final int INIT_SIZE = 10; + + /** Item factory. */ + private final IgniteOutClosure factory; + + /** Stack array. */ + private T[] stack; + + /** Current position. */ + private int pos; + + /** + * @param cls State item type. + * @param factory Item factory. + */ + @SuppressWarnings("unchecked") + public DirectMessageState(Class cls, IgniteOutClosure factory) { + this.factory = factory; + + stack = (T[])Array.newInstance(cls, INIT_SIZE); + + stack[0] = factory.apply(); + } + + /** + * @return Current item. + */ + public T item() { + return stack[pos]; + } + + /** + * Go forward. + */ + @SuppressWarnings("unchecked") + public void forward() { + pos++; + + if (pos == stack.length) { + T[] stack0 = stack; + + stack = (T[])Array.newInstance(stack.getClass().getComponentType(), stack.length << 1); + + System.arraycopy(stack0, 0, stack, 0, stack0.length); + } + + if (stack[pos] == null) + stack[pos] = factory.apply(); + } + + /** + * Go backward. + * + * @param reset Whether to reset current item. + */ + public void backward(boolean reset) { + if (reset) + stack[pos].reset(); + + pos--; + } + + /** + * Resets state. + */ + public void reset() { + assert pos == 0; + + stack[0].reset(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a34cb376/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java new file mode 100644 index 0000000..50a4879 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.direct.state; + +/** + * TODO + */ +public interface DirectMessageStateItem { + public void reset(); +}