flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [15/50] flink git commit: [FLINK-1712] Remove "flink-staging" module
Date Thu, 14 Jan 2016 16:16:12 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
deleted file mode 100644
index 6dcee0b..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-public class TezOutputEmitter<T> implements TezChannelSelector<T> {
-
-	private final ShipStrategyType strategy;		// the shipping strategy used by this output emitter
-
-	private int[] channels;						// the reused array defining target channels
-
-	private int nextChannelToSendTo = 0;		// counter to go over channels round robin
-
-	private final TypeComparator<T> comparator;	// the comparator for hashing / sorting
-
-	// ------------------------------------------------------------------------
-	// Constructors
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new channel selector that distributes data round robin.
-	 */
-	public TezOutputEmitter() {
-		this(ShipStrategyType.NONE);
-	}
-
-	/**
-	 * Creates a new channel selector that uses the given strategy (broadcasting, partitioning,
...).
-	 *
-	 * @param strategy The distribution strategy to be used.
-	 */
-	public TezOutputEmitter(ShipStrategyType strategy) {
-		this(strategy, null);
-	}
-
-	/**
-	 * Creates a new channel selector that uses the given strategy (broadcasting, partitioning,
...)
-	 * and uses the supplied comparator to hash / compare records for partitioning them deterministically.
-	 *
-	 * @param strategy The distribution strategy to be used.
-	 * @param comparator The comparator used to hash / compare the records.
-	 */
-	public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator) {
-		this(strategy, comparator, null);
-	}
-
-	/**
-	 * Creates a new channel selector that uses the given strategy (broadcasting, partitioning,
...)
-	 * and uses the supplied comparator to hash / compare records for partitioning them deterministically.
-	 *
-	 * @param strategy The distribution strategy to be used.
-	 * @param comparator The comparator used to hash / compare the records.
-	 * @param distr The distribution pattern used in the case of a range partitioning.
-	 */
-	public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator, DataDistribution
distr) {
-		if (strategy == null) {
-			throw new NullPointerException();
-		}
-
-		this.strategy = strategy;
-		this.comparator = comparator;
-
-		switch (strategy) {
-			case FORWARD:
-			case PARTITION_HASH:
-			case PARTITION_RANGE:
-			case PARTITION_RANDOM:
-			case PARTITION_FORCED_REBALANCE:
-			case BROADCAST:
-				break;
-			default:
-				throw new IllegalArgumentException("Invalid shipping strategy for OutputEmitter: " +
strategy.name());
-		}
-
-		if ((strategy == ShipStrategyType.PARTITION_RANGE) && distr == null) {
-			throw new NullPointerException("Data distribution must not be null when the ship strategy
is range partitioning.");
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// Channel Selection
-	// ------------------------------------------------------------------------
-
-	@Override
-	public final int[] selectChannels(T record, int numberOfChannels) {
-		switch (strategy) {
-			case FORWARD:
-			case PARTITION_RANDOM:
-			case PARTITION_FORCED_REBALANCE:
-				return robin(numberOfChannels);
-			case PARTITION_HASH:
-				return hashPartitionDefault(record, numberOfChannels);
-			case PARTITION_RANGE:
-				return rangePartition(record, numberOfChannels);
-			case BROADCAST:
-				return broadcast(numberOfChannels);
-			default:
-				throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	private final int[] robin(int numberOfChannels) {
-		if (this.channels == null || this.channels.length != 1) {
-			this.channels = new int[1];
-		}
-
-		int nextChannel = nextChannelToSendTo + 1;
-		nextChannel = nextChannel < numberOfChannels ? nextChannel : 0;
-
-		this.nextChannelToSendTo = nextChannel;
-		this.channels[0] = nextChannel;
-		return this.channels;
-	}
-
-	private final int[] broadcast(int numberOfChannels) {
-		if (channels == null || channels.length != numberOfChannels) {
-			channels = new int[numberOfChannels];
-			for (int i = 0; i < numberOfChannels; i++) {
-				channels[i] = i;
-			}
-		}
-
-		return channels;
-	}
-
-	private final int[] hashPartitionDefault(T record, int numberOfChannels) {
-		if (channels == null || channels.length != 1) {
-			channels = new int[1];
-		}
-
-		int hash = this.comparator.hash(record);
-
-		hash = murmurHash(hash);
-
-		if (hash >= 0) {
-			this.channels[0] = hash % numberOfChannels;
-		}
-		else if (hash != Integer.MIN_VALUE) {
-			this.channels[0] = -hash % numberOfChannels;
-		}
-		else {
-			this.channels[0] = 0;
-		}
-
-		return this.channels;
-	}
-
-	private final int murmurHash(int k) {
-		k *= 0xcc9e2d51;
-		k = Integer.rotateLeft(k, 15);
-		k *= 0x1b873593;
-
-		k = Integer.rotateLeft(k, 13);
-		k *= 0xe6546b64;
-
-		k ^= 4;
-		k ^= k >>> 16;
-		k *= 0x85ebca6b;
-		k ^= k >>> 13;
-		k *= 0xc2b2ae35;
-		k ^= k >>> 16;
-
-		return k;
-	}
-
-	private final int[] rangePartition(T record, int numberOfChannels) {
-		throw new UnsupportedOperationException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
deleted file mode 100644
index 39d247c..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class DummyInvokable extends AbstractInvokable {
-
-	private ExecutionConfig executionConfig;
-
-	public DummyInvokable() {
-	}
-
-	public DummyInvokable(ExecutionConfig executionConfig) {
-		this.executionConfig = executionConfig;
-	}
-
-	public void setExecutionConfig(ExecutionConfig executionConfig) {
-		this.executionConfig = executionConfig;
-	}
-
-	@Override
-	public void registerInputOutput() {}
-
-
-	@Override
-	public void invoke() throws Exception {}
-
-	@Override
-	public ExecutionConfig getExecutionConfig() {
-		return executionConfig;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
deleted file mode 100644
index 202cb24..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.commons.codec.binary.Base64;
-
-import java.io.IOException;
-
-public class EncodingUtils {
-
-	public static Object decodeObjectFromString(String encoded, ClassLoader cl) {
-
-		try {
-			if (encoded == null) {
-				return null;
-			}
-			byte[] bytes = Base64.decodeBase64(encoded);
-
-			return InstantiationUtil.deserializeObject(bytes, cl);
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			System.exit(-1);
-			throw new RuntimeException();
-		}
-		catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			System.exit(-1);
-			throw new RuntimeException();
-		}
-	}
-
-	public static String encodeObjectToString(Object o) {
-
-		try {
-			byte[] bytes = InstantiationUtil.serializeObject(o);
-
-			String encoded = Base64.encodeBase64String(bytes);
-			return encoded;
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			System.exit(-1);
-			throw new RuntimeException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
deleted file mode 100644
index 07c5f97..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class FlinkSerialization<T> extends Configured implements Serialization<T>{
-
-	@Override
-	public boolean accept(Class<?> c) {
-		TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
getClass().getClassLoader());
-		T instance = typeSerializer.createInstance();
-		return instance.getClass().isAssignableFrom(c);
-	}
-
-	@Override
-	public Serializer<T> getSerializer(Class<T> c) {
-		TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
getClass().getClassLoader());
-		return new FlinkSerializer<T>(typeSerializer);
-	}
-
-	@Override
-	public Deserializer<T> getDeserializer(Class<T> c) {
-		TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
getClass().getClassLoader());
-		return new FlinkDeserializer<T>(typeSerializer);
-	}
-
-	public static class FlinkSerializer<T> implements Serializer<T> {
-
-		private OutputStream dataOut;
-		private DataOutputViewOutputStreamWrapper dataOutputView;
-		private TypeSerializer<T> typeSerializer;
-
-		public FlinkSerializer(TypeSerializer<T> typeSerializer) {
-			this.typeSerializer = typeSerializer;
-		}
-
-		@Override
-		public void open(OutputStream out) throws IOException {
-			this.dataOut = out;
-			this.dataOutputView = new DataOutputViewOutputStreamWrapper(out);
-		}
-
-		@Override
-		public void serialize(T t) throws IOException {
-			typeSerializer.serialize(t, dataOutputView);
-		}
-
-		@Override
-		public void close() throws IOException {
-			this.dataOut.close();
-		}
-	}
-
-	public static class FlinkDeserializer<T> implements Deserializer<T> {
-
-		private InputStream dataIn;
-		private TypeSerializer<T> typeSerializer;
-		private DataInputViewInputStreamWrapper dataInputView;
-
-		public FlinkDeserializer(TypeSerializer<T> typeSerializer) {
-			this.typeSerializer = typeSerializer;
-		}
-
-		@Override
-		public void open(InputStream in) throws IOException {
-			this.dataIn = in;
-			this.dataInputView = new DataInputViewInputStreamWrapper(in);
-		}
-
-		@Override
-		public T deserialize(T t) throws IOException {
-			T reuse = t;
-			if (reuse == null) {
-				reuse = typeSerializer.createInstance();
-			}
-			return typeSerializer.deserialize(reuse, dataInputView);
-		}
-
-		@Override
-		public void close() throws IOException {
-			this.dataIn.close();
-		}
-	}
-
-	private static final class DataOutputViewOutputStreamWrapper implements DataOutputView {
-
-		private final DataOutputStream dos;
-
-		public DataOutputViewOutputStreamWrapper(OutputStream output) {
-			this.dos = new DataOutputStream(output);
-		}
-
-		@Override
-		public void write(int b) throws IOException {
-			dos.write(b);
-		}
-
-		@Override
-		public void write(byte[] b) throws IOException {
-			dos.write(b);
-		}
-
-		@Override
-		public void write(byte[] b, int off, int len) throws IOException {
-			dos.write(b, off, len);
-		}
-
-		@Override
-		public void writeBoolean(boolean v) throws IOException {
-			dos.writeBoolean(v);
-		}
-
-		@Override
-		public void writeByte(int v) throws IOException {
-			dos.writeByte(v);
-		}
-
-		@Override
-		public void writeShort(int v) throws IOException {
-			dos.writeShort(v);
-		}
-
-		@Override
-		public void writeChar(int v) throws IOException {
-			dos.writeChar(v);
-		}
-
-		@Override
-		public void writeInt(int v) throws IOException {
-			dos.writeInt(v);
-		}
-
-		@Override
-		public void writeLong(long v) throws IOException {
-			dos.writeLong(v);
-		}
-
-		@Override
-		public void writeFloat(float v) throws IOException {
-			dos.writeFloat(v);
-		}
-
-		@Override
-		public void writeDouble(double v) throws IOException {
-			dos.writeDouble(v);
-		}
-
-		@Override
-		public void writeBytes(String s) throws IOException {
-			dos.writeBytes(s);
-		}
-
-		@Override
-		public void writeChars(String s) throws IOException {
-			dos.writeChars(s);
-		}
-
-		@Override
-		public void writeUTF(String s) throws IOException {
-			dos.writeUTF(s);
-		}
-
-		@Override
-		public void skipBytesToWrite(int num) throws IOException {
-			for (int i = 0; i < num; i++) {
-				dos.write(0);
-			}
-		}
-
-		@Override
-		public void write(DataInputView inview, int num) throws IOException {
-			for (int i = 0; i < num; i++) {
-				dos.write(inview.readByte());
-			}
-		}
-	}
-
-	private static final class DataInputViewInputStreamWrapper implements DataInputView {
-
-		private final DataInputStream dis;
-
-
-		public DataInputViewInputStreamWrapper(InputStream input) {
-			this.dis = new DataInputStream(input);
-		}
-
-		@Override
-		public void readFully(byte[] b) throws IOException {
-			dis.readFully(b);
-		}
-
-		@Override
-		public void readFully(byte[] b, int off, int len) throws IOException {
-			dis.readFully(b, off, len);
-		}
-
-		@Override
-		public int skipBytes(int n) throws IOException {
-			return dis.skipBytes(n);
-		}
-
-		@Override
-		public boolean readBoolean() throws IOException {
-			return dis.readBoolean();
-		}
-
-		@Override
-		public byte readByte() throws IOException {
-			return dis.readByte();
-		}
-
-		@Override
-		public int readUnsignedByte() throws IOException {
-			return dis.readUnsignedByte();
-		}
-
-		@Override
-		public short readShort() throws IOException {
-			return dis.readShort();
-		}
-
-		@Override
-		public int readUnsignedShort() throws IOException {
-			return dis.readUnsignedShort();
-		}
-
-		@Override
-		public char readChar() throws IOException {
-			return dis.readChar();
-		}
-
-		@Override
-		public int readInt() throws IOException {
-			return dis.readInt();
-		}
-
-		@Override
-		public long readLong() throws IOException {
-			return dis.readLong();
-		}
-
-		@Override
-		public float readFloat() throws IOException {
-			return dis.readFloat();
-		}
-
-		@Override
-		public double readDouble() throws IOException {
-			return dis.readDouble();
-		}
-
-		@Override
-		public String readLine() throws IOException {
-			return dis.readLine();
-		}
-
-		@Override
-		public String readUTF() throws IOException {
-			return dis.readUTF();
-		}
-
-		@Override
-		public void skipBytesToRead(int numBytes) throws IOException {
-			while (numBytes > 0) {
-				numBytes -= dis.skipBytes(numBytes);
-			}
-		}
-
-		@Override
-		public int read(byte[] b, int off, int len) throws IOException {
-			dis.readFully(b, off, len);
-			return len;
-		}
-
-		@Override
-		public int read(byte[] b) throws IOException {
-			return read(b, 0, b.length);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/resources/log4j.properties b/flink-staging/flink-tez/src/main/resources/log4j.properties
deleted file mode 100644
index 0845c81..0000000
--- a/flink-staging/flink-tez/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=INFO, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
deleted file mode 100644
index 9124faa..0000000
--- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.tez.examples.ConnectedComponentsStep;
-import org.junit.Assert;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.util.regex.Pattern;
-
-/*
- * Note: This does not test whether the program computes one step of the
- * Weakly Connected Components program correctly. It only tests whether
- * the program assigns a wrong component to a vertex.
- */
-
-public class ConnectedComponentsStepITCase extends TezProgramTestBase {
-
-    private static final long SEED = 0xBADC0FFEEBEEFL;
-
-    private static final int NUM_VERTICES = 1000;
-
-    private static final int NUM_EDGES = 10000;
-
-
-    private String verticesPath;
-    private String edgesPath;
-    private String resultPath;
-
-
-    @Override
-    protected void preSubmit() throws Exception {
-        verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
-        edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES,
NUM_VERTICES, SEED));
-        resultPath = getTempFilePath("results");
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        ConnectedComponentsStep.main(verticesPath, edgesPath, resultPath, "100");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        for (BufferedReader reader : getResultReader(resultPath)) {
-            checkOddEvenResult(reader);
-        }
-    }
-
-    private static void checkOddEvenResult(BufferedReader result) throws IOException {
-        Pattern split = Pattern.compile(" ");
-        String line;
-        while ((line = result.readLine()) != null) {
-            String[] res = split.split(line);
-            Assert.assertEquals("Malformed result: Wrong number of tokens in line.", 2, res.length);
-            try {
-                int vertex = Integer.parseInt(res[0]);
-                int component = Integer.parseInt(res[1]);
-                Assert.assertTrue(((vertex % 2) == (component % 2)));
-            } catch (NumberFormatException e) {
-                Assert.fail("Malformed result.");
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
deleted file mode 100644
index 9a203fe..0000000
--- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.tez.examples.PageRankBasicStep;
-
-public class PageRankBasicStepITCase extends TezProgramTestBase {
-
-    private String verticesPath;
-    private String edgesPath;
-    private String resultPath;
-    private String expectedResult;
-
-    public static final String RANKS_AFTER_1_ITERATION = "1 0.2\n" +
-            "2 0.25666666666666665\n" +
-            "3 0.1716666666666667\n" +
-            "4 0.1716666666666667\n" +
-            "5 0.2";
-
-    @Override
-    protected void preSubmit() throws Exception {
-        resultPath = getTempDirPath("result");
-        verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
-        edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        PageRankBasicStep.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"",
"-1"});
-        expectedResult = RANKS_AFTER_1_ITERATION;
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.001);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
deleted file mode 100644
index eda9d1a..0000000
--- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.tez.client.LocalTezEnvironment;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public abstract class TezProgramTestBase extends AbstractTestBase {
-
-    private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4;
-
-    private JobExecutionResult latestExecutionResult;
-
-    private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;
-
-
-    public TezProgramTestBase() {
-        this(new Configuration());
-    }
-
-    public TezProgramTestBase(Configuration config) {
-        super (config);
-    }
-
-
-    public void setParallelism(int degreeOfParallelism) {
-        this.degreeOfParallelism = degreeOfParallelism;
-    }
-
-    public JobExecutionResult getLatestExecutionResult() {
-        return this.latestExecutionResult;
-    }
-
-
-    protected abstract void testProgram() throws Exception;
-
-    protected void preSubmit() throws Exception {}
-
-    protected void postSubmit() throws Exception {}
-
-    // --------------------------------------------------------------------------------------------
-    //  Test entry point
-    // --------------------------------------------------------------------------------------------
-
-    // Ignored due to deadlocks in Tez 0.6.1 (https://s3.amazonaws.com/archive.travis-ci.org/jobs/67848151/log.txt)
-    // TODO Reactivate with future Tez versions
-    @Ignore
-    @Test
-    public void testJob() throws Exception {
-        // pre-submit
-        try {
-            preSubmit();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Pre-submit work caused an error: " + e.getMessage());
-        }
-
-        // prepare the test environment
-        LocalTezEnvironment env = LocalTezEnvironment.create();
-        env.setParallelism(degreeOfParallelism);
-        env.setAsContext();
-
-        // call the test program
-        try {
-            testProgram();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Error while calling the test program: " + e.getMessage());
-        }
-
-        // post-submit
-        try {
-            postSubmit();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Post-submit work caused an error: " + e.getMessage());
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
deleted file mode 100644
index 35aa54a..0000000
--- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-
-import org.apache.flink.examples.java.relational.WebLogAnalysis;
-import org.apache.flink.test.testdata.WebLogAnalysisData;
-
-public class WebLogAnalysisITCase extends TezProgramTestBase {
-
-    private String docsPath;
-    private String ranksPath;
-    private String visitsPath;
-    private String resultPath;
-
-    @Override
-    protected void preSubmit() throws Exception {
-        docsPath = createTempFile("docs", WebLogAnalysisData.DOCS);
-        ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS);
-        visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS);
-        resultPath = getTempDirPath("result");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath);
-    }
-    @Override
-    protected void testProgram() throws Exception {
-        WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, resultPath});
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
deleted file mode 100644
index d73aa8b..0000000
--- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class WordCountITCase extends TezProgramTestBase {
-
-    protected String textPath;
-    protected String resultPath;
-
-    public WordCountITCase(){
-    }
-
-    @Override
-    protected void preSubmit() throws Exception {
-        textPath = createTempFile("text.txt", WordCountData.TEXT);
-        resultPath = getTempDirPath("result");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        WordCount.main(new String[]{textPath, resultPath});
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/resources/log4j-test.properties b/flink-staging/flink-tez/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0845c81..0000000
--- a/flink-staging/flink-tez/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=INFO, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/resources/logback-test.xml b/flink-staging/flink-tez/src/test/resources/logback-test.xml
deleted file mode 100644
index 48e4374..0000000
--- a/flink-staging/flink-tez/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{60} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-
-    <!--<logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.taskmanager.TaskManager" level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.executiongraph.ExecutionGraph" level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.jobmanager.EventCollector" level="OFF"/>-->
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
deleted file mode 100644
index 535c910..0000000
--- a/flink-staging/pom.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-parent</artifactId>
-		<version>1.0-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-
-	<artifactId>flink-staging</artifactId>
-	<name>flink-staging</name>
-	<packaging>pom</packaging>
-
-	<modules>
-		<module>flink-avro</module>
-		<module>flink-jdbc</module>
-		<module>flink-hadoop-compatibility</module>
-		<module>flink-hbase</module>
-		<module>flink-hcatalog</module>
-		<module>flink-table</module>
-		<module>flink-ml</module>
-		<module>flink-scala-shell</module>
-	</modules>
-	
-	<!-- See main pom.xml for explanation of profiles -->
-	<profiles>
-		<profile>
-			<id>hadoop-2</id>
-			<activation>
-				<property>
-					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh
-->
-					<!--hadoop2--><name>!hadoop.profile</name>
-				</property>
-			</activation>
-			<modules>
-				<!-- Include the flink-fs-tests project only for HD2.
-				 	The HDFS minicluster interfaces changed between the two versions.
-				 -->
-				<module>flink-fs-tests</module>
-			</modules>
-		</profile>
-		<profile>
-			<id>include-tez</id>
-			<modules>
-				<module>flink-tez</module>
-			</modules>
-		</profile>
-	</profiles>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6cba956..e503a11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,12 +62,13 @@ under the License.
 		<module>flink-streaming-java</module>
 		<module>flink-streaming-scala</module>
 		<module>flink-streaming-connectors</module>
+		<module>flink-batch-connectors</module>
 		<module>flink-examples</module>
 		<module>flink-clients</module>
 		<module>flink-tests</module>
 		<module>flink-test-utils</module>
-		<module>flink-staging</module>
 		<module>flink-libraries</module>
+		<module>flink-scala-shell</module>
 		<module>flink-quickstart</module>
 		<module>flink-contrib</module>
 		<module>flink-dist</module>
@@ -428,6 +429,10 @@ under the License.
 			</properties>
 			<modules>
 				<module>flink-yarn</module>
+				<!-- Include the flink-fs-tests project only for HD2.
+				 	The HDFS minicluster interfaces changed between the two versions.
+				 -->
+				<module>flink-fs-tests</module>
 			</modules>
 		</profile>
 
@@ -802,13 +807,13 @@ under the License.
 
 						<!-- Test Data. -->
 						<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
-						<exclude>flink-staging/flink-avro/src/test/resources/avro/*.avsc</exclude>
+						<exclude>flink-batch-connectors/flink-avro/src/test/resources/avro/*.avsc</exclude>
 						<exclude>out/test/flink-avro/avro/user.avsc</exclude>
-						<exclude>flink-staging/flink-table/src/test/scala/resources/*.out</exclude>
+						<exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude>
 						<!-- TweetInputFormat Test Data-->
 						<exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude>
-						<exclude>flink-staging/flink-avro/src/test/resources/testdata.avro</exclude>
-						<exclude>flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude>
+						<exclude>flink-batch-connectors/flink-avro/src/test/resources/testdata.avro</exclude>
+						<exclude>flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude>
 						<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv</exclude>
 						<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text</exclude>
 						<!-- Configuration Files. -->


Mime
View raw message