ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [7/9] incubator-ignite git commit: IGNITE-61 - Direct marshalling
Date Mon, 09 Feb 2015 02:01:43 GMT
IGNITE-61 - Direct marshalling


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1c19aa4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1c19aa4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1c19aa4a

Branch: refs/heads/ignite-61
Commit: 1c19aa4a1448ccf54241ca6ecacc082d2d42af32
Parents: f62ee2d
Author: Valentin Kulichenko <vkulichenko@gridgain.com>
Authored: Sun Feb 8 17:56:25 2015 -0800
Committer: Valentin Kulichenko <vkulichenko@gridgain.com>
Committed: Sun Feb 8 17:56:25 2015 -0800

----------------------------------------------------------------------
 .../ignite/internal/GridPluginContext.java      |   6 -
 .../apache/ignite/internal/IgniteKernal.java    |   2 -
 .../internal/direct/DirectByteBufferStream.java |   4 +-
 .../internal/direct/DirectMessageReader.java    |   2 +-
 .../GridTcpCommunicationMessageFactory.java     | 419 --------------
 .../GridTcpCommunicationMessageProducer.java    |  33 --
 .../internal/direct/GridTcpMessageFactory.java  |  31 -
 .../internal/managers/GridManagerAdapter.java   |   3 +-
 .../managers/communication/GridIoManager.java   |  79 +--
 .../communication/GridIoMessageFactory.java     | 570 +++++++++++++++++++
 .../internal/util/nio/GridDirectParser.java     |   3 +-
 .../internal/util/nio/GridNioMessageReader.java |   3 +-
 .../ignite/internal/util/nio/GridNioServer.java |   2 +-
 .../org/apache/ignite/plugin/PluginContext.java |   7 -
 .../communication/MessageFactory.java           |  31 +
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   3 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |   3 +-
 .../GridCommunicationSendMessageSelfTest.java   |   7 +-
 .../GridAbstractCommunicationSelfTest.java      |   8 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |  10 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |   8 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |  10 +-
 ...GridTcpCommunicationSpiRecoverySelfTest.java |   9 +-
 .../testframework/GridSpiTestContext.java       |  14 +-
 24 files changed, 648 insertions(+), 619 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginContext.java
index 24e080e..31a4246 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginContext.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.direct.*;
 import org.apache.ignite.plugin.*;
 import org.apache.ignite.spi.*;
 
@@ -93,9 +92,4 @@ public class GridPluginContext implements PluginContext {
     @Override public void deregisterPorts(Class<?> cls) {
         ctx.ports().deregisterPorts(cls);
     }
-
-    /** {@inheritDoc} */
-    @Override public byte registerMessageProducer(GridTcpCommunicationMessageProducer producer) {
-        return ctx.io().registerMessageProducer(producer);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 0330322..8bc846a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -768,8 +768,6 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit
                 provider.start(ctx.plugins().pluginContextForProvider(provider), attrs);
             }
 
-            ctx.io().initMessageFactory();
-
             if (ctx.isEnterprise()) {
                 security = new GridSecurityImpl(ctx);
                 portables = new GridPortablesImpl(ctx);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
index e35ab69..1b8b5ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
@@ -259,7 +259,7 @@ public class DirectByteBufferStream {
     }
 
     /** */
-    private final GridTcpMessageFactory msgFactory;
+    private final MessageFactory msgFactory;
 
     /** */
     private ByteBuffer buf;
@@ -318,7 +318,7 @@ public class DirectByteBufferStream {
     /**
      * @param msgFactory Message factory.
      */
-    public DirectByteBufferStream(@Nullable GridTcpMessageFactory msgFactory) {
+    public DirectByteBufferStream(@Nullable MessageFactory msgFactory) {
         this.msgFactory = msgFactory;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index 71b9e63..15f113b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -37,7 +37,7 @@ public class DirectMessageReader implements MessageReader {
     /**
      * @param msgFactory Message factory.
      */
-    public DirectMessageReader(GridTcpMessageFactory msgFactory) {
+    public DirectMessageReader(MessageFactory msgFactory) {
         this.stream = new DirectByteBufferStream(msgFactory);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageFactory.java
deleted file mode 100644
index 600bd31..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageFactory.java
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.direct;
-
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.managers.checkpoint.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.query.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.clock.*;
-import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
-import org.apache.ignite.internal.processors.fs.*;
-import org.apache.ignite.internal.processors.rest.client.message.*;
-import org.apache.ignite.internal.processors.rest.handlers.task.*;
-import org.apache.ignite.internal.processors.rest.protocols.tcp.*;
-import org.apache.ignite.internal.processors.streamer.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.apache.ignite.spi.collision.jobstealing.*;
-import org.apache.ignite.spi.communication.tcp.*;
-import org.jdk8.backport.*;
-
-import java.util.*;
-
-/**
- * Communication message factory.
- */
-public class GridTcpCommunicationMessageFactory {
-    /** Common message producers. */
-    private static final GridTcpCommunicationMessageProducer[] COMMON = new GridTcpCommunicationMessageProducer[89];
-
-    /**
-     * Custom messages registry. Used for test purposes.
-     */
-    private static final Map<Byte, GridTcpCommunicationMessageProducer> CUSTOM = new ConcurrentHashMap8<>();
-
-    /** */
-    public static final int MAX_COMMON_TYPE = 88;
-
-    static {
-        registerCommon(new GridTcpCommunicationMessageProducer() {
-            @Override public MessageAdapter create(byte type) {
-                switch (type) {
-                    case 0:
-                        return new GridJobCancelRequest();
-
-                    case 1:
-                        return new GridJobExecuteRequest();
-
-                    case 2:
-                        return new GridJobExecuteResponse();
-
-                    case 3:
-                        return new GridJobSiblingsRequest();
-
-                    case 4:
-                        return new GridJobSiblingsResponse();
-
-                    case 5:
-                        return new GridTaskCancelRequest();
-
-                    case 6:
-                        return new GridTaskSessionRequest();
-
-                    case 7:
-                        return new GridCheckpointRequest();
-
-                    case 8:
-                        return new GridIoMessage();
-
-                    case 9:
-                        return new GridIoUserMessage();
-
-                    case 10:
-                        return new GridDeploymentInfoBean();
-
-                    case 11:
-                        return new GridDeploymentRequest();
-
-                    case 12:
-                        return new GridDeploymentResponse();
-
-                    case 13:
-                        return new GridEventStorageMessage();
-
-                    case 14:
-                        return new GridCacheEvictionRequest();
-
-                    case 15:
-                        return new GridCacheEvictionResponse();
-
-                    case 16:
-                        return new GridCacheOptimisticCheckPreparedTxRequest();
-
-                    case 17:
-                        return new GridCacheOptimisticCheckPreparedTxResponse();
-
-                    case 18:
-                        return new GridCachePessimisticCheckCommittedTxRequest();
-
-                    case 19:
-                        return new GridCachePessimisticCheckCommittedTxResponse();
-
-                    case 20:
-                        return new GridCacheTtlUpdateRequest();
-
-                    case 21:
-                        return new GridDistributedLockRequest();
-
-                    case 22:
-                        return new GridDistributedLockResponse();
-
-                    case 23:
-                        return new GridDistributedTxFinishRequest();
-
-                    case 24:
-                        return new GridDistributedTxFinishResponse();
-
-                    case 25:
-                        return new GridDistributedTxPrepareRequest();
-
-                    case 26:
-                        return new GridDistributedTxPrepareResponse();
-
-                    case 27:
-                        return new GridDistributedUnlockRequest();
-
-                    case 28:
-                        return new GridDhtAffinityAssignmentRequest();
-
-                    case 29:
-                        return new GridDhtAffinityAssignmentResponse();
-
-                    case 30:
-                        return new GridDhtLockRequest();
-
-                    case 31:
-                        return new GridDhtLockResponse();
-
-                    case 32:
-                        return new GridDhtTxFinishRequest();
-
-                    case 33:
-                        return new GridDhtTxFinishResponse();
-
-                    case 34:
-                        return new GridDhtTxPrepareRequest();
-
-                    case 35:
-                        return new GridDhtTxPrepareResponse();
-
-                    case 36:
-                        return new GridDhtUnlockRequest();
-
-                    case 37:
-                        return new GridDhtAtomicDeferredUpdateResponse();
-
-                    case 38:
-                        return new GridDhtAtomicUpdateRequest();
-
-                    case 39:
-                        return new GridDhtAtomicUpdateResponse();
-
-                    case 40:
-                        return new GridNearAtomicUpdateRequest();
-
-                    case 41:
-                        return new GridNearAtomicUpdateResponse();
-
-                    case 42:
-                        return new GridDhtForceKeysRequest();
-
-                    case 43:
-                        return new GridDhtForceKeysResponse();
-
-                    case 44:
-                        return new GridDhtPartitionDemandMessage();
-
-                    case 45:
-                        return new GridDhtPartitionSupplyMessage();
-
-                    case 46:
-                        return new GridDhtPartitionsFullMessage();
-
-                    case 47:
-                        return new GridDhtPartitionsSingleMessage();
-
-                    case 48:
-                        return new GridDhtPartitionsSingleRequest();
-
-                    case 49:
-                        return new GridNearGetRequest();
-
-                    case 50:
-                        return new GridNearGetResponse();
-
-                    case 51:
-                        return new GridNearLockRequest();
-
-                    case 52:
-                        return new GridNearLockResponse();
-
-                    case 53:
-                        return new GridNearTxFinishRequest();
-
-                    case 54:
-                        return new GridNearTxFinishResponse();
-
-                    case 55:
-                        return new GridNearTxPrepareRequest();
-
-                    case 56:
-                        return new GridNearTxPrepareResponse();
-
-                    case 57:
-                        return new GridNearUnlockRequest();
-
-                    case 58:
-                        return new GridCacheQueryRequest();
-
-                    case 59:
-                        return new GridCacheQueryResponse();
-
-                    case 60:
-                        return new GridClockDeltaSnapshotMessage();
-
-                    case 61:
-                        return new GridContinuousMessage();
-
-                    case 62:
-                        return new GridDataLoadRequest();
-
-                    case 63:
-                        return new GridDataLoadResponse();
-
-                    case 64:
-                        return new GridGgfsAckMessage();
-
-                    case 65:
-                        return new GridGgfsBlockKey();
-
-                    case 66:
-                        return new GridGgfsBlocksMessage();
-
-                    case 67:
-                        return new GridGgfsDeleteMessage();
-
-                    case 68:
-                        return new GridGgfsFileAffinityRange();
-
-                    case 69:
-                        return new GridGgfsFragmentizerRequest();
-
-                    case 70:
-                        return new GridGgfsFragmentizerResponse();
-
-                    case 71:
-                        return new GridGgfsSyncMessage();
-
-                    case 72:
-                        return new GridClientHandshakeRequestWrapper();
-
-                    case 73:
-                        return new GridClientHandshakeResponseWrapper();
-
-                    case 74:
-                        return new GridClientMessageWrapper();
-
-                    case 75:
-                        return new GridClientPingPacketWrapper();
-
-                    case 76:
-                        return new GridTaskResultRequest();
-
-                    case 77:
-                        return new GridTaskResultResponse();
-
-                    case 78:
-                        return new GridMemcachedMessageWrapper();
-
-                    case 79:
-                        return new GridStreamerCancelRequest();
-
-                    case 80:
-                        return new GridStreamerExecutionRequest();
-
-                    case 81:
-                        return new GridStreamerResponse();
-
-                    case 82:
-                        return new JobStealingRequest();
-
-                    case 83:
-                        return new GridClockDeltaVersion();
-
-                    case 84:
-                        return new GridByteArrayList();
-
-                    case 85:
-                        return new GridLongList();
-
-                    case 86:
-                        return new GridCacheVersion();
-
-                    case 87:
-                        return new GridDhtPartitionExchangeId();
-
-                    case 88:
-                        return new GridCacheValueBytes();
-
-                    default:
-                        assert false : "Invalid message type.";
-
-                        return null;
-                }
-            }
-        },  0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
-           20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
-           40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
-           60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79,
-           80, 81, 82, 83, 84, 85, 86, 87, 88);
-    }
-
-    /**
-     * @param type Message type.
-     * @return New message.
-     */
-    public static MessageAdapter create(byte type) {
-        if (type == TcpCommunicationSpi.NODE_ID_MSG_TYPE)
-            return new TcpCommunicationSpi.NodeIdMessage();
-        else if (type == TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE)
-            return new TcpCommunicationSpi.RecoveryLastReceivedMessage();
-        else if (type == TcpCommunicationSpi.HANDSHAKE_MSG_TYPE)
-            return new TcpCommunicationSpi.HandshakeMessage();
-        else
-            return create0(type);
-    }
-
-    /**
-     * @param type Message type.
-     * @return New message.
-     */
-    private static MessageAdapter create0(byte type) {
-        if (type >= 0 && type < COMMON.length) {
-            GridTcpCommunicationMessageProducer producer = COMMON[type];
-
-            if (producer != null)
-                return producer.create(type);
-            else
-                throw new IllegalStateException("Common message type producer is not registered: " + type);
-        }
-        else {
-            GridTcpCommunicationMessageProducer c = CUSTOM.get(type);
-
-            if (c != null)
-                return c.create(type);
-            else
-                throw new IllegalStateException("Custom message type producer is not registered: " + type);
-        }
-    }
-
-    /**
-     * Register message producer for common message type.
-     *
-     * @param producer Producer.
-     * @param types Types applicable for this producer.
-     */
-    public static void registerCommon(GridTcpCommunicationMessageProducer producer, int... types) {
-        for (int type : types) {
-            assert type >= 0 && type < COMMON.length : "Common type being registered is out of common messages " +
-                "array length: " + type;
-
-            COMMON[type] = producer;
-        }
-    }
-
-    /**
-     * Registers factory for custom message. Used for test purposes.
-     *
-     * @param producer Message producer.
-     * @param type Message type.
-     */
-    public static void registerCustom(GridTcpCommunicationMessageProducer producer, byte type) {
-        assert producer != null;
-
-        CUSTOM.put(type, producer);
-    }
-
-    /**
-     * @return Common message producers.
-     */
-    public static GridTcpCommunicationMessageProducer[] commonProducers() {
-        return COMMON;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageProducer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageProducer.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageProducer.java
deleted file mode 100644
index 2932002..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageProducer.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.direct;
-
-import org.apache.ignite.plugin.extensions.communication.*;
-
-/**
- * Message producer. Each component have to register it's own message producer.
- */
-public interface GridTcpCommunicationMessageProducer {
-    /**
-     * Create message.
-     *
-     * @param type Message type.
-     * @return Communication message.
-     */
-    public MessageAdapter create(byte type);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpMessageFactory.java
deleted file mode 100644
index 5c66a04..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpMessageFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.direct;
-
-import org.apache.ignite.plugin.extensions.communication.*;
-
-/**
- *
- */
-public interface GridTcpMessageFactory {
-    /**
-     * @param type Message type.
-     * @return Message instance.
-     */
-    public MessageAdapter create(byte type);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 3e85199..117fbe9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -22,7 +22,6 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.direct.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -551,7 +550,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         return ctx.io().messageWriterFactory();
                     }
 
-                    @Override public GridTcpMessageFactory messageFactory() {
+                    @Override public MessageFactory messageFactory() {
                         return ctx.io().messageFactory();
                     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 2d7b67f..a44efd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -132,20 +132,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private final LongAdder workersCnt = new LongAdder();
 
     /** */
-    private int pluginMsg = GridTcpCommunicationMessageFactory.MAX_COMMON_TYPE;
-
-    /** */
-    private Map<Byte, GridTcpCommunicationMessageProducer> pluginMsgs;
-
-    /** */
-    private GridTcpMessageFactory msgFactory;
+    private MessageFactory msgFactory;
 
     /** */
     private MessageWriterFactory writerFactory;
 
-    /** */
-    private MessageReaderFactory readerFactory;
-
     /**
      * @param ctx Grid kernal context.
      */
@@ -161,71 +152,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param producer Message producer.
-     * @return Message type code.
-     */
-    public byte registerMessageProducer(GridTcpCommunicationMessageProducer producer) {
-        int nextMsg = ++pluginMsg;
-
-        if (nextMsg > Byte.MAX_VALUE)
-            throw new IgniteException();
-
-        if (pluginMsgs == null)
-            pluginMsgs = new HashMap<>();
-
-        pluginMsgs.put((byte)nextMsg, producer);
-
-        return (byte)nextMsg;
-    }
-
-    /**
-     * Initializes manager (called prior to discovery start, but after all other components).
-     */
-    public void initMessageFactory() {
-        final GridTcpCommunicationMessageProducer[] common = GridTcpCommunicationMessageFactory.commonProducers();
-
-        final GridTcpCommunicationMessageProducer[] producers;
-
-        if (pluginMsgs != null) {
-            producers = Arrays.copyOf(common, pluginMsg + 1);
-
-            for (Map.Entry<Byte, GridTcpCommunicationMessageProducer> e : pluginMsgs.entrySet()) {
-                assert producers[e.getKey()] == null : e.getKey();
-
-                producers[e.getKey()] = e.getValue();
-            }
-
-            pluginMsgs = null;
-        }
-        else
-            producers = common;
-
-        msgFactory = new GridTcpMessageFactory() {
-            @Override public MessageAdapter create(byte type) {
-                MessageAdapter msg;
-
-                if (type < 0 || type >= producers.length)
-                    msg = GridTcpCommunicationMessageFactory.create(type);
-                else {
-                    GridTcpCommunicationMessageProducer producer = producers[type];
-
-                    if (producer == null)
-                        throw new IllegalStateException("Common message type producer is not registered: " + type);
-
-                    msg = producer.create(type);
-                }
-
-                msg.setReader(readerFactory.reader());
-
-                return msg;
-            }
-        };
-    }
-
-    /**
      * @return Message factory.
      */
-    public GridTcpMessageFactory messageFactory() {
+    public MessageFactory messageFactory() {
         assert msgFactory != null;
 
         return msgFactory;
@@ -291,6 +220,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             };
         }
 
+        MessageReaderFactory readerFactory;
+
         MessageReaderFactory[] readerExt = ctx.plugins().extensions(MessageReaderFactory.class);
 
         if (readerExt != null && readerExt.length > 0)
@@ -303,6 +234,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             };
         }
 
+        msgFactory = new GridIoMessageFactory(readerFactory, ctx.plugins().extensions(MessageFactory.class));
+
         if (log.isDebugEnabled())
             log.debug(startInfo());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
new file mode 100644
index 0000000..0634b1f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.checkpoint.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.clock.*;
+import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.processors.rest.client.message.*;
+import org.apache.ignite.internal.processors.rest.handlers.task.*;
+import org.apache.ignite.internal.processors.rest.protocols.tcp.*;
+import org.apache.ignite.internal.processors.streamer.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.collision.jobstealing.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.jdk8.backport.*;
+
+import java.util.*;
+
+/**
+ * Message factory implementation.
+ */
+public class GridIoMessageFactory implements MessageFactory {
+    /** Custom messages registry. Used for test purposes. */
+    private static final Map<Byte, IgniteOutClosure<MessageAdapter>> CUSTOM = new ConcurrentHashMap8<>();
+
+    /** Message reader factory. */
+    private final MessageReaderFactory readerFactory;
+
+    /** Extensions. */
+    private final MessageFactory[] ext;
+
+    /**
+     * @param readerFactory Message reader factory.
+     * @param ext Extensions.
+     */
+    public GridIoMessageFactory(MessageReaderFactory readerFactory, MessageFactory[] ext) {
+        assert readerFactory != null;
+
+        this.readerFactory = readerFactory;
+        this.ext = ext;
+    }
+
+    /** {@inheritDoc} */
+    @Override public MessageAdapter create(byte type) {
+        MessageAdapter msg = null;
+
+        switch (type) {
+            case TcpCommunicationSpi.NODE_ID_MSG_TYPE:
+                return new TcpCommunicationSpi.NodeIdMessage();
+
+            case TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE:
+                return new TcpCommunicationSpi.RecoveryLastReceivedMessage();
+
+            case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE:
+                return new TcpCommunicationSpi.HandshakeMessage();
+
+            case 0:
+                msg = new GridJobCancelRequest();
+
+                break;
+
+            case 1:
+                msg = new GridJobExecuteRequest();
+
+                break;
+
+            case 2:
+                msg = new GridJobExecuteResponse();
+
+                break;
+
+            case 3:
+                msg = new GridJobSiblingsRequest();
+
+                break;
+
+            case 4:
+                msg = new GridJobSiblingsResponse();
+
+                break;
+
+            case 5:
+                msg = new GridTaskCancelRequest();
+
+                break;
+
+            case 6:
+                msg = new GridTaskSessionRequest();
+
+                break;
+
+            case 7:
+                msg = new GridCheckpointRequest();
+
+                break;
+
+            case 8:
+                msg = new GridIoMessage();
+
+                break;
+
+            case 9:
+                msg = new GridIoUserMessage();
+
+                break;
+
+            case 10:
+                msg = new GridDeploymentInfoBean();
+
+                break;
+
+            case 11:
+                msg = new GridDeploymentRequest();
+
+                break;
+
+            case 12:
+                msg = new GridDeploymentResponse();
+
+                break;
+
+            case 13:
+                msg = new GridEventStorageMessage();
+
+                break;
+
+            case 14:
+                msg = new GridCacheEvictionRequest();
+
+                break;
+
+            case 15:
+                msg = new GridCacheEvictionResponse();
+
+                break;
+
+            case 16:
+                msg = new GridCacheOptimisticCheckPreparedTxRequest();
+
+                break;
+
+            case 17:
+                msg = new GridCacheOptimisticCheckPreparedTxResponse();
+
+                break;
+
+            case 18:
+                msg = new GridCachePessimisticCheckCommittedTxRequest();
+
+                break;
+
+            case 19:
+                msg = new GridCachePessimisticCheckCommittedTxResponse();
+
+                break;
+
+            case 20:
+                msg = new GridCacheTtlUpdateRequest();
+
+                break;
+
+            case 21:
+                msg = new GridDistributedLockRequest();
+
+                break;
+
+            case 22:
+                msg = new GridDistributedLockResponse();
+
+                break;
+
+            case 23:
+                msg = new GridDistributedTxFinishRequest();
+
+                break;
+
+            case 24:
+                msg = new GridDistributedTxFinishResponse();
+
+                break;
+
+            case 25:
+                msg = new GridDistributedTxPrepareRequest();
+
+                break;
+
+            case 26:
+                msg = new GridDistributedTxPrepareResponse();
+
+                break;
+
+            case 27:
+                msg = new GridDistributedUnlockRequest();
+
+                break;
+
+            case 28:
+                msg = new GridDhtAffinityAssignmentRequest();
+
+                break;
+
+            case 29:
+                msg = new GridDhtAffinityAssignmentResponse();
+
+                break;
+
+            case 30:
+                msg = new GridDhtLockRequest();
+
+                break;
+
+            case 31:
+                msg = new GridDhtLockResponse();
+
+                break;
+
+            case 32:
+                msg = new GridDhtTxFinishRequest();
+
+                break;
+
+            case 33:
+                msg = new GridDhtTxFinishResponse();
+
+                break;
+
+            case 34:
+                msg = new GridDhtTxPrepareRequest();
+
+                break;
+
+            case 35:
+                msg = new GridDhtTxPrepareResponse();
+
+                break;
+
+            case 36:
+                msg = new GridDhtUnlockRequest();
+
+                break;
+
+            case 37:
+                msg = new GridDhtAtomicDeferredUpdateResponse();
+
+                break;
+
+            case 38:
+                msg = new GridDhtAtomicUpdateRequest();
+
+                break;
+
+            case 39:
+                msg = new GridDhtAtomicUpdateResponse();
+
+                break;
+
+            case 40:
+                msg = new GridNearAtomicUpdateRequest();
+
+                break;
+
+            case 41:
+                msg = new GridNearAtomicUpdateResponse();
+
+                break;
+
+            case 42:
+                msg = new GridDhtForceKeysRequest();
+
+                break;
+
+            case 43:
+                msg = new GridDhtForceKeysResponse();
+
+                break;
+
+            case 44:
+                msg = new GridDhtPartitionDemandMessage();
+
+                break;
+
+            case 45:
+                msg = new GridDhtPartitionSupplyMessage();
+
+                break;
+
+            case 46:
+                msg = new GridDhtPartitionsFullMessage();
+
+                break;
+
+            case 47:
+                msg = new GridDhtPartitionsSingleMessage();
+
+                break;
+
+            case 48:
+                msg = new GridDhtPartitionsSingleRequest();
+
+                break;
+
+            case 49:
+                msg = new GridNearGetRequest();
+
+                break;
+
+            case 50:
+                msg = new GridNearGetResponse();
+
+                break;
+
+            case 51:
+                msg = new GridNearLockRequest();
+
+                break;
+
+            case 52:
+                msg = new GridNearLockResponse();
+
+                break;
+
+            case 53:
+                msg = new GridNearTxFinishRequest();
+
+                break;
+
+            case 54:
+                msg = new GridNearTxFinishResponse();
+
+                break;
+
+            case 55:
+                msg = new GridNearTxPrepareRequest();
+
+                break;
+
+            case 56:
+                msg = new GridNearTxPrepareResponse();
+
+                break;
+
+            case 57:
+                msg = new GridNearUnlockRequest();
+
+                break;
+
+            case 58:
+                msg = new GridCacheQueryRequest();
+
+                break;
+
+            case 59:
+                msg = new GridCacheQueryResponse();
+
+                break;
+
+            case 60:
+                msg = new GridClockDeltaSnapshotMessage();
+
+                break;
+
+            case 61:
+                msg = new GridContinuousMessage();
+
+                break;
+
+            case 62:
+                msg = new GridDataLoadRequest();
+
+                break;
+
+            case 63:
+                msg = new GridDataLoadResponse();
+
+                break;
+
+            case 64:
+                msg = new GridGgfsAckMessage();
+
+                break;
+
+            case 65:
+                msg = new GridGgfsBlockKey();
+
+                break;
+
+            case 66:
+                msg = new GridGgfsBlocksMessage();
+
+                break;
+
+            case 67:
+                msg = new GridGgfsDeleteMessage();
+
+                break;
+
+            case 68:
+                msg = new GridGgfsFileAffinityRange();
+
+                break;
+
+            case 69:
+                msg = new GridGgfsFragmentizerRequest();
+
+                break;
+
+            case 70:
+                msg = new GridGgfsFragmentizerResponse();
+
+                break;
+
+            case 71:
+                msg = new GridGgfsSyncMessage();
+
+                break;
+
+            case 72:
+                msg = new GridClientHandshakeRequestWrapper();
+
+                break;
+
+            case 73:
+                msg = new GridClientHandshakeResponseWrapper();
+
+                break;
+
+            case 74:
+                msg = new GridClientMessageWrapper();
+
+                break;
+
+            case 75:
+                msg = new GridClientPingPacketWrapper();
+
+                break;
+
+            case 76:
+                msg = new GridTaskResultRequest();
+
+                break;
+
+            case 77:
+                msg = new GridTaskResultResponse();
+
+                break;
+
+            case 78:
+                msg = new GridMemcachedMessageWrapper();
+
+                break;
+
+            case 79:
+                msg = new GridStreamerCancelRequest();
+
+                break;
+
+            case 80:
+                msg = new GridStreamerExecutionRequest();
+
+                break;
+
+            case 81:
+                msg = new GridStreamerResponse();
+
+                break;
+
+            case 82:
+                msg = new JobStealingRequest();
+
+                break;
+
+            case 83:
+                msg = new GridClockDeltaVersion();
+
+                break;
+
+            case 84:
+                msg = new GridByteArrayList();
+
+                break;
+
+            case 85:
+                msg = new GridLongList();
+
+                break;
+
+            case 86:
+                msg = new GridCacheVersion();
+
+                break;
+
+            case 87:
+                msg = new GridDhtPartitionExchangeId();
+
+                break;
+
+            case 88:
+                msg = new GridCacheValueBytes();
+
+                break;
+
+            default:
+                if (ext != null) {
+                    for (MessageFactory factory : ext) {
+                        msg = factory.create(type);
+
+                        if (msg != null)
+                            break;
+                    }
+                }
+
+                if (msg == null) {
+                    IgniteOutClosure<MessageAdapter> c = CUSTOM.get(type);
+
+                    if (c != null)
+                        msg = c.apply();
+                }
+        }
+
+        if (msg == null)
+            throw new IgniteException("Invalid message type: " + type);
+
+        msg.setReader(readerFactory.reader());
+
+        return msg;
+    }
+
+    /**
+     * Registers factory for custom message. Used for test purposes.
+     *
+     * @param type Message type.
+     * @param c Message producer.
+     */
+    public static void registerCustom(byte type, IgniteOutClosure<MessageAdapter> c) {
+        assert c != null;
+
+        CUSTOM.put(type, c);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index 0071942..021aa91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.direct.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.spi.*;
 import org.jetbrains.annotations.*;
@@ -37,7 +36,7 @@ public class GridDirectParser implements GridNioParser {
     private IgniteSpiAdapter spi;
 
     /** */
-    private GridTcpMessageFactory msgFactory;
+    private MessageFactory msgFactory;
 
     /**
      * @param spi Spi.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReader.java
index 17249fa..9ae5bbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReader.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.util.nio;
 
-import org.apache.ignite.internal.direct.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
@@ -39,5 +38,5 @@ public interface GridNioMessageReader {
     /**
      * @return Optional message factory.
      */
-    @Nullable public GridTcpMessageFactory messageFactory();
+    @Nullable public MessageFactory messageFactory();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 42a540b..585bf6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2288,7 +2288,7 @@ public class GridNioServer<T> {
         }
 
         /**
-         * @param messageWriterFactory Message writer factory.
+         * @param messageWriterFactory Message writer factory..
          * @return This for chaining.
          */
         public Builder<T> messageWriterFactory(MessageWriterFactory messageWriterFactory) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/plugin/PluginContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/PluginContext.java b/modules/core/src/main/java/org/apache/ignite/plugin/PluginContext.java
index a59d7ab..0704700 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/PluginContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/PluginContext.java
@@ -20,7 +20,6 @@ package org.apache.ignite.plugin;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.direct.*;
 import org.apache.ignite.spi.*;
 
 import java.util.*;
@@ -95,10 +94,4 @@ public interface PluginContext {
      * @param cls Class.
      */
     public void deregisterPorts(Class<?> cls);
-
-    /**
-     * @param producer Message producer.
-     * @return Message type code.
-     */
-    public byte registerMessageProducer(GridTcpCommunicationMessageProducer producer);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
new file mode 100644
index 0000000..d443209
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.extensions.communication;
+
+import org.apache.ignite.plugin.*;
+
+/**
+ *
+ */
+public interface MessageFactory extends Extension {
+    /**
+     * @param type Message type.
+     * @return Message instance.
+     */
+    public MessageAdapter create(byte type);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index c3b4700..6ea655a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -21,7 +21,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.direct.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.authentication.*;
@@ -730,7 +729,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         }
 
         /** {@inheritDoc} */
-        @Override public GridTcpMessageFactory messageFactory() {
+        @Override public MessageFactory messageFactory() {
             return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 1f68226..ce4c0c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -20,7 +20,6 @@ package org.apache.ignite.spi;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
-import org.apache.ignite.internal.direct.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -408,5 +407,5 @@ public interface IgniteSpiContext {
     /**
      * @return Message factory.
      */
-    public GridTcpMessageFactory messageFactory();
+    public MessageFactory messageFactory();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index d99cdfd..7f39141 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.managers.communication;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.direct.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
@@ -50,11 +51,11 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
     private int bufSize;
 
     static {
-        GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() {
-            @Override public MessageAdapter create(byte type) {
+        GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<MessageAdapter>() {
+            @Override public MessageAdapter apply() {
                 return new TestMessage();
             }
-        }, DIRECT_TYPE);
+        });
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 1aabf89..2b7dac3 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.spi.communication;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.direct.*;
+import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -62,11 +62,11 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
      *
      */
     static {
-        GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() {
-            @Override public MessageAdapter create(byte type) {
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<MessageAdapter>() {
+            @Override public MessageAdapter apply() {
                 return new GridTestMessage();
             }
-        }, GridTestMessage.DIRECT_TYPE);
+        });
     }
 
     /** */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 39bfe37..175a4c5 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -20,9 +20,10 @@ package org.apache.ignite.spi.communication.tcp;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.direct.*;
+import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -65,12 +66,11 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
      *
      */
     static {
-        GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() {
-            @Override
-            public MessageAdapter create(byte type) {
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<MessageAdapter>() {
+            @Override public MessageAdapter apply() {
                 return new GridTestMessage();
             }
-        }, GridTestMessage.DIRECT_TYPE);
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 5ae3d2b..159173e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.spi.communication.tcp;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.direct.*;
+import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.nio.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -74,11 +74,11 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS
     private static boolean reject;
 
     static {
-        GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() {
-            @Override public MessageAdapter create(byte type) {
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<MessageAdapter>() {
+            @Override public MessageAdapter apply() {
                 return new GridTestMessage();
             }
-        }, GridTestMessage.DIRECT_TYPE);
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 78e1b81..2ba9086 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -19,9 +19,10 @@ package org.apache.ignite.spi.communication.tcp;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.direct.*;
+import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -56,12 +57,11 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
      *
      */
     static {
-        GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() {
-            @Override
-            public MessageAdapter create(byte type) {
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<MessageAdapter>() {
+            @Override public MessageAdapter apply() {
                 return new GridTestMessage();
             }
-        }, GridTestMessage.DIRECT_TYPE);
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index c2c1676..86b4e44 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -20,9 +20,10 @@ package org.apache.ignite.spi.communication.tcp;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.direct.*;
+import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -65,11 +66,11 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
      *
      */
     static {
-        GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() {
-            @Override public MessageAdapter create(byte type) {
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<MessageAdapter>() {
+            @Override public MessageAdapter apply() {
                 return new GridTestMessage();
             }
-        }, GridTestMessage.DIRECT_TYPE);
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 76fd302..bfcfe2f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -525,16 +525,12 @@ public class GridSpiTestContext implements IgniteSpiContext {
     }
 
     /** {@inheritDoc} */
-    @Override public GridTcpMessageFactory messageFactory() {
-        return new GridTcpMessageFactory() {
-            @Override public MessageAdapter create(byte type) {
-                MessageAdapter msg = GridTcpCommunicationMessageFactory.create(type);
-
-                msg.setReader(new DirectMessageReader(null));
-
-                return msg;
+    @Override public MessageFactory messageFactory() {
+        return new GridIoMessageFactory(new MessageReaderFactory() {
+            @Override public MessageReader reader() {
+                return new DirectMessageReader(null);
             }
-        };
+        }, null);
     }
 
     /**


Mime
View raw message