cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From devd...@apache.org
Subject [04/22] CLOUDSTACK-5344: Update to allow rdp console to access hyper-v vm virtual framebuffer.
Date Mon, 23 Dec 2013 09:12:58 GMT
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
old mode 100644
new mode 100755
index 32c14bb..696402a
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
@@ -22,381 +22,393 @@ package streamer;
  */
 public class SyncLink implements Link {
 
-  /**
-   * When null packet is pulled from source element, then make slight delay to
-   * avoid consuming of 100% of CPU in main loop in cases when link is pauses or
-   * source element cannot produce data right now.
-   */
-  protected static final long STANDARD_DELAY_FOR_EMPTY_PACKET = 10; // Milliseconds
-
-  /**
-   * Delay for null packets in poll method when blocking is requested, in
-   * milliseconds.
-   */
-  protected long delay = STANDARD_DELAY_FOR_EMPTY_PACKET;
-
-  /**
-   * Set to true to print debugging messages.
-   */
-  protected boolean verbose = System.getProperty("streamer.Link.debug", "false").equals("true");;
-
-  /**
-   * ID of this link.
-   */
-  protected String id = null;
-
-  /**
-   * Buffer with data to hold because link is paused, or data is pushed back.
-   */
-  protected ByteBuffer cacheBuffer = null;
-
-  /**
-   * Size of expected packet. Data must be hold in link until full packet will
-   * be read.
-   */
-  protected int expectedPacketSize = 0;
-
-  /**
-   * Number of packets and packet header transferred to element.
-   */
-  protected int packetNumber = 0;
-
-  /**
-   * Set to true to hold all data in link until it will be set to false again.
-   */
-  protected boolean paused = false;
-
-  /**
-   * Element to pull data from, when in pull mode.
-   */
-  protected Element source = null;
-
-  /**
-   * Element to send data to in both pull and push modes.
-   */
-  protected Element sink = null;
-
-  /**
-   * When in loop, indicates that loop must be stopped.
-   * 
-   * @see run()
-   */
-  private boolean shutdown = false;
-
-  /**
-   * Indicates that event STREAM_START is passed over this link, so main loop
-   * can be started to pull data from source element.
-   */
-  protected boolean start;
-
-  /**
-   * Operate in pull mode.
-   */
-  protected boolean pullMode;
-
-  public SyncLink() {
-  }
-
-  public SyncLink(String id) {
-    this.id = id;
-  }
-
-  @Override
-  public void pushBack(ByteBuffer buf) {
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Buffer pushed back: " + buf + ".");
-
-    if (cacheBuffer != null) {
-      ByteBuffer tmp = cacheBuffer.join(buf);
-      cacheBuffer.unref();
-      cacheBuffer = tmp;
-    } else {
-      cacheBuffer = buf;
-      cacheBuffer.ref();
+    /**
+     * When null packet is pulled from source element, then make slight delay to
+     * avoid consuming of 100% of CPU in main loop in cases when link is pauses or
+     * source element cannot produce data right now.
+     */
+    public static final long STANDARD_DELAY_FOR_EMPTY_PACKET = 10; // Milliseconds
+
+    /**
+     * Delay for null packets in poll method when blocking is requested, in
+     * milliseconds.
+     */
+    protected long delay = STANDARD_DELAY_FOR_EMPTY_PACKET;
+
+    /**
+     * Set to true to print debugging messages.
+     */
+    protected boolean verbose = System.getProperty("streamer.Link.debug", "false").equals("true");;
+
+    /**
+     * ID of this link.
+     */
+    protected String id = null;
+
+    /**
+     * Buffer with data to hold because link is paused, on hold, or data is pushed
+     * back from output element.
+     */
+    protected ByteBuffer cacheBuffer = null;
+
+    /**
+     * Size of expected packet. Data must be hold in link until full packet will
+     * be read.
+     */
+    protected int expectedPacketSize = 0;
+
+    /**
+     * Number of packets and packet header transferred to element.
+     */
+    protected int packetNumber = 0;
+
+    /**
+     * Element to pull data from, when in pull mode.
+     */
+    protected Element source = null;
+
+    /**
+     * Element to send data to in both pull and push modes.
+     */
+    protected Element sink = null;
+
+    /**
+     * When in loop, indicates that loop must be stopped.
+     * 
+     * @see run()
+     */
+    private boolean shutdown = false;
+
+    /**
+     * Indicates that event STREAM_START is passed over this link, so main loop
+     * can be started to pull data from source element.
+     */
+    protected boolean started = false;
+
+    /**
+     * Set to true to hold all data in link until it will be set to false again.
+     */
+    protected boolean paused = false;
+
+    /**
+     * Used by pull() method to hold all data in this link to avoid recursion when
+     * source element is asked to push new data to it outputs.
+     */
+    protected boolean hold = false;
+
+    /**
+     * Operate in pull mode instead of default push mode. In pull mode, link will
+     * ask it source element for new data.
+     */
+    protected boolean pullMode = false;
+
+    public SyncLink() {
     }
 
-    resetCursor();
-  }
-
-  private void resetCursor() {
-    // Reset cursor
-    cacheBuffer.cursor = 0;
-  }
-
-  @Override
-  public void pushBack(ByteBuffer buf, int lengthOfFullPacket) {
-    pushBack(buf);
-    expectedPacketSize = lengthOfFullPacket;
-  }
-
-  @Override
-  public String toString() {
-    return "SyncLink(" + ((id != null) ? id + ", " : "") + source + ":" + sink + ")";
-  }
-
-  /**
-   * Push data to sink. Call with null to push cached data.
-   */
-  @Override
-  public void sendData(ByteBuffer buf) {
-    if (!paused && pullMode)
-      throw new RuntimeException("[" + this + "] ERROR: link is not in push mode.");
-
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Incoming buffer: " + buf + ".");
-
-    if (buf == null && cacheBuffer == null)
-      return;
-
-    if (cacheBuffer != null && buf != null) {
-      // Join old data with fresh data
-      buf = cacheBuffer.join(buf);
-      cacheBuffer.unref();
-      cacheBuffer = buf;
+    public SyncLink(String id) {
+        this.id = id;
     }
 
-    // Store buffer in cache field to simplify following loop
-    if (buf != null)
-      cacheBuffer = buf;
-
-    // When data pushed back and length of data is less than length of full
-    // packet, then feed data to sink element immediately
-    while (cacheBuffer != null) {
-      if (paused) {
+    @Override
+    public void pushBack(ByteBuffer buf) {
         if (verbose)
-          System.out.println("[" + this + "] INFO: Transfer is paused. Data in cache buffer: " + cacheBuffer + ".");
+            System.out.println("[" + this + "] INFO: Buffer pushed back: " + buf + ".");
+
+        if (cacheBuffer != null) {
+            ByteBuffer tmp = cacheBuffer.join(buf);
+            cacheBuffer.unref();
+            cacheBuffer = tmp;
+        } else {
+            cacheBuffer = buf;
+            cacheBuffer.ref();
+        }
+
+        resetCursor();
+    }
+
+    private void resetCursor() {
+        // Reset cursor
+        cacheBuffer.cursor = 0;
+    }
 
-        // Wait until rest of packet will be read
-        return;
-      }
+    @Override
+    public void pushBack(ByteBuffer buf, int lengthOfFullPacket) {
+        pushBack(buf);
+        expectedPacketSize = lengthOfFullPacket;
+    }
+
+    @Override
+    public String toString() {
+        return "SyncLink(" + ((id != null) ? id + ", " : "") + source + ":" + sink + ")";
+    }
+
+    /**
+     * Push data to sink. Call with null to push cached data.
+     */
+    @Override
+    public void sendData(ByteBuffer buf) {
+        if (!hold && pullMode)
+            throw new RuntimeException("[" + this + "] ERROR: link is not in push mode.");
 
-      if (expectedPacketSize > 0 && cacheBuffer.length < expectedPacketSize) {
         if (verbose)
-          System.out.println("[" + this + "] INFO: Transfer is suspended because available data is less than expected packet size. Expected packet size: "
-              + expectedPacketSize + ", data in cache buffer: " + cacheBuffer + ".");
+            System.out.println("[" + this + "] INFO: Incoming buffer: " + buf + ".");
 
-        // Wait until rest of packet will be read
-        return;
-      }
+        if (buf == null && cacheBuffer == null)
+            return;
 
-      // Full packet or packet header is read, feed it to element
-      buf = cacheBuffer;
-      cacheBuffer = null;
-      expectedPacketSize = 0;
-      packetNumber++;
+        if (cacheBuffer != null && buf != null) {
+            // Join old data with fresh data
+            buf = cacheBuffer.join(buf);
+            cacheBuffer.unref();
+            cacheBuffer = buf;
+        }
 
-      if (sink == null)
-        throw new NullPointerException("[" + this + "] ERROR: Cannot send data to sink: sink is null. Data: " + buf + ".");
+        // Store buffer in cache field to simplify following loop
+        if (buf != null)
+            cacheBuffer = buf;
+
+        // When data pushed back and length of data is less than length of full
+        // packet, then feed data to sink element immediately
+        while (cacheBuffer != null) {
+            if (paused || hold) {
+                if (verbose)
+                    System.out.println("[" + this + "] INFO: Transfer is paused. Data in cache buffer: " + cacheBuffer + ".");
+
+                // Wait until rest of packet will be read
+                return;
+            }
+
+            if (expectedPacketSize > 0 && cacheBuffer.length < expectedPacketSize) {
+                if (verbose)
+                    System.out.println("[" + this + "] INFO: Transfer is suspended because available data is less than expected packet size. Expected packet size: "
+                            + expectedPacketSize + ", data in cache buffer: " + cacheBuffer + ".");
+
+                // Wait until rest of packet will be read
+                return;
+            }
+
+            // Full packet or packet header is read, feed it to element
+            buf = cacheBuffer;
+            cacheBuffer = null;
+            expectedPacketSize = 0;
+            packetNumber++;
+
+            if (sink == null)
+                throw new NullPointerException("[" + this + "] ERROR: Cannot send data to sink: sink is null. Data: " + buf + ".");
+
+            sink.handleData(buf, this);
+            // cacheBuffer and expectedPacketSize can be changed at this time
+        }
 
-      sink.handleData(buf, this);
-      // cacheBuffer and expectedPacketSize can be changed at this time
     }
 
-  }
+    @SuppressWarnings("incomplete-switch")
+    @Override
+    public void sendEvent(Event event, Direction direction) {
 
-  @SuppressWarnings("incomplete-switch")
-  @Override
-  public void sendEvent(Event event, Direction direction) {
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Event " + event + " is received.");
+
+        // Shutdown main loop (if any) when STREAM_CLOSE event is received.
+        switch (event) {
+        case STREAM_START: {
+            if (!started)
+                started = true;
+            else
+                // Event already sent trough this link
+                return;
+            break;
+        }
+        case STREAM_CLOSE: {
+            if (!shutdown)
+                shutdown = true;
+            else
+                // Event already sent trough this link
+                return;
+            break;
+        }
+        case LINK_SWITCH_TO_PULL_MODE: {
+            setPullMode();
+            break;
+        }
 
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Event " + event + " is received.");
+        }
 
-    // Shutdown main loop (if any) when STREAM_CLOSE event is received.
-    switch (event) {
-    case STREAM_START: {
-      if (!start)
-        start = true;
-      else
-        // Event already sent trough this link
-        return;
-      break;
-    }
-    case STREAM_CLOSE: {
-      if (!shutdown)
-        shutdown = true;
-      else
-        // Event already sent trough this link
-        return;
-      break;
-    }
-    case LINK_SWITCH_TO_PULL_MODE: {
-      setPullMode();
-      break;
+        switch (direction) {
+        case IN:
+            source.handleEvent(event, direction);
+            break;
+        case OUT:
+            sink.handleEvent(event, direction);
+            break;
+        }
     }
 
-    }
+    @Override
+    public ByteBuffer pull(boolean block) {
+        if (!pullMode)
+            throw new RuntimeException("[" + this + "] ERROR: This link is not in pull mode.");
+
+        if (hold)
+            throw new RuntimeException("[" + this + "] ERROR: This link is already on hold, waiting for data to be pulled in. Circular reference?");
+
+        if (paused) {
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Cannot pull, link is paused.");
+
+            // Make slight delay in such case, to avoid consuming 100% of CPU
+            if (block) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                }
+            }
+            return null;
+        }
+        // If data in cache can be sent immediately,
+        // then return it instead of asking for more data from source
+        if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Data pulled from cache buffer: " + cacheBuffer + ".");
+
+            ByteBuffer tmp = cacheBuffer;
+            cacheBuffer = null;
+            return tmp;
+        }
 
-    switch (direction) {
-    case IN:
-      source.handleEvent(event, direction);
-      break;
-    case OUT:
-      sink.handleEvent(event, direction);
-      break;
+        // Pause this link, so incoming data will not be sent to sink
+        // immediately, then ask source element for more data
+        try {
+            hold = true;
+            source.poll(block);
+        } finally {
+            hold = false;
+        }
+
+        // Can return something only when data was stored in buffer
+        if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Data pulled from source: " + cacheBuffer + ".");
+
+            ByteBuffer tmp = cacheBuffer;
+            cacheBuffer = null;
+            return tmp;
+        } else {
+            return null;
+        }
     }
-  }
 
-  @Override
-  public ByteBuffer pull(boolean block) {
-    if (!pullMode)
-      throw new RuntimeException("This link is not in pull mode.");
+    @Override
+    public Element setSink(Element sink) {
+        if (sink != null && this.sink != null)
+            throw new RuntimeException("[" + this + "] ERROR: This link sink element is already set. Link: " + this + ", new sink: " + sink + ", existing sink: "
+                    + this.sink + ".");
 
-    if (paused) {
-      if (verbose)
-        System.out.println("[" + this + "] INFO: Cannot pull, link is paused.");
+        if (sink == null && cacheBuffer != null)
+            throw new RuntimeException("[" + this + "] ERROR: Cannot drop link: cache is not empty. Link: " + this + ", cache: " + cacheBuffer);
 
-      // Make slight delay in such case, to avoid consuming 100% of CPU
-      if (block) {
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException e) {
-        }
-      }
+        this.sink = sink;
 
-      return null;
+        return sink;
     }
 
-    // If data in cache can be sent immediately,
-    // then return it instead of asking for more data from source
-    if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
-      if (verbose)
-        System.out.println("[" + this + "] INFO: Data pulled from cache buffer: " + cacheBuffer + ".");
+    @Override
+    public Element setSource(Element source) {
+        if (this.source != null && source != null)
+            throw new RuntimeException("[" + this + "] ERROR: This link source element is already set. Link: " + this + ", new source: " + source
+                    + ", existing source: " + this.source + ".");
 
-      ByteBuffer tmp = cacheBuffer;
-      cacheBuffer = null;
-      return tmp;
+        this.source = source;
+        return source;
     }
 
-    // Pause this link, so incoming data will not be sent to sink
-    // immediately, then ask source element for more data
-    pause();
-    source.poll(block);
-    resume();
-
-    // Can return something only when data was stored in buffer
-    if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
-      if (verbose)
-        System.out.println("[" + this + "] INFO: Data pulled from source: " + cacheBuffer + ".");
-
-      ByteBuffer tmp = cacheBuffer;
-      cacheBuffer = null;
-      return tmp;
-    } else {
-      return null;
+    @Override
+    public Element getSource() {
+        return source;
     }
-  }
-
-  @Override
-  public Element setSink(Element sink) {
-    if (sink != null && this.sink != null)
-      throw new RuntimeException("This link sink element is already set. Link: " + this + ", new sink: " + sink + ", existing sink: " + this.sink + ".");
-
-    if (sink == null && cacheBuffer != null)
-      throw new RuntimeException("Cannot drop link: cache is not empty. Link: " + this + ", cache: " + cacheBuffer);
-
-    this.sink = sink;
-
-    return sink;
-  }
-
-  @Override
-  public Element setSource(Element source) {
-    if (this.source != null && source != null)
-      throw new RuntimeException("This link source element is already set. Link: " + this + ", new source: " + source + ", existing source: " + this.source
-          + ".");
-
-    this.source = source;
-    return source;
-  }
-
-  @Override
-  public Element getSource() {
-    return source;
-  }
-
-  @Override
-  public Element getSink() {
-    return sink;
-  }
-
-  @Override
-  public void pause() {
-    if (paused)
-      throw new RuntimeException("Link is already paused.");
-
-    paused = true;
-
-  }
-
-  @Override
-  public void resume() {
-    paused = false;
-  }
-
-  /**
-   * Run pull loop to actively pull data from source and push it to sink. It
-   * must be only one pull loop per thread.
-   * 
-   * Pull loop will start after event STREAM_START. This link and source element
-   * incomming links will be switched to pull mode before pull loop will be
-   * started using event LINK_SWITCH_TO_PULL_MODE.
-   */
-  @Override
-  public void run() {
-    // Wait until even STREAM_START will arrive
-    while (!start) {
-      delay();
+
+    @Override
+    public Element getSink() {
+        return sink;
     }
 
-    sendEvent(Event.LINK_SWITCH_TO_PULL_MODE, Direction.IN);
+    @Override
+    public void pause() {
+        if (paused)
+            throw new RuntimeException("[" + this + "] ERROR: Link is already paused.");
 
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Starting pull loop.");
+        paused = true;
 
-    // Pull source in loop
-    while (!shutdown) {
-      // Pull data from source element and send it to sink element
-      ByteBuffer data = pull(true);
-      if (data != null)
-        sink.handleData(data, this);
+    }
 
-      if (!shutdown && data == null) {
-        // Make slight delay to avoid consuming of 100% of CPU
-        delay();
-      }
+    @Override
+    public void resume() {
+        paused = false;
     }
 
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Pull loop finished.");
+    /**
+     * Run pull loop to actively pull data from source and push it to sink. It
+     * must be only one pull loop per thread.
+     * 
+     * Pull loop will start after event STREAM_START. This link and source element
+     * incomming links will be switched to pull mode before pull loop will be
+     * started using event LINK_SWITCH_TO_PULL_MODE.
+     */
+    @Override
+    public void run() {
+        // Wait until even STREAM_START will arrive
+        while (!started) {
+            delay();
+        }
 
-  }
+        sendEvent(Event.LINK_SWITCH_TO_PULL_MODE, Direction.IN);
+
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Starting pull loop.");
+
+        // Pull source in loop
+        while (!shutdown) {
+            // Pull data from source element and send it to sink element
+            ByteBuffer data = pull(true);
+            if (data != null)
+                sink.handleData(data, this);
+
+            if (!shutdown && data == null) {
+                // Make slight delay to avoid consuming of 100% of CPU
+                delay();
+            }
+        }
+
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Pull loop finished.");
 
-  protected void delay() {
-    try {
-      Thread.sleep(delay);
-    } catch (InterruptedException e) {
-      e.printStackTrace(System.err);
-      throw new RuntimeException("Interrupted in main loop.", e);
     }
-  }
 
-  @Override
-  public void setPullMode() {
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Switching to PULL mode.");
+    protected void delay() {
+        try {
+            Thread.sleep(delay);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("[" + this + "] ERROR: Interrupted in main loop.", e);
+        }
+    }
 
-    this.pullMode = true;
-  }
+    @Override
+    public void setPullMode() {
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Switching to PULL mode.");
 
-  @Override
-  public void drop() {
-    if (pullMode)
-      throw new RuntimeException("Cannot drop link in pull mode.");
+        pullMode = true;
+    }
 
-    if (cacheBuffer != null)
-      throw new RuntimeException("Cannot drop link when cache conatains data: " + cacheBuffer + ".");
+    @Override
+    public void drop() {
+        if (pullMode)
+            throw new RuntimeException("[" + this + "] ERROR: Cannot drop link in pull mode.");
 
-    source.dropLink(this);
-    sink.dropLink(this);
-  }
+        if (cacheBuffer != null)
+            throw new RuntimeException("[" + this + "] ERROR: Cannot drop link when cache conatains data: " + cacheBuffer + ".");
+
+        source.dropLink(this);
+        sink.dropLink(this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
new file mode 100755
index 0000000..edfe8db
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.DataSource;
+import streamer.Direction;
+import streamer.Event;
+import streamer.Link;
+
+public class AprSocketSink extends BaseElement {
+
+    protected AprSocketWrapperImpl socketWrapper;
+    protected Long socket;
+
+    public AprSocketSink(String id) {
+        super(id);
+    }
+
+    public AprSocketSink(String id, AprSocketWrapperImpl socketWrapper) {
+        super(id);
+        this.socketWrapper = socketWrapper;
+    }
+
+    public void setSocket(long socket) {
+        this.socket = socket;
+
+        // Resume links
+        resumeLinks();
+    }
+
+    /**
+     * Send incoming data to stream.
+     */
+    @Override
+    public void handleData(ByteBuffer buf, Link link) {
+        if (buf == null)
+            return;
+
+        if (socketWrapper.shutdown)
+            return;
+
+        try {
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Writing data to stream: " + buf + ".");
+
+            // FIXME: If pull is destroyed or socket is closed, segfault will happen here
+            Socket.send(socket, buf.data, buf.offset, buf.length);
+        } catch (Exception e) {
+            System.err.println("[" + this + "] ERROR: " + e.getMessage());
+            closeStream();
+        }
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        switch (event) {
+        case SOCKET_UPGRADE_TO_SSL:
+            socketWrapper.upgradeToSsl();
+            break;
+        case LINK_SWITCH_TO_PULL_MODE:
+            throw new RuntimeException("[" + this + "] ERROR: Unexpected event: sink recived LINK_SWITCH_TO_PULL_MODE event.");
+        default:
+            super.handleEvent(event, direction);
+        }
+    }
+
+    @Override
+    public void setLink(String padName, Link link, Direction direction) {
+        switch (direction) {
+        case IN:
+            super.setLink(padName, link, direction);
+
+            if (socket == null)
+                // Pause links until data stream will be ready
+                link.pause();
+            break;
+        case OUT:
+            throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+        }
+    }
+
+    private void resumeLinks() {
+        for (DataSource source : inputPads.values())
+            ((Link)source).resume();
+    }
+
+    @Override
+    protected void onClose() {
+        closeStream();
+    }
+
+    private void closeStream() {
+        if (socketWrapper.shutdown)
+            return;
+
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Closing stream.");
+
+        try {
+            sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+        } catch (Exception e) {
+        }
+        socketWrapper.shutdown();
+    }
+
+    @Override
+    public String toString() {
+        return "AprSocketSink(" + id + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
new file mode 100755
index 0000000..178034e
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.DataSink;
+import streamer.Direction;
+import streamer.Event;
+import streamer.Link;
+
+/**
+ * Source element, which reads data from InputStream.
+ */
+public class AprSocketSource extends BaseElement {
+
+    protected AprSocketWrapperImpl socketWrapper;
+    protected Long socket;
+
+    public AprSocketSource(String id) {
+        super(id);
+    }
+
+    public AprSocketSource(String id, AprSocketWrapperImpl socketWrapper) {
+        super(id);
+        this.socketWrapper = socketWrapper;
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        switch (event) {
+        case SOCKET_UPGRADE_TO_SSL:
+            socketWrapper.upgradeToSsl();
+            break;
+        case LINK_SWITCH_TO_PULL_MODE:
+            // Do nothing - this is the source
+            break;
+        default:
+            super.handleEvent(event, direction);
+        }
+    }
+
+    @Override
+    public void setLink(String padName, Link link, Direction direction) {
+        switch (direction) {
+        case OUT:
+            super.setLink(padName, link, direction);
+
+            if (socket == null) {
+                // Pause links until data stream will be ready
+                link.pause();
+            }
+            break;
+        case IN:
+            throw new RuntimeException("Cannot assign link to input pad in source element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+        }
+    }
+
+    public void setSocket(long socket) {
+        this.socket = socket;
+
+        // Resume links
+        resumeLinks();
+    }
+
+    private void resumeLinks() {
+        for (DataSink sink : outputPads.values())
+            ((Link)sink).resume();
+    }
+
+    /**
+     * Read data from input stream.
+     */
+    @Override
+    public void poll(boolean block) {
+        if (socketWrapper.shutdown) {
+            socketWrapper.destroyPull();
+            return;
+        }
+
+        try {
+            // Create buffer of recommended size and with default offset
+            ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Reading data from stream.");
+
+            // FIXME: If pull is destroyed or socket is closed, segfault will happen here
+            int actualLength = (block) ? // Blocking read
+            Socket.recv(socket, buf.data, buf.offset, buf.data.length - buf.offset)
+                    : // Non-blocking read
+                    Socket.recvt(socket, buf.data, buf.offset, buf.data.length - buf.offset, 1);
+
+            if (socketWrapper.shutdown) {
+                socketWrapper.destroyPull();
+                return;
+            }
+
+            if (actualLength < 0) {
+                if (verbose)
+                    System.out.println("[" + this + "] INFO: End of stream.");
+
+                buf.unref();
+                closeStream();
+                sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+                return;
+            }
+
+            if (actualLength == 0) {
+                if (verbose)
+                    System.out.println("[" + this + "] INFO: Empty buffer is read from stream.");
+
+                buf.unref();
+                return;
+            }
+
+            buf.length = actualLength;
+
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Data read from stream: " + buf + ".");
+
+            pushDataToAllOuts(buf);
+
+        } catch (Exception e) {
+            System.err.println("[" + this + "] ERROR: " + e.getMessage());
+            closeStream();
+        }
+    }
+
+    @Override
+    protected void onClose() {
+        closeStream();
+    }
+
+    private void closeStream() {
+
+        if (socketWrapper.shutdown)
+            return;
+
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Closing stream.");
+
+        try {
+            sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+        } catch (Exception e) {
+        }
+        socketWrapper.shutdown();
+    }
+
+    @Override
+    public String toString() {
+        return "AprSocketSource(" + id + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
new file mode 100755
index 0000000..2ee426b
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import static streamer.debug.MockServer.Packet.PacketType.CLIENT;
+import static streamer.debug.MockServer.Packet.PacketType.SERVER;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.apache.tomcat.jni.Address;
+import org.apache.tomcat.jni.Error;
+import org.apache.tomcat.jni.Library;
+import org.apache.tomcat.jni.Pool;
+import org.apache.tomcat.jni.SSL;
+import org.apache.tomcat.jni.SSLContext;
+import org.apache.tomcat.jni.SSLSocket;
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Event;
+import streamer.Pipeline;
+import streamer.PipelineImpl;
+import streamer.Queue;
+import streamer.SocketWrapper;
+import streamer.debug.MockServer;
+import streamer.debug.MockServer.Packet;
+import streamer.ssl.SSLState;
+import sun.security.x509.X509CertImpl;
+
+public class AprSocketWrapperImpl extends PipelineImpl implements SocketWrapper {
+
+    static {
+        try {
+            Library.initialize(null);
+            SSL.initialize(null);
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot load Tomcat Native Library (Apache Portable Runtime).", e);
+        }
+    }
+
+    private final SSLState sslState;
+
+    final long pool = Pool.create(0);
+    long inetAddress;
+    long socket;
+    private AprSocketSource source;
+    private AprSocketSink sink;
+
+    boolean shutdown = false;
+
+    private final boolean shutdowned = false;
+
+    public AprSocketWrapperImpl(String id, SSLState sslState) {
+        super(id);
+        this.sslState = sslState;
+    }
+
+    @Override
+    protected HashMap<String, Element> initElementMap(String id) {
+        HashMap<String, Element> map = new HashMap<String, Element>();
+
+        source = new AprSocketSource(id + "." + OUT, this);
+        sink = new AprSocketSink(id + "." + IN, this);
+
+        // Pass requests to read data to socket input stream
+        map.put(OUT, source);
+
+        // All incoming data, which is sent to this socket wrapper, will be sent
+        // to socket remote
+        map.put(IN, sink);
+
+        return map;
+    }
+
+    /**
+     * Connect this socket wrapper to remote server and start main loop on
+     * AprSocketSource stdout link, to watch for incoming data, and
+     * AprSocketSink stdin link, to pull for outgoing data.
+     */
+    @Override
+    public void connect(InetSocketAddress address) throws IOException {
+        try {
+            inetAddress = Address.info(address.getHostName(), Socket.APR_UNSPEC, address.getPort(), 0, pool);
+            socket = Socket.create(Address.getInfo(inetAddress).family, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
+        } catch (Exception e) {
+            throw new IOException("[" + this + "] ERROR: Cannot create socket for \"" + address + "\".", e);
+        }
+
+        int ret = Socket.connect(socket, inetAddress);
+        if (ret != 0)
+            throw new IOException("[" + this + "] ERROR: Cannot connect to remote host \"" + address + "\": " + Error.strerror(ret));
+
+        source.setSocket(socket);
+        sink.setSocket(socket);
+
+        // Start polling for data to send to remote sever
+        runMainLoop(IN, STDIN, true, true);
+
+        // Push incoming data from server to handlers
+        runMainLoop(OUT, STDOUT, false, false);
+
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        switch (event) {
+        case SOCKET_UPGRADE_TO_SSL:
+            upgradeToSsl();
+            break;
+        default:
+            super.handleEvent(event, direction);
+            break;
+        }
+    }
+
+    @Override
+    public void validate() {
+
+        if (getPads(Direction.IN).size() == 0)
+            throw new RuntimeException("[ " + this + "] BUG: Input of socket is not connected.");
+
+        if (getPads(Direction.OUT).size() == 0)
+            throw new RuntimeException("[ " + this + "] BUG: Output of socket is not connected.");
+
+    }
+
+    @Override
+    public void upgradeToSsl() {
+
+        try {
+            long sslContext;
+            try {
+                sslContext = SSLContext.make(pool, SSL.SSL_PROTOCOL_TLSV1, SSL.SSL_MODE_CLIENT);
+            } catch (Exception e) {
+                throw new RuntimeException("Cannot create SSL context using Tomcat native library.", e);
+            }
+
+            SSLContext.setOptions(sslContext, SSL.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS | SSL.SSL_OP_TLS_BLOCK_PADDING_BUG | SSL.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER
+                    | SSL.SSL_OP_MSIE_SSLV2_RSA_PADDING);
+            // FIXME: verify certificate by default
+            SSLContext.setVerify(sslContext, SSL.SSL_CVERIFY_NONE, 0);
+            int ret;
+            try {
+                ret = SSLSocket.attach(sslContext, socket);
+            } catch (Exception e) {
+                throw new RuntimeException("[" + this + "] ERROR: Cannot attach SSL context to socket: ", e);
+            }
+            if (ret != 0)
+                throw new RuntimeException("[" + this + "] ERROR: Cannot attach SSL context to socket(" + ret + "): " + SSL.getLastError());
+
+            try {
+                ret = SSLSocket.handshake(socket);
+            } catch (Exception e) {
+                throw new RuntimeException("[" + this + "] ERROR: Cannot make SSL handshake with server: ", e);
+            }
+            if (ret != 0 && ret != 20014) // 20014: bad certificate signature FIXME: show prompt for self signed certificate
+                throw new RuntimeException("[" + this + "] ERROR: Cannot make SSL handshake with server(" + ret + "): " + SSL.getLastError());
+
+            try {
+                byte[] key = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT);
+                //*DEBUG*/System.out.println("DEBUG: Server cert:\n"+new ByteBuffer(key).dump());
+                sslState.serverCertificateSubjectPublicKeyInfo = new X509CertImpl(key).getPublicKey().getEncoded();
+            } catch (Exception e) {
+                throw new RuntimeException("[" + this + "] ERROR: Cannot get server public key: ", e);
+            }
+
+        } catch (RuntimeException e) {
+            shutdown();
+            throw e;
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        if (shutdown)
+            return;
+
+        shutdown = true;
+
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.IN);
+        } catch (Exception e) {
+        }
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+        } catch (Exception e) {
+        }
+    }
+
+    void destroyPull() {
+        if (shutdowned)
+            return;
+
+        // Causes segfault in AprSocketSource.poll() method, so this function must be called from it
+        try {
+            Socket.close(socket);
+            // or
+            // Socket.shutdown(socket, Socket.APR_SHUTDOWN_READWRITE);
+            Pool.destroy(pool);
+        } catch (Exception e) {
+        }
+
+    }
+
+    @Override
+    public String toString() {
+        return "AprSocketWrapper(" + id + ")";
+    }
+
+    /**
+     * Example.
+     */
+    public static void main(String args[]) {
+
+        try {
+            System.setProperty("streamer.Link.debug", "true");
+            System.setProperty("streamer.Element.debug", "true");
+            System.setProperty("rdpclient.MockServer.debug", "true");
+
+            Pipeline pipeline = new PipelineImpl("echo client");
+
+            AprSocketWrapperImpl socketWrapper = new AprSocketWrapperImpl("socket", null);
+
+            pipeline.add(socketWrapper);
+            pipeline.add(new BaseElement("echo"));
+            pipeline.add(new Queue("queue")); // To decouple input and output
+
+            pipeline.link("socket", "echo", "queue", "socket");
+
+            final byte[] mockData = new byte[] {0x01, 0x02, 0x03};
+            MockServer server = new MockServer(new Packet[] {new Packet("Server hello") {
+                {
+                    type = SERVER;
+                    data = mockData;
+                }
+            }, new Packet("Client hello") {
+                {
+                    type = CLIENT;
+                    data = mockData;
+                }
+            }, new Packet("Server hello") {
+                {
+                    type = SERVER;
+                    data = mockData;
+                }
+            }, new Packet("Client hello") {
+                {
+                    type = CLIENT;
+                    data = mockData;
+                }
+            }});
+            server.start();
+            InetSocketAddress address = server.getAddress();
+
+            socketWrapper.connect(address);
+
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
new file mode 100755
index 0000000..67e2dbd
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.bco;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.SecureRandom;
+import java.security.Security;
+
+import org.bouncycastle.asn1.x509.X509CertificateStructure;
+import org.bouncycastle.crypto.tls.CertificateVerifyer;
+import org.bouncycastle.crypto.tls.TlsProtocolHandler;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+import streamer.Direction;
+import streamer.Event;
+import streamer.SocketWrapperImpl;
+import streamer.ssl.SSLState;
+
+@SuppressWarnings("deprecation")
+public class BcoSocketWrapperImpl extends SocketWrapperImpl {
+
+    static {
+        Security.addProvider(new BouncyCastleProvider());
+    }
+
+    private TlsProtocolHandler bcoSslSocket;
+
+    public BcoSocketWrapperImpl(String id, SSLState sslState) {
+        super(id, sslState);
+    }
+
+    @Override
+    public void upgradeToSsl() {
+
+        if (sslSocket != null)
+            // Already upgraded
+            return;
+
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
+
+        try {
+
+            SecureRandom secureRandom = new SecureRandom();
+            bcoSslSocket = new TlsProtocolHandler(socket.getInputStream(), socket.getOutputStream(), secureRandom);
+
+            CertificateVerifyer client = new CertificateVerifyer() {
+
+                @Override
+                public boolean isValid(X509CertificateStructure[] chain) {
+
+                    try {
+                        if (sslState != null) {
+                            sslState.serverCertificateSubjectPublicKeyInfo = chain[0].getSubjectPublicKeyInfo().getEncoded();
+                        }
+                    } catch (IOException e) {
+                        throw new RuntimeException("Cannot get server public key.", e);
+                    }
+
+                    return true;
+                }
+            };
+            bcoSslSocket.connect(client);
+
+            InputStream sis = bcoSslSocket.getInputStream();
+            source.setInputStream(sis);
+
+            OutputStream sos = bcoSslSocket.getOutputStream();
+            sink.setOutputStream(sos);
+
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
+        }
+
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.IN);
+        } catch (Exception e) {
+        }
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+        } catch (Exception e) {
+        }
+        try {
+            if (bcoSslSocket != null)
+                bcoSslSocket.close();
+        } catch (Exception e) {
+        }
+        try {
+            socket.close();
+        } catch (Exception e) {
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "BcoSocketWrapper(" + id + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/AssertingByteBuffer.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/AssertingByteBuffer.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/AssertingByteBuffer.java
new file mode 100755
index 0000000..15449c3
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/AssertingByteBuffer.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import java.nio.charset.Charset;
+
+import streamer.ByteBuffer;
+
+/**
+ * Assert that writes to this buffer are matching expected data.
+ */
+public class AssertingByteBuffer extends ByteBuffer {
+
+    public AssertingByteBuffer(byte[] expectedData) {
+        super(expectedData);
+    }
+
+    private void assertEquals(int expected, int actual) {
+        if (expected != actual)
+            throw new RuntimeException("Expected value does not match actual value. Expected value: " + expected + ", actual value: " + actual + ", buf: " + this + ".");
+    }
+
+    @Override
+    public void writeByte(int b) {
+        if (b < 0)
+            throw new RuntimeException();
+        //*DEBUG*/System.out.println("WriteByte: "+b+", cursor:"+cursor+".");
+        assertEquals(readUnsignedByte(), b & 0xff);
+    }
+
+    @Override
+    public void writeShort(int x) {
+        //*DEBUG*/System.out.println("WriteShort: "+x+", cursor:"+cursor+".");
+        assertEquals(readUnsignedShort(), x & 0xFFff);
+    }
+
+    @Override
+    public void writeShortLE(int x) {
+        //*DEBUG*/System.out.println("WriteShortLE: "+x+", cursor:"+cursor+".");
+        assertEquals(readUnsignedShortLE(), x & 0xFFff);
+    }
+
+    @Override
+    public void writeInt(int i) {
+        //*DEBUG*/System.out.println("WriteInt: "+i+", cursor:"+cursor+".");
+        assertEquals(readSignedInt(), i);
+    }
+
+    @Override
+    public void writeIntLE(int i) {
+        //*DEBUG*/System.out.println("WriteIntLE: "+i+", cursor:"+cursor+".");
+        assertEquals(readSignedIntLE(), i);
+    }
+
+    @Override
+    public void writeVariableIntLE(int i) {
+        //*DEBUG*/System.out.println("WriteVariableIntLE: "+i+", cursor:"+cursor+".");
+        assertEquals(readVariableSignedIntLE(), i);
+    }
+
+    @Override
+    public void writeString(String actual, Charset charset) {
+        //*DEBUG*/System.out.println("WriteString: "+actual+", cursor:"+cursor+".");
+        String expected = readString(actual.length(), charset);
+        if (!actual.equals(expected))
+            throw new RuntimeException("Expected value does not match actual value. Expected value: " + expected + ", actual value: " + actual + ".");
+    }
+
+    @Override
+    public void writeBytes(ByteBuffer actual) {
+        //*DEBUG*/System.out.println("WriteString: "+actual+", cursor:"+cursor+".");
+        ByteBuffer expected = readBytes(actual.length);
+        if (!actual.equals(expected))
+            throw new RuntimeException("Expected value does not match actual value. Expected value: " + expected + ", actual value: " + actual + ".");
+    }
+
+    @Override
+    public void writeBytes(byte[] actualData) {
+        ByteBuffer actual = new ByteBuffer(actualData);
+        //*DEBUG*/System.out.println("WriteString: "+actual+", cursor:"+cursor+".");
+        ByteBuffer expected = readBytes(actual.length);
+        if (!actual.equals(expected))
+            throw new RuntimeException("Expected value does not match actual value. Expected value: " + expected + ", actual value: " + actual + ".");
+    }
+
+    @Override
+    public void writeBytes(byte[] actualData, int offset, int length) {
+        ByteBuffer actual = new ByteBuffer(actualData, offset, length);
+        //*DEBUG*/System.out.println("WriteString: "+actual+", cursor:"+cursor+".");
+        ByteBuffer expected = readBytes(actual.length);
+        if (!actual.equals(expected))
+            throw new RuntimeException("Expected value does not match actual value. Expected value: " + expected + ", actual value: " + actual + ".");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/Dumper.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/Dumper.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/Dumper.java
new file mode 100755
index 0000000..c884cfb
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/Dumper.java
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import streamer.ByteBuffer;
+
+public interface Dumper {
+
+    /**
+     * Parse and dump content of buffer.
+     */
+    void dump(ByteBuffer buf);
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSink.java
new file mode 100755
index 0000000..9315488
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSink.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Event;
+import streamer.Link;
+import streamer.SyncLink;
+
+public class FakeSink extends BaseElement {
+
+    public FakeSink(String id) {
+        super(id);
+    }
+
+    @Override
+    public void handleData(ByteBuffer buf, Link link) {
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
+
+        if (buf == null)
+            return;
+
+        // Use packetNumber variable to count incoming packets
+        packetNumber++;
+
+        buf.unref();
+    }
+
+    @Override
+    public String toString() {
+        return "FakeSink(" + id + ")";
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Event received: " + event + ".");
+
+    }
+
+    /**
+     * Example.
+     */
+    public static void main(String args[]) {
+
+        Element sink = new FakeSink("sink") {
+            {
+                verbose = true;
+            }
+        };
+
+        byte[] data = new byte[] {1, 2, 3};
+        ByteBuffer buf = new ByteBuffer(data);
+        sink.setLink(STDIN, new SyncLink(), Direction.IN);
+        sink.getLink(STDIN).sendData(buf);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSource.java
new file mode 100755
index 0000000..adfa594
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSource.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Event;
+import streamer.Link;
+import streamer.SyncLink;
+
+public class FakeSource extends BaseElement {
+
+    /**
+     * Delay for null packets in poll method when blocking is requested, in
+     * milliseconds.
+     */
+    protected long delay = SyncLink.STANDARD_DELAY_FOR_EMPTY_PACKET;
+
+    public FakeSource(String id) {
+        super(id);
+    }
+
+    @Override
+    public void poll(boolean block) {
+        if (numBuffers > 0 && packetNumber >= numBuffers) {
+            // Close stream when limit of packets is reached
+            sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+            return;
+        }
+
+        // Prepare new packet
+        ByteBuffer buf = initializeData();
+
+        // Push it to output(s)
+        if (buf != null)
+            pushDataToAllOuts(buf);
+
+        // Make slight delay when blocking input was requested (to avoid
+        // consuming of 100% in parent loop)
+        if (block)
+            delay();
+
+    }
+
+    /**
+     * Make slight delay. Should be used when blocking input is requested in pull
+     * mode, but null packed was returned by input.
+     */
+    protected void delay() {
+        try {
+            Thread.sleep(delay);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    /**
+     * Initialize data.
+     */
+    public ByteBuffer initializeData() {
+        ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+        // Set first byte of package to it sequance number
+        buf.data[buf.offset] = (byte)(packetNumber % 128);
+
+        // Initialize rest of bytes with sequential values, which are
+        // corresponding with their position in byte buffer
+        for (int i = buf.offset + 1; i < buf.length; i++)
+            buf.data[i] = (byte)(i % 128);
+
+        buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
+        buf.putMetadata("src", id);
+
+        return buf;
+    }
+
+    @Override
+    public String toString() {
+        return "FakeSource(" + id + ")";
+    }
+
+    /**
+     * Example.
+     */
+    public static void main(String args[]) {
+
+        Element fakeSource = new FakeSource("source 3/10/100") {
+            {
+                verbose = true;
+                this.incommingBufLength = 3;
+                this.numBuffers = 10;
+                this.delay = 100;
+            }
+        };
+
+        Element fakeSink = new FakeSink("sink") {
+            {
+                this.verbose = true;
+            }
+        };
+
+        Element fakeSink2 = new FakeSink("sink2") {
+            {
+                this.verbose = true;
+            }
+        };
+
+        Link link = new SyncLink();
+
+        fakeSource.setLink(STDOUT, link, Direction.OUT);
+        fakeSink.setLink(STDIN, link, Direction.IN);
+
+        Link link2 = new SyncLink();
+
+        fakeSource.setLink("out2", link2, Direction.OUT);
+        fakeSink2.setLink(STDIN, link2, Direction.IN);
+
+        link.sendEvent(Event.STREAM_START, Direction.IN);
+        link.run();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockServer.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockServer.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockServer.java
new file mode 100755
index 0000000..3f313b8
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockServer.java
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Arrays;
+
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+public class MockServer implements Runnable {
+
+    private boolean shutdown = false;
+    private ServerSocket serverSocket;
+    private Packet[] packets;
+    private Throwable exception;
+    private boolean shutdowned;
+
+    /**
+     * Set to true to enable debugging messages.
+     */
+    protected boolean verbose = System.getProperty("rdpclient.MockServer.debug", "false").equals("true");
+
+    public MockServer(Packet packets[]) {
+        this.packets = packets;
+    }
+
+    public void start() throws IOException {
+        serverSocket = new ServerSocket(0);
+
+        shutdown = false;
+        exception = null;
+        shutdowned = false;
+
+        Thread thread = new Thread(this);
+        thread.setDaemon(true);
+        thread.start();
+    }
+
+    public void run() {
+
+        try {
+            Socket socket = serverSocket.accept();
+
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Clien connected: " + socket.getRemoteSocketAddress() + ".");
+
+            InputStream is = socket.getInputStream();
+            OutputStream os = socket.getOutputStream();
+
+            try {
+                for (int i = 0; i < packets.length && !shutdown; i++) {
+
+                    Packet packet = packets[i];
+                    switch (packet.type) {
+                    case CLIENT: {
+                        // Read client data and compare it with mock data
+                        // (unless "ignore" option is set)
+                        byte actualData[] = new byte[packet.data.length];
+                        int actualDataLength = is.read(actualData);
+
+                        if (verbose)
+                            System.out.println("[" + this + "] INFO: Data is read: {" + Arrays.toString(Arrays.copyOf(actualData, actualDataLength)) + "}.");
+
+                        if (!packet.ignore) {
+                            // Compare actual data with expected data
+                            if (actualDataLength != packet.data.length) {
+                                throw new AssertionError("Actual length of client request for packet #" + (i + 1) + " (\"" + packet.id + "\")"
+                                        + " does not match length of expected client request. Actual length: " + actualDataLength + ", expected legnth: " + packet.data.length
+                                        + ".");
+                            }
+
+                            for (int j = 0; j < packet.data.length; j++) {
+
+                                if (packet.data[j] != actualData[j]) {
+                                    throw new AssertionError("Actual byte #" + (j + 1) + " of client request for packet #" + (i + 1) + " (\"" + packet.id + "\")"
+                                            + " does not match corresponding byte of expected client request. Actual byte: " + actualData[j] + ", expected byte: " + packet.data[j]
+                                            + ".");
+                                }
+                            }
+                        }
+                        break;
+                    }
+                    case SERVER: {
+                        // Send mock data to client
+                        os.write(packet.data);
+
+                        if (verbose)
+                            System.out.println("[" + this + "] INFO: Data is written: {" + Arrays.toString(packet.data) + "}.");
+
+                        break;
+                    }
+                    case UPGRADE_TO_SSL: {
+                        // Attach SSL context to socket
+
+                        final SSLSocketFactory sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault();
+                        SSLSocket sslSocket = (SSLSocket)sslSocketFactory.createSocket(socket, null, serverSocket.getLocalPort(), true);
+                        sslSocket.setEnabledCipherSuites(sslSocket.getSupportedCipherSuites());
+                        sslSocket.setUseClientMode(false);
+                        sslSocket.startHandshake();
+                        is = sslSocket.getInputStream();
+                        os = sslSocket.getOutputStream();
+
+                        break;
+                    }
+                    default:
+                        throw new RuntimeException("Unknown packet type: " + packet.type);
+                    }
+
+                }
+            } finally {
+                try {
+                    is.close();
+                } catch (Throwable e) {
+                }
+                try {
+                    os.close();
+                } catch (Throwable e) {
+                }
+                try {
+                    socket.close();
+                } catch (Throwable e) {
+                }
+                try {
+                    serverSocket.close();
+                } catch (Throwable e) {
+                }
+            }
+        } catch (Throwable e) {
+            System.err.println("Error in mock server: " + e.getMessage());
+            e.printStackTrace(System.err);
+            exception = e;
+        }
+        shutdowned = true;
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Mock server shutdowned.");
+
+    }
+
+    public void shutdown() {
+        shutdown = true;
+    }
+
+    public InetSocketAddress getAddress() {
+        return (InetSocketAddress)serverSocket.getLocalSocketAddress();
+    }
+
+    public Throwable getException() {
+        return exception;
+    }
+
+    public static class Packet {
+        public static enum PacketType {
+            SERVER, CLIENT, UPGRADE_TO_SSL;
+        }
+
+        public String id = "";
+
+        public Packet() {
+        }
+
+        public Packet(String id) {
+            this.id = id;
+        }
+
+        public PacketType type;
+
+        public boolean ignore = false;
+
+        public byte data[];
+    }
+
+    public boolean isShutdowned() {
+        return shutdowned;
+    }
+
+    public void waitUntilShutdowned(long timeToWaitMiliseconds) throws InterruptedException {
+        long deadline = System.currentTimeMillis() + timeToWaitMiliseconds;
+        while (!shutdowned && System.currentTimeMillis() < deadline) {
+            Thread.sleep(10);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSink.java
new file mode 100755
index 0000000..3c1ce7c
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSink.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import java.util.Arrays;
+import java.util.Set;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Link;
+import streamer.SyncLink;
+
+/**
+ * Compare incoming packets with expected packets.
+ */
+public class MockSink extends BaseElement {
+
+    protected ByteBuffer bufs[];
+    protected Dumper dumper;
+
+    public MockSink(String id) {
+        super(id);
+    }
+
+    public MockSink(String id, ByteBuffer bufs[]) {
+        super(id);
+        this.bufs = bufs;
+    }
+
+    public MockSink(String id, ByteBuffer bufs[], Dumper dumper) {
+        super(id);
+        this.bufs = bufs;
+        this.dumper = dumper;
+    }
+
+    @Override
+    public void handleData(ByteBuffer buf, Link link) {
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
+
+        if (buf == null)
+            return;
+
+        if (packetNumber >= bufs.length)
+            throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not expected. Number of expected buffers: " + bufs.length
+                    + ", unexpected buffer: " + buf + ".");
+
+        // Compare incoming buffer with expected buffer
+        ByteBuffer expectedBuf = bufs[packetNumber];
+        if (!Arrays.equals(expectedBuf.toByteArray(), buf.toByteArray())) {
+            dump(buf, expectedBuf);
+            throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer.\n  Actual bufer: " + buf
+                    + ",\n  expected buffer: " + expectedBuf + ".");
+        }
+
+        // If expected buffer has metadata, then compare it too
+        Set<String> metadataKeys = expectedBuf.getMetadataKeys();
+        if (metadataKeys.size() > 0) {
+            for (String key : metadataKeys) {
+                Object expectedValue = expectedBuf.getMetadata(key);
+                Object actualValue = buf.getMetadata(key);
+                if (actualValue == null)
+                    throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer in metadata for key \"" + key
+                            + "\".\n  Actual metadata value: " + actualValue + ",\n  expected value: \"" + expectedValue + "\".");
+
+                if (!expectedValue.equals(actualValue))
+                    throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer in metadata for key \"" + key
+                            + "\".\n  Actual metadata value: \"" + actualValue + "\",\n  expected value: \"" + expectedValue + "\".");
+            }
+        }
+
+        if (verbose)
+            System.out.println("[" + this + "] INFO: buffers are equal.");
+
+        // Use packetNumber variable to count incoming packets
+        packetNumber++;
+
+        buf.unref();
+    }
+
+    private void dump(ByteBuffer actualData, ByteBuffer expectedData) {
+        if (dumper != null) {
+            System.out.println("[" + this + "] INFO: Actual data:");
+            dumper.dump(actualData);
+            System.out.println("[" + this + "] INFO: Expected data:");
+            dumper.dump(expectedData);
+        }
+    }
+
+    @Override
+    protected void onClose() {
+        super.onClose();
+
+        if (packetNumber != bufs.length)
+            throw new AssertionError("[" + this + "] Number of expected buffers: " + bufs.length + ", number of actual buffers: " + packetNumber + ".");
+    }
+
+    @Override
+    public String toString() {
+        return "MockSink(" + id + ")";
+    }
+
+    /**
+     * Example.
+     */
+    public static void main(String args[]) {
+
+        Element mockSource = new MockSource("source") {
+            {
+                this.bufs = new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}),
+                        new ByteBuffer(new byte[] {3, 1, 2, 3}), new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
+                this.verbose = true;
+                this.delay = 100;
+                this.numBuffers = this.bufs.length;
+            }
+        };
+
+        Element mockSink = new MockSink("sink") {
+            {
+                this.bufs = new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}),
+                        new ByteBuffer(new byte[] {3, 1, 2, 3}), new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
+                this.verbose = true;
+            }
+        };
+
+        Link link = new SyncLink() {
+            {
+                this.verbose = true;
+            }
+        };
+
+        mockSource.setLink(STDOUT, link, Direction.OUT);
+        mockSink.setLink(STDIN, link, Direction.IN);
+
+        link.run();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSource.java
new file mode 100755
index 0000000..2a70829
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSource.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import streamer.ByteBuffer;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Event;
+import streamer.Link;
+import streamer.SyncLink;
+
+public class MockSource extends FakeSource {
+
+    protected ByteBuffer bufs[] = null;
+
+    public MockSource(String id) {
+        super(id);
+    }
+
+    public MockSource(String id, ByteBuffer bufs[]) {
+        super(id);
+        this.bufs = bufs;
+    }
+
+    /**
+     * Initialize data.
+     */
+    @Override
+    public ByteBuffer initializeData() {
+        if (packetNumber >= bufs.length) {
+            sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+            return null;
+        }
+
+        ByteBuffer buf = bufs[packetNumber];
+
+        buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
+        return buf;
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Event received: " + event + ".");
+
+    }
+
+    @Override
+    public String toString() {
+        return "MockSource(" + id + ")";
+    }
+
+    /**
+     * Example.
+     */
+    public static void main(String args[]) {
+
+        Element mockSource = new MockSource("source") {
+            {
+                this.bufs = new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}),
+                        new ByteBuffer(new byte[] {3, 1, 2, 3}), new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
+                this.verbose = true;
+                this.delay = 100;
+                // this.numBuffers = this.bufs.length;
+            }
+        };
+
+        Element fakeSink = new FakeSink("sink") {
+            {
+                this.verbose = true;
+            }
+        };
+
+        Link link = new SyncLink();
+
+        mockSource.setLink(STDOUT, link, Direction.OUT);
+        fakeSink.setLink(STDIN, link, Direction.IN);
+
+        link.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/SSLState.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/SSLState.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/SSLState.java
new file mode 100755
index 0000000..f405088
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/SSLState.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.ssl;
+
+public class SSLState {
+
+    /**
+     * Server public certificate in ASN.1 BER format.
+     */
+    public byte[] serverCertificateSubjectPublicKeyInfo;
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/TrustAllX509TrustManager.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/TrustAllX509TrustManager.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/TrustAllX509TrustManager.java
new file mode 100755
index 0000000..4d9eac2
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/TrustAllX509TrustManager.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.ssl;
+
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.X509TrustManager;
+
+public class TrustAllX509TrustManager implements X509TrustManager {
+    private SSLState sslState;
+
+    public TrustAllX509TrustManager(SSLState sslState) {
+        this.sslState = sslState;
+    }
+
+    @Override
+    public void checkClientTrusted(final X509Certificate[] chain, final String authType) {
+        // TODO: ask user to confirm self-signed certificates
+        // Trust all (insecure)
+    }
+
+    @Override
+    public void checkServerTrusted(final X509Certificate[] chain, final String authType) {
+        // TODO: ask user to confirm self-signed certificates
+        // Trust all (insecure)
+
+        // Store public certificates to use for NTLMSSP negotiation
+        if (sslState != null)
+            sslState.serverCertificateSubjectPublicKeyInfo = chain[0].getPublicKey().getEncoded();
+    }
+
+    @Override
+    public X509Certificate[] getAcceptedIssuers() {
+        // TODO: use system CA certificates here
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/UpgradeSocketToSSL.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/UpgradeSocketToSSL.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/UpgradeSocketToSSL.java
new file mode 100755
index 0000000..9d7c708
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/UpgradeSocketToSSL.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.ssl;
+
+import streamer.ByteBuffer;
+import streamer.Direction;
+import streamer.Event;
+import streamer.Link;
+import streamer.OneTimeSwitch;
+
+public class UpgradeSocketToSSL extends OneTimeSwitch {
+
+    public UpgradeSocketToSSL(String id) {
+        super(id);
+    }
+
+    @Override
+    protected void onStart() {
+
+        sendEventToAllPads(Event.SOCKET_UPGRADE_TO_SSL, Direction.IN);
+        switchOff();
+    }
+
+    @Override
+    protected void handleOneTimeData(ByteBuffer buf, Link link) {
+        throw new RuntimeException("Unexpected data: " + buf + ".");
+
+    }
+
+}


Mime
View raw message