flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [02/12] flink git commit: [FLINK-3700] [core] Remove Guava dependency from flink-core
Date Fri, 15 Apr 2016 17:39:47 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/types/StringValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
index e20083e..92f364e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Mutable string data type that implements the Key interface.
@@ -147,7 +147,7 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
 	 * @param value The new string value.
 	 */
 	public void setValue(CharSequence value) {
-		Preconditions.checkNotNull(value);
+		checkNotNull(value);
 		setValue(value, 0, value.length());
 	}
 	
@@ -158,7 +158,7 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
 	 */
 	@Override
 	public void setValue(StringValue value) {
-		Preconditions.checkNotNull(value);
+		checkNotNull(value);
 		setValue(value.value, 0, value.len);
 	}
 
@@ -170,7 +170,7 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
 	 * @param len The length of the substring.
 	 */
 	public void setValue(StringValue value, int offset, int len) {
-		Preconditions.checkNotNull(value);
+		checkNotNull(value);
 		setValue(value.value, offset, len);
 	}
 	
@@ -182,7 +182,7 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
 	 * @param len The length of the substring.
 	 */
 	public void setValue(CharSequence value, int offset, int len) {
-		Preconditions.checkNotNull(value);
+		checkNotNull(value);
 		if (offset < 0 || len < 0 || offset > value.length() - len) {
 			throw new IndexOutOfBoundsException("offset: " + offset + " len: " + len + " value.len: " + len);
 		}
@@ -204,7 +204,7 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
 	 * @param buffer The character buffer to read the characters from.
 	 */
 	public void setValue(CharBuffer buffer) {
-		Preconditions.checkNotNull(buffer);
+		checkNotNull(buffer);
 		final int len = buffer.length();
 		ensureSize(len);
 		buffer.get(this.value, 0, len);
@@ -220,7 +220,7 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
 	 * @param len The length of the substring.
 	 */
 	public void setValue(char[] chars, int offset, int len) {
-		Preconditions.checkNotNull(chars);
+		checkNotNull(chars);
 		if (offset < 0 || len < 0 || offset > chars.length - len) {
 			throw new IndexOutOfBoundsException();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
new file mode 100644
index 0000000..078599d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * This is a utility class to deal with temporary files.
+ */
+public final class FileUtils {
+
+	/**
+	 * The alphabet to construct the random part of the filename from.
+	 */
+	private static final char[] ALPHABET = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd',
+		'e', 'f' };
+
+	/**
+	 * The length of the random part of the filename.
+	 */
+	private static final int LENGTH = 12;
+
+	
+
+	/**
+	 * Constructs a random filename with the given prefix and
+	 * a random part generated from hex characters.
+	 * 
+	 * @param prefix
+	 *        the prefix to the filename to be constructed
+	 * @return the generated random filename with the given prefix
+	 */
+	public static String getRandomFilename(final String prefix) {
+
+		final StringBuilder stringBuilder = new StringBuilder(prefix);
+
+		for (int i = 0; i < LENGTH; i++) {
+			stringBuilder.append(ALPHABET[(int) Math.floor(Math.random() * (double) ALPHABET.length)]);
+		}
+
+		return stringBuilder.toString();
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Simple reading and writing of files
+	// ------------------------------------------------------------------------
+	
+	public static String readFile(File file, String charsetName) throws IOException {
+		byte[] bytes = Files.readAllBytes(file.toPath());
+		return new String(bytes, charsetName);
+	}
+
+	public static String readFileUtf8(File file) throws IOException {
+		return readFile(file, "UTF-8");
+	}
+
+	public static void writeFile(File file, String contents, String encoding) throws IOException {
+		byte[] bytes = contents.getBytes(encoding);
+		Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
+	}
+	
+	public static void writeFileUtf8(File file, String contents) throws IOException {
+		writeFile(file, contents, "UTF-8");
+	}
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Private default constructor to avoid instantiation.
+	 */
+	private FileUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
new file mode 100644
index 0000000..12d70ce
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.Socket;
+
+import org.slf4j.Logger;
+
+/**
+ * An utility class for I/O related functionality.
+ * 
+ */
+public final class IOUtils {
+
+	/** The block size for byte operations in byte. */
+	private static final int BLOCKSIZE = 4096;
+	
+	// ------------------------------------------------------------------------
+	//  Byte copy operations
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Copies from one stream to another.
+	 * 
+	 * @param in
+	 *        InputStream to read from
+	 * @param out
+	 *        OutputStream to write to
+	 * @param buffSize
+	 *        the size of the buffer
+	 * @param close
+	 *        whether or not close the InputStream and OutputStream at the end. The streams are closed in the finally
+	 *        clause.
+	 * @throws IOException
+	 *         thrown if an error occurred while writing to the output stream
+	 */
+	public static void copyBytes(final InputStream in, final OutputStream out, final int buffSize, final boolean close)
+			throws IOException {
+
+		@SuppressWarnings("resource")
+		final PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
+		final byte[] buf = new byte[buffSize];
+		try {
+			int bytesRead = in.read(buf);
+			while (bytesRead >= 0) {
+				out.write(buf, 0, bytesRead);
+				if ((ps != null) && ps.checkError()) {
+					throw new IOException("Unable to write to output stream.");
+				}
+				bytesRead = in.read(buf);
+			}
+		} finally {
+			if (close) {
+				out.close();
+				in.close();
+			}
+		}
+	}
+
+	/**
+	 * Copies from one stream to another. <strong>closes the input and output
+	 * streams at the end</strong>.
+	 * 
+	 * @param in
+	 *        InputStream to read from
+	 * @param out
+	 *        OutputStream to write to
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while copying
+	 */
+	public static void copyBytes(final InputStream in, final OutputStream out) throws IOException {
+		copyBytes(in, out, BLOCKSIZE, true);
+	}
+
+	/**
+	 * Copies from one stream to another.
+	 * 
+	 * @param in
+	 *        InputStream to read from
+	 * @param out
+	 *        OutputStream to write to
+	 * @param close
+	 *        whether or not close the InputStream and OutputStream at the
+	 *        end. The streams are closed in the finally clause.
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while copying
+	 */
+	public static void copyBytes(final InputStream in, final OutputStream out, final boolean close) throws IOException {
+		copyBytes(in, out, BLOCKSIZE, close);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Stream input skipping
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Reads len bytes in a loop.
+	 * 
+	 * @param in
+	 *        The InputStream to read from
+	 * @param buf
+	 *        The buffer to fill
+	 * @param off
+	 *        offset from the buffer
+	 * @param len
+	 *        the length of bytes to read
+	 * @throws IOException
+	 *         if it could not read requested number of bytes for any reason (including EOF)
+	 */
+	public static void readFully(final InputStream in, final byte[] buf, int off, final int len)
+			throws IOException {
+		int toRead = len;
+		while (toRead > 0) {
+			final int ret = in.read(buf, off, toRead);
+			if (ret < 0) {
+				throw new IOException("Premeture EOF from inputStream");
+			}
+			toRead -= ret;
+			off += ret;
+		}
+	}
+
+	/**
+	 * Similar to readFully(). Skips bytes in a loop.
+	 * 
+	 * @param in
+	 *        The InputStream to skip bytes from
+	 * @param len
+	 *        number of bytes to skip
+	 * @throws IOException
+	 *         if it could not skip requested number of bytes for any reason (including EOF)
+	 */
+	public static void skipFully(final InputStream in, long len) throws IOException {
+		while (len > 0) {
+			final long ret = in.skip(len);
+			if (ret < 0) {
+				throw new IOException("Premeture EOF from inputStream");
+			}
+			len -= ret;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Silent I/O cleanup / closing
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
+	 * null pointers. Must only be used for cleanup in exception handlers.
+	 * 
+	 * @param log
+	 *        the log to record problems to at debug level. Can be <code>null</code>.
+	 * @param closeables
+	 *        the objects to close
+	 */
+	public static void cleanup(final Logger log, final java.io.Closeable... closeables) {
+		for (java.io.Closeable c : closeables) {
+			if (c != null) {
+				try {
+					c.close();
+				} catch (IOException e) {
+					if (log != null && log.isDebugEnabled()) {
+						log.debug("Exception in closing " + c, e);
+					}
+				}
+			}
+		}
+	}
+
+	/**
+	 * Closes the stream ignoring {@link IOException}. Must only be called in
+	 * cleaning up from exception handlers.
+	 * 
+	 * @param stream
+	 *        the stream to close
+	 */
+	public static void closeStream(final java.io.Closeable stream) {
+		cleanup(null, stream);
+	}
+
+	/**
+	 * Closes the socket ignoring {@link IOException}.
+	 * 
+	 * @param sock
+	 *        the socket to close
+	 */
+	public static void closeSocket(final Socket sock) {
+		// avoids try { close() } dance
+		if (sock != null) {
+			try {
+				sock.close();
+			} catch (IOException ignored) {
+			}
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Private constructor to prevent instantiation.
+	 */
+	private IOUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
new file mode 100644
index 0000000..2bdddc8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * Collection of simple mathematical routines.
+ */
+public final class MathUtils {
+	
+	/**
+	 * Computes the logarithm of the given value to the base of 2, rounded down. It corresponds to the
+	 * position of the highest non-zero bit. The position is counted, starting with 0 from the least
+	 * significant bit to the most significant bit. For example, <code>log2floor(16) = 4</code>, and
+	 * <code>log2floor(10) = 3</code>.
+	 * 
+	 * @param value The value to compute the logarithm for.
+	 * @return The logarithm (rounded down) to the base of 2.
+	 * @throws ArithmeticException Thrown, if the given value is zero.
+	 */
+	public static int log2floor(int value) throws ArithmeticException {
+		if (value == 0) {
+			throw new ArithmeticException("Logarithm of zero is undefined.");
+		}
+		
+		int log = 0;
+		while ((value = value >>> 1) != 0) {
+			log++;
+		}
+		
+		return log;
+	}
+	
+	/**
+	 * Computes the logarithm of the given value to the base of 2. This method throws an error,
+	 * if the given argument is not a power of 2.
+	 * 
+	 * @param value The value to compute the logarithm for.
+	 * @return The logarithm to the base of 2.
+	 * @throws ArithmeticException Thrown, if the given value is zero.
+	 * @throws IllegalArgumentException Thrown, if the given value is not a power of two.
+	 */
+	public static int log2strict(int value) throws ArithmeticException, IllegalArgumentException {
+		if (value == 0) {
+			throw new ArithmeticException("Logarithm of zero is undefined.");
+		}
+		if ((value & (value - 1)) != 0) {
+			throw new IllegalArgumentException("The given value " + value + " is not a power of two.");
+		}
+		
+		int log = 0;
+		while ((value = value >>> 1) != 0) {
+			log++;
+		}
+		
+		return log;
+	}
+	
+	/**
+	 * Decrements the given number down to the closest power of two. If the argument is a
+	 * power of two, it remains unchanged.
+	 * 
+	 * @param value The value to round down.
+	 * @return The closest value that is a power of to and less or equal than the given value.
+	 */
+	public static int roundDownToPowerOf2(int value) {
+		return Integer.highestOneBit(value);
+	}
+	
+	/**
+	 * Casts the given value to a 32 bit integer, if it can be safely done. If the cast would change the numeric
+	 * value, this method raises an exception.
+	 * <p>
+	 * This method is a protection in places where one expects to be able to safely case, but where unexpected
+	 * situations could make the cast unsafe and would cause hidden problems that are hard to track down.
+	 * 
+	 * @param value The value to be cast to an integer.
+	 * @return The given value as an integer.
+	 */
+	public static int checkedDownCast(long value) {
+		if (value > Integer.MAX_VALUE) {
+			throw new IllegalArgumentException("Cannot downcast long value " + value + " to integer.");
+		}
+		return (int) value;
+	}
+
+	/**
+	 * Checks whether the given value is a power of two.
+	 *
+	 * @param value The value to check.
+	 * @return True, if the value is a power of two, false otherwise.
+	 */
+	public static boolean isPowerOf2(long value) {
+		return (value & (value - 1)) == 0;
+	}
+
+	/**
+	 * This function hashes an integer value. It is adapted from Bob Jenkins' website
+	 * <a href="http://www.burtleburtle.net/bob/hash/integer.html">http://www.burtleburtle.net/bob/hash/integer.html</a>.
+	 * The hash function has the <i>full avalanche</i> property, meaning that every bit of the value to be hashed
+	 * affects every bit of the hash value.
+	 *
+	 * It is crucial to use different hash functions to partition data across machines and the internal partitioning of
+	 * data structures. This hash function is intended for partitioning internally in data structures.
+	 *
+	 * @param code The integer to be hashed.
+	 * @return The non-negative hash code for the integer.
+	 */
+	public static int jenkinsHash(int code) {
+		code = (code + 0x7ed55d16) + (code << 12);
+		code = (code ^ 0xc761c23c) ^ (code >>> 19);
+		code = (code + 0x165667b1) + (code << 5);
+		code = (code + 0xd3a2646c) ^ (code << 9);
+		code = (code + 0xfd7046c5) + (code << 3);
+		code = (code ^ 0xb55a4f09) ^ (code >>> 16);
+		return code >= 0 ? code : -(code + 1);
+	}
+
+	/**
+	 * This function hashes an integer value.
+	 *
+	 * It is crucial to use different hash functions to partition data across machines and the internal partitioning of
+	 * data structures. This hash function is intended for partitioning across machines.
+	 *
+	 * @param code The integer to be hashed.
+	 * @return The non-negative hash code for the integer.
+	 */
+	public static int murmurHash(int code) {
+		code *= 0xcc9e2d51;
+		code = Integer.rotateLeft(code, 15);
+		code *= 0x1b873593;
+
+		code = Integer.rotateLeft(code, 13);
+		code = code * 5 + 0xe6546b64;
+
+		code ^= 4;
+		code ^= code >>> 16;
+		code *= 0x85ebca6b;
+		code ^= code >>> 13;
+		code *= 0xc2b2ae35;
+		code ^= code >>> 16;
+
+		if (code >= 0) {
+			return code;
+		}
+		else if (code != Integer.MIN_VALUE) {
+			return -code;
+		}
+		else {
+			return 0;
+		}
+	}
+
+	// ============================================================================================
+	
+	/**
+	 * Prevent Instantiation through private constructor.
+	 */
+	private MathUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index d9c4d3c..6f63eb4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.util;
 
-import com.google.common.collect.Iterators;
-import com.google.common.net.InetAddresses;
 import org.apache.flink.annotation.Internal;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,9 +32,9 @@ import java.net.MalformedURLException;
 import java.net.ServerSocket;
 import java.net.URL;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 
 @Internal
 public class NetUtils {
@@ -116,8 +115,6 @@ public class NetUtils {
 	/**
 	 * Encodes an IP address properly as a URL string. This method makes sure that IPv6 addresses
 	 * have the proper formatting to be included in URLs.
-	 * <p>
-	 * This method internally uses Guava's functionality to properly encode IPv6 addresses.
 	 * 
 	 * @param address The IP address to encode.
 	 * @return The proper URL string encoded IP address.
@@ -130,7 +127,7 @@ public class NetUtils {
 			return address.getHostAddress();
 		}
 		else if (address instanceof Inet6Address) {
-			return '[' + InetAddresses.toAddrString(address) + ']';
+			return getIPv6UrlRepresentation((Inet6Address) address);
 		}
 		else {
 			throw new IllegalArgumentException("Unrecognized type of InetAddress: " + address);
@@ -178,6 +175,70 @@ public class NetUtils {
 	}
 
 	/**
+	 * Creates a compressed URL style representation of an Inet6Address.
+	 * 
+	 * <p>This method copies and adopts code from Google's Guava library.
+	 * We re-implement this here in order to reduce dependency on Guava.
+	 * The Guava library has frequently caused dependency conflicts in the past.
+	 */
+	private static String getIPv6UrlRepresentation(Inet6Address address) {
+		// first, convert bytes to 16 bit chunks
+		byte[] addressBytes = address.getAddress();
+		int[] hextets = new int[8];
+		for (int i = 0; i < hextets.length; i++) {
+			hextets[i] = (addressBytes[2 * i] & 0xFF) << 8 | (addressBytes[2 * i + 1] & 0xFF);
+		}
+
+		// now, find the sequence of zeros that should be compressed
+		int bestRunStart = -1;
+		int bestRunLength = -1;
+		int runStart = -1;
+		for (int i = 0; i < hextets.length + 1; i++) {
+			if (i < hextets.length && hextets[i] == 0) {
+				if (runStart < 0) {
+					runStart = i;
+				}
+			} else if (runStart >= 0) {
+				int runLength = i - runStart;
+				if (runLength > bestRunLength) {
+					bestRunStart = runStart;
+					bestRunLength = runLength;
+				}
+				runStart = -1;
+			}
+		}
+		if (bestRunLength >= 2) {
+			Arrays.fill(hextets, bestRunStart, bestRunStart + bestRunLength, -1);
+		}
+
+		// convert into text form
+		StringBuilder buf = new StringBuilder(40);
+		buf.append('[');
+		
+		boolean lastWasNumber = false;
+		for (int i = 0; i < hextets.length; i++) {
+			boolean thisIsNumber = hextets[i] >= 0;
+			if (thisIsNumber) {
+				if (lastWasNumber) {
+					buf.append(':');
+				}
+				buf.append(Integer.toHexString(hextets[i]));
+			} else {
+				if (i == 0 || lastWasNumber) {
+					buf.append("::");
+				}
+			}
+			lastWasNumber = thisIsNumber;
+		}
+		buf.append(']');
+		return buf.toString();
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Port range parsing
+	// ------------------------------------------------------------------------
+	
+	/**
 	 * Returns an iterator over available ports defined by the range definition.
 	 *
 	 * @param rangeDefinition String describing a single port, a range of ports or multiple ranges.
@@ -186,14 +247,16 @@ public class NetUtils {
 	 */
 	public static Iterator<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException {
 		final String[] ranges = rangeDefinition.trim().split(",");
-		List<Iterator<Integer>> iterators = new ArrayList<>(ranges.length);
-		for(String rawRange: ranges) {
+		
+		UnionIterator<Integer> iterators = new UnionIterator<>();
+		
+		for (String rawRange: ranges) {
 			Iterator<Integer> rangeIterator;
 			String range = rawRange.trim();
 			int dashIdx = range.indexOf('-');
 			if (dashIdx == -1) {
 				// only one port in range:
-				rangeIterator = Iterators.singletonIterator(Integer.valueOf(range));
+				rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
 			} else {
 				// evaluate range
 				final int start = Integer.valueOf(range.substring(0, dashIdx));
@@ -218,7 +281,8 @@ public class NetUtils {
 			}
 			iterators.add(rangeIterator);
 		}
-		return Iterators.concat(iterators.iterator());
+		
+		return iterators;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
index 135038b..a9bd166 100644
--- a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
+++ b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
@@ -107,7 +107,7 @@ public final class Preconditions {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Boolean Condition Checking
+	//  Boolean Condition Checking (Argument)
 	// ------------------------------------------------------------------------
 	
 	/**
@@ -162,6 +162,61 @@ public final class Preconditions {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Boolean Condition Checking (State)
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Checks the given boolean condition, and throws an {@code IllegalStateException} if
+	 * the condition is not met (evaluates to {@code false}).
+	 *
+	 * @param condition The condition to check
+	 *
+	 * @throws IllegalStateException Thrown, if the condition is violated.
+	 */
+	public static void checkState(boolean condition) {
+		if (!condition) {
+			throw new IllegalStateException();
+		}
+	}
+
+	/**
+	 * Checks the given boolean condition, and throws an {@code IllegalStateException} if
+	 * the condition is not met (evaluates to {@code false}). The exception will have the
+	 * given error message.
+	 *
+	 * @param condition The condition to check
+	 * @param errorMessage The message for the {@code IllegalStateException} that is thrown if the check fails.
+	 *
+	 * @throws IllegalStateException Thrown, if the condition is violated.
+	 */
+	public static void checkState(boolean condition, @Nullable Object errorMessage) {
+		if (!condition) {
+			throw new IllegalStateException(String.valueOf(errorMessage));
+		}
+	}
+
+	/**
+	 * Checks the given boolean condition, and throws an {@code IllegalStateException} if
+	 * the condition is not met (evaluates to {@code false}).
+	 *
+	 * @param condition The condition to check
+	 * @param errorMessageTemplate The message template for the {@code IllegalStateException}
+	 *                             that is thrown if the check fails. The template substitutes its
+	 *                             {@code %s} placeholders with the error message arguments.
+	 * @param errorMessageArgs The arguments for the error message, to be inserted into the
+	 *                         message template for the {@code %s} placeholders.
+	 *
+	 * @throws IllegalStateException Thrown, if the condition is violated.
+	 */
+	public static void checkState(boolean condition,
+			@Nullable String errorMessageTemplate,
+			@Nullable Object... errorMessageArgs) {
+
+		if (!condition) {
+			throw new IllegalStateException(format(errorMessageTemplate, errorMessageArgs));
+		}
+	}
 
 	// ------------------------------------------------------------------------
 	//  Utilities

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/UnionIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/UnionIterator.java b/flink-core/src/main/java/org/apache/flink/util/UnionIterator.java
new file mode 100644
index 0000000..17204ce
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/UnionIterator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class UnionIterator<T> implements Iterator<T>, Iterable<T> {
+	
+	private Iterator<T> currentIterator;
+	
+	private ArrayList<Iterator<T>> furtherIterators = new ArrayList<>();
+	
+	private int nextIterator;
+	
+	private boolean iteratorAvailable = true;
+
+	// ------------------------------------------------------------------------
+	
+	public void clear() {
+		currentIterator = null;
+		furtherIterators.clear();
+		nextIterator = 0;
+		iteratorAvailable = true;
+	}
+	
+	public void addList(List<T> list) {
+		add(list.iterator());
+	}
+
+	public void add(Iterator<T> iterator) {
+		if (currentIterator == null) {
+			currentIterator = iterator;
+		}
+		else {
+			furtherIterators.add(iterator);
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public Iterator<T> iterator() {
+		if (iteratorAvailable) {
+			iteratorAvailable = false;
+			return this;
+		} else {
+			throw new TraversableOnceException();
+		}
+	}
+
+	@Override
+	public boolean hasNext() {
+		while (currentIterator != null) {
+			if (currentIterator.hasNext()) {
+				return true;
+			}
+			else if (nextIterator < furtherIterators.size()) {
+				currentIterator = furtherIterators.get(nextIterator);
+				nextIterator++;
+			}
+			else {
+				currentIterator = null;
+			}
+		}
+		
+		return false;
+	}
+
+	@Override
+	public T next() {
+		if (hasNext()) {
+			return currentIterator.next();
+		}
+		else {
+			throw new NoSuchElementException();
+		}
+	}
+
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java b/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
index 47ed561..b3ec393 100644
--- a/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
+++ b/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.util;
 
-import com.google.common.hash.Hashing;
 import org.apache.flink.annotation.Public;
 
 import java.util.Random;
@@ -42,7 +42,7 @@ public class XORShiftRandom extends Random {
 
 	public XORShiftRandom(long input) {
 		super(input);
-		this.seed = Hashing.murmur3_128().hashLong(input).asLong();
+		this.seed = MathUtils.murmurHash((int) input) ^ MathUtils.murmurHash((int) (input >>> 32));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
index 679e4ce..69159f2 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.api.common.operators.base;
 
-
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 
-import com.google.common.base.Joiner;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -39,7 +37,7 @@ public class OuterJoinOperatorBaseTest implements Serializable {
 	private final FlatJoinFunction<String, String, String> joiner = new FlatJoinFunction<String, String, String>() {
 		@Override
 		public void join(String first, String second, Collector<String> out) throws Exception {
-			out.collect(Joiner.on(',').join(String.valueOf(first), String.valueOf(second)));
+			out.collect(String.valueOf(first) + ',' + String.valueOf(second));
 		}
 	};
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index e783a1d..ee37194 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.api.java.tuple;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
+import org.apache.flink.util.FileUtils;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -85,7 +84,7 @@ class TupleGenerator {
 	}
 
 	private static void insertCodeIntoFile(String code, File file) throws IOException {
-		String fileContent = Files.toString(file, Charsets.UTF_8);
+		String fileContent = FileUtils.readFileUtf8(file);
 		
 		try (Scanner s = new Scanner(fileContent)) {
 			StringBuilder sb = new StringBuilder();
@@ -126,7 +125,7 @@ class TupleGenerator {
 			while (s.hasNextLine() && (line = s.nextLine()) != null) {
 				sb.append(line).append("\n");
 			}
-			Files.write(sb.toString(), file, Charsets.UTF_8);
+			FileUtils.writeFileUtf8(file, sb.toString());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
index bc11848..ffcfd52 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.MapFunction;
@@ -37,8 +38,6 @@ import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.collect.HashMultiset;
-
 /**
  *  Pojo Type tests
  *
@@ -87,7 +86,7 @@ public class PojoTypeExtractionTest {
 	// all public test
 	public static class AllPublic extends ComplexNestedClass {
 		public ArrayList<String> somethingFancy; // generic type
-		public HashMultiset<Integer> fancyIds; // generic type
+		public FancyCollectionSubtype<Integer> fancyIds; // generic type
 		public String[]	fancyArray;			 // generic type
 	}
 
@@ -436,7 +435,7 @@ public class PojoTypeExtractionTest {
 				}
 				multisetSeen = true;
 				Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo);
-				Assert.assertEquals(HashMultiset.class, field.getTypeInformation().getTypeClass());
+				Assert.assertEquals(FancyCollectionSubtype.class, field.getTypeInformation().getTypeClass());
 			} else if(name.equals("fancyArray")) {
 				if(strArraySeen) {
 					Assert.fail("already seen");
@@ -809,4 +808,10 @@ public class PojoTypeExtractionTest {
 		Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
 		Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
 	}
+	
+	// ------------------------------------------------------------------------
+	
+	public static class FancyCollectionSubtype<T> extends HashSet<T> {
+		private static final long serialVersionUID = -3494469602638179921L;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 260f7e9..4712ed1 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -36,11 +37,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.base.Objects;
-
 /**
  * A test for the {@link PojoSerializer}.
  */
@@ -104,7 +104,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 
 		@Override
 		public int hashCode() {
-			return Objects.hashCode(dumm1, dumm2, dumm3, dumm4, nestedClass);
+			return Objects.hash(dumm1, dumm2, dumm3, dumm4, nestedClass);
 		}
 
 		@Override
@@ -162,7 +162,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 
 		@Override
 		public int hashCode() {
-			return Objects.hashCode(dumm1, dumm2, dumm3, dumm4);
+			return Objects.hash(dumm1, dumm2, dumm3, dumm4);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
index 8c61a19..ddf1d0e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import com.google.common.base.Objects;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 import org.junit.Test;
 
+import java.util.Objects;
 import java.util.Random;
 
 /**
@@ -88,7 +89,7 @@ public class PojoSubclassSerializerTest extends SerializerTestBase<PojoSubclassS
 
 		@Override
 		public int hashCode() {
-			return Objects.hashCode(dumm1, dumm2);
+			return Objects.hash(dumm1, dumm2);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
index b797090..e6ffd07 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import com.google.common.base.Objects;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
 import org.junit.Test;
 
+import java.util.Objects;
 import java.util.Random;
 
 /**
@@ -90,7 +91,7 @@ public class SubclassFromInterfaceSerializerTest extends SerializerTestBase<Subc
 
 		@Override
 		public int hashCode() {
-			return Objects.hashCode(dumm1, dumm2);
+			return Objects.hash(dumm1, dumm2);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
index 2b3a37a..782f4fb 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
@@ -22,11 +22,12 @@ import org.junit.Test;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A rule to retry failed tests for a fixed number of times.

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
new file mode 100644
index 0000000..7917a7b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.util.MathUtils;
+import org.junit.Test;
+
+public class MathUtilTest {
+
+	@Test
+	public void testLog2Computation() {
+		assertEquals(0, MathUtils.log2floor(1));
+		assertEquals(1, MathUtils.log2floor(2));
+		assertEquals(1, MathUtils.log2floor(3));
+		assertEquals(2, MathUtils.log2floor(4));
+		assertEquals(2, MathUtils.log2floor(5));
+		assertEquals(2, MathUtils.log2floor(7));
+		assertEquals(3, MathUtils.log2floor(8));
+		assertEquals(3, MathUtils.log2floor(9));
+		assertEquals(4, MathUtils.log2floor(16));
+		assertEquals(4, MathUtils.log2floor(17));
+		assertEquals(13, MathUtils.log2floor((0x1 << 13) + 1));
+		assertEquals(30, MathUtils.log2floor(Integer.MAX_VALUE));
+		assertEquals(31, MathUtils.log2floor(-1));
+		
+		try {
+			MathUtils.log2floor(0);
+			fail();
+		}
+		catch (ArithmeticException aex) {}
+	}
+	
+	@Test
+	public void testRoundDownToPowerOf2() {
+		assertEquals(0, MathUtils.roundDownToPowerOf2(0));
+		assertEquals(1, MathUtils.roundDownToPowerOf2(1));
+		assertEquals(2, MathUtils.roundDownToPowerOf2(2));
+		assertEquals(2, MathUtils.roundDownToPowerOf2(3));
+		assertEquals(4, MathUtils.roundDownToPowerOf2(4));
+		assertEquals(4, MathUtils.roundDownToPowerOf2(5));
+		assertEquals(4, MathUtils.roundDownToPowerOf2(6));
+		assertEquals(4, MathUtils.roundDownToPowerOf2(7));
+		assertEquals(8, MathUtils.roundDownToPowerOf2(8));
+		assertEquals(8, MathUtils.roundDownToPowerOf2(9));
+		assertEquals(8, MathUtils.roundDownToPowerOf2(15));
+		assertEquals(16, MathUtils.roundDownToPowerOf2(16));
+		assertEquals(16, MathUtils.roundDownToPowerOf2(17));
+		assertEquals(16, MathUtils.roundDownToPowerOf2(31));
+		assertEquals(32, MathUtils.roundDownToPowerOf2(32));
+		assertEquals(32, MathUtils.roundDownToPowerOf2(33));
+		assertEquals(32, MathUtils.roundDownToPowerOf2(42));
+		assertEquals(32, MathUtils.roundDownToPowerOf2(63));
+		assertEquals(64, MathUtils.roundDownToPowerOf2(64));
+		assertEquals(64, MathUtils.roundDownToPowerOf2(125));
+		assertEquals(16384, MathUtils.roundDownToPowerOf2(25654));
+		assertEquals(33554432, MathUtils.roundDownToPowerOf2(34366363));
+		assertEquals(33554432, MathUtils.roundDownToPowerOf2(63463463));
+		assertEquals(1073741824, MathUtils.roundDownToPowerOf2(1852987883));
+		assertEquals(1073741824, MathUtils.roundDownToPowerOf2(Integer.MAX_VALUE));
+	}
+
+	@Test
+	public void testPowerOfTwo() {
+		assertTrue(MathUtils.isPowerOf2(1));
+		assertTrue(MathUtils.isPowerOf2(2));
+		assertTrue(MathUtils.isPowerOf2(4));
+		assertTrue(MathUtils.isPowerOf2(8));
+		assertTrue(MathUtils.isPowerOf2(32768));
+		assertTrue(MathUtils.isPowerOf2(65536));
+		assertTrue(MathUtils.isPowerOf2(1 << 30));
+		assertTrue(MathUtils.isPowerOf2(1L + Integer.MAX_VALUE));
+		assertTrue(MathUtils.isPowerOf2(1L << 41));
+		assertTrue(MathUtils.isPowerOf2(1L << 62));
+
+		assertFalse(MathUtils.isPowerOf2(3));
+		assertFalse(MathUtils.isPowerOf2(5));
+		assertFalse(MathUtils.isPowerOf2(567923));
+		assertFalse(MathUtils.isPowerOf2(Integer.MAX_VALUE));
+		assertFalse(MathUtils.isPowerOf2(Long.MAX_VALUE));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index e367e8b..72cc89e 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -18,17 +18,13 @@
 
 package org.apache.flink.util;
 
-import com.google.common.collect.DiscreteDomain;
-import com.google.common.collect.Iterators;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 
 import static org.hamcrest.core.IsCollectionContaining.hasItems;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/util/UnionIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/UnionIteratorTest.java b/flink-core/src/test/java/org/apache/flink/util/UnionIteratorTest.java
new file mode 100644
index 0000000..4c1fc41
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/UnionIteratorTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.util.TraversableOnceException;
+import org.apache.flink.util.UnionIterator;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.*;
+
+public class UnionIteratorTest {
+
+	@Test
+	public void testUnion() {
+		try {
+			UnionIterator<Integer> iter = new UnionIterator<>();
+
+			// should succeed and be empty
+			assertFalse(iter.iterator().hasNext());
+
+			iter.clear();
+			
+			try {
+				iter.iterator().next();
+				fail("should fail with an exception");
+			} catch (NoSuchElementException e) {
+				// expected
+			}
+
+			iter.clear();
+			iter.addList(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+			iter.addList(Collections.<Integer>emptyList());
+			iter.addList(Arrays.asList(8, 9, 10, 11));
+			
+			int val = 1;
+			for (int i : iter) {
+				assertEquals(val++, i);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTraversableOnce() {
+		try {
+			UnionIterator<Integer> iter = new UnionIterator<>();
+			
+			// should succeed
+			iter.iterator();
+			
+			// should fail
+			try {
+				iter.iterator();
+				fail("should fail with an exception");
+			} catch (TraversableOnceException e) {
+				// expected
+			}
+
+			// should fail again
+			try {
+				iter.iterator();
+				fail("should fail with an exception");
+			} catch (TraversableOnceException e) {
+				// expected
+			}
+
+			// reset the thing, keep it empty
+			iter.clear();
+
+			// should succeed
+			iter.iterator();
+
+			// should fail
+			try {
+				iter.iterator();
+				fail("should fail with an exception");
+			} catch (TraversableOnceException e) {
+				// expected
+			}
+
+			// should fail again
+			try {
+				iter.iterator();
+				fail("should fail with an exception");
+			} catch (TraversableOnceException e) {
+				// expected
+			}
+
+			// reset the thing, add some data
+			iter.clear();
+			iter.addList(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+			
+			// should succeed
+			Iterator<Integer> ints = iter.iterator();
+			assertNotNull(ints.next());
+			assertNotNull(ints.next());
+			assertNotNull(ints.next());
+			
+			// should fail if called in the middle of operations
+			try {
+				iter.iterator();
+				fail("should fail with an exception");
+			} catch (TraversableOnceException e) {
+				// expected
+			}
+
+			// reset the thing, keep it empty
+			iter.clear();
+
+			// should succeed again
+			assertFalse(iter.iterator().hasNext());
+			
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index d8f744b..99237a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -23,7 +23,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.util.IOUtils;
+import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 
 import java.io.EOFException;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index efdb003..5f65564 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.util.IOUtils;
+import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FileSystemStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FileSystemStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FileSystemStateStore.java
index 8d9d3a7..73a094a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FileSystemStateStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FileSystemStateStore.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle;
-import org.apache.flink.runtime.util.FileUtils;
+import org.apache.flink.util.FileUtils;
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 1b8b1d9..b9ee2c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -42,7 +42,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.runtime.util.IOUtils;
+import org.apache.flink.util.IOUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
index 2ca7f78..992631b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
@@ -29,7 +29,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 /**
  * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a {@link BlockChannelReader},

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
index 23dccb0..2ff6c94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.SeekableDataInputView;
 import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 
 public class RandomAccessInputView extends AbstractPagedInputView implements SeekableDataInputView {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
index 427fe84..b0ad184 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
@@ -24,7 +24,7 @@ import java.io.EOFException;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.SeekableDataOutputView;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
index 355b2eb..e768c77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 /**
  * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a {@link BlockChannelReader},

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
index a107e79..1a45ff2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentSource;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 /**
  * The list with the full segments contains at any point all completely full segments, plus the segment that is

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 094d065..3ade753 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -33,7 +33,7 @@ import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.HybridMemorySegment;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 /**
  * The memory manager governs the memory that Flink uses for sorting, hashing, and caching. Memory

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index b4d03e7..0ff2ce8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -33,7 +33,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 import org.apache.flink.runtime.util.IntArrayList;
 import org.apache.flink.runtime.util.LongArrayList;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 44ee163..9221e7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 1495ee1..351cd3f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.operators.util.BloomFilter;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
index fdbcd9f..4e20842 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 /**
  * The output emitter decides to which of the possibly multiple output channels a record is sent.

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
deleted file mode 100644
index 42994d3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-/**
- * This is a utility class to deal with temporary files.
- */
-public final class FileUtils {
-
-	/**
-	 * The alphabet to construct the random part of the filename from.
-	 */
-	private static final char[] ALPHABET = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd',
-		'e', 'f' };
-
-	/**
-	 * The length of the random part of the filename.
-	 */
-	private static final int LENGTH = 12;
-
-	/**
-	 * Empty private constructor to avoid instantiation.
-	 */
-	private FileUtils() {
-	}
-
-	/**
-	 * Constructs a random filename with the given prefix and
-	 * a random part generated from hex characters.
-	 * 
-	 * @param prefix
-	 *        the prefix to the filename to be constructed
-	 * @return the generated random filename with the given prefix
-	 */
-	public static String getRandomFilename(final String prefix) {
-
-		final StringBuilder stringBuilder = new StringBuilder(prefix);
-
-		for (int i = 0; i < LENGTH; i++) {
-			stringBuilder.append(ALPHABET[(int) Math.floor(Math.random() * (double) ALPHABET.length)]);
-		}
-
-		return stringBuilder.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IOUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IOUtils.java
deleted file mode 100644
index d03b72c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IOUtils.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.net.Socket;
-
-import org.slf4j.Logger;
-
-/**
- * An utility class for I/O related functionality.
- * 
- */
-public final class IOUtils {
-
-	/**
-	 * The block size for byte operations in byte.
-	 */
-	private static final int BLOCKSIZE = 4096;
-
-	/**
-	 * Private constructor to overwrite the public one.
-	 */
-	private IOUtils() {
-	}
-
-	/**
-	 * Copies from one stream to another.
-	 * 
-	 * @param in
-	 *        InputStream to read from
-	 * @param out
-	 *        OutputStream to write to
-	 * @param buffSize
-	 *        the size of the buffer
-	 * @param close
-	 *        whether or not close the InputStream and OutputStream at the end. The streams are closed in the finally
-	 *        clause.
-	 * @throws IOException
-	 *         thrown if an error occurred while writing to the output stream
-	 */
-	public static void copyBytes(final InputStream in, final OutputStream out, final int buffSize, final boolean close)
-			throws IOException {
-
-		@SuppressWarnings("resource")
-		final PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
-		final byte[] buf = new byte[buffSize];
-		try {
-			int bytesRead = in.read(buf);
-			while (bytesRead >= 0) {
-				out.write(buf, 0, bytesRead);
-				if ((ps != null) && ps.checkError()) {
-					throw new IOException("Unable to write to output stream.");
-				}
-				bytesRead = in.read(buf);
-			}
-		} finally {
-			if (close) {
-				out.close();
-				in.close();
-			}
-		}
-	}
-
-	/**
-	 * Copies from one stream to another. <strong>closes the input and output
-	 * streams at the end</strong>.
-	 * 
-	 * @param in
-	 *        InputStream to read from
-	 * @param out
-	 *        OutputStream to write to
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while copying
-	 */
-	public static void copyBytes(final InputStream in, final OutputStream out) throws IOException {
-		copyBytes(in, out, BLOCKSIZE, true);
-	}
-
-	/**
-	 * Copies from one stream to another.
-	 * 
-	 * @param in
-	 *        InputStream to read from
-	 * @param out
-	 *        OutputStream to write to
-	 * @param close
-	 *        whether or not close the InputStream and OutputStream at the
-	 *        end. The streams are closed in the finally clause.
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while copying
-	 */
-	public static void copyBytes(final InputStream in, final OutputStream out, final boolean close) throws IOException {
-		copyBytes(in, out, BLOCKSIZE, close);
-	}
-
-	/**
-	 * Reads len bytes in a loop.
-	 * 
-	 * @param in
-	 *        The InputStream to read from
-	 * @param buf
-	 *        The buffer to fill
-	 * @param off
-	 *        offset from the buffer
-	 * @param len
-	 *        the length of bytes to read
-	 * @throws IOException
-	 *         if it could not read requested number of bytes for any reason (including EOF)
-	 */
-	public static void readFully(final InputStream in, final byte[] buf, int off, final int len)
-			throws IOException {
-		int toRead = len;
-		while (toRead > 0) {
-			final int ret = in.read(buf, off, toRead);
-			if (ret < 0) {
-				throw new IOException("Premeture EOF from inputStream");
-			}
-			toRead -= ret;
-			off += ret;
-		}
-	}
-
-	/**
-	 * Similar to readFully(). Skips bytes in a loop.
-	 * 
-	 * @param in
-	 *        The InputStream to skip bytes from
-	 * @param len
-	 *        number of bytes to skip
-	 * @throws IOException
-	 *         if it could not skip requested number of bytes for any reason (including EOF)
-	 */
-	public static void skipFully(final InputStream in, long len) throws IOException {
-		while (len > 0) {
-			final long ret = in.skip(len);
-			if (ret < 0) {
-				throw new IOException("Premeture EOF from inputStream");
-			}
-			len -= ret;
-		}
-	}
-
-	/**
-	 * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
-	 * null pointers. Must only be used for cleanup in exception handlers.
-	 * 
-	 * @param log
-	 *        the log to record problems to at debug level. Can be <code>null</code>.
-	 * @param closeables
-	 *        the objects to close
-	 */
-	public static void cleanup(final Logger log, final java.io.Closeable... closeables) {
-		for (java.io.Closeable c : closeables) {
-			if (c != null) {
-				try {
-					c.close();
-				} catch (IOException e) {
-					if (log != null && log.isDebugEnabled()) {
-						log.debug("Exception in closing " + c, e);
-					}
-				}
-			}
-		}
-	}
-
-	/**
-	 * Closes the stream ignoring {@link IOException}. Must only be called in
-	 * cleaning up from exception handlers.
-	 * 
-	 * @param stream
-	 *        the stream to close
-	 */
-	public static void closeStream(final java.io.Closeable stream) {
-		cleanup(null, stream);
-	}
-
-	/**
-	 * Closes the socket ignoring {@link IOException}.
-	 * 
-	 * @param sock
-	 *        the socket to close
-	 */
-	public static void closeSocket(final Socket sock) {
-		// avoids try { close() } dance
-		if (sock != null) {
-			try {
-				sock.close();
-			} catch (IOException ignored) {
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
deleted file mode 100644
index 5d26186..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.util;
-
-/**
- * Collection of simple mathematical routines.
- */
-public final class MathUtils {
-	
-	/**
-	 * Computes the logarithm of the given value to the base of 2, rounded down. It corresponds to the
-	 * position of the highest non-zero bit. The position is counted, starting with 0 from the least
-	 * significant bit to the most significant bit. For example, <code>log2floor(16) = 4</code>, and
-	 * <code>log2floor(10) = 3</code>.
-	 * 
-	 * @param value The value to compute the logarithm for.
-	 * @return The logarithm (rounded down) to the base of 2.
-	 * @throws ArithmeticException Thrown, if the given value is zero.
-	 */
-	public static int log2floor(int value) throws ArithmeticException {
-		if (value == 0) {
-			throw new ArithmeticException("Logarithm of zero is undefined.");
-		}
-		
-		int log = 0;
-		while ((value = value >>> 1) != 0) {
-			log++;
-		}
-		
-		return log;
-	}
-	
-	/**
-	 * Computes the logarithm of the given value to the base of 2. This method throws an error,
-	 * if the given argument is not a power of 2.
-	 * 
-	 * @param value The value to compute the logarithm for.
-	 * @return The logarithm to the base of 2.
-	 * @throws ArithmeticException Thrown, if the given value is zero.
-	 * @throws IllegalArgumentException Thrown, if the given value is not a power of two.
-	 */
-	public static int log2strict(int value) throws ArithmeticException, IllegalArgumentException {
-		if (value == 0) {
-			throw new ArithmeticException("Logarithm of zero is undefined.");
-		}
-		if ((value & (value - 1)) != 0) {
-			throw new IllegalArgumentException("The given value " + value + " is not a power of two.");
-		}
-		
-		int log = 0;
-		while ((value = value >>> 1) != 0) {
-			log++;
-		}
-		
-		return log;
-	}
-	
-	/**
-	 * Decrements the given number down to the closest power of two. If the argument is a
-	 * power of two, it remains unchanged.
-	 * 
-	 * @param value The value to round down.
-	 * @return The closest value that is a power of to and less or equal than the given value.
-	 */
-	public static int roundDownToPowerOf2(int value) {
-		return Integer.highestOneBit(value);
-	}
-	
-	/**
-	 * Casts the given value to a 32 bit integer, if it can be safely done. If the cast would change the numeric
-	 * value, this method raises an exception.
-	 * <p>
-	 * This method is a protection in places where one expects to be able to safely case, but where unexpected
-	 * situations could make the cast unsafe and would cause hidden problems that are hard to track down.
-	 * 
-	 * @param value The value to be cast to an integer.
-	 * @return The given value as an integer.
-	 */
-	public static int checkedDownCast(long value) {
-		if (value > Integer.MAX_VALUE) {
-			throw new IllegalArgumentException("Cannot downcast long value " + value + " to integer.");
-		}
-		return (int) value;
-	}
-
-	/**
-	 * Checks whether the given value is a power of two.
-	 *
-	 * @param value The value to check.
-	 * @return True, if the value is a power of two, false otherwise.
-	 */
-	public static boolean isPowerOf2(long value) {
-		return (value & (value - 1)) == 0;
-	}
-
-	/**
-	 * This function hashes an integer value. It is adapted from Bob Jenkins' website
-	 * <a href="http://www.burtleburtle.net/bob/hash/integer.html">http://www.burtleburtle.net/bob/hash/integer.html</a>.
-	 * The hash function has the <i>full avalanche</i> property, meaning that every bit of the value to be hashed
-	 * affects every bit of the hash value.
-	 *
-	 * It is crucial to use different hash functions to partition data across machines and the internal partitioning of
-	 * data structures. This hash function is intended for partitioning internally in data structures.
-	 *
-	 * @param code The integer to be hashed.
-	 * @return The non-negative hash code for the integer.
-	 */
-	public static int jenkinsHash(int code) {
-		code = (code + 0x7ed55d16) + (code << 12);
-		code = (code ^ 0xc761c23c) ^ (code >>> 19);
-		code = (code + 0x165667b1) + (code << 5);
-		code = (code + 0xd3a2646c) ^ (code << 9);
-		code = (code + 0xfd7046c5) + (code << 3);
-		code = (code ^ 0xb55a4f09) ^ (code >>> 16);
-		return code >= 0 ? code : -(code + 1);
-	}
-
-	/**
-	 * This function hashes an integer value.
-	 *
-	 * It is crucial to use different hash functions to partition data across machines and the internal partitioning of
-	 * data structures. This hash function is intended for partitioning across machines.
-	 *
-	 * @param code The integer to be hashed.
-	 * @return The non-negative hash code for the integer.
-	 */
-	public static int murmurHash(int code) {
-		code *= 0xcc9e2d51;
-		code = Integer.rotateLeft(code, 15);
-		code *= 0x1b873593;
-
-		code = Integer.rotateLeft(code, 13);
-		code = code * 5 + 0xe6546b64;
-
-		code ^= 4;
-		code ^= code >>> 16;
-		code *= 0x85ebca6b;
-		code ^= code >>> 13;
-		code *= 0xc2b2ae35;
-		code ^= code >>> 16;
-
-		if (code >= 0) {
-			return code;
-		}
-		else if (code != Integer.MIN_VALUE) {
-			return -code;
-		}
-		else {
-			return 0;
-		}
-	}
-
-	// ============================================================================================
-	
-	/**
-	 * Prevent Instantiation through private constructor.
-	 */
-	private MathUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java
deleted file mode 100644
index c279adf..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.util.TraversableOnceException;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-public class UnionIterator<T> implements Iterator<T>, Iterable<T> {
-	
-	private Iterator<T> currentIterator;
-	
-	private ArrayList<List<T>> furtherLists = new ArrayList<>();
-	
-	private int nextList;
-	
-	private boolean iteratorAvailable = true;
-
-	// ------------------------------------------------------------------------
-	
-	public void clear() {
-		currentIterator = null;
-		furtherLists.clear();
-		nextList = 0;
-		iteratorAvailable = true;
-	}
-	
-	public void addList(List<T> list) {
-		if (currentIterator == null) {
-			currentIterator = list.iterator();
-		}
-		else {
-			furtherLists.add(list);
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public Iterator<T> iterator() {
-		if (iteratorAvailable) {
-			iteratorAvailable = false;
-			return this;
-		} else {
-			throw new TraversableOnceException();
-		}
-	}
-
-	@Override
-	public boolean hasNext() {
-		while (currentIterator != null) {
-			if (currentIterator.hasNext()) {
-				return true;
-			}
-			else if (nextList < furtherLists.size()) {
-				currentIterator = furtherLists.get(nextList).iterator();
-				nextList++;
-			}
-			else {
-				currentIterator = null;
-			}
-		}
-		
-		return false;
-	}
-
-	@Override
-	public T next() {
-		if (hasNext()) {
-			return currentIterator.next();
-		}
-		else {
-			throw new NoSuchElementException();
-		}
-	}
-
-	@Override
-	public void remove() {
-		throw new UnsupportedOperationException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
index d6b69e4..bb7d94c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle;
-import org.apache.flink.runtime.util.FileUtils;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 6d765b3..49953a6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -67,7 +67,7 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.NetUtils
+import org.apache.flink.util.{MathUtils, NetUtils}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index bbb6a89..6bd8b34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.testutils;
 
-import org.apache.flink.runtime.util.FileUtils;
+import org.apache.flink.util.FileUtils;
 
 import java.io.File;
 import java.io.FileWriter;


Mime
View raw message