directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dran...@apache.org
Subject [1/3] directory-kerby git commit: Add kerby-event library and the event-network support back in the branch
Date Sat, 13 Jun 2015 10:02:33 GMT
Repository: directory-kerby
Updated Branches:
  refs/heads/event-network-support [created] f3645ba1d


http://git-wip-us.apache.org/repos/asf/directory-kerby/blob/f3645ba1/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkBase.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkBase.java
b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkBase.java
new file mode 100644
index 0000000..2ef7241
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkBase.java
@@ -0,0 +1,64 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.network;
+
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.event.NetworkUtil;
+import org.apache.kerby.transport.tcp.DecodingCallback;
+import org.apache.kerby.transport.tcp.StreamingDecoder;
+
+import java.nio.ByteBuffer;
+
+public class TestNetworkBase {
+    protected String serverHost = "127.0.0.1";
+    protected int tcpPort = 0;
+    protected int udpPort = 0;
+    protected String TEST_MESSAGE = "Hello world!";
+    protected String clientRecvedMessage;
+
+    protected enum TestEventType implements EventType {
+        FINISHED
+    }
+
+    protected void preparePorts() {
+        tcpPort = NetworkUtil.getServerPort();
+        udpPort = NetworkUtil.getServerPort();
+    }
+
+    protected String recvBuffer2String(ByteBuffer buffer) {
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+        return new String(bytes);
+    }
+
+    protected StreamingDecoder createStreamingDecoder() {
+        return new StreamingDecoder() {
+            @Override
+            public void decode(ByteBuffer streamingBuffer, DecodingCallback callback) {
+                int expectedMessageLength = TEST_MESSAGE.getBytes().length;
+                if (streamingBuffer.remaining() >= expectedMessageLength) {
+                    callback.onMessageComplete(expectedMessageLength, -1);
+                } else {
+                    callback.onMoreDataNeeded(expectedMessageLength);
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerby/blob/f3645ba1/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkClient.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkClient.java
b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkClient.java
new file mode 100644
index 0000000..7d00d58
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkClient.java
@@ -0,0 +1,213 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.network;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.event.EventWaiter;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Network;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestNetworkClient extends TestNetworkBase {
+
+    private EventHub eventHub;
+    private EventWaiter eventWaiter;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePorts();
+
+        setUpServer();
+        setUpClient();
+    }
+
+    private void setUpServer() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doRunTcpServer();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doRunUdpServer();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    private void doRunTcpServer() throws IOException {
+        Selector selector = Selector.open();
+        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(tcpPort));
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+        SocketChannel socketChannel;
+        while (true) {
+            if (selector.selectNow() > 0) {
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                Iterator<SelectionKey> iterator = selectionKeys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey selectionKey = iterator.next();
+                    iterator.remove();
+
+                    if (selectionKey.isAcceptable()) {
+                        while ((socketChannel = serverSocketChannel.accept()) != null) {
+                            socketChannel.configureBlocking(false);
+                            socketChannel.socket().setTcpNoDelay(true);
+                            socketChannel.socket().setKeepAlive(true);
+                            socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE,
socketChannel);
+                            //selectionKey.attach(socketChannel);
+                        }
+                    } else if (selectionKey.isReadable()) {
+                        ByteBuffer recvBuffer = ByteBuffer.allocate(65536);
+                        socketChannel = (SocketChannel) selectionKey.attachment();
+                        if (socketChannel.read(recvBuffer) > 0) {
+                            recvBuffer.flip();
+                            socketChannel.write(recvBuffer);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void doRunUdpServer() throws IOException {
+        Selector selector = Selector.open();
+        DatagramChannel serverSocketChannel = DatagramChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        DatagramSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(udpPort));
+        serverSocketChannel.register(selector, SelectionKey.OP_READ);
+
+        while (true) {
+            if (selector.selectNow() > 0) {
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                Iterator<SelectionKey> iterator = selectionKeys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey selectionKey = iterator.next();
+                    iterator.remove();
+                    if (selectionKey.isReadable()) {
+                        ByteBuffer recvBuffer = ByteBuffer.allocate(65536);
+                        InetSocketAddress fromAddress = (InetSocketAddress) serverSocketChannel.receive(recvBuffer);
+                        if (fromAddress != null) {
+                            recvBuffer.flip();
+                            serverSocketChannel.send(recvBuffer, fromAddress);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void setUpClient() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent event) {
+                if (event.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    ByteBuffer buffer = event.getMessage();
+                    if (buffer != null) {
+                        clientRecvedMessage = recvBuffer2String(buffer);
+                        System.out.println("Recved clientRecvedMessage: " + clientRecvedMessage);
+                        Boolean result = TEST_MESSAGE.equals(clientRecvedMessage);
+                        dispatch(new Event(TestEventType.FINISHED, result));
+                    }
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Network network = new Network();
+        network.setStreamingDecoder(createStreamingDecoder());
+        eventHub.register(network);
+
+        eventWaiter = eventHub.waitEvent(
+                TestEventType.FINISHED,
+                TransportEventType.NEW_TRANSPORT);
+
+        eventHub.start();
+        network.tcpConnect(serverHost, tcpPort);
+        network.udpConnect(serverHost, udpPort);
+    }
+
+    @Test
+    public void testNetworkClient() {
+        Event event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        Transport transport = ((TransportEvent) event).getTransport();
+        transport.sendMessage(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        event = eventWaiter.waitEvent(TestEventType.FINISHED);
+        assertThat((Boolean) event.getEventData()).isTrue();
+
+        event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        transport = ((TransportEvent) event).getTransport();
+        transport.sendMessage(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        event = eventWaiter.waitEvent(TestEventType.FINISHED);
+        assertThat((Boolean) event.getEventData()).isTrue();
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerby/blob/f3645ba1/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkServer.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkServer.java
b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkServer.java
new file mode 100644
index 0000000..b68745d
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkServer.java
@@ -0,0 +1,115 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.network;
+
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Network;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SocketChannel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestNetworkServer extends TestNetworkBase {
+
+    private EventHub eventHub;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePorts();
+
+        setUpServer();
+    }
+
+    private void setUpServer() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent msgEvent) {
+                if (msgEvent.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    msgEvent.getTransport().sendMessage(msgEvent.getMessage());
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Network network = new Network();
+        network.setStreamingDecoder(createStreamingDecoder());
+        eventHub.register(network);
+
+        eventHub.start();
+
+        network.tcpListen(serverHost, tcpPort);
+        network.udpListen(serverHost, udpPort);
+    }
+
+    @Test
+    public void testNetworkServer() throws IOException, InterruptedException {
+        testTcpTransport();
+        testUdpTransport();
+    }
+
+    private void testTcpTransport() throws IOException, InterruptedException {
+        Thread.sleep(10);
+
+        SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.configureBlocking(true);
+        SocketAddress sa = new InetSocketAddress(serverHost, tcpPort);
+        socketChannel.connect(sa);
+        socketChannel.write(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+        socketChannel.read(byteBuffer);
+        byteBuffer.flip();
+        clientRecvedMessage = recvBuffer2String(byteBuffer);
+        assertThat(clientRecvedMessage).isEqualTo(TEST_MESSAGE);
+    }
+
+    private void testUdpTransport() throws IOException, InterruptedException {
+        Thread.sleep(10);
+
+        DatagramChannel socketChannel = DatagramChannel.open();
+        socketChannel.configureBlocking(true);
+        SocketAddress sa = new InetSocketAddress(serverHost, udpPort);
+        socketChannel.send(ByteBuffer.wrap(TEST_MESSAGE.getBytes()), sa);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+        socketChannel.receive(byteBuffer);
+        byteBuffer.flip();
+        clientRecvedMessage = recvBuffer2String(byteBuffer);
+        assertThat(clientRecvedMessage).isEqualTo(TEST_MESSAGE);
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerby/blob/f3645ba1/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpBase.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpBase.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpBase.java
new file mode 100644
index 0000000..ed060ef
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpBase.java
@@ -0,0 +1,62 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.tcp;
+
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.event.NetworkUtil;
+import org.apache.kerby.transport.tcp.DecodingCallback;
+import org.apache.kerby.transport.tcp.StreamingDecoder;
+
+import java.nio.ByteBuffer;
+
+public class TestTcpBase {
+    protected String serverHost = "127.0.0.1";
+    protected int serverPort = 0;
+    protected String TEST_MESSAGE = "Hello world!";
+    protected String clientRecvedMessage;
+
+    protected enum TestEventType implements EventType {
+        FINISHED
+    }
+
+    protected void preparePort() {
+        serverPort = NetworkUtil.getServerPort();
+    }
+
+    protected String recvBuffer2String(ByteBuffer buffer) {
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+        return new String(bytes);
+    }
+
+    protected StreamingDecoder createStreamingDecoder() {
+        return new StreamingDecoder() {
+            @Override
+            public void decode(ByteBuffer streamingBuffer, DecodingCallback callback) {
+                int expectedMessageLength = TEST_MESSAGE.getBytes().length;
+                if (streamingBuffer.remaining() >= expectedMessageLength) {
+                    callback.onMessageComplete(expectedMessageLength, -1);
+                } else {
+                    callback.onMoreDataNeeded(expectedMessageLength);
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerby/blob/f3645ba1/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpClient.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpClient.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpClient.java
new file mode 100644
index 0000000..823ae7b
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpClient.java
@@ -0,0 +1,162 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.tcp;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.event.EventWaiter;
+import org.apache.kerby.transport.Connector;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.apache.kerby.transport.tcp.TcpConnector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestTcpClient extends TestTcpBase {
+
+    private EventHub eventHub;
+    private EventWaiter eventWaiter;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePort();
+
+        setUpServer();
+        setUpClient();
+    }
+
+    private void setUpServer() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doRunServer();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    private void doRunServer() throws IOException {
+        Selector selector = Selector.open();
+        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(serverPort));
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+        SocketChannel socketChannel;
+        while (true) {
+            if (selector.selectNow() > 0) {
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                Iterator<SelectionKey> iterator = selectionKeys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey selectionKey = iterator.next();
+                    iterator.remove();
+
+                    if (selectionKey.isAcceptable()) {
+                        while ((socketChannel = serverSocketChannel.accept()) != null) {
+                            socketChannel.configureBlocking(false);
+                            socketChannel.socket().setTcpNoDelay(true);
+                            socketChannel.socket().setKeepAlive(true);
+                            socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE,
socketChannel);
+                            //selectionKey.attach(socketChannel);
+                        }
+                    } else if (selectionKey.isReadable()) {
+                        ByteBuffer recvBuffer = ByteBuffer.allocate(65536);
+                        socketChannel = (SocketChannel) selectionKey.attachment();
+                        if (socketChannel.read(recvBuffer) > 0) {
+                            recvBuffer.flip();
+                            socketChannel.write(recvBuffer);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void setUpClient() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent event) {
+                if (event.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    ByteBuffer buffer = event.getMessage();
+                    clientRecvedMessage = recvBuffer2String(buffer);
+                    System.out.println("Recved clientRecvedMessage: " + clientRecvedMessage);
+                    Boolean result = TEST_MESSAGE.equals(clientRecvedMessage);
+                    dispatch(new Event(TestEventType.FINISHED, result));
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Connector connector = new TcpConnector(createStreamingDecoder());
+        eventHub.register(connector);
+
+        eventWaiter = eventHub.waitEvent(
+                TestEventType.FINISHED,
+                TransportEventType.NEW_TRANSPORT);
+
+        eventHub.start();
+        connector.connect(serverHost, serverPort);
+    }
+
+    @Test
+    public void testTcpTransport() {
+        Event event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        Transport transport = ((TransportEvent) event).getTransport();
+        transport.sendMessage(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+
+        event = eventWaiter.waitEvent(TestEventType.FINISHED);
+        assertThat((Boolean) event.getEventData()).isTrue();
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerby/blob/f3645ba1/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpServer.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpServer.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpServer.java
new file mode 100644
index 0000000..d95580a
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpServer.java
@@ -0,0 +1,94 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.transport.Acceptor;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.apache.kerby.transport.tcp.TcpAcceptor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTcpServer extends TestTcpBase {
+
+    private EventHub eventHub;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePort();
+
+        setUpServer();
+    }
+
+    private void setUpServer() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent msgEvent) {
+                if (msgEvent.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    msgEvent.getTransport().sendMessage(msgEvent.getMessage());
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Acceptor acceptor = new TcpAcceptor(createStreamingDecoder());
+        eventHub.register(acceptor);
+
+        eventHub.start();
+        acceptor.listen(serverHost, serverPort);
+    }
+
+    @Test
+    public void testTcpTransport() throws IOException, InterruptedException {
+        Thread.sleep(15);
+
+        SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.configureBlocking(true);
+        SocketAddress sa = new InetSocketAddress(serverHost, serverPort);
+        socketChannel.connect(sa);
+        socketChannel.write(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+        socketChannel.read(byteBuffer);
+        byteBuffer.flip();
+        clientRecvedMessage = recvBuffer2String(byteBuffer);
+
+        assertThat(clientRecvedMessage).isEqualTo(TEST_MESSAGE);
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerby/blob/f3645ba1/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpBase.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpBase.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpBase.java
new file mode 100644
index 0000000..1887043
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpBase.java
@@ -0,0 +1,46 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.udp;
+
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.event.NetworkUtil;
+
+import java.nio.ByteBuffer;
+
+public class TestUdpBase {
+    protected String serverHost = "127.0.0.1";
+    protected int serverPort = 0;
+    protected String TEST_MESSAGE = "Hello world!";
+    protected String clientRecvedMessage;
+
+    protected enum TestEventType implements EventType {
+        FINISHED
+    }
+
+    protected void preparePort() {
+        serverPort = NetworkUtil.getServerPort();
+    }
+
+    protected String recvBuffer2String(ByteBuffer buffer) {
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+        return new String(bytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerby/blob/f3645ba1/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpClient.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpClient.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpClient.java
new file mode 100644
index 0000000..6f3453e
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpClient.java
@@ -0,0 +1,151 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.udp;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.event.EventWaiter;
+import org.apache.kerby.transport.Connector;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.udp.UdpConnector;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestUdpClient extends TestUdpBase {
+
+    private EventHub eventHub;
+    private EventWaiter eventWaiter;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePort();
+
+        setUpServer();
+        setUpClient();
+    }
+
+    private void setUpServer() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doRunServer();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    private void doRunServer() throws IOException {
+        Selector selector = Selector.open();
+        DatagramChannel serverSocketChannel = DatagramChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        DatagramSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(serverPort));
+        serverSocketChannel.register(selector, SelectionKey.OP_READ);
+
+        while (true) {
+            if (selector.selectNow() > 0) {
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                Iterator<SelectionKey> iterator = selectionKeys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey selectionKey = iterator.next();
+                    iterator.remove();
+                    if (selectionKey.isReadable()) {
+                        ByteBuffer recvBuffer = ByteBuffer.allocate(65536);
+                        InetSocketAddress fromAddress = (InetSocketAddress) serverSocketChannel.receive(recvBuffer);
+                        if (fromAddress != null) {
+                            recvBuffer.flip();
+                            serverSocketChannel.send(recvBuffer, fromAddress);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void setUpClient() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent msgEvent) {
+                if (msgEvent.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    ByteBuffer buffer = msgEvent.getMessage();
+                    clientRecvedMessage = recvBuffer2String(buffer);
+                    System.out.println("Recved clientRecvedMessage: " + clientRecvedMessage);
+                    Boolean result = TEST_MESSAGE.equals(clientRecvedMessage);
+                    dispatch(new Event(TestEventType.FINISHED, result));
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Connector connector = new UdpConnector();
+        eventHub.register(connector);
+
+        eventWaiter = eventHub.waitEvent(
+                TestEventType.FINISHED,
+                TransportEventType.NEW_TRANSPORT);
+
+        eventHub.start();
+        connector.connect(serverHost, serverPort);
+    }
+
+    @Test
+    public void testUdpTransport() {
+        Event event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        Transport transport = ((TransportEvent) event).getTransport();
+        transport.sendMessage(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+
+        event = eventWaiter.waitEvent(TestEventType.FINISHED);
+        assertThat((Boolean) event.getEventData()).isTrue();
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerby/blob/f3645ba1/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpServer.java
----------------------------------------------------------------------
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpServer.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpServer.java
new file mode 100644
index 0000000..f44aa53
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpServer.java
@@ -0,0 +1,93 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.udp;
+
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.transport.Acceptor;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.apache.kerby.transport.udp.UdpAcceptor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestUdpServer extends TestUdpBase {
+
+    private EventHub eventHub;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePort();
+
+        setUpServer();
+    }
+
+    private void setUpServer() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent msgEvent) {
+                if (msgEvent.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    msgEvent.getTransport().sendMessage(msgEvent.getMessage());
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Acceptor acceptor = new UdpAcceptor();
+        eventHub.register(acceptor);
+
+        eventHub.start();
+        acceptor.listen(serverHost, serverPort);
+    }
+
+    @Test
+    public void testUdpTransport() throws IOException, InterruptedException {
+        Thread.sleep(10);
+
+        DatagramChannel socketChannel = DatagramChannel.open();
+        socketChannel.configureBlocking(true);
+        SocketAddress sa = new InetSocketAddress(serverHost, serverPort);
+        socketChannel.send(ByteBuffer.wrap(TEST_MESSAGE.getBytes()), sa);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+        socketChannel.receive(byteBuffer);
+        byteBuffer.flip();
+        clientRecvedMessage = recvBuffer2String(byteBuffer);
+
+        assertThat(clientRecvedMessage).isEqualTo(TEST_MESSAGE);
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/directory-kerby/blob/f3645ba1/lib/pom.xml
----------------------------------------------------------------------
diff --git a/lib/pom.xml b/lib/pom.xml
index 1b9af80..d2859f3 100644
--- a/lib/pom.xml
+++ b/lib/pom.xml
@@ -28,6 +28,7 @@
 
   <modules>
     <module>kerby-config</module>
+    <module>kerby-event</module>
     <module>kerby-util</module>
   </modules>
 


Mime
View raw message