cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From devd...@apache.org
Subject [04/21] CLOUDSTACK-5344: Updated to allow rdp console to access hyper-v vm virtual framebuffer.
Date Thu, 02 Jan 2014 07:53:06 GMT
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
old mode 100644
new mode 100755
index d489b18..e78e301
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
@@ -19,7 +19,7 @@ package streamer;
 import java.util.Set;
 
 /**
- * Element is for processing of data. It has one or more contact pads, which can
+ * Element is basic building block for constructing data processing pipes. It has one or more contact pads, which can
  * be wired with other elements using links.
  */
 public interface Element {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
old mode 100644
new mode 100755
index 9808f62..998274c
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
@@ -17,7 +17,8 @@
 package streamer;
 
 public enum Event {
-    STREAM_START, STREAM_CLOSE,
+    STREAM_START,
+    STREAM_CLOSE,
 
     /**
      * Upgrade socket to SSL.

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
deleted file mode 100644
index e373b19..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-public class FakeSink extends BaseElement {
-
-    public FakeSink(String id) {
-        super(id);
-    }
-
-    @Override
-    public void handleData(ByteBuffer buf, Link link) {
-        if (verbose)
-            System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
-
-        if (buf == null)
-            return;
-
-        // Use packetNumber variable to count incoming packets
-        packetNumber++;
-
-        buf.unref();
-    }
-
-    @Override
-    public String toString() {
-        return "FakeSink(" + id + ")";
-    }
-
-    @Override
-    public void handleEvent(Event event, Direction direction) {
-        if (verbose)
-            System.out.println("[" + this + "] INFO: Event received: " + event + ".");
-
-    }
-
-    /**
-     * Example.
-     */
-    public static void main(String args[]) {
-
-        Element sink = new FakeSink("sink") {
-            {
-                verbose = true;
-            }
-        };
-
-        byte[] data = new byte[] {1, 2, 3};
-        ByteBuffer buf = new ByteBuffer(data);
-        sink.setLink(STDIN, new SyncLink(), Direction.IN);
-        sink.getLink(STDIN).sendData(buf);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
deleted file mode 100644
index 9056852..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
+++ /dev/null
@@ -1,125 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-public class FakeSource extends BaseElement {
-
-    /**
-     * Delay for null packets in poll method when blocking is requested, in
-     * milliseconds.
-     */
-    protected long delay = SyncLink.STANDARD_DELAY_FOR_EMPTY_PACKET;
-
-    public FakeSource(String id) {
-        super(id);
-    }
-
-    @Override
-    public void poll(boolean block) {
-        if (numBuffers > 0 && packetNumber >= numBuffers) {
-            // Close stream when limit of packets is reached
-            sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
-            return;
-        }
-
-        // Prepare new packet
-        ByteBuffer buf = initializeData();
-
-        // Push it to output(s)
-        pushDataToAllOuts(buf);
-
-        // Make slight delay when blocking input was requested (to avoid
-        // consuming of 100% in parent loop)
-        if (block)
-            delay();
-
-    }
-
-    /**
-     * Make slight delay. Should be used when blocking input is requested in pull
-     * mode, but null packed was returned by input.
-     */
-    protected void delay() {
-        try {
-            Thread.sleep(delay);
-        } catch (InterruptedException e) {
-        }
-    }
-
-    /**
-     * Initialize data.
-     */
-    public ByteBuffer initializeData() {
-        ByteBuffer buf = new ByteBuffer(incommingBufLength);
-
-        // Set first byte of package to it sequance number
-        buf.data[buf.offset] = (byte)(packetNumber % 128);
-
-        // Initialize rest of bytes with sequential values, which are
-        // corresponding with their position in byte buffer
-        for (int i = buf.offset + 1; i < buf.length; i++)
-            buf.data[i] = (byte)(i % 128);
-
-        buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
-        buf.putMetadata("src", id);
-
-        return buf;
-    }
-
-    @Override
-    public String toString() {
-        return "FakeSource(" + id + ")";
-    }
-
-    public static void main(String args[]) {
-
-        Element fakeSource = new FakeSource("source 3/10/100") {
-            {
-                verbose = true;
-                this.incommingBufLength = 3;
-                this.numBuffers = 10;
-                this.delay = 100;
-            }
-        };
-
-        Element fakeSink = new FakeSink("sink") {
-            {
-                this.verbose = true;
-            }
-        };
-
-        Element fakeSink2 = new FakeSink("sink2") {
-            {
-                this.verbose = true;
-            }
-        };
-
-        Link link = new SyncLink();
-
-        fakeSource.setLink(STDOUT, link, Direction.OUT);
-        fakeSink.setLink(STDIN, link, Direction.IN);
-
-        Link link2 = new SyncLink();
-
-        fakeSource.setLink("out2", link2, Direction.OUT);
-        fakeSink2.setLink(STDIN, link2, Direction.IN);
-
-        link.run();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
old mode 100644
new mode 100755
index 45155e1..0c8c97d
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
@@ -20,13 +20,15 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
+import streamer.debug.FakeSink;
+
 /**
  * Source element, which reads data from InputStream.
  */
 public class InputStreamSource extends BaseElement {
 
     protected InputStream is;
-    protected SocketWrapper socketWrapper;
+    protected SocketWrapperImpl socketWrapper;
 
     public InputStreamSource(String id) {
         super(id);
@@ -37,7 +39,7 @@ public class InputStreamSource extends BaseElement {
         this.is = is;
     }
 
-    public InputStreamSource(String id, SocketWrapper socketWrapper) {
+    public InputStreamSource(String id, SocketWrapperImpl socketWrapper) {
         super(id);
         this.socketWrapper = socketWrapper;
     }
@@ -45,27 +47,27 @@ public class InputStreamSource extends BaseElement {
     @Override
     public void handleEvent(Event event, Direction direction) {
         switch (event) {
-            case SOCKET_UPGRADE_TO_SSL:
-                socketWrapper.upgradeToSsl();
-                break;
-            default:
-                super.handleEvent(event, direction);
+        case SOCKET_UPGRADE_TO_SSL:
+            socketWrapper.upgradeToSsl();
+            break;
+        default:
+            super.handleEvent(event, direction);
         }
     }
 
     @Override
     public void setLink(String padName, Link link, Direction direction) {
         switch (direction) {
-            case OUT:
-                super.setLink(padName, link, direction);
-
-                if (is == null) {
-                    // Pause links until data stream will be ready
-                    link.pause();
-                }
-                break;
-            case IN:
-                throw new RuntimeException("Cannot assign link to input pad in source element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+        case OUT:
+            super.setLink(padName, link, direction);
+
+            if (is == null) {
+                // Pause links until data stream will be ready
+                link.pause();
+            }
+            break;
+        case IN:
+            throw new RuntimeException("Cannot assign link to input pad in source element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
deleted file mode 100644
index 8094c82..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-import java.util.Arrays;
-
-/**
- * Compare incoming packets with expected packets.
- */
-public class MockSink extends BaseElement {
-
-    protected ByteBuffer bufs[] = null;
-
-    public MockSink(String id) {
-        super(id);
-    }
-
-    public MockSink(String id, ByteBuffer bufs[]) {
-        super(id);
-        this.bufs = bufs;
-    }
-
-    @Override
-    public void handleData(ByteBuffer buf, Link link) {
-        if (verbose)
-            System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
-
-        if (buf == null)
-            return;
-
-        if (packetNumber >= bufs.length)
-            throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not expected. Number of expected buffers: " + bufs.length +
-                ", unexpected buffer: " + buf + ".");
-
-        // Compare incoming buffer with expected buffer
-        if (!Arrays.equals(bufs[packetNumber].toByteArray(), buf.toByteArray()))
-            throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer.\n  Actual bufer: " + buf +
-                ",\n  expected buffer: " + bufs[packetNumber] + ".");
-
-        if (verbose)
-            System.out.println("[" + this + "] INFO: buffers are equal.");
-
-        // Use packetNumber variable to count incoming packets
-        packetNumber++;
-
-        buf.unref();
-    }
-
-    @Override
-    protected void onClose() {
-        super.onClose();
-
-        if (packetNumber != bufs.length)
-            throw new AssertionError("[" + this + "] Number of expected buffers: " + bufs.length + ", number of actual buffers: " + packetNumber + ".");
-    }
-
-    @Override
-    public String toString() {
-        return "MockSink(" + id + ")";
-    }
-
-    /**
-     * Example.
-     */
-    public static void main(String args[]) {
-
-        Element mockSource = new MockSource("source") {
-            {
-                this.bufs =
-                    new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}), new ByteBuffer(new byte[] {3, 1, 2, 3}),
-                        new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
-                this.verbose = true;
-                this.delay = 100;
-                this.numBuffers = this.bufs.length;
-            }
-        };
-
-        Element mockSink = new MockSink("sink") {
-            {
-                this.bufs =
-                    new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}), new ByteBuffer(new byte[] {3, 1, 2, 3}),
-                        new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
-                this.verbose = true;
-            }
-        };
-
-        Link link = new SyncLink() {
-            {
-                this.verbose = true;
-            }
-        };
-
-        mockSource.setLink(STDOUT, link, Direction.OUT);
-        mockSink.setLink(STDIN, link, Direction.IN);
-
-        link.run();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
deleted file mode 100644
index f758bab..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-public class MockSource extends FakeSource {
-
-    protected ByteBuffer bufs[] = null;
-
-    public MockSource(String id) {
-        super(id);
-    }
-
-    public MockSource(String id, ByteBuffer bufs[]) {
-        super(id);
-        this.bufs = bufs;
-    }
-
-    /**
-     * Initialize data.
-     */
-    @Override
-    public ByteBuffer initializeData() {
-        if (packetNumber >= bufs.length) {
-            sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
-            return null;
-        }
-
-        ByteBuffer buf = bufs[packetNumber];
-
-        buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
-        return buf;
-    }
-
-    @Override
-    public void handleEvent(Event event, Direction direction) {
-        if (verbose)
-            System.out.println("[" + this + "] INFO: Event received: " + event + ".");
-
-    }
-
-    @Override
-    public String toString() {
-        return "MockSource(" + id + ")";
-    }
-
-    /**
-     * Example.
-     */
-    public static void main(String args[]) {
-
-        Element mockSource = new MockSource("source") {
-            {
-                this.bufs =
-                    new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}), new ByteBuffer(new byte[] {3, 1, 2, 3}),
-                        new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
-                this.verbose = true;
-                this.delay = 100;
-                // this.numBuffers = this.bufs.length;
-            }
-        };
-
-        Element fakeSink = new FakeSink("sink") {
-            {
-                this.verbose = true;
-            }
-        };
-
-        Link link = new SyncLink();
-
-        mockSource.setLink(STDOUT, link, Direction.OUT);
-        fakeSink.setLink(STDIN, link, Direction.IN);
-
-        link.run();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
old mode 100644
new mode 100755
index d50995e..9cccb5b
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
@@ -87,13 +87,16 @@ public abstract class OneTimeSwitch extends BaseElement {
         // Wake up next peer(s)
         sendEventToAllPads(Event.STREAM_START, Direction.OUT);
 
+        // Disconnect our stdin from this element
         stdin.setSink(null);
         inputPads.remove(STDIN);
 
+        // Replace next peer stdin (our stdout) by our stdin
         Element nextPeer = stdout.getSink();
         nextPeer.replaceLink(stdout, stdin);
         stdout.drop();
 
+        // Drop all other links
         for (Object link : inputPads.values().toArray())
             ((Link)link).drop();
         for (Object link : outputPads.values().toArray())

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
old mode 100644
new mode 100755
index 7a55ea8..e66899d
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
@@ -20,10 +20,12 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import streamer.debug.FakeSource;
+
 public class OutputStreamSink extends BaseElement {
 
     protected OutputStream os;
-    protected SocketWrapper socketWrapper;
+    protected SocketWrapperImpl socketWrapper;
 
     public OutputStreamSink(String id) {
         super(id);
@@ -34,7 +36,7 @@ public class OutputStreamSink extends BaseElement {
         this.os = os;
     }
 
-    public OutputStreamSink(String id, SocketWrapper socketWrapper) {
+    public OutputStreamSink(String id, SocketWrapperImpl socketWrapper) {
         super(id);
         this.socketWrapper = socketWrapper;
     }
@@ -68,26 +70,26 @@ public class OutputStreamSink extends BaseElement {
     @Override
     public void handleEvent(Event event, Direction direction) {
         switch (event) {
-            case SOCKET_UPGRADE_TO_SSL:
-                socketWrapper.upgradeToSsl();
-                break;
-            default:
-                super.handleEvent(event, direction);
+        case SOCKET_UPGRADE_TO_SSL:
+            socketWrapper.upgradeToSsl();
+            break;
+        default:
+            super.handleEvent(event, direction);
         }
     }
 
     @Override
     public void setLink(String padName, Link link, Direction direction) {
         switch (direction) {
-            case IN:
-                super.setLink(padName, link, direction);
-
-                if (os == null)
-                    // Pause links until data stream will be ready
-                    link.pause();
-                break;
-            case OUT:
-                throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+        case IN:
+            super.setLink(padName, link, direction);
+
+            if (os == null)
+                // Pause links until data stream will be ready
+                link.pause();
+            break;
+        case OUT:
+            throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
         }
     }
 
@@ -126,10 +128,10 @@ public class OutputStreamSink extends BaseElement {
     public static void main(String args[]) {
         Element source = new FakeSource("source") {
             {
-                this.verbose = true;
-                this.numBuffers = 3;
-                this.incommingBufLength = 5;
-                this.delay = 100;
+                verbose = true;
+                numBuffers = 3;
+                incommingBufLength = 5;
+                delay = 100;
             }
         };
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
old mode 100644
new mode 100755
index d2e52dd..0f6089b
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
@@ -48,6 +48,8 @@ public interface Pipeline extends Element {
      * so when pipeline will be connected with other elements, outside of this
      * pipeline, they will be connected to IN and OUT elements.
      *
+     * Empty names are skipped.
+     *
      * Example:
      *
      * <pre>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
old mode 100644
new mode 100755
index 4e6fc0a..299d0a4
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
@@ -16,10 +16,15 @@
 // under the License.
 package streamer;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import streamer.debug.FakeSink;
+import streamer.debug.FakeSource;
+
 public class PipelineImpl implements Pipeline {
 
     protected String id;
@@ -51,11 +56,11 @@ public class PipelineImpl implements Pipeline {
     @Override
     public Set<String> getPads(Direction direction) {
         switch (direction) {
-            case IN:
-                return elements.get(IN).getPads(direction);
+        case IN:
+            return elements.get(IN).getPads(direction);
 
-            case OUT:
-                return elements.get(OUT).getPads(direction);
+        case OUT:
+            return elements.get(OUT).getPads(direction);
         }
         return null;
     }
@@ -71,8 +76,8 @@ public class PipelineImpl implements Pipeline {
             int outPadsNumber = element.getPads(Direction.OUT).size();
             int inPadsNumber = element.getPads(Direction.IN).size();
             if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
-                throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: " +
-                    element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+                throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: "
+                        + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
         }
 
         // Check OUT element
@@ -81,8 +86,8 @@ public class PipelineImpl implements Pipeline {
             int outPadsNumber = element.getPads(Direction.OUT).size();
             int inPadsNumber = element.getPads(Direction.IN).size();
             if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
-                throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: " +
-                    element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+                throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: "
+                        + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
         }
 
     }
@@ -127,12 +132,12 @@ public class PipelineImpl implements Pipeline {
     @Override
     public void handleEvent(Event event, Direction direction) {
         switch (direction) {
-            case IN:
-                get(IN).handleEvent(event, direction);
-                break;
-            case OUT:
-                get(OUT).handleEvent(event, direction);
-                break;
+        case IN:
+            get(IN).handleEvent(event, direction);
+            break;
+        case OUT:
+            get(OUT).handleEvent(event, direction);
+            break;
         }
     }
 
@@ -142,8 +147,8 @@ public class PipelineImpl implements Pipeline {
             String id = element.getId();
 
             if (this.elements.containsKey(id))
-                throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: " +
-                    this.elements.get(id) + ".");
+                throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: "
+                        + this.elements.get(id) + ".");
 
             this.elements.put(id, element);
         }
@@ -152,6 +157,8 @@ public class PipelineImpl implements Pipeline {
     @Override
     public void link(String... elementNames) {
 
+        elementNames = filterOutEmptyStrings(elementNames);
+
         if (elementNames.length < 2)
             throw new RuntimeException("At least two elements are necessary to create link between them.");
 
@@ -180,8 +187,8 @@ public class PipelineImpl implements Pipeline {
             elements[i] = get(elementName);
 
             if (elements[i] == null)
-                throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: " +
-                    this + ".");
+                throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: "
+                        + this + ".");
 
             i++;
         }
@@ -204,6 +211,30 @@ public class PipelineImpl implements Pipeline {
         }
     }
 
+    /**
+     * Filter out empty strings from array and return new array with non-empty
+     * elements only. If array contains no empty string, returns same array.
+     */
+    private String[] filterOutEmptyStrings(String[] strings) {
+
+        boolean found = false;
+        for (String string : strings) {
+            if (string == null || string.isEmpty()) {
+                found = true;
+                break;
+            }
+        }
+
+        if (!found)
+            return strings;
+
+        List<String> filteredStrings = new ArrayList<String>(strings.length);
+        for (String string : strings)
+            if (string != null && !string.isEmpty())
+                filteredStrings.add(string);
+        return filteredStrings.toArray(new String[filteredStrings.size()]);
+    }
+
     @Override
     public void addAndLink(Element... elements) {
         add(elements);
@@ -281,20 +312,20 @@ public class PipelineImpl implements Pipeline {
         // Create elements
         pipeline.add(new FakeSource("source") {
             {
-                this.incommingBufLength = 3;
-                this.numBuffers = 10;
-                this.delay = 100;
+                incommingBufLength = 3;
+                numBuffers = 10;
+                delay = 100;
             }
         });
         pipeline.add(new BaseElement("tee"));
         pipeline.add(new FakeSink("sink") {
             {
-                this.verbose = true;
+                verbose = true;
             }
         });
         pipeline.add(new FakeSink("sink2") {
             {
-                this.verbose = true;
+                verbose = true;
             }
         });
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
old mode 100644
new mode 100755
index 23c57e0..910e073
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
@@ -19,6 +19,9 @@ package streamer;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import streamer.debug.FakeSink;
+import streamer.debug.FakeSource;
+
 /**
  * Message queue for safe transfer of packets between threads.
  */
@@ -30,7 +33,6 @@ public class Queue extends BaseElement {
         super(id);
     }
 
-    @SuppressWarnings("incomplete-switch")
     @Override
     public void poll(boolean block) {
         try {
@@ -67,12 +69,12 @@ public class Queue extends BaseElement {
     @Override
     public void handleEvent(Event event, Direction direction) {
         switch (event) {
-            case LINK_SWITCH_TO_PULL_MODE:
-                // Do not propagate this event, because this element is boundary between
-                // threads
-                break;
-            default:
-                super.handleEvent(event, direction);
+        case LINK_SWITCH_TO_PULL_MODE:
+            // Do not propagate this event, because this element is boundary between
+            // threads
+            break;
+        default:
+            super.handleEvent(event, direction);
         }
     }
 
@@ -104,17 +106,17 @@ public class Queue extends BaseElement {
 
         Element source1 = new FakeSource("source1") {
             {
-                this.delay = 100;
-                this.numBuffers = 10;
-                this.incommingBufLength = 10;
+                delay = 100;
+                numBuffers = 10;
+                incommingBufLength = 10;
             }
         };
 
         Element source2 = new FakeSource("source2") {
             {
-                this.delay = 100;
-                this.numBuffers = 10;
-                this.incommingBufLength = 10;
+                delay = 100;
+                numBuffers = 10;
+                incommingBufLength = 10;
             }
         };
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
old mode 100644
new mode 100755
index dfffd35..22d436e
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
@@ -16,224 +16,20 @@
 // under the License.
 package streamer;
 
-import static rdpclient.MockServer.Packet.PacketType.CLIENT;
-import static rdpclient.MockServer.Packet.PacketType.SERVER;
-
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.HashMap;
-
-import javax.net.SocketFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManager;
-
-import rdpclient.MockServer;
-import rdpclient.MockServer.Packet;
-import rdpclient.TrustAllX509TrustManager;
-
-public class SocketWrapper extends PipelineImpl {
-
-    protected InputStreamSource source;
-    protected OutputStreamSink sink;
-    protected Socket socket;
-    protected InetSocketAddress address;
-
-    protected SSLSocket sslSocket;
-
-    //protected String SSL_VERSION_TO_USE = "TLSv1.2";
-    /*DEBUG*/protected static final String SSL_VERSION_TO_USE = "TLSv1";
-
-    public SocketWrapper(String id) {
-        super(id);
-    }
-
-    @Override
-    protected HashMap<String, Element> initElementMap(String id) {
-        HashMap<String, Element> map = new HashMap<String, Element>();
-
-        source = new InputStreamSource(id + "." + OUT, this);
-        sink = new OutputStreamSink(id + "." + IN, this);
-
-        // Pass requests to read data to socket input stream
-        map.put(OUT, source);
 
-        // All incoming data, which is sent to this socket wrapper, will be sent
-        // to socket remote
-        map.put(IN, sink);
-
-        return map;
-    }
+public interface SocketWrapper extends Element {
 
     /**
      * Connect this socket wrapper to remote server and start main loop on
-     * IputStreamSource stdout link, to watch for incoming data, and
-     * OutputStreamSink stdin link, to pull for outgoing data.
-     *
-     * @param address
-     * @throws IOException
+     * Source stdout link, to watch for incoming data, and
+     * Sink stdin link, to pull for outgoing data.
      */
-    public void connect(InetSocketAddress address) throws IOException {
-        this.address = address;
-
-        // Connect socket to server
-        socket = SocketFactory.getDefault().createSocket();
-        try {
-            socket.connect(address);
-
-            InputStream is = socket.getInputStream();
-            source.setInputStream(is);
-
-            OutputStream os = socket.getOutputStream();
-            sink.setOutputStream(os);
-
-            // Start polling for data to send to remote sever
-            runMainLoop(IN, STDIN, true, true);
-
-            // Push incoming data from server to handlers
-            runMainLoop(OUT, STDOUT, false, false);
-
-        } finally {
-            socket.close();
-        }
-    }
-
-    @Override
-    public void handleEvent(Event event, Direction direction) {
-        switch (event) {
-            case SOCKET_UPGRADE_TO_SSL:
-                upgradeToSsl();
-                break;
-            default:
-                super.handleEvent(event, direction);
-                break;
-        }
-    }
-
-    public void upgradeToSsl() {
-
-        if (sslSocket != null)
-            // Already upgraded
-            return;
-
-        if (verbose)
-            System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
-
-        try {
-            // Use most secure implementation of SSL available now.
-            // JVM will try to negotiate TLS1.2, then will fallback to TLS1.0, if
-            // TLS1.2 is not supported.
-            SSLContext sslContext = SSLContext.getInstance(SSL_VERSION_TO_USE);
-
-            // Trust all certificates (FIXME: insecure)
-            sslContext.init(null, new TrustManager[] {new TrustAllX509TrustManager()}, null);
-
-            SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
-            sslSocket = (SSLSocket)sslSocketFactory.createSocket(socket, address.getHostName(), address.getPort(), true);
-            sslSocket.startHandshake();
-
-            InputStream sis = sslSocket.getInputStream();
-            source.setInputStream(sis);
-
-            OutputStream sos = sslSocket.getOutputStream();
-            sink.setOutputStream(sos);
-
-        } catch (Exception e) {
-            throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
-        }
-
-    }
-
-    @Override
-    public void validate() {
-        for (Element element : elements.values())
-            element.validate();
-
-        if (get(IN).getPads(Direction.IN).size() == 0)
-            throw new RuntimeException("[ " + this + "] Input of socket is not connected.");
-
-        if (get(OUT).getPads(Direction.OUT).size() == 0)
-            throw new RuntimeException("[ " + this + "] Output of socket is not connected.");
-
-    }
-
-    public void shutdown() {
-        try {
-            handleEvent(Event.STREAM_CLOSE, Direction.IN);
-        } catch (Exception e) {
-        }
-        try {
-            handleEvent(Event.STREAM_CLOSE, Direction.OUT);
-        } catch (Exception e) {
-        }
-        try {
-            if (sslSocket != null)
-                sslSocket.close();
-        } catch (Exception e) {
-        }
-        try {
-            socket.close();
-        } catch (Exception e) {
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "SocketWrapper(" + id + ")";
-    }
-
-    /**
-     * Example.
-     */
-    public static void main(String args[]) {
-        try {
-            System.setProperty("streamer.Link.debug", "true");
-            System.setProperty("streamer.Element.debug", "true");
-            System.setProperty("rdpclient.MockServer.debug", "true");
-
-            Pipeline pipeline = new PipelineImpl("echo client");
-
-            SocketWrapper socketWrapper = new SocketWrapper("socket");
-
-            pipeline.add(socketWrapper);
-            pipeline.add(new BaseElement("echo"));
-
-            pipeline.link("socket", "echo", "socket");
-
-            final byte[] mockData = new byte[] {0x01, 0x02, 0x03};
-            MockServer server = new MockServer(new Packet[] {new Packet("Server hello") {
-                {
-                    type = SERVER;
-                    data = mockData;
-                }
-            }, new Packet("Client hello") {
-                {
-                    type = CLIENT;
-                    data = mockData;
-                }
-            }, new Packet("Server hello") {
-                {
-                    type = SERVER;
-                    data = mockData;
-                }
-            }, new Packet("Client hello") {
-                {
-                    type = CLIENT;
-                    data = mockData;
-                }
-            }});
-            server.start();
-            InetSocketAddress address = server.getAddress();
+    public void connect(InetSocketAddress address) throws IOException;
 
-            socketWrapper.connect(address);
+    public void shutdown();
 
-        } catch (IOException e) {
-            e.printStackTrace(System.err);
-        }
+    void upgradeToSsl();
 
-    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
new file mode 100755
index 0000000..da89a0d
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import static streamer.debug.MockServer.Packet.PacketType.CLIENT;
+import static streamer.debug.MockServer.Packet.PacketType.SERVER;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+
+import streamer.debug.MockServer;
+import streamer.debug.MockServer.Packet;
+import streamer.ssl.SSLState;
+import streamer.ssl.TrustAllX509TrustManager;
+
+public class SocketWrapperImpl extends PipelineImpl implements SocketWrapper {
+
+    protected InputStreamSource source;
+    protected OutputStreamSink sink;
+    protected Socket socket;
+    protected InetSocketAddress address;
+
+    protected SSLSocket sslSocket;
+
+    protected String sslVersionToUse = "TLSv1.2";
+
+    protected SSLState sslState;
+
+    public SocketWrapperImpl(String id, SSLState sslState) {
+        super(id);
+        this.sslState = sslState;
+    }
+
+    @Override
+    protected HashMap<String, Element> initElementMap(String id) {
+        HashMap<String, Element> map = new HashMap<String, Element>();
+
+        source = new InputStreamSource(id + "." + OUT, this);
+        sink = new OutputStreamSink(id + "." + IN, this);
+
+        // Pass requests to read data to socket input stream
+        map.put(OUT, source);
+
+        // All incoming data, which is sent to this socket wrapper, will be sent
+        // to socket remote
+        map.put(IN, sink);
+
+        return map;
+    }
+
+    /**
+     * Connect this socket wrapper to remote server and start main loop on
+     * IputStreamSource stdout link, to watch for incoming data, and
+     * OutputStreamSink stdin link, to pull for outgoing data.
+     *
+     * @param address
+     * @throws IOException
+     */
+    @Override
+    public void connect(InetSocketAddress address) throws IOException {
+        this.address = address;
+
+        // Connect socket to server
+        socket = SocketFactory.getDefault().createSocket();
+        try {
+            socket.connect(address);
+
+            InputStream is = socket.getInputStream();
+            source.setInputStream(is);
+
+            OutputStream os = socket.getOutputStream();
+            sink.setOutputStream(os);
+
+            // Start polling for data to send to remote sever
+            runMainLoop(IN, STDIN, true, true);
+
+            // Push incoming data from server to handlers
+            runMainLoop(OUT, STDOUT, false, false);
+
+        } finally {
+            socket.close();
+        }
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        switch (event) {
+        case SOCKET_UPGRADE_TO_SSL:
+            upgradeToSsl();
+            break;
+        default:
+            super.handleEvent(event, direction);
+            break;
+        }
+    }
+
+    @Override
+    public void upgradeToSsl() {
+
+        if (sslSocket != null)
+            // Already upgraded
+            return;
+
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
+
+        try {
+            // Use most secure implementation of SSL available now.
+            // JVM will try to negotiate TLS1.2, then will fallback to TLS1.0, if
+            // TLS1.2 is not supported.
+            SSLContext sslContext = SSLContext.getInstance(sslVersionToUse);
+
+            // Trust all certificates (FIXME: insecure)
+            sslContext.init(null, new TrustManager[] {new TrustAllX509TrustManager(sslState)}, null);
+
+            SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+            sslSocket = (SSLSocket)sslSocketFactory.createSocket(socket, address.getHostName(), address.getPort(), true);
+
+            sslSocket.startHandshake();
+
+            InputStream sis = sslSocket.getInputStream();
+            source.setInputStream(sis);
+
+            OutputStream sos = sslSocket.getOutputStream();
+            sink.setOutputStream(sos);
+
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
+        }
+
+    }
+
+    @Override
+    public void validate() {
+        for (Element element : elements.values())
+            element.validate();
+
+        if (get(IN).getPads(Direction.IN).size() == 0)
+            throw new RuntimeException("[ " + this + "] Input of socket is not connected.");
+
+        if (get(OUT).getPads(Direction.OUT).size() == 0)
+            throw new RuntimeException("[ " + this + "] Output of socket is not connected.");
+
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.IN);
+        } catch (Exception e) {
+        }
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+        } catch (Exception e) {
+        }
+        try {
+            if (sslSocket != null)
+                sslSocket.close();
+        } catch (Exception e) {
+        }
+        try {
+            socket.close();
+        } catch (Exception e) {
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "SocketWrapper(" + id + ")";
+    }
+
+    /**
+     * Example.
+     */
+    public static void main(String args[]) {
+
+        try {
+            System.setProperty("streamer.Link.debug", "true");
+            System.setProperty("streamer.Element.debug", "true");
+            System.setProperty("rdpclient.MockServer.debug", "true");
+
+            Pipeline pipeline = new PipelineImpl("echo client");
+
+            SocketWrapperImpl socketWrapper = new SocketWrapperImpl("socket", null);
+
+            pipeline.add(socketWrapper);
+            pipeline.add(new BaseElement("echo"));
+            pipeline.add(new Queue("queue")); // To decouple input and output
+
+            pipeline.link("socket", "echo", "queue", "socket");
+
+            final byte[] mockData = new byte[] {0x01, 0x02, 0x03};
+            MockServer server = new MockServer(new Packet[] {new Packet("Server hello") {
+                {
+                    type = SERVER;
+                    data = mockData;
+                }
+            }, new Packet("Client hello") {
+                {
+                    type = CLIENT;
+                    data = mockData;
+                }
+            }, new Packet("Server hello") {
+                {
+                    type = SERVER;
+                    data = mockData;
+                }
+            }, new Packet("Client hello") {
+                {
+                    type = CLIENT;
+                    data = mockData;
+                }
+            }});
+            server.start();
+            InetSocketAddress address = server.getAddress();
+
+            /*DEBUG*/System.out.println("Address: " + address);
+            socketWrapper.connect(address);
+
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
old mode 100644
new mode 100755
index 2bd4919..94281d2
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
@@ -27,7 +27,7 @@ public class SyncLink implements Link {
      * avoid consuming of 100% of CPU in main loop in cases when link is pauses or
      * source element cannot produce data right now.
      */
-    protected static final long STANDARD_DELAY_FOR_EMPTY_PACKET = 10; // Milliseconds
+    public static final long STANDARD_DELAY_FOR_EMPTY_PACKET = 10; // Milliseconds
 
     /**
      * Delay for null packets in poll method when blocking is requested, in
@@ -46,7 +46,8 @@ public class SyncLink implements Link {
     protected String id = null;
 
     /**
-     * Buffer with data to hold because link is paused, or data is pushed back.
+     * Buffer with data to hold because link is paused, on hold, or data is pushed
+     * back from output element.
      */
     protected ByteBuffer cacheBuffer = null;
 
@@ -62,11 +63,6 @@ public class SyncLink implements Link {
     protected int packetNumber = 0;
 
     /**
-     * Set to true to hold all data in link until it will be set to false again.
-     */
-    protected boolean paused = false;
-
-    /**
      * Element to pull data from, when in pull mode.
      */
     protected Element source = null;
@@ -87,12 +83,24 @@ public class SyncLink implements Link {
      * Indicates that event STREAM_START is passed over this link, so main loop
      * can be started to pull data from source element.
      */
-    protected boolean start;
+    protected boolean started = false;
+
+    /**
+     * Set to true to hold all data in link until it will be set to false again.
+     */
+    protected boolean paused = false;
+
+    /**
+     * Used by pull() method to hold all data in this link to avoid recursion when
+     * source element is asked to push new data to it outputs.
+     */
+    protected boolean hold = false;
 
     /**
-     * Operate in pull mode.
+     * Operate in pull mode instead of default push mode. In pull mode, link will
+     * ask it source element for new data.
      */
-    protected boolean pullMode;
+    protected boolean pullMode = false;
 
     public SyncLink() {
     }
@@ -139,7 +147,7 @@ public class SyncLink implements Link {
      */
     @Override
     public void sendData(ByteBuffer buf) {
-        if (!paused && pullMode)
+        if (!hold && pullMode)
             throw new RuntimeException("[" + this + "] ERROR: link is not in push mode.");
 
         if (verbose)
@@ -162,7 +170,7 @@ public class SyncLink implements Link {
         // When data pushed back and length of data is less than length of full
         // packet, then feed data to sink element immediately
         while (cacheBuffer != null) {
-            if (paused) {
+            if (paused || hold) {
                 if (verbose)
                     System.out.println("[" + this + "] INFO: Transfer is paused. Data in cache buffer: " + cacheBuffer + ".");
 
@@ -172,8 +180,8 @@ public class SyncLink implements Link {
 
             if (expectedPacketSize > 0 && cacheBuffer.length < expectedPacketSize) {
                 if (verbose)
-                    System.out.println("[" + this + "] INFO: Transfer is suspended because available data is less than expected packet size. Expected packet size: " +
-                        expectedPacketSize + ", data in cache buffer: " + cacheBuffer + ".");
+                    System.out.println("[" + this + "] INFO: Transfer is suspended because available data is less than expected packet size. Expected packet size: "
+                            + expectedPacketSize + ", data in cache buffer: " + cacheBuffer + ".");
 
                 // Wait until rest of packet will be read
                 return;
@@ -203,43 +211,46 @@ public class SyncLink implements Link {
 
         // Shutdown main loop (if any) when STREAM_CLOSE event is received.
         switch (event) {
-            case STREAM_START: {
-                if (!start)
-                    start = true;
-                else
-                    // Event already sent trough this link
-                    return;
-                break;
-            }
-            case STREAM_CLOSE: {
-                if (!shutdown)
-                    shutdown = true;
-                else
-                    // Event already sent trough this link
-                    return;
-                break;
-            }
-            case LINK_SWITCH_TO_PULL_MODE: {
-                setPullMode();
-                break;
-            }
+        case STREAM_START: {
+            if (!started)
+                started = true;
+            else
+                // Event already sent trough this link
+                return;
+            break;
+        }
+        case STREAM_CLOSE: {
+            if (!shutdown)
+                shutdown = true;
+            else
+                // Event already sent trough this link
+                return;
+            break;
+        }
+        case LINK_SWITCH_TO_PULL_MODE: {
+            setPullMode();
+            break;
+        }
 
         }
 
         switch (direction) {
-            case IN:
-                source.handleEvent(event, direction);
-                break;
-            case OUT:
-                sink.handleEvent(event, direction);
-                break;
+        case IN:
+            source.handleEvent(event, direction);
+            break;
+        case OUT:
+            sink.handleEvent(event, direction);
+            break;
         }
     }
 
     @Override
     public ByteBuffer pull(boolean block) {
         if (!pullMode)
-            throw new RuntimeException("This link is not in pull mode.");
+            throw new RuntimeException("[" + this + "] ERROR: This link is not in pull mode.");
+
+        if (hold)
+            throw new RuntimeException("[" + this + "] ERROR: This link is already on hold, waiting for data to be pulled in. Circular reference?");
 
         if (paused) {
             if (verbose)
@@ -269,9 +280,12 @@ public class SyncLink implements Link {
 
         // Pause this link, so incoming data will not be sent to sink
         // immediately, then ask source element for more data
-        pause();
-        source.poll(block);
-        resume();
+        try {
+            hold = true;
+            source.poll(block);
+        } finally {
+            hold = false;
+        }
 
         // Can return something only when data was stored in buffer
         if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
@@ -289,10 +303,11 @@ public class SyncLink implements Link {
     @Override
     public Element setSink(Element sink) {
         if (sink != null && this.sink != null)
-            throw new RuntimeException("This link sink element is already set. Link: " + this + ", new sink: " + sink + ", existing sink: " + this.sink + ".");
+            throw new RuntimeException("[" + this + "] ERROR: This link sink element is already set. Link: " + this + ", new sink: " + sink + ", existing sink: "
+                    + this.sink + ".");
 
         if (sink == null && cacheBuffer != null)
-            throw new RuntimeException("Cannot drop link: cache is not empty. Link: " + this + ", cache: " + cacheBuffer);
+            throw new RuntimeException("[" + this + "] ERROR: Cannot drop link: cache is not empty. Link: " + this + ", cache: " + cacheBuffer);
 
         this.sink = sink;
 
@@ -302,7 +317,8 @@ public class SyncLink implements Link {
     @Override
     public Element setSource(Element source) {
         if (this.source != null && source != null)
-            throw new RuntimeException("This link source element is already set. Link: " + this + ", new source: " + source + ", existing source: " + this.source + ".");
+            throw new RuntimeException("[" + this + "] ERROR: This link source element is already set. Link: " + this + ", new source: " + source
+                    + ", existing source: " + this.source + ".");
 
         this.source = source;
         return source;
@@ -321,7 +337,7 @@ public class SyncLink implements Link {
     @Override
     public void pause() {
         if (paused)
-            throw new RuntimeException("Link is already paused.");
+            throw new RuntimeException("[" + this + "] ERROR: Link is already paused.");
 
         paused = true;
 
@@ -343,7 +359,7 @@ public class SyncLink implements Link {
     @Override
     public void run() {
         // Wait until even STREAM_START will arrive
-        while (!start) {
+        while (!started) {
             delay();
         }
 
@@ -374,8 +390,7 @@ public class SyncLink implements Link {
         try {
             Thread.sleep(delay);
         } catch (InterruptedException e) {
-            e.printStackTrace(System.err);
-            throw new RuntimeException("Interrupted in main loop.", e);
+            throw new RuntimeException("[" + this + "] ERROR: Interrupted in main loop.", e);
         }
     }
 
@@ -384,16 +399,16 @@ public class SyncLink implements Link {
         if (verbose)
             System.out.println("[" + this + "] INFO: Switching to PULL mode.");
 
-        this.pullMode = true;
+        pullMode = true;
     }
 
     @Override
     public void drop() {
         if (pullMode)
-            throw new RuntimeException("Cannot drop link in pull mode.");
+            throw new RuntimeException("[" + this + "] ERROR: Cannot drop link in pull mode.");
 
         if (cacheBuffer != null)
-            throw new RuntimeException("Cannot drop link when cache conatains data: " + cacheBuffer + ".");
+            throw new RuntimeException("[" + this + "] ERROR: Cannot drop link when cache conatains data: " + cacheBuffer + ".");
 
         source.dropLink(this);
         sink.dropLink(this);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
new file mode 100755
index 0000000..edfe8db
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.DataSource;
+import streamer.Direction;
+import streamer.Event;
+import streamer.Link;
+
+public class AprSocketSink extends BaseElement {
+
+    protected AprSocketWrapperImpl socketWrapper;
+    protected Long socket;
+
+    public AprSocketSink(String id) {
+        super(id);
+    }
+
+    public AprSocketSink(String id, AprSocketWrapperImpl socketWrapper) {
+        super(id);
+        this.socketWrapper = socketWrapper;
+    }
+
+    public void setSocket(long socket) {
+        this.socket = socket;
+
+        // Resume links
+        resumeLinks();
+    }
+
+    /**
+     * Send incoming data to stream.
+     */
+    @Override
+    public void handleData(ByteBuffer buf, Link link) {
+        if (buf == null)
+            return;
+
+        if (socketWrapper.shutdown)
+            return;
+
+        try {
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Writing data to stream: " + buf + ".");
+
+            // FIXME: If pull is destroyed or socket is closed, segfault will happen here
+            Socket.send(socket, buf.data, buf.offset, buf.length);
+        } catch (Exception e) {
+            System.err.println("[" + this + "] ERROR: " + e.getMessage());
+            closeStream();
+        }
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        switch (event) {
+        case SOCKET_UPGRADE_TO_SSL:
+            socketWrapper.upgradeToSsl();
+            break;
+        case LINK_SWITCH_TO_PULL_MODE:
+            throw new RuntimeException("[" + this + "] ERROR: Unexpected event: sink recived LINK_SWITCH_TO_PULL_MODE event.");
+        default:
+            super.handleEvent(event, direction);
+        }
+    }
+
+    @Override
+    public void setLink(String padName, Link link, Direction direction) {
+        switch (direction) {
+        case IN:
+            super.setLink(padName, link, direction);
+
+            if (socket == null)
+                // Pause links until data stream will be ready
+                link.pause();
+            break;
+        case OUT:
+            throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+        }
+    }
+
+    private void resumeLinks() {
+        for (DataSource source : inputPads.values())
+            ((Link)source).resume();
+    }
+
+    @Override
+    protected void onClose() {
+        closeStream();
+    }
+
+    private void closeStream() {
+        if (socketWrapper.shutdown)
+            return;
+
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Closing stream.");
+
+        try {
+            sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+        } catch (Exception e) {
+        }
+        socketWrapper.shutdown();
+    }
+
+    @Override
+    public String toString() {
+        return "AprSocketSink(" + id + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
new file mode 100755
index 0000000..0298349
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.DataSink;
+import streamer.Direction;
+import streamer.Event;
+import streamer.Link;
+
+/**
+ * Source element, which reads data from InputStream.
+ */
+public class AprSocketSource extends BaseElement {
+
+    protected AprSocketWrapperImpl socketWrapper;
+    protected Long socket;
+
+    public AprSocketSource(String id) {
+        super(id);
+    }
+
+    public AprSocketSource(String id, AprSocketWrapperImpl socketWrapper) {
+        super(id);
+        this.socketWrapper = socketWrapper;
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        switch (event) {
+        case SOCKET_UPGRADE_TO_SSL:
+            socketWrapper.upgradeToSsl();
+            break;
+        case LINK_SWITCH_TO_PULL_MODE:
+            // Do nothing - this is the source
+            break;
+        default:
+            super.handleEvent(event, direction);
+        }
+    }
+
+    @Override
+    public void setLink(String padName, Link link, Direction direction) {
+        switch (direction) {
+        case OUT:
+            super.setLink(padName, link, direction);
+
+            if (socket == null) {
+                // Pause links until data stream will be ready
+                link.pause();
+            }
+            break;
+        case IN:
+            throw new RuntimeException("Cannot assign link to input pad in source element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+        }
+    }
+
+    public void setSocket(long socket) {
+        this.socket = socket;
+
+        // Resume links
+        resumeLinks();
+    }
+
+    private void resumeLinks() {
+        for (DataSink sink : outputPads.values())
+            ((Link)sink).resume();
+    }
+
+    /**
+     * Read data from input stream.
+     */
+    @Override
+    public void poll(boolean block) {
+        if (socketWrapper.shutdown) {
+            socketWrapper.destroyPull();
+            return;
+        }
+
+        try {
+            // Create buffer of recommended size and with default offset
+            ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Reading data from stream.");
+
+            // FIXME: If pull is destroyed or socket is closed, segfault will happen here
+            int actualLength = (block) ? // Blocking read
+                    Socket.recv(socket, buf.data, buf.offset, buf.data.length - buf.offset)
+                    : // Non-blocking read
+                        Socket.recvt(socket, buf.data, buf.offset, buf.data.length - buf.offset, 1);
+
+                    if (socketWrapper.shutdown) {
+                        socketWrapper.destroyPull();
+                        return;
+                    }
+
+                    if (actualLength < 0) {
+                        if (verbose)
+                            System.out.println("[" + this + "] INFO: End of stream.");
+
+                        buf.unref();
+                        closeStream();
+                        sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+                        return;
+                    }
+
+                    if (actualLength == 0) {
+                        if (verbose)
+                            System.out.println("[" + this + "] INFO: Empty buffer is read from stream.");
+
+                        buf.unref();
+                        return;
+                    }
+
+                    buf.length = actualLength;
+
+                    if (verbose)
+                        System.out.println("[" + this + "] INFO: Data read from stream: " + buf + ".");
+
+                    pushDataToAllOuts(buf);
+
+        } catch (Exception e) {
+            System.err.println("[" + this + "] ERROR: " + e.getMessage());
+            closeStream();
+        }
+    }
+
+    @Override
+    protected void onClose() {
+        closeStream();
+    }
+
+    private void closeStream() {
+
+        if (socketWrapper.shutdown)
+            return;
+
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Closing stream.");
+
+        try {
+            sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+        } catch (Exception e) {
+        }
+        socketWrapper.shutdown();
+    }
+
+    @Override
+    public String toString() {
+        return "AprSocketSource(" + id + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
new file mode 100755
index 0000000..2ee426b
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import static streamer.debug.MockServer.Packet.PacketType.CLIENT;
+import static streamer.debug.MockServer.Packet.PacketType.SERVER;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.apache.tomcat.jni.Address;
+import org.apache.tomcat.jni.Error;
+import org.apache.tomcat.jni.Library;
+import org.apache.tomcat.jni.Pool;
+import org.apache.tomcat.jni.SSL;
+import org.apache.tomcat.jni.SSLContext;
+import org.apache.tomcat.jni.SSLSocket;
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Event;
+import streamer.Pipeline;
+import streamer.PipelineImpl;
+import streamer.Queue;
+import streamer.SocketWrapper;
+import streamer.debug.MockServer;
+import streamer.debug.MockServer.Packet;
+import streamer.ssl.SSLState;
+import sun.security.x509.X509CertImpl;
+
+public class AprSocketWrapperImpl extends PipelineImpl implements SocketWrapper {
+
+    static {
+        try {
+            Library.initialize(null);
+            SSL.initialize(null);
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot load Tomcat Native Library (Apache Portable Runtime).", e);
+        }
+    }
+
+    private final SSLState sslState;
+
+    final long pool = Pool.create(0);
+    long inetAddress;
+    long socket;
+    private AprSocketSource source;
+    private AprSocketSink sink;
+
+    boolean shutdown = false;
+
+    private final boolean shutdowned = false;
+
+    public AprSocketWrapperImpl(String id, SSLState sslState) {
+        super(id);
+        this.sslState = sslState;
+    }
+
+    @Override
+    protected HashMap<String, Element> initElementMap(String id) {
+        HashMap<String, Element> map = new HashMap<String, Element>();
+
+        source = new AprSocketSource(id + "." + OUT, this);
+        sink = new AprSocketSink(id + "." + IN, this);
+
+        // Pass requests to read data to socket input stream
+        map.put(OUT, source);
+
+        // All incoming data, which is sent to this socket wrapper, will be sent
+        // to socket remote
+        map.put(IN, sink);
+
+        return map;
+    }
+
+    /**
+     * Connect this socket wrapper to remote server and start main loop on
+     * AprSocketSource stdout link, to watch for incoming data, and
+     * AprSocketSink stdin link, to pull for outgoing data.
+     */
+    @Override
+    public void connect(InetSocketAddress address) throws IOException {
+        try {
+            inetAddress = Address.info(address.getHostName(), Socket.APR_UNSPEC, address.getPort(), 0, pool);
+            socket = Socket.create(Address.getInfo(inetAddress).family, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
+        } catch (Exception e) {
+            throw new IOException("[" + this + "] ERROR: Cannot create socket for \"" + address + "\".", e);
+        }
+
+        int ret = Socket.connect(socket, inetAddress);
+        if (ret != 0)
+            throw new IOException("[" + this + "] ERROR: Cannot connect to remote host \"" + address + "\": " + Error.strerror(ret));
+
+        source.setSocket(socket);
+        sink.setSocket(socket);
+
+        // Start polling for data to send to remote sever
+        runMainLoop(IN, STDIN, true, true);
+
+        // Push incoming data from server to handlers
+        runMainLoop(OUT, STDOUT, false, false);
+
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        switch (event) {
+        case SOCKET_UPGRADE_TO_SSL:
+            upgradeToSsl();
+            break;
+        default:
+            super.handleEvent(event, direction);
+            break;
+        }
+    }
+
+    @Override
+    public void validate() {
+
+        if (getPads(Direction.IN).size() == 0)
+            throw new RuntimeException("[ " + this + "] BUG: Input of socket is not connected.");
+
+        if (getPads(Direction.OUT).size() == 0)
+            throw new RuntimeException("[ " + this + "] BUG: Output of socket is not connected.");
+
+    }
+
+    @Override
+    public void upgradeToSsl() {
+
+        try {
+            long sslContext;
+            try {
+                sslContext = SSLContext.make(pool, SSL.SSL_PROTOCOL_TLSV1, SSL.SSL_MODE_CLIENT);
+            } catch (Exception e) {
+                throw new RuntimeException("Cannot create SSL context using Tomcat native library.", e);
+            }
+
+            SSLContext.setOptions(sslContext, SSL.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS | SSL.SSL_OP_TLS_BLOCK_PADDING_BUG | SSL.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER
+                    | SSL.SSL_OP_MSIE_SSLV2_RSA_PADDING);
+            // FIXME: verify certificate by default
+            SSLContext.setVerify(sslContext, SSL.SSL_CVERIFY_NONE, 0);
+            int ret;
+            try {
+                ret = SSLSocket.attach(sslContext, socket);
+            } catch (Exception e) {
+                throw new RuntimeException("[" + this + "] ERROR: Cannot attach SSL context to socket: ", e);
+            }
+            if (ret != 0)
+                throw new RuntimeException("[" + this + "] ERROR: Cannot attach SSL context to socket(" + ret + "): " + SSL.getLastError());
+
+            try {
+                ret = SSLSocket.handshake(socket);
+            } catch (Exception e) {
+                throw new RuntimeException("[" + this + "] ERROR: Cannot make SSL handshake with server: ", e);
+            }
+            if (ret != 0 && ret != 20014) // 20014: bad certificate signature FIXME: show prompt for self signed certificate
+                throw new RuntimeException("[" + this + "] ERROR: Cannot make SSL handshake with server(" + ret + "): " + SSL.getLastError());
+
+            try {
+                byte[] key = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT);
+                //*DEBUG*/System.out.println("DEBUG: Server cert:\n"+new ByteBuffer(key).dump());
+                sslState.serverCertificateSubjectPublicKeyInfo = new X509CertImpl(key).getPublicKey().getEncoded();
+            } catch (Exception e) {
+                throw new RuntimeException("[" + this + "] ERROR: Cannot get server public key: ", e);
+            }
+
+        } catch (RuntimeException e) {
+            shutdown();
+            throw e;
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        if (shutdown)
+            return;
+
+        shutdown = true;
+
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.IN);
+        } catch (Exception e) {
+        }
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+        } catch (Exception e) {
+        }
+    }
+
+    void destroyPull() {
+        if (shutdowned)
+            return;
+
+        // Causes segfault in AprSocketSource.poll() method, so this function must be called from it
+        try {
+            Socket.close(socket);
+            // or
+            // Socket.shutdown(socket, Socket.APR_SHUTDOWN_READWRITE);
+            Pool.destroy(pool);
+        } catch (Exception e) {
+        }
+
+    }
+
+    @Override
+    public String toString() {
+        return "AprSocketWrapper(" + id + ")";
+    }
+
+    /**
+     * Example.
+     */
+    public static void main(String args[]) {
+
+        try {
+            System.setProperty("streamer.Link.debug", "true");
+            System.setProperty("streamer.Element.debug", "true");
+            System.setProperty("rdpclient.MockServer.debug", "true");
+
+            Pipeline pipeline = new PipelineImpl("echo client");
+
+            AprSocketWrapperImpl socketWrapper = new AprSocketWrapperImpl("socket", null);
+
+            pipeline.add(socketWrapper);
+            pipeline.add(new BaseElement("echo"));
+            pipeline.add(new Queue("queue")); // To decouple input and output
+
+            pipeline.link("socket", "echo", "queue", "socket");
+
+            final byte[] mockData = new byte[] {0x01, 0x02, 0x03};
+            MockServer server = new MockServer(new Packet[] {new Packet("Server hello") {
+                {
+                    type = SERVER;
+                    data = mockData;
+                }
+            }, new Packet("Client hello") {
+                {
+                    type = CLIENT;
+                    data = mockData;
+                }
+            }, new Packet("Server hello") {
+                {
+                    type = SERVER;
+                    data = mockData;
+                }
+            }, new Packet("Client hello") {
+                {
+                    type = CLIENT;
+                    data = mockData;
+                }
+            }});
+            server.start();
+            InetSocketAddress address = server.getAddress();
+
+            socketWrapper.connect(address);
+
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
new file mode 100755
index 0000000..67e2dbd
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.bco;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.SecureRandom;
+import java.security.Security;
+
+import org.bouncycastle.asn1.x509.X509CertificateStructure;
+import org.bouncycastle.crypto.tls.CertificateVerifyer;
+import org.bouncycastle.crypto.tls.TlsProtocolHandler;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+import streamer.Direction;
+import streamer.Event;
+import streamer.SocketWrapperImpl;
+import streamer.ssl.SSLState;
+
+@SuppressWarnings("deprecation")
+public class BcoSocketWrapperImpl extends SocketWrapperImpl {
+
+    static {
+        Security.addProvider(new BouncyCastleProvider());
+    }
+
+    private TlsProtocolHandler bcoSslSocket;
+
+    public BcoSocketWrapperImpl(String id, SSLState sslState) {
+        super(id, sslState);
+    }
+
+    @Override
+    public void upgradeToSsl() {
+
+        if (sslSocket != null)
+            // Already upgraded
+            return;
+
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
+
+        try {
+
+            SecureRandom secureRandom = new SecureRandom();
+            bcoSslSocket = new TlsProtocolHandler(socket.getInputStream(), socket.getOutputStream(), secureRandom);
+
+            CertificateVerifyer client = new CertificateVerifyer() {
+
+                @Override
+                public boolean isValid(X509CertificateStructure[] chain) {
+
+                    try {
+                        if (sslState != null) {
+                            sslState.serverCertificateSubjectPublicKeyInfo = chain[0].getSubjectPublicKeyInfo().getEncoded();
+                        }
+                    } catch (IOException e) {
+                        throw new RuntimeException("Cannot get server public key.", e);
+                    }
+
+                    return true;
+                }
+            };
+            bcoSslSocket.connect(client);
+
+            InputStream sis = bcoSslSocket.getInputStream();
+            source.setInputStream(sis);
+
+            OutputStream sos = bcoSslSocket.getOutputStream();
+            sink.setOutputStream(sos);
+
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
+        }
+
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.IN);
+        } catch (Exception e) {
+        }
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+        } catch (Exception e) {
+        }
+        try {
+            if (bcoSslSocket != null)
+                bcoSslSocket.close();
+        } catch (Exception e) {
+        }
+        try {
+            socket.close();
+        } catch (Exception e) {
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "BcoSocketWrapper(" + id + ")";
+    }
+
+}


Mime
View raw message