guacamole-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmuehl...@apache.org
Subject [2/9] incubator-guacamole-client git commit: GUACAMOLE-44: Extract logic of StreamInterceptingTunnel.
Date Mon, 06 Jun 2016 21:59:03 GMT
GUACAMOLE-44: Extract logic of StreamInterceptingTunnel.


Project: http://git-wip-us.apache.org/repos/asf/incubator-guacamole-client/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-guacamole-client/commit/f391f00c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-guacamole-client/tree/f391f00c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-guacamole-client/diff/f391f00c

Branch: refs/heads/master
Commit: f391f00c7ba9d146c5bb1717f80a0bb291fd5bac
Parents: 274145a
Author: Michael Jumper <mjumper@apache.org>
Authored: Sat Jun 4 01:58:01 2016 -0700
Committer: Michael Jumper <mjumper@apache.org>
Committed: Sat Jun 4 16:24:50 2016 -0700

----------------------------------------------------------------------
 .../guacamole/tunnel/InterceptedStream.java     |  86 ++++++
 .../guacamole/tunnel/InterceptedStreamMap.java  | 211 ++++++++++++++
 .../tunnel/OutputStreamInterceptingFilter.java  | 194 +++++++++++++
 .../tunnel/StreamInterceptingFilter.java        | 209 ++++++++++++++
 .../tunnel/StreamInterceptingTunnel.java        | 277 +------------------
 5 files changed, 712 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-guacamole-client/blob/f391f00c/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java
----------------------------------------------------------------------
diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java
new file mode 100644
index 0000000..b4f4d06
--- /dev/null
+++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.guacamole.tunnel;
+
+import java.io.Closeable;
+
+/**
+ * A simple pairing of the index of an intercepted Guacamole stream with the
+ * stream-type object which will produce or consume the data sent over the
+ * intercepted Guacamole stream.
+ *
+ * @author Michael Jumper
+ * @param <T>
+ *     The type of object which will produce or consume the data sent over the
+ *     intercepted Guacamole stream. Usually, this will be either InputStream
+ *     or OutputStream.
+ */
+public class InterceptedStream<T extends Closeable> {
+
+    /**
+     * The index of the Guacamole stream being intercepted.
+     */
+    private final String index;
+
+    /**
+     * The stream which will produce or consume the data sent over the
+     * intercepted Guacamole stream.
+     */
+    private final T stream;
+
+    /**
+     * Creates a new InterceptedStream which associated the given Guacamole
+     * stream index with the given stream object.
+     *
+     * @param index
+     *     The index of the Guacamole stream being intercepted.
+     *
+     * @param stream
+     *     The stream which will produce or consume the data sent over the
+     *     intercepted Guacamole stream.
+     */
+    public InterceptedStream(String index, T stream) {
+        this.index = index;
+        this.stream = stream;
+    }
+
+    /**
+     * Returns the index of the Guacamole stream being intercepted.
+     *
+     * @return
+     *     The index of the Guacamole stream being intercepted.
+     */
+    public String getIndex() {
+        return index;
+    }
+
+    /**
+     * Returns the stream which will produce or consume the data sent over the
+     * intercepted Guacamole stream.
+     *
+     * @return
+     *     The stream which will produce or consume the data sent over the
+     *     intercepted Guacamole stream.
+     */
+    public T getStream() {
+        return stream;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-guacamole-client/blob/f391f00c/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java
----------------------------------------------------------------------
diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java
b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java
new file mode 100644
index 0000000..344bc36
--- /dev/null
+++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.guacamole.tunnel;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Map-like storage for intercepted Guacamole streams.
+ *
+ * @author Michael Jumper
+ * @param <T>
+ *     The type of object which will produce or consume the data sent over the
+ *     intercepted Guacamole stream. Usually, this will be either InputStream
+ *     or OutputStream.
+ */
+public class InterceptedStreamMap<T extends Closeable> {
+
+    /**
+     * Logger for this class.
+     */
+    private static final Logger logger = LoggerFactory.getLogger(InterceptedStreamMap.class);
+
+    /**
+     * The maximum number of milliseconds to wait for notification that a
+     * stream has closed before explicitly checking for closure ourselves.
+     */
+    private static final long STREAM_WAIT_TIMEOUT = 1000;
+
+    /**
+     * Mapping of the indexes of all streams whose associated "blob" and "end"
+     * instructions should be intercepted.
+     */
+    private final ConcurrentMap<String, InterceptedStream<T>> streams =
+            new ConcurrentHashMap<String, InterceptedStream<T>>();
+
+    /**
+     * Closes the given stream, logging any errors that occur during closure.
+     * The monitor of the stream is notified via a single call to notify() once
+     * the attempt to close has been made.
+     *
+     * @param stream
+     *     The stream to close and notify.
+     */
+    private void close(T stream) {
+
+        // Attempt to close stream
+        try {
+            stream.close();
+        }
+        catch (IOException e) {
+            logger.warn("Unable to close intercepted stream: {}", e.getMessage());
+            logger.debug("I/O error prevented closure of intercepted stream.", e);
+        }
+
+        // Notify waiting threads that the stream has ended
+        synchronized (stream) {
+            stream.notify();
+        }
+
+    }
+
+    /**
+     * Closes the stream object associated with the stream having the given
+     * index, if any, removing it from the map, logging any errors that occur
+     * during closure, and unblocking any in-progress calls to waitFor() for
+     * that stream. If no such stream exists within this map, then this
+     * function has no effect.
+     *
+     * @param index
+     *     The index of the stream whose associated stream object should be
+     *     closed.
+     *
+     * @return
+     *     The stream associated with the given index, if the stream was stored
+     *     within this map, or null if no such stream exists.
+     */
+    public InterceptedStream<T> close(String index) {
+
+        // Remove associated stream
+        InterceptedStream<T> stream = streams.remove(index);
+        if (stream == null)
+            return null;
+
+        // Close stream if it exists
+        close(stream.getStream());
+        return stream;
+
+    }
+
+    /**
+     * Closes the given stream, logging any errors that occur during closure,
+     * and unblocking any in-progress calls to waitFor() for the given stream.
+     * If the given stream is stored within this map, it will also be removed.
+     *
+     * @param stream
+     *     The stream to close.
+     *
+     * @return
+     *     true if the given stream was stored within this map, false
+     *     otherwise.
+     */
+    public boolean close(InterceptedStream<T> stream) {
+
+        // Remove stream if present
+        boolean wasRemoved = streams.remove(stream.getIndex(), stream);
+
+        // Close provided stream
+        close(stream.getStream());
+
+        return wasRemoved;
+
+    }
+
+    /**
+     * Removes and closes all streams stored within this map, logging any errors
+     * that occur during closure, and unblocking any in-progress calls to
+     * waitFor().
+     */
+    public void closeAll() {
+
+        // Close any active streams
+        for (InterceptedStream<T> stream : streams.values())
+            close(stream.getStream());
+
+        // Remove now-useless references
+        streams.clear();
+
+    }
+
+    /**
+     * Blocks until the given stream is closed, or until another stream with
+     * the same index replaces it.
+     *
+     * @param stream
+     *     The stream to wait for.
+     */
+    public void waitFor(InterceptedStream<T> stream) {
+
+        T underlyingStream = stream.getStream();
+
+        // Wait for stream to close
+        synchronized (underlyingStream) {
+            while (streams.get(stream.getIndex()) == stream) {
+                try {
+                    underlyingStream.wait(STREAM_WAIT_TIMEOUT);
+                }
+                catch (InterruptedException e) {
+                    // Ignore
+                }
+            }
+        }
+
+    }
+
+    /**
+     * Returns the stream stored in this map under the given index.
+     *
+     * @param index
+     *     The index of the stream to return.
+     *
+     * @return
+     *     The stream having the given index, or null if no such stream is
+     *     stored within this map.
+     */
+    public InterceptedStream<T> get(String index) {
+        return streams.get(index);
+    }
+
+    /**
+     * Adds the given stream to this map, storing it under its associated
+     * index. If another stream already exists within this map having the same
+     * index, that stream will be closed and replaced.
+     *
+     * @param stream
+     *     The stream to store within this map.
+     */
+    public void put(InterceptedStream<T> stream) {
+
+        // Add given stream to map
+        InterceptedStream<T> oldStream =
+                streams.put(stream.getIndex(), stream);
+
+        // If a previous stream DID exist, close it
+        if (oldStream != null)
+            close(oldStream.getStream());
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-guacamole-client/blob/f391f00c/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java
----------------------------------------------------------------------
diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java
b/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java
new file mode 100644
index 0000000..7cdadba
--- /dev/null
+++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.guacamole.tunnel;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.guacamole.GuacamoleException;
+import org.apache.guacamole.net.GuacamoleTunnel;
+import org.apache.guacamole.protocol.GuacamoleInstruction;
+import org.apache.guacamole.protocol.GuacamoleStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Filter which selectively intercepts "blob" and "end" instructions,
+ * automatically writing to or closing the stream given with
+ * interceptStream(). The required "ack" responses to received blobs are
+ * sent automatically.
+ *
+ * @author Michael Jumper
+ */
+public class OutputStreamInterceptingFilter
+        extends StreamInterceptingFilter<OutputStream> {
+
+    /**
+     * Logger for this class.
+     */
+    private static final Logger logger =
+            LoggerFactory.getLogger(OutputStreamInterceptingFilter.class);
+
+    /**
+     * Creates a new OutputStreamInterceptingFilter which selectively intercepts
+     * "blob" and "end" instructions. The required "ack" responses will
+     * automatically be sent over the given tunnel.
+     *
+     * @param tunnel
+     *     The GuacamoleTunnel over which any required "ack" instructions
+     *     should be sent.
+     */
+    public OutputStreamInterceptingFilter(GuacamoleTunnel tunnel) {
+        super(tunnel);
+    }
+
+    /**
+     * Injects an "ack" instruction into the outbound Guacamole protocol
+     * stream, as if sent by the connected client. "ack" instructions are used
+     * to acknowledge the receipt of a stream and its subsequent blobs, and are
+     * the only means of communicating success/failure status.
+     *
+     * @param index
+     *     The index of the stream that this "ack" instruction relates to.
+     *
+     * @param message
+     *     An arbitrary human-readable message to include within the "ack"
+     *     instruction.
+     *
+     * @param status
+     *     The status of the stream operation being acknowledged via the "ack"
+     *     instruction. Error statuses will implicitly close the stream via
+     *     closeStream().
+     */
+    private void sendAck(String index, String message, GuacamoleStatus status) {
+
+        // Error "ack" instructions implicitly close the stream
+        if (status != GuacamoleStatus.SUCCESS)
+            closeInterceptedStream(index);
+
+        sendInstruction(new GuacamoleInstruction("ack", index, message,
+                Integer.toString(status.getGuacamoleStatusCode())));
+
+    }
+
+    /**
+     * Handles a single "blob" instruction, decoding its base64 data,
+     * sending that data to the associated OutputStream, and ultimately
+     * dropping the "blob" instruction such that the client never receives
+     * it. If no OutputStream is associated with the stream index within
+     * the "blob" instruction, the instruction is passed through untouched.
+     *
+     * @param instruction
+     *     The "blob" instruction being handled.
+     *
+     * @return
+     *     The originally-provided "blob" instruction, if that instruction
+     *     should be passed through to the client, or null if the "blob"
+     *     instruction should be dropped.
+     */
+    private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) {
+
+        // Verify all required arguments are present
+        List<String> args = instruction.getArgs();
+        if (args.size() < 2)
+            return instruction;
+
+        // Pull associated stream
+        String index = args.get(0);
+        InterceptedStream<OutputStream> stream = getInterceptedStream(index);
+        if (stream == null)
+            return instruction;
+
+        // Decode blob
+        byte[] blob;
+        try {
+            String data = args.get(1);
+            blob = DatatypeConverter.parseBase64Binary(data);
+        }
+        catch (IllegalArgumentException e) {
+            logger.warn("Received base64 data for intercepted stream was invalid.");
+            logger.debug("Decoding base64 data for intercepted stream failed.", e);
+            return null;
+        }
+
+        // Attempt to write data to stream
+        try {
+            stream.getStream().write(blob);
+            sendAck(index, "OK", GuacamoleStatus.SUCCESS);
+        }
+        catch (IOException e) {
+            sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR);
+            logger.debug("Write failed for intercepted stream.", e);
+        }
+
+        // Instruction was handled purely internally
+        return null;
+
+    }
+
+    /**
+     * Handles a single "end" instruction, closing the associated
+     * OutputStream. If no OutputStream is associated with the stream index
+     * within the "end" instruction, this function has no effect.
+     *
+     * @param instruction
+     *     The "end" instruction being handled.
+     */
+    private void handleEnd(GuacamoleInstruction instruction) {
+
+        // Verify all required arguments are present
+        List<String> args = instruction.getArgs();
+        if (args.size() < 1)
+            return;
+
+        // Terminate stream
+        closeInterceptedStream(args.get(0));
+
+    }
+
+    @Override
+    public GuacamoleInstruction filter(GuacamoleInstruction instruction)
+            throws GuacamoleException {
+
+        // Intercept "blob" instructions for in-progress streams
+        if (instruction.getOpcode().equals("blob"))
+            return handleBlob(instruction);
+
+        // Intercept "end" instructions for in-progress streams
+        if (instruction.getOpcode().equals("end")) {
+            handleEnd(instruction);
+            return instruction;
+        }
+
+        // Pass instruction through untouched
+        return instruction;
+
+    }
+
+    @Override
+    protected void handleInterceptedStream(InterceptedStream<OutputStream> stream)
{
+
+        // Acknowledge that the stream is ready to receive data
+        sendAck(stream.getIndex(), "OK", GuacamoleStatus.SUCCESS);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-guacamole-client/blob/f391f00c/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java
----------------------------------------------------------------------
diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java
b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java
new file mode 100644
index 0000000..e04de7d
--- /dev/null
+++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.guacamole.tunnel;
+
+import java.io.Closeable;
+import org.apache.guacamole.GuacamoleException;
+import org.apache.guacamole.io.GuacamoleWriter;
+import org.apache.guacamole.net.GuacamoleTunnel;
+import org.apache.guacamole.protocol.GuacamoleFilter;
+import org.apache.guacamole.protocol.GuacamoleInstruction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Filter which selectively intercepts stream-related instructions,
+ * automatically writing to, reading from, or closing the stream given with
+ * interceptStream(). Any instructions required by the Guacamole protocol to be
+ * sent in response to intercepted instructions will be sent automatically.
+ *
+ * @param <T>
+ *     The type of object which will produce or consume the data sent over the
+ *     intercepted Guacamole stream. Usually, this will be either InputStream
+ *     or OutputStream.
+ */
+public abstract class StreamInterceptingFilter<T extends Closeable>
+        implements GuacamoleFilter {
+
+    /**
+     * Logger for this class.
+     */
+    private static final Logger logger =
+            LoggerFactory.getLogger(StreamInterceptingFilter.class);
+
+    /**
+     * Mapping of the all streams whose related instructions should be
+     * intercepted.
+     */
+    private final InterceptedStreamMap<T> streams = new InterceptedStreamMap<T>();
+
+    /**
+     * The tunnel over which any required instructions should be sent.
+     */
+    private final GuacamoleTunnel tunnel;
+
+    /**
+     * Creates a new StreamInterceptingFilter which selectively intercepts
+     * stream-related instructions. Any instructions required by the Guacamole
+     * protocol to be sent in response to intercepted instructions will be sent
+     * automatically over the given tunnel.
+     *
+     * @param tunnel
+     *     The GuacamoleTunnel over which any required instructions should be
+     *     sent.
+     */
+    public StreamInterceptingFilter(GuacamoleTunnel tunnel) {
+        this.tunnel = tunnel;
+    }
+
+    /**
+     * Injects an arbitrary Guacamole instruction into the outbound Guacamole
+     * protocol stream (GuacamoleWriter) of the tunnel associated with this
+     * StreamInterceptingFilter, as if the instruction was sent by the connected
+     * client.
+     *
+     * @param instruction
+     *     The Guacamole instruction to inject.
+     */
+    protected void sendInstruction(GuacamoleInstruction instruction) {
+
+        // Temporarily acquire writer to send "ack" instruction
+        GuacamoleWriter writer = tunnel.acquireWriter();
+
+        // Send successful "ack"
+        try {
+            writer.writeInstruction(instruction);
+        }
+        catch (GuacamoleException e) {
+            logger.debug("Unable to send \"{}\" for intercepted stream.",
+                    instruction.getOpcode(), e);
+        }
+
+        // Done writing
+        tunnel.releaseWriter();
+
+    }
+
+    /**
+     * Returns the stream having the given index and currently being intercepted
+     * by this filter.
+     *
+     * @param index
+     *     The index of the stream to return.
+     *
+     * @return
+     *     The stream having the given index, or null if no such stream is
+     *     being intercepted.
+     */
+    protected InterceptedStream<T> getInterceptedStream(String index) {
+        return streams.get(index);
+    }
+
+    /**
+     * Closes the stream having the given index and currently being intercepted
+     * by this filter, if any. If no such stream is being intercepted, then this
+     * function has no effect.
+     *
+     * @param index
+     *     The index of the stream to close.
+     *
+     * @return
+     *     The stream associated with the given index, if the stream is being
+     *     intercepted, or null if no such stream exists.
+     */
+    protected InterceptedStream<T> closeInterceptedStream(String index) {
+        return streams.close(index);
+    }
+
+    /**
+     * Closes the given stream.
+     *
+     * @param stream
+     *     The stream to close.
+     *
+     * @return
+     *     true if the given stream was being intercepted, false otherwise.
+     */
+    protected boolean closeInterceptedStream(InterceptedStream<T> stream) {
+        return streams.close(stream);
+    }
+
+    /**
+     * Closes all streams being intercepted by this filter.
+     */
+    public void closeAllInterceptedStreams() {
+        streams.closeAll();
+    }
+
+    /**
+     * Begins handling the data of the given intercepted stream. This function
+     * will automatically be invoked by interceptStream() for any valid stream.
+     * It is not required that this function block until all data is handled;
+     * interceptStream() will do this automatically. Implementations are free
+     * to use asynchronous approaches to data handling.
+     *
+     * @param stream
+     *     The stream being intercepted.
+     */
+    protected abstract void handleInterceptedStream(InterceptedStream<T> stream);
+
+    /**
+     * Intercept the stream having the given index, producing or consuming its
+     * data as appropriate. The given stream object will automatically be closed
+     * when the stream ends. If there is no stream having the given index, then
+     * the stream object will be closed immediately. This function will block
+     * until all data has been handled and the stream is ended.
+     *
+     * @param index
+     *     The index of the stream to intercept.
+     *
+     * @param stream
+     *     The stream object which will produce or consume all data for the
+     *     stream having the given index.
+     */
+    public void interceptStream(int index, T stream) {
+
+        InterceptedStream<T> interceptedStream;
+        String indexString = Integer.toString(index);
+
+        // Atomically verify tunnel is open and add the given stream
+        synchronized (tunnel) {
+
+            // Do nothing if tunnel is not open
+            if (!tunnel.isOpen())
+                return;
+
+            // Wrap stream
+            interceptedStream = new InterceptedStream<T>(indexString, stream);
+
+            // Replace any existing stream
+            streams.put(interceptedStream);
+
+        }
+
+        // Produce/consume all stream data
+        handleInterceptedStream(interceptedStream);
+
+        // Wait for stream to close
+        streams.waitFor(interceptedStream);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-guacamole-client/blob/f391f00c/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java
----------------------------------------------------------------------
diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java
b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java
index a9a1355..333cdb1 100644
--- a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java
+++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java
@@ -20,21 +20,12 @@
 package org.apache.guacamole.tunnel;
 
 import java.io.BufferedOutputStream;
-import java.io.IOException;
 import java.io.OutputStream;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.xml.bind.DatatypeConverter;
 import org.apache.guacamole.GuacamoleException;
 import org.apache.guacamole.io.GuacamoleReader;
-import org.apache.guacamole.io.GuacamoleWriter;
 import org.apache.guacamole.net.DelegatingGuacamoleTunnel;
 import org.apache.guacamole.net.GuacamoleTunnel;
 import org.apache.guacamole.protocol.FilteredGuacamoleReader;
-import org.apache.guacamole.protocol.GuacamoleFilter;
-import org.apache.guacamole.protocol.GuacamoleInstruction;
-import org.apache.guacamole.protocol.GuacamoleStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,13 +42,8 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel
{
     /**
      * Logger for this class.
      */
-    private static final Logger logger = LoggerFactory.getLogger(StreamInterceptingTunnel.class);
-
-    /**
-     * The maximum number of milliseconds to wait for notification that a
-     * stream has closed before explicitly checking for closure ourselves.
-     */
-    private static final long STREAM_WAIT_TIMEOUT = 1000;
+    private static final Logger logger =
+            LoggerFactory.getLogger(StreamInterceptingTunnel.class);
 
     /**
      * Creates a new StreamInterceptingTunnel which wraps the given tunnel,
@@ -73,206 +59,10 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel
{
     }
 
     /**
-     * Mapping of the indexes of all streams whose associated "blob" and "end"
-     * instructions should be intercepted.
-     */
-    private final Map<String, OutputStream> streams =
-            new ConcurrentHashMap<String, OutputStream>();
-
-    /**
-     * Filter which selectively intercepts "blob" and "end" instructions,
-     * automatically writing to or closing the stream given with
-     * interceptStream(). The required "ack" responses to received blobs are
-     * sent automatically.
-     */
-    private final GuacamoleFilter STREAM_FILTER = new GuacamoleFilter() {
-
-        /**
-         * Handles a single "blob" instruction, decoding its base64 data,
-         * sending that data to the associated OutputStream, and ultimately
-         * dropping the "blob" instruction such that the client never receives
-         * it. If no OutputStream is associated with the stream index within
-         * the "blob" instruction, the instruction is passed through untouched.
-         *
-         * @param instruction
-         *     The "blob" instruction being handled.
-         *
-         * @return
-         *     The originally-provided "blob" instruction, if that instruction
-         *     should be passed through to the client, or null if the "blob"
-         *     instruction should be dropped.
-         */
-        private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) {
-
-            // Verify all required arguments are present
-            List<String> args = instruction.getArgs();
-            if (args.size() < 2)
-                return instruction;
-
-            // Pull associated stream
-            String index = args.get(0);
-            OutputStream stream = streams.get(index);
-            if (stream == null)
-                return instruction;
-
-            // Decode blob
-            byte[] blob;
-            try {
-                String data = args.get(1);
-                blob = DatatypeConverter.parseBase64Binary(data);
-            }
-            catch (IllegalArgumentException e) {
-                logger.warn("Received base64 data for intercepted stream was invalid.");
-                logger.debug("Decoding base64 data for intercepted stream failed.", e);
-                return null;
-            }
-
-            // Attempt to write data to stream
-            try {
-                stream.write(blob);
-                sendAck(index, "OK", GuacamoleStatus.SUCCESS);
-            }
-            catch (IOException e) {
-                sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR);
-                logger.debug("Write failed for intercepted stream.", e);
-            }
-
-            // Instruction was handled purely internally
-            return null;
-
-        }
-
-        /**
-         * Handles a single "end" instruction, closing the associated
-         * OutputStream. If no OutputStream is associated with the stream index
-         * within the "end" instruction, this function has no effect.
-         *
-         * @param instruction
-         *     The "end" instruction being handled.
-         */
-        private void handleEnd(GuacamoleInstruction instruction) {
-
-            // Verify all required arguments are present
-            List<String> args = instruction.getArgs();
-            if (args.size() < 1)
-                return;
-
-            // Terminate stream
-            closeStream(args.get(0));
-
-        }
-
-        @Override
-        public GuacamoleInstruction filter(GuacamoleInstruction instruction)
-                throws GuacamoleException {
-
-            // Intercept "blob" instructions for in-progress streams
-            if (instruction.getOpcode().equals("blob"))
-                return handleBlob(instruction);
-
-            // Intercept "end" instructions for in-progress streams
-            if (instruction.getOpcode().equals("end")) {
-                handleEnd(instruction);
-                return instruction;
-            }
-
-            // Pass instruction through untouched
-            return instruction;
-
-        }
-
-    };
-
-    /**
-     * Closes the given OutputStream, logging any errors that occur during
-     * closure. The monitor of the OutputStream is notified via a single call
-     * to notify() once the attempt to close has been made.
-     *
-     * @param stream
-     *     The OutputStream to close and notify.
-     */
-    private void closeStream(OutputStream stream) {
-
-        // Attempt to close stream
-        try {
-            stream.close();
-        }
-        catch (IOException e) {
-            logger.warn("Unable to close intercepted stream: {}", e.getMessage());
-            logger.debug("I/O error prevented closure of intercepted stream.", e);
-        }
-
-        // Notify waiting threads that the stream has ended
-        synchronized (stream) {
-            stream.notify();
-        }
-
-    }
-
-    /**
-     * Closes the OutputStream associated with the stream having the given
-     * index, if any, logging any errors that occur during closure. If no such
-     * stream exists, this function has no effect. The monitor of the
-     * OutputStream is notified via a single call to notify() once the attempt
-     * to close has been made.
-     *
-     * @param index
-     *     The index of the stream whose associated OutputStream should be
-     *     closed and notified.
+     * The filter to use for rerouting received stream data to OutputStreams.
      */
-    private OutputStream closeStream(String index) {
-
-        // Remove associated stream
-        OutputStream stream = streams.remove(index);
-        if (stream == null)
-            return null;
-
-        // Close stream if it exists
-        closeStream(stream);
-        return stream;
-
-    }
-
-    /**
-     * Injects an "ack" instruction into the outbound Guacamole protocol
-     * stream, as if sent by the connected client. "ack" instructions are used
-     * to acknowledge the receipt of a stream and its subsequent blobs, and are
-     * the only means of communicating success/failure status.
-     *
-     * @param index
-     *     The index of the stream that this "ack" instruction relates to.
-     *
-     * @param message
-     *     An arbitrary human-readable message to include within the "ack"
-     *     instruction.
-     *
-     * @param status
-     *     The status of the stream operation being acknowledged via the "ack"
-     *     instruction. Error statuses will implicitly close the stream via
-     *     closeStream().
-     */
-    private void sendAck(String index, String message, GuacamoleStatus status) {
-
-        // Temporarily acquire writer to send "ack" instruction
-        GuacamoleWriter writer = acquireWriter();
-
-        // Send successful "ack"
-        try {
-            writer.writeInstruction(new GuacamoleInstruction("ack", index, message,
-                    Integer.toString(status.getGuacamoleStatusCode())));
-        }
-        catch (GuacamoleException e) {
-            logger.debug("Unable to send \"ack\" for intercepted stream.", e);
-        }
-
-        // Error "ack" instructions implicitly close the stream
-        if (status != GuacamoleStatus.SUCCESS)
-            closeStream(index);
-
-        // Done writing
-        releaseWriter();
-
-    }
+    private final OutputStreamInterceptingFilter outputStreamFilter =
+            new OutputStreamInterceptingFilter(this);
 
     /**
      * Intercept all data received along the stream having the given index,
@@ -290,57 +80,21 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel
{
      */
     public void interceptStream(int index, OutputStream stream) {
 
-        String indexString = Integer.toString(index);
-
-        // Atomically verify tunnel is open and add the given stream
-        OutputStream oldStream;
-        synchronized (this) {
-
-            // Do nothing if tunnel is not open
-            if (!isOpen()) {
-                closeStream(stream);
-                return;
-            }
-
-            // Wrap stream
-            stream = new BufferedOutputStream(stream);
-
-            // Replace any existing stream
-            oldStream = streams.put(indexString, stream);
-
-        }
-
-        // If a previous stream DID exist, close it
-        if (oldStream != null)
-            closeStream(oldStream);
-
         // Log beginning of intercepted stream
-        logger.debug("Intercepting stream #{} of tunnel \"{}\".",
+        logger.debug("Intercepting output stream #{} of tunnel \"{}\".",
                 index, getUUID());
 
-        // Acknowledge stream receipt
-        sendAck(indexString, "OK", GuacamoleStatus.SUCCESS);
-
-        // Wait for stream to close
-        synchronized (stream) {
-            while (streams.get(indexString) == stream) {
-                try {
-                    stream.wait(STREAM_WAIT_TIMEOUT);
-                }
-                catch (InterruptedException e) {
-                    // Ignore
-                }
-            }
-        }
+        outputStreamFilter.interceptStream(index, new BufferedOutputStream(stream));
 
         // Log end of intercepted stream
-        logger.debug("Intercepted stream #{} of tunnel \"{}\" ended.", index, getUUID());
+        logger.debug("Intercepted output stream #{} of tunnel \"{}\" ended.",
+                index, getUUID());
 
     }
 
     @Override
     public GuacamoleReader acquireReader() {
-        return new FilteredGuacamoleReader(super.acquireReader(), STREAM_FILTER);
+        return new FilteredGuacamoleReader(super.acquireReader(), outputStreamFilter);
     }
 
     @Override
@@ -352,16 +106,9 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel
{
             super.close();
         }
 
-        // Ensure all waiting threads are notified that all streams have ended
+        // Close all intercepted streams
         finally {
-
-            // Close any active streams
-            for (OutputStream stream : streams.values())
-                closeStream(stream);
-
-            // Remove now-useless references
-            streams.clear();
-
+            outputStreamFilter.closeAllInterceptedStreams();
         }
 
     }



Mime
View raw message