flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StephanEwen <...@git.apache.org>
Subject [GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Date Thu, 23 Nov 2017 17:47:35 GMT
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5059#discussion_r152853859
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
---
    @@ -0,0 +1,1097 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.fs;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.configuration.ConfigOption;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.CoreOptions;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.util.function.SupplierWithException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import javax.annotation.concurrent.GuardedBy;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.HashSet;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * A file system that limits the number of concurrently open input streams,
    + * output streams, and total streams for a target file system.
    + *
    + * <p>This file system can wrap another existing file system in cases where
    + * the target file system cannot handle certain connection spikes and connections
    + * would fail in that case. This happens, for example, for very small HDFS clusters
    + * with few RPC handlers, when a large Flink job tries to build up many connections during
    + * a checkpoint.
    + *
    + * <p>The filesystem may track the progress of streams and close streams that have
been
    + * inactive for too long, to avoid locked streams of taking up the complete pool.
    + * Rather than having a dedicated reaper thread, the calls that try to open a new stream
    + * periodically check the currently open streams once the limit of open streams is reached.
    + */
    +@Internal
    +public class LimitedConnectionsFileSystem extends FileSystem {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
    +
    +	/** The original file system to which connections are limited. */
    +	private final FileSystem originalFs;
    +
    +	/** The lock that synchronizes connection bookkeeping. */
    +	private final ReentrantLock lock;
    +
    +	/** Condition for threads that are blocking on the availability of new connections.
*/
    +	private final Condition available;
    +
    +	/** The maximum number of concurrently open output streams. */
    +	private final int maxNumOpenOutputStreams;
    +
    +	/** The maximum number of concurrently open input streams. */
    +	private final int maxNumOpenInputStreams;
    +
    +	/** The maximum number of concurrently open streams (input + output). */
    +	private final int maxNumOpenStreamsTotal;
    +
    +	/** The nanoseconds that a opening a stream may wait for availability. */
    +	private final long streamOpenTimeoutNanos;
    +
    +	/** The nanoseconds that a stream may spend not writing any bytes before it is closed
as inactive. */
    +	private final long streamInactivityTimeoutNanos;
    +
    +	/** The set of currently open output streams. */
    +	@GuardedBy("lock")
    +	private final HashSet<OutStream> openOutputStreams;
    +
    +	/** The set of currently open input streams. */
    +	@GuardedBy("lock")
    +	private final HashSet<InStream> openInputStreams;
    +
    +	/** The number of output streams reserved to be opened. */
    +	@GuardedBy("lock")
    +	private int numReservedOutputStreams;
    +
    +	/** The number of input streams reserved to be opened. */
    +	@GuardedBy("lock")
    +	private int numReservedInputStreams;
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Creates a new output connection limiting file system.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 */
    +	public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal)
{
    +		this(originalFs, maxNumOpenStreamsTotal, 0, 0);
    +	}
    +
    +	/**
    +	 * Creates a new output connection limiting file system.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 * @param streamOpenTimeout       The maximum number of milliseconds that the file system
will wait when
    +	 *                                no more connections are currently permitted.
    +	 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing
any
    +	 *                                bytes before it is closed as inactive.
    +	 */
    +	public LimitedConnectionsFileSystem(
    +			FileSystem originalFs,
    +			int maxNumOpenStreamsTotal,
    +			long streamOpenTimeout,
    +			long streamInactivityTimeout) {
    +		this(originalFs, maxNumOpenStreamsTotal, 0, 0, streamOpenTimeout, streamInactivityTimeout);
    +	}
    +
    +	/**
    +	 * Creates a new output connection limiting file system, limiting input and output streams
with
    +	 * potentially different quotas.
    +	 *
    +	 * <p>If streams are inactive (meaning not writing bytes) for longer than the
given timeout,
    +	 * then they are terminated as "inactive", to prevent that the limited number of connections
gets
    +	 * stuck on only blocked threads.
    +	 *
    +	 * @param originalFs              The original file system to which connections are
limited.
    +	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means
no limit).
    +	 * @param maxNumOpenOutputStreams The maximum number of concurrent open output streams
(0 means no limit).
    +	 * @param maxNumOpenInputStreams  The maximum number of concurrent open input streams
(0 means no limit).
    +	 * @param streamOpenTimeout       The maximum number of milliseconds that the file system
will wait when
    +	 *                                no more connections are currently permitted.
    +	 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing
any
    +	 *                                bytes before it is closed as inactive.
    +	 */
    +	public LimitedConnectionsFileSystem(
    +			FileSystem originalFs,
    +			int maxNumOpenStreamsTotal,
    +			int maxNumOpenOutputStreams,
    +			int maxNumOpenInputStreams,
    +			long streamOpenTimeout,
    +			long streamInactivityTimeout) {
    +
    +		checkArgument(maxNumOpenStreamsTotal >= 0, "maxNumOpenStreamsTotal must be >=
0");
    +		checkArgument(maxNumOpenOutputStreams >= 0, "maxNumOpenOutputStreams must be >=
0");
    +		checkArgument(maxNumOpenInputStreams >= 0, "maxNumOpenInputStreams must be >=
0");
    +		checkArgument(streamOpenTimeout >= 0, "stream opening timeout must be >= 0 (0
means infinite timeout)");
    +		checkArgument(streamInactivityTimeout >= 0, "stream inactivity timeout must be >=
0 (0 means infinite timeout)");
    +
    +		this.originalFs = checkNotNull(originalFs, "originalFs");
    +		this.lock = new ReentrantLock(true);
    +		this.available = lock.newCondition();
    +		this.openOutputStreams = new HashSet<>();
    +		this.openInputStreams = new HashSet<>();
    +		this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal;
    +		this.maxNumOpenOutputStreams = maxNumOpenOutputStreams;
    +		this.maxNumOpenInputStreams = maxNumOpenInputStreams;
    +
    +		// assign nanos overflow aware
    +		final long openTimeoutNanos = streamOpenTimeout * 1_000_000;
    +		final long inactivityTimeoutNanos = streamInactivityTimeout * 1_000_000;
    +
    +		this.streamOpenTimeoutNanos =
    +				openTimeoutNanos >= streamOpenTimeout ? openTimeoutNanos : Long.MAX_VALUE;
    +
    +		this.streamInactivityTimeoutNanos =
    +				inactivityTimeoutNanos >= streamInactivityTimeout ? inactivityTimeoutNanos : Long.MAX_VALUE;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the maximum number of concurrently open output streams.
    +	 */
    +	public int getMaxNumOpenOutputStreams() {
    +		return maxNumOpenOutputStreams;
    +	}
    +
    +	/**
    +	 * Gets the maximum number of concurrently open input streams.
    +	 */
    +	public int getMaxNumOpenInputStreams() {
    +		return maxNumOpenInputStreams;
    +	}
    +
    +	/**
    +	 * Gets the maximum number of concurrently open streams (input + output).
    +	 */
    +	public int getMaxNumOpenStreamsTotal() {
    +		return maxNumOpenStreamsTotal;
    +	}
    +
    +	/**
    +	 * Gets the number of milliseconds that a opening a stream may wait for availability
in the
    +	 * connection pool.
    +	 */
    +	public long getStreamOpenTimeout() {
    +		return streamOpenTimeoutNanos / 1_000_000;
    +	}
    +
    +	/**
    +	 * Gets the milliseconds that a stream may spend not writing any bytes before it is
closed as inactive.
    +	 */
    +	public long getStreamInactivityTimeout() {
    +		return streamInactivityTimeoutNanos / 1_000_000;
    +	}
    +
    +	/**
    +	 * Gets the total number of open streams (input plus output).
    +	 */
    +	public int getTotalNumberOfOpenStreams() {
    +		lock.lock();
    +		try {
    +			return numReservedOutputStreams + numReservedInputStreams;
    +		} finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	/**
    +	 * Gets the number of currently open output streams.
    +	 */
    +	public int getNumberOfOpenOutputStreams() {
    +		lock.lock();
    +		try {
    +			return numReservedOutputStreams;
    +		}
    +		finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	/**
    +	 * Gets the number of currently open input streams.
    +	 */
    +	public int getNumberOfOpenInputStreams() {
    +		return numReservedInputStreams;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  input & output stream opening methods
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException
{
    +		return createOutputStream(() -> originalFs.create(f, overwriteMode));
    +	}
    +
    +	@Override
    +	@Deprecated
    +	@SuppressWarnings("deprecation")
    +	public FSDataOutputStream create(
    +			Path f,
    +			boolean overwrite,
    +			int bufferSize,
    +			short replication,
    +			long blockSize) throws IOException {
    +
    +		return createOutputStream(() -> originalFs.create(f, overwrite, bufferSize, replication,
blockSize));
    +	}
    +
    +	@Override
    +	public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    +		return createInputStream(() -> originalFs.open(f, bufferSize));
    +	}
    +
    +	@Override
    +	public FSDataInputStream open(Path f) throws IOException {
    +		return createInputStream(() -> originalFs.open(f));
    +	}
    +
    +	private FSDataOutputStream createOutputStream(
    +			final SupplierWithException<FSDataOutputStream, IOException> streamOpener) throws
IOException {
    +
    +		final SupplierWithException<OutStream, IOException> wrappedStreamOpener =
    +				() -> new OutStream(streamOpener.get(), this);
    +
    +		return createStream(wrappedStreamOpener, openOutputStreams, true);
    +	}
    +
    +	private FSDataInputStream createInputStream(
    +			final SupplierWithException<FSDataInputStream, IOException> streamOpener) throws
IOException {
    +
    +		final SupplierWithException<InStream, IOException> wrappedStreamOpener =
    +				() -> new InStream(streamOpener.get(), this);
    +
    +		return createStream(wrappedStreamOpener, openInputStreams, false);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  other delegating file system methods
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public FileSystemKind getKind() {
    +		return originalFs.getKind();
    +	}
    +
    +	@Override
    +	public boolean isDistributedFS() {
    +		return originalFs.isDistributedFS();
    +	}
    +
    +	@Override
    +	public Path getWorkingDirectory() {
    +		return originalFs.getWorkingDirectory();
    +	}
    +
    +	@Override
    +	public Path getHomeDirectory() {
    +		return originalFs.getHomeDirectory();
    +	}
    +
    +	@Override
    +	public URI getUri() {
    +		return originalFs.getUri();
    +	}
    +
    +	@Override
    +	public FileStatus getFileStatus(Path f) throws IOException {
    +		return originalFs.getFileStatus(f);
    +	}
    +
    +	@Override
    +	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)
throws IOException {
    +		return originalFs.getFileBlockLocations(file, start, len);
    +	}
    +
    +	@Override
    +	public FileStatus[] listStatus(Path f) throws IOException {
    +		return originalFs.listStatus(f);
    +	}
    +
    +	@Override
    +	public boolean delete(Path f, boolean recursive) throws IOException {
    +		return originalFs.delete(f, recursive);
    +	}
    +
    +	@Override
    +	public boolean mkdirs(Path f) throws IOException {
    +		return originalFs.mkdirs(f);
    +	}
    +
    +	@Override
    +	public boolean rename(Path src, Path dst) throws IOException {
    +		return originalFs.rename(src, dst);
    +	}
    +
    +	@Override
    +	public boolean exists(Path f) throws IOException {
    +		return originalFs.exists(f);
    +	}
    +
    +	@Override
    +	@Deprecated
    +	@SuppressWarnings("deprecation")
    +	public long getDefaultBlockSize() {
    +		return originalFs.getDefaultBlockSize();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	private <T extends StreamWithTimeout> T createStream(
    +			final SupplierWithException<T, IOException> streamOpener,
    +			final HashSet<T> openStreams,
    +			final boolean output) throws IOException {
    +
    +		final int outputLimit = output && maxNumOpenInputStreams > 0 ? maxNumOpenOutputStreams
: Integer.MAX_VALUE;
    +		final int inputLimit = !output && maxNumOpenInputStreams > 0 ? maxNumOpenInputStreams
: Integer.MAX_VALUE;
    +		final int totalLimit = maxNumOpenStreamsTotal > 0 ? maxNumOpenStreamsTotal : Integer.MAX_VALUE;
    +		final int outputCredit = output ? 1 : 0;
    +		final int inputCredit = output ? 0 : 1;
    +
    +		// because waiting for availability may take long, we need to be interruptible here
    +		// and handle interrupted exceptions as I/O errors
    +		// even though the code is written to make sure the lock is held for a short time only,
    +		// making the lock acquisition interruptible helps to guard against the cases where
    +		// a supposedly fast operation (like 'getPos()' on a stream) actually takes long.
    +		try {
    +			lock.lockInterruptibly();
    +			try {
    +				// some integrity checks
    +				assert openOutputStreams.size() <= numReservedOutputStreams;
    +				assert openInputStreams.size() <= numReservedInputStreams;
    +
    +				// wait until there are few enough streams so we can open another
    +				waitForAvailability(totalLimit, outputLimit, inputLimit);
    +
    +				// We do not open the stream here in the locked scope because opening a stream
    +				// could take a while. Holding the lock during that operation would block all concurrent
    +				// attempts to try and open a stream, effectively serializing all calls to open the
streams.
    +				numReservedOutputStreams += outputCredit;
    +				numReservedInputStreams += inputCredit;
    +			}
    +			finally {
    +				lock.unlock();
    +			}
    +		}
    +		catch (InterruptedException e) {
    +			// restore interruption flag
    +			Thread.currentThread().interrupt();
    +			throw new IOException("interrupted before opening stream");
    +		}
    +
    +		// open the stream outside the lock.
    +		boolean success = false;
    +		try {
    +			final T out = streamOpener.get();
    +
    +			// add the stream to the set, need to re-acquire the lock
    +			lock.lock();
    +			try {
    +				openStreams.add(out);
    +			} finally {
    +				lock.unlock();
    +			}
    +
    +			// good, can now return cleanly
    +			success = true;
    +			return out;
    +		}
    +		finally {
    +			if (!success) {
    +				// remove the reserved credit
    +				// we must open this non-interruptibly, because this must succeed!
    +				lock.lock();
    +				try {
    +					numReservedOutputStreams -= outputCredit;
    +					numReservedInputStreams -= inputCredit;
    +					available.signalAll();
    +				} finally {
    +					lock.unlock();
    +				}
    +			}
    +		}
    +	}
    +
    +	@GuardedBy("lock")
    +	private void waitForAvailability(
    +			int totalLimit,
    +			int outputLimit,
    +			int inputLimit) throws InterruptedException, IOException {
    +
    +		checkState(lock.isHeldByCurrentThread());
    +
    +		// compute the deadline of this operations
    +		final long deadline;
    +		if (streamOpenTimeoutNanos == 0) {
    +			deadline = Long.MAX_VALUE;
    +		} else {
    +			long deadlineNanos = System.nanoTime() + streamOpenTimeoutNanos;
    +			// check for overflow
    +			deadline = deadlineNanos > 0 ? deadlineNanos : Long.MAX_VALUE;
    +		}
    +
    +		// wait for available connections
    +		long timeLeft;
    +
    +		if (streamInactivityTimeoutNanos == 0) {
    +			// simple case: just wait
    +			while ((timeLeft = (deadline - System.nanoTime())) > 0 &&
    +					!hasAvailability(totalLimit, outputLimit, inputLimit)) {
    +
    +				available.await(timeLeft, TimeUnit.NANOSECONDS);
    +			}
    +		}
    +		else {
    +			// complex case: chase down inactive streams
    +			final long checkIntervalNanos = (streamInactivityTimeoutNanos >>> 1) + 1;
    +
    +			long now;
    +			while ((timeLeft = (deadline - (now = System.nanoTime()))) > 0 && // while
still within timeout
    +					!hasAvailability(totalLimit, outputLimit, inputLimit)) {
    +
    +				// check all streams whether there in one that has been inactive for too long
    +				if (!(closeInactiveStream(openOutputStreams, now) || closeInactiveStream(openInputStreams,
now))) {
    +					// only wait if we did not manage to close any stream.
    +					// otherwise eagerly check again if we have availability now (we should have!)
    +					long timeToWait = Math.min(checkIntervalNanos, timeLeft);
    +					available.await(timeToWait, TimeUnit.NANOSECONDS);
    +				}
    +			}
    +		}
    +
    +		// check for timeout
    +		// we check availability again to catch cases where the timeout expired while waiting
    +		// to re-acquire the lock
    +		if (timeLeft <= 0 && !hasAvailability(totalLimit, outputLimit, inputLimit))
{
    +			throw new IOException(String.format(
    +					"Timeout while waiting for an available stream/connect. " +
    +					"limits: total=%d, input=%d, output=%d ; Open: input=%d, output=%d ; timeout: %d
ms",
    +					maxNumOpenStreamsTotal, maxNumOpenInputStreams, maxNumOpenOutputStreams,
    +					numReservedInputStreams, numReservedOutputStreams, getStreamOpenTimeout()));
    +		}
    +	}
    +
    +	@GuardedBy("lock")
    +	private boolean hasAvailability(int totalLimit, int outputLimit, int inputLimit) {
    +		return numReservedOutputStreams < outputLimit &&
    +				numReservedInputStreams < inputLimit &&
    +				numReservedOutputStreams + numReservedInputStreams < totalLimit;
    +	}
    +
    +	@GuardedBy("lock")
    +	private boolean closeInactiveStream(HashSet<? extends StreamWithTimeout> streams,
long nowNanos) {
    +		for (StreamWithTimeout stream : streams) {
    +			try {
    +				// If the stream is closed already, it will be removed anyways, so we
    +				// do not classify it as inactive. We also skip the check if another check happened
too recently.
    +				if (stream.isClosed() || nowNanos < stream.getLastCheckTimestampNanos() + streamInactivityTimeoutNanos)
{
    +					// interval since last check not yet over
    +					return false;
    +				}
    +				else if (!stream.checkNewBytesAndMark(nowNanos)) {
    +					stream.closeDueToTimeout();
    +					return true;
    +				}
    +			}
    +			catch (StreamTimeoutException ignored) {
    +				// may happen due to races
    +			}
    +			catch (IOException e) {
    +				// only log on debug level here, to avoid log spamming
    +				LOG.debug("Could not check for stream progress to determine inactivity", e);
    +			}
    +		}
    +
    +		return false;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Atomically removes the given output stream from the set of currently open output
streams,
    +	 * and signals that new stream can now be opened.
    +	 */
    +	void unregisterOutputStream(OutStream stream) {
    +		lock.lock();
    +		try {
    +			// only decrement if we actually remove the stream
    +			if (openOutputStreams.remove(stream)) {
    +				numReservedOutputStreams--;
    +				available.signalAll();
    +			}
    +		}
    +		finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	/**
    +	 * Atomically removes the given input stream from the set of currently open input streams,
    +	 * and signals that new stream can now be opened.
    +	 */
    +	void unregisterInputStream(InStream stream) {
    +		lock.lock();
    +		try {
    +			// only decrement if we actually remove the stream
    +			if (openInputStreams.remove(stream)) {
    +				numReservedInputStreams--;
    +				available.signalAll();
    +			}
    +		}
    +		finally {
    +			lock.unlock();
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * A special IOException, indicating a timeout in the data output stream.
    +	 */
    +	public static final class StreamTimeoutException extends IOException {
    +
    +		private static final long serialVersionUID = -8790922066795901928L;
    +
    +		public StreamTimeoutException() {
    +			super("Stream closed due to inactivity timeout. " +
    +					"This is done to prevent inactive streams from blocking the full " +
    +					"pool of limited connections");
    +		}
    +
    +		public StreamTimeoutException(StreamTimeoutException other) {
    +			super(other.getMessage(), other);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Interface for streams that can be checked for inactivity.
    +	 */
    +	private interface StreamWithTimeout extends Closeable {
    +
    +		/**
    +		 * Gets the timestamp when the last inactivity evaluation was made.
    +		 */
    +		long getLastCheckTimestampNanos();
    +
    +		/**
    +		 * Checks whether there were new bytes since the last time this method was invoked.
    +		 * This also sets the given timestamp, to be read via {@link #getLastCheckTimestampNanos()}.
    +		 *
    +		 * @return True, if there were new bytes, false if not.
    +		 */
    +		boolean checkNewBytesAndMark(long timestamp) throws IOException;
    +
    +		/**
    +		 * Closes the stream asynchronously with a special exception that indicates closing
due
    +		 * to lack of progress.
    +		 */
    +		void closeDueToTimeout() throws IOException;
    +
    +		/**
    +		 * Checks whether the stream was closed already.
    +		 */
    +		boolean isClosed();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * A data output stream that wraps a given data output stream and un-registers
    +	 * from a given connection-limiting file system
    +	 * (via {@link LimitedConnectionsFileSystem#unregisterOutputStream(OutStream)}
    +	 * upon closing.
    +	 */
    +	private static final class OutStream extends FSDataOutputStream implements StreamWithTimeout
{
    +
    +		/** The original data output stream to write to. */
    +		private final FSDataOutputStream originalStream;
    +
    +		/** The connection-limiting file system to un-register from. */
    +		private final LimitedConnectionsFileSystem fs;
    +
    +		/** An exception with which the stream has been externally closed. */
    +		private volatile StreamTimeoutException timeoutException;
    +
    +		/** The number of bytes written the last time that the {@link #checkNewBytesAndMark(long)}
    +		 * method was called. It is important to initialize this with {@code -1} so that the
    +		 * first check (0 bytes) always appears to have made progress. */
    +		private volatile long lastCheckBytes = -1;
    +
    +		/** The timestamp when the last inactivity evaluation was made. */
    +		private volatile long lastCheckTimestampNanos;
    +
    +		/** Flag tracking whether the stream was already closed, for proper inactivity tracking.
*/
    +		private AtomicBoolean closed = new AtomicBoolean();
    +
    +		OutStream(
    +				FSDataOutputStream originalStream,
    +				LimitedConnectionsFileSystem fs) {
    +
    +			this.originalStream = checkNotNull(originalStream);
    +			this.fs = checkNotNull(fs);
    +		}
    +
    +		// --- FSDataOutputStream API implementation
    +
    +		@Override
    +		public void write(int b) throws IOException {
    +			try {
    +				originalStream.write(b);
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +			}
    +		}
    +
    +		@Override
    +		public void write(byte[] b, int off, int len) throws IOException {
    +			try {
    +				originalStream.write(b, off, len);
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +			}
    +		}
    +
    +		@Override
    +		public long getPos() throws IOException {
    +			try {
    +				return originalStream.getPos();
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return -1; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public void flush() throws IOException {
    +			try {
    +				originalStream.flush();
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +			}
    +		}
    +
    +		@Override
    +		public void sync() throws IOException {
    +			try {
    +				originalStream.sync();
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +			}
    +		}
    +
    +		@Override
    +		public void close() throws IOException {
    +			if (closed.compareAndSet(false, true)) {
    +				try {
    +					originalStream.close();
    +				}
    +				catch (IOException e) {
    +					handleIOException(e);
    +				}
    +				finally {
    +					fs.unregisterOutputStream(this);
    +				}
    +			}
    +		}
    +
    +		@Override
    +		public void closeDueToTimeout() throws IOException {
    +			this.timeoutException = new StreamTimeoutException();
    +			close();
    +		}
    +
    +		@Override
    +		public boolean isClosed() {
    +			return closed.get();
    +		}
    +
    +		@Override
    +		public long getLastCheckTimestampNanos() {
    +			return lastCheckTimestampNanos;
    +		}
    +
    +		@Override
    +		public boolean checkNewBytesAndMark(long timestamp) throws IOException {
    +			// remember the time when checked
    +			lastCheckTimestampNanos = timestamp;
    +
    +			final long bytesNow = originalStream.getPos();
    +			if (bytesNow > lastCheckBytes) {
    +				lastCheckBytes = bytesNow;
    +				return true;
    +			}
    +			else {
    +				return false;
    +			}
    +		}
    +
    +		private void handleIOException(IOException exception) throws IOException {
    +			if (timeoutException == null) {
    +				throw exception;
    +			} else {
    +				// throw a new exception to capture this call's stack trace
    +				throw new StreamTimeoutException(timeoutException);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * A data input stream that wraps a given data input stream and un-registers
    +	 * from a given connection-limiting file system
    +	 * (via {@link LimitedConnectionsFileSystem#unregisterInputStream(InStream)}
    +	 * upon closing.
    +	 */
    +	private static final class InStream extends FSDataInputStream implements StreamWithTimeout
{
    +
    +		/** The original data input stream to read from. */
    +		private final FSDataInputStream originalStream;
    +
    +		/** The connection-limiting file system to un-register from. */
    +		private final LimitedConnectionsFileSystem fs;
    +
    +		/** An exception with which the stream has been externally closed. */
    +		private volatile StreamTimeoutException timeoutException;
    +
    +		/** The number of bytes written the last time that the {@link #checkNewBytesAndMark(long)}
    +		 * method was called. It is important to initialize this with {@code -1} so that the
    +		 * first check (0 bytes) always appears to have made progress. */
    +		private volatile long lastCheckBytes = -1;
    +
    +		/** The timestamp when the last inactivity evaluation was made. */
    +		private volatile long lastCheckTimestampNanos;
    +
    +		/** Flag tracking whether the stream was already closed, for proper inactivity tracking.
*/
    +		private AtomicBoolean closed = new AtomicBoolean();
    +
    +		InStream(
    +				FSDataInputStream originalStream,
    +				LimitedConnectionsFileSystem fs) {
    +
    +			this.originalStream = checkNotNull(originalStream);
    +			this.fs = checkNotNull(fs);
    +		}
    +
    +		// --- FSDataOutputStream API implementation
    +
    +		@Override
    +		public int read() throws IOException {
    +			try {
    +				return originalStream.read();
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return 0; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public int read(byte[] b) throws IOException {
    +			try {
    +				return originalStream.read(b);
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return 0; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public int read(byte[] b, int off, int len) throws IOException {
    +			try {
    +				return originalStream.read(b, off, len);
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return 0; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public long skip(long n) throws IOException {
    +			try {
    +				return originalStream.skip(n);
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return 0L; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public int available() throws IOException {
    +			try {
    +				return originalStream.available();
    +			}
    +			catch (IOException e) {
    +				handleIOException(e);
    +				return 0; // silence the compiler
    +			}
    +		}
    +
    +		@Override
    +		public void mark(int readlimit) {
    +			originalStream.mark(readlimit);
    +		}
    +
    +		@Override
    +		public void reset() throws IOException {
    +			originalStream.reset();
    --- End diff --
    
    Nope, that is simply an oversight. Will fix that...


---

Mime
View raw message