Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B7998114AE for ; Sun, 21 Sep 2014 02:13:13 +0000 (UTC) Received: (qmail 86772 invoked by uid 500); 21 Sep 2014 02:13:13 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 86735 invoked by uid 500); 21 Sep 2014 02:13:13 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 86711 invoked by uid 99); 21 Sep 2014 02:13:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Sep 2014 02:13:13 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 21 Sep 2014 02:12:46 +0000 Received: (qmail 83182 invoked by uid 99); 21 Sep 2014 02:12:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Sep 2014 02:12:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9A40FA1DE01; Sun, 21 Sep 2014 02:12:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Sun, 21 Sep 2014 02:12:40 -0000 Message-Id: <0e6704df9c99434f91dfff4ec4f46579@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/63] [abbrv] Refactor job graph construction to incremental attachment based X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java deleted file mode 100644 index 2f89a80..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.io; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.BlockLocation; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.FileStatus; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.fs.FileSystem.WriteMode; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.util.ReflectionUtil; - -/** - * Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}. - */ -public class FormatUtil { - - - /** - * Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration} - * initializes the format. - * - * @param - * the class of the InputFormat - * @param inputFormatClass - * the class of the InputFormat - * @param path - * the path of the file - * @param configuration - * optional configuration of the InputFormat - * @return the created {@link InputFormat} - * @throws IOException - * if an I/O error occurred while accessing the file or initializing the InputFormat. - */ - public static > F openInput( - Class inputFormatClass, String path, Configuration configuration) - throws IOException - { - configuration = configuration == null ? new Configuration() : configuration; - - Path normalizedPath = normalizePath(new Path(path)); - final F inputFormat = ReflectionUtil.newInstance(inputFormatClass); - - inputFormat.setFilePath(normalizedPath); - inputFormat.setOpenTimeout(0); - inputFormat.configure(configuration); - - final FileSystem fs = FileSystem.get(normalizedPath.toUri()); - FileStatus fileStatus = fs.getFileStatus(normalizedPath); - - BlockLocation[] blocks = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); - inputFormat.open(new FileInputSplit(0, new Path(path), 0, fileStatus.getLen(), blocks[0].getHosts())); - return inputFormat; - } - - /** - * Creates {@link InputFormat}s from a given class for the specified file(s). The optional {@link Configuration} - * initializes the formats. - * - * @param - * the class of the InputFormat - * @param inputFormatClass - * the class of the InputFormat - * @param path - * the path of the file or to the directory containing the splits - * @param configuration - * optional configuration of the InputFormat - * @return the created {@link InputFormat}s for each file in the specified path - * @throws IOException - * if an I/O error occurred while accessing the files or initializing the InputFormat. - */ - @SuppressWarnings("unchecked") - public static > List openAllInputs( - Class inputFormatClass, String path, Configuration configuration) throws IOException { - Path nephelePath = new Path(path); - FileSystem fs = nephelePath.getFileSystem(); - FileStatus fileStatus = fs.getFileStatus(nephelePath); - if (!fileStatus.isDir()) { - return Arrays.asList(openInput(inputFormatClass, path, configuration)); - } - FileStatus[] list = fs.listStatus(nephelePath); - List formats = new ArrayList(); - for (int index = 0; index < list.length; index++) { - formats.add(openInput(inputFormatClass, list[index].getPath().toString(), configuration)); - } - return formats; - } - - /** - * Creates an {@link InputFormat} from a given class. The optional {@link Configuration} - * initializes the format. - * - * @param - * the class of the InputFormat - * @param inputFormatClass - * the class of the InputFormat - * @param configuration - * optional configuration of the InputFormat - * @return the created {@link InputFormat} - * @throws IOException - * if an I/O error occurred while accessing the file or initializing the InputFormat. - */ - public static > F openInput( - Class inputFormatClass, Configuration configuration) throws IOException { - configuration = configuration == null ? new Configuration() : configuration; - - final F inputFormat = ReflectionUtil.newInstance(inputFormatClass); - inputFormat.configure(configuration); - final IS[] splits = inputFormat.createInputSplits(1); - inputFormat.open(splits[0]); - return inputFormat; - } - - /** - * Creates an {@link OutputFormat} from a given class for the specified file. The optional {@link Configuration} - * initializes the format. - * - * @param - * the class of the OutputFormat - * @param outputFormatClass - * the class of the OutputFormat - * @param path - * the path of the file or to the directory containing the splits - * @param configuration - * optional configuration of the OutputFormat - * @return the created {@link OutputFormat} - * @throws IOException - * if an I/O error occurred while accessing the file or initializing the OutputFormat. - */ - public static > F openOutput( - Class outputFormatClass, String path, Configuration configuration) - throws IOException - { - final F outputFormat = ReflectionUtil.newInstance(outputFormatClass); - outputFormat.setOutputFilePath(new Path(path)); - outputFormat.setWriteMode(WriteMode.OVERWRITE); - - configuration = configuration == null ? new Configuration() : configuration; - - outputFormat.configure(configuration); - outputFormat.open(0, 1); - return outputFormat; - } - - /** - * Fixes the path if it denotes a local (relative) file without the proper protocol prefix. - */ - private static Path normalizePath(Path path) { - URI uri = path.toUri(); - if (uri.getScheme() == null) { - try { - uri = new URI("file", uri.getHost(), uri.getPath(), uri.getFragment()); - path = new Path(uri.toString()); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("path is invalid", e); - } - } - return path; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java index 0ddeb64..c108471 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java @@ -50,7 +50,6 @@ public abstract class GenericInputFormat implements InputFormat implements InputFormat getInputSplitType() { - return GenericInputSplit.class; + public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) { + return new DefaultInputSplitAssigner(splits); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java index 1bb7815..5eaa657 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.io; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java index cb4019c..6845237 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.io; import java.io.IOException; @@ -25,6 +24,8 @@ import java.io.Serializable; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.core.io.InputSplitSource; /** * The base interface for data sources that produces records. @@ -59,7 +60,7 @@ import org.apache.flink.core.io.InputSplit; * @param The type of the produced records. * @param The type of input split. */ -public interface InputFormat extends Serializable { +public interface InputFormat extends InputSplitSource, Serializable { /** * Configures this input format. Since input formats are instantiated generically and hence parameterless, @@ -95,6 +96,7 @@ public interface InputFormat extends Serializable { * * @throws IOException Thrown, when the creation of the splits was erroneous. */ + @Override T[] createInputSplits(int minNumSplits) throws IOException; /** @@ -102,7 +104,8 @@ public interface InputFormat extends Serializable { * * @return The type of the input splits. */ - Class getInputSplitType(); + @Override + InputSplitAssigner getInputSplitAssigner(T[] inputSplits); // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java new file mode 100644 index 0000000..6243681 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.core.io.LocatableInputSplit; + +/** + * The locatable input split assigner assigns to each host splits that are local, before assigning + * splits that are not local. + */ +public final class LocatableInputSplitAssigner implements InputSplitAssigner { + + private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class); + + + private final Set unassigned = new HashSet(); + + private final ConcurrentHashMap> localPerHost = new ConcurrentHashMap>(); + + private int localAssignments; // lock protected by the unassigned set lock + + private int remoteAssignments; // lock protected by the unassigned set lock + + // -------------------------------------------------------------------------------------------- + + public LocatableInputSplitAssigner(Collection splits) { + this.unassigned.addAll(splits); + } + + public LocatableInputSplitAssigner(LocatableInputSplit[] splits) { + Collections.addAll(this.unassigned, splits); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public LocatableInputSplit getNextInputSplit(String host) { + // for a null host, we return an arbitrary split + if (host == null) { + + synchronized (this.unassigned) { + Iterator iter = this.unassigned.iterator(); + if (iter.hasNext()) { + LocatableInputSplit next = iter.next(); + iter.remove(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Assigning arbitrary split to null host."); + } + + remoteAssignments++; + return next; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No more input splits remaining."); + } + return null; + } + } + } + + host = host.toLowerCase(Locale.US); + + // for any non-null host, we take the list of non-null splits + List localSplits = this.localPerHost.get(host); + + // if we have no list for this host yet, create one + if (localSplits == null) { + localSplits = new ArrayList(16); + + // lock the list, to be sure that others have to wait for that host's local list + synchronized (localSplits) { + List prior = this.localPerHost.putIfAbsent(host, localSplits); + + // if someone else beat us in the case to create this list, then we do not populate this one, but + // simply work with that other list + if (prior == null) { + // we are the first, we populate + + // first, copy the remaining splits to release the lock on the set early + // because that is shared among threads + LocatableInputSplit[] remaining; + synchronized (this.unassigned) { + remaining = (LocatableInputSplit[]) this.unassigned.toArray(new LocatableInputSplit[this.unassigned.size()]); + } + + for (LocatableInputSplit is : remaining) { + if (isLocal(host, is.getHostnames())) { + localSplits.add(is); + } + } + } + else { + // someone else was faster + localSplits = prior; + } + } + } + + // at this point, we have a list of local splits (possibly empty) + // we need to make sure no one else operates in the current list (that protects against + // list creation races) and that the unassigned set is consistent + // NOTE: we need to obtain the locks in this order, strictly!!! + synchronized (localSplits) { + int size = localSplits.size(); + if (size > 0) { + synchronized (this.unassigned) { + do { + --size; + LocatableInputSplit split = localSplits.remove(size); + if (this.unassigned.remove(split)) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Assigning local split to host " + host); + } + + localAssignments++; + return split; + } + } while (size > 0); + } + } + } + + // we did not find a local split, return any + synchronized (this.unassigned) { + Iterator iter = this.unassigned.iterator(); + if (iter.hasNext()) { + LocatableInputSplit next = iter.next(); + iter.remove(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Assigning remote split to host " + host); + } + + remoteAssignments++; + return next; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No more input splits remaining."); + } + return null; + } + } + } + + private static final boolean isLocal(String host, String[] hosts) { + if (host == null || hosts == null) { + return false; + } + + for (String h : hosts) { + if (h != null && host.equals(h.toLowerCase())) { + return true; + } + } + + return false; + } + + public int getNumberOfLocalAssignments() { + return localAssignments; + } + + public int getNumberOfRemoteAssignments() { + return remoteAssignments; + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java b/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java index 157df71..d4fce5b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java @@ -18,10 +18,9 @@ package org.apache.flink.api.common.io; - /** * This interface acts as a marker for input formats for inputs which cannot be split. - * Data sources with a Sequential input formats are always executed with a degree-of-parallelism + * Data sources with a non-parallel input formats are always executed with a degree-of-parallelism * of one. * * @see InputFormat http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java index cd02ac9..ddf9cbc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.io; import java.io.IOException; @@ -24,7 +23,6 @@ import java.io.Serializable; import org.apache.flink.configuration.Configuration; - /** * The base interface for outputs that consumes records. The output format * describes how to store the final records, for example in a file. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java index c4a69ba..ebee5d0 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java @@ -151,19 +151,12 @@ public class FileInputSplit extends LocatableInputSplit { if (obj == this) { return true; } - else if (obj != null && super.equals(obj) && obj instanceof FileInputSplit) { + else if (obj != null && obj instanceof FileInputSplit && super.equals(obj)) { FileInputSplit other = (FileInputSplit) obj; - if (this.file != null) { - if (!this.file.equals(other.file)) { - return false; - } - } - else if (other.file != null) { - return false; - } - - return this.start == other.start && this.length == other.length; + return this.start == other.start && + this.length == other.length && + (this.file == null ? other.file == null : (other.file != null && this.file.equals(other.file))); } else { return false; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java index 850ba1c..52018a1 100644 --- a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java +++ b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java @@ -78,7 +78,7 @@ public class GenericInputSplit implements InputSplit, java.io.Serializable { } @Override - public void read(final DataInputView in) throws IOException { + public void read(DataInputView in) throws IOException { this.partitionNumber = in.readInt(); this.totalNumberOfPartitions = in.readInt(); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java index c054fb8..1a51207 100644 --- a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java +++ b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.core.io; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java new file mode 100644 index 0000000..256b9c7 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +/** + * InputSplitSources create {@link InputSplit}s that define portions of data to be produced + * by {@link org.apache.flink.api.common.io.InputFormat}s. + * + * @param The type of the input splits created by the source. + */ +public interface InputSplitSource extends java.io.Serializable { + + /** + * Computes the input splits. The given minimum number of splits is a hint as to how + * many splits are desired. + * + * @param minNumSplits Number of minimal input splits, as a hint. + * @return An array of input splits. + * + * @throws Exception Exceptions when creating the input splits may be forwarded and will cause the + * execution to permanently fail. + */ + T[] createInputSplits(int minNumSplits) throws Exception; + + /** + * Returns the assigner for the input splits. Assigner determines which parallel instance of the + * input format gets which input split. + * + * @return The input split assigner. + */ + InputSplitAssigner getInputSplitAssigner(T[] inputSplits); +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java new file mode 100644 index 0000000..25835f5 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import java.io.IOException; +import java.io.InputStream; + +public final class DataInputViewStream extends InputStream { + + private final DataInputView inputView; + + + public DataInputViewStream(DataInputView inputView) { + this.inputView = inputView; + } + + + @Override + public int read() throws IOException { + return inputView.readByte(); + } + + public int read(byte b[], int off, int len) throws IOException { + inputView.readFully(b, off, len); + return len; + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java new file mode 100644 index 0000000..f3188aa --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import java.io.IOException; +import java.io.OutputStream; + +public final class DataOutputViewStream extends OutputStream { + + private final DataOutputView outputView; + + + public DataOutputViewStream(DataOutputView outputView) { + this.outputView = outputView; + } + + + @Override + public void write(int b) throws IOException { + outputView.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outputView.write(b, off, len); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java b/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java index 00ad8e5..7e891bb 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.util; import java.util.HashMap; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java new file mode 100644 index 0000000..8f9822e --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.io.PrintWriter; +import java.io.StringWriter; + +public class ExceptionUtils { + + /** + * Makes a string representation of the exception's stack trace. + * + * @param e The exception to stringify. + * @return A string with exception name and call stack. + */ + public static String stringifyException(final Throwable e) { + final StringWriter stm = new StringWriter(); + final PrintWriter wrt = new PrintWriter(stm); + e.printStackTrace(wrt); + wrt.close(); + return stm.toString(); + } + + /** + * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to + * throw arbitrary Throwables. Errors and RuntimeExceptions are thrown directly, other exceptions + * are packed into runtime exceptions + * + * @param t The throwable to be thrown. + */ + public static void rethrow(Throwable t) { + if (t instanceof Error) { + throw (Error) t; + } + else if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } + else { + throw new RuntimeException(t); + } + } + + /** + * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to + * throw arbitrary Throwables. Errors and RuntimeExceptions are thrown directly, other exceptions + * are packed into a parent RuntimeEception. + * + * @param t The throwable to be thrown. + * @param parentMessage The message for the parent RuntimeException, if one is needed. + */ + public static void rethrow(Throwable t, String parentMessage) { + if (t instanceof Error) { + throw (Error) t; + } + else if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } + else { + throw new RuntimeException(parentMessage, t); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java b/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java index 2bdbc60..755e188 100644 --- a/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java +++ b/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java @@ -21,13 +21,10 @@ package org.apache.flink.util; import java.util.Iterator; import java.util.NoSuchElementException; - - -/** - * - */ /** - * + * The {@code NumberSequenceIterator} is an iterator that returns a sequence of numbers (as {@code Long})s. + * The iterator is splittable (as defined by {@link SplittableIterator}, i.e., it can be divided into multiple + * iterators that each return a subsequence of the number sequence. */ public class NumberSequenceIterator implements SplittableIterator { @@ -38,7 +35,12 @@ public class NumberSequenceIterator implements SplittableIterator { private long current; - + /** + * Internal constructor to allow for empty iterators. + * + * @param from The first number returned by the iterator. + * @param to The last number returned by the iterator. + */ public NumberSequenceIterator(long from, long to) { if (from > to) { throw new IllegalArgumentException("The 'to' value must not be smaller than the 'from' value."); @@ -52,11 +54,11 @@ public class NumberSequenceIterator implements SplittableIterator { /** * Internal constructor to allow for empty iterators. * - * @param from - * @param to - * @param mark + * @param from The first number returned by the iterator. + * @param to The last number returned by the iterator. + * @param unused A dummy parameter to disambiguate the constructor. */ - private NumberSequenceIterator(long from, long to, boolean mark) { + private NumberSequenceIterator(long from, long to, boolean unused) { this.current = from; this.to = to; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java b/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java index c3e6362..cfbd14e 100644 --- a/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java @@ -16,14 +16,12 @@ * limitations under the License. */ - package org.apache.flink.util; import java.io.Serializable; import org.apache.flink.types.StringValue; - /** * Utility class for efficient string operations on strings. All methods in this class are * written to be optimized for efficiency and work only on strings whose characters are http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java index a15c31c..7d50d12 100644 --- a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java +++ b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java @@ -18,10 +18,17 @@ package org.apache.flink.util; +/** + * An exception, indicating that an {@link java.lang.Iterable} can only be traversed once, but has been attempted + * to traverse an additional time. + */ public class TraversableOnceException extends RuntimeException { private static final long serialVersionUID = 7636881584773577290L; + /** + * Creates a new exception with a default message. + */ public TraversableOnceException() { super("The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed."); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java new file mode 100644 index 0000000..2f89a80 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.io; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.BlockLocation; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.util.ReflectionUtil; + +/** + * Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}. + */ +public class FormatUtil { + + + /** + * Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration} + * initializes the format. + * + * @param + * the class of the InputFormat + * @param inputFormatClass + * the class of the InputFormat + * @param path + * the path of the file + * @param configuration + * optional configuration of the InputFormat + * @return the created {@link InputFormat} + * @throws IOException + * if an I/O error occurred while accessing the file or initializing the InputFormat. + */ + public static > F openInput( + Class inputFormatClass, String path, Configuration configuration) + throws IOException + { + configuration = configuration == null ? new Configuration() : configuration; + + Path normalizedPath = normalizePath(new Path(path)); + final F inputFormat = ReflectionUtil.newInstance(inputFormatClass); + + inputFormat.setFilePath(normalizedPath); + inputFormat.setOpenTimeout(0); + inputFormat.configure(configuration); + + final FileSystem fs = FileSystem.get(normalizedPath.toUri()); + FileStatus fileStatus = fs.getFileStatus(normalizedPath); + + BlockLocation[] blocks = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + inputFormat.open(new FileInputSplit(0, new Path(path), 0, fileStatus.getLen(), blocks[0].getHosts())); + return inputFormat; + } + + /** + * Creates {@link InputFormat}s from a given class for the specified file(s). The optional {@link Configuration} + * initializes the formats. + * + * @param + * the class of the InputFormat + * @param inputFormatClass + * the class of the InputFormat + * @param path + * the path of the file or to the directory containing the splits + * @param configuration + * optional configuration of the InputFormat + * @return the created {@link InputFormat}s for each file in the specified path + * @throws IOException + * if an I/O error occurred while accessing the files or initializing the InputFormat. + */ + @SuppressWarnings("unchecked") + public static > List openAllInputs( + Class inputFormatClass, String path, Configuration configuration) throws IOException { + Path nephelePath = new Path(path); + FileSystem fs = nephelePath.getFileSystem(); + FileStatus fileStatus = fs.getFileStatus(nephelePath); + if (!fileStatus.isDir()) { + return Arrays.asList(openInput(inputFormatClass, path, configuration)); + } + FileStatus[] list = fs.listStatus(nephelePath); + List formats = new ArrayList(); + for (int index = 0; index < list.length; index++) { + formats.add(openInput(inputFormatClass, list[index].getPath().toString(), configuration)); + } + return formats; + } + + /** + * Creates an {@link InputFormat} from a given class. The optional {@link Configuration} + * initializes the format. + * + * @param + * the class of the InputFormat + * @param inputFormatClass + * the class of the InputFormat + * @param configuration + * optional configuration of the InputFormat + * @return the created {@link InputFormat} + * @throws IOException + * if an I/O error occurred while accessing the file or initializing the InputFormat. + */ + public static > F openInput( + Class inputFormatClass, Configuration configuration) throws IOException { + configuration = configuration == null ? new Configuration() : configuration; + + final F inputFormat = ReflectionUtil.newInstance(inputFormatClass); + inputFormat.configure(configuration); + final IS[] splits = inputFormat.createInputSplits(1); + inputFormat.open(splits[0]); + return inputFormat; + } + + /** + * Creates an {@link OutputFormat} from a given class for the specified file. The optional {@link Configuration} + * initializes the format. + * + * @param + * the class of the OutputFormat + * @param outputFormatClass + * the class of the OutputFormat + * @param path + * the path of the file or to the directory containing the splits + * @param configuration + * optional configuration of the OutputFormat + * @return the created {@link OutputFormat} + * @throws IOException + * if an I/O error occurred while accessing the file or initializing the OutputFormat. + */ + public static > F openOutput( + Class outputFormatClass, String path, Configuration configuration) + throws IOException + { + final F outputFormat = ReflectionUtil.newInstance(outputFormatClass); + outputFormat.setOutputFilePath(new Path(path)); + outputFormat.setWriteMode(WriteMode.OVERWRITE); + + configuration = configuration == null ? new Configuration() : configuration; + + outputFormat.configure(configuration); + outputFormat.open(0, 1); + return outputFormat; + } + + /** + * Fixes the path if it denotes a local (relative) file without the proper protocol prefix. + */ + private static Path normalizePath(Path path) { + URI uri = path.toUri(); + if (uri.getScheme() == null) { + try { + uri = new URI("file", uri.getHost(), uri.getPath(), uri.getFragment()); + path = new Path(uri.toString()); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("path is invalid", e); + } + } + return path; + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java new file mode 100644 index 0000000..b88fcc5 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import static org.junit.Assert.*; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.junit.Test; + + +public class DefaultSplitAssignerTest { + + @Test + public void testSerialSplitAssignment() { + try { + final int NUM_SPLITS = 50; + + Set splits = new HashSet(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new GenericInputSplit(i, NUM_SPLITS)); + } + + DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits); + InputSplit is = null; + while ((is = ia.getNextInputSplit("")) != null) { + assertTrue(splits.remove(is)); + } + + assertTrue(splits.isEmpty()); + assertNull(ia.getNextInputSplit("")); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testConcurrentSplitAssignment() { + try { + final int NUM_THREADS = 10; + final int NUM_SPLITS = 500; + final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2; + + Set splits = new HashSet(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new GenericInputSplit(i, NUM_SPLITS)); + } + + final DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits); + + final AtomicInteger splitsRetrieved = new AtomicInteger(0); + final AtomicInteger sumOfIds = new AtomicInteger(0); + + Runnable retriever = new Runnable() { + + @Override + public void run() { + String host = ""; + GenericInputSplit split; + while ((split = (GenericInputSplit) ia.getNextInputSplit(host)) != null) { + splitsRetrieved.incrementAndGet(); + sumOfIds.addAndGet(split.getSplitNumber()); + } + } + }; + + // create the threads + Thread[] threads = new Thread[NUM_THREADS]; + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(retriever); + threads[i].setDaemon(true); + } + + // launch concurrently + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].start(); + } + + // sync + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].join(5000); + } + + // verify + for (int i = 0; i < NUM_THREADS; i++) { + if (threads[i].isAlive()) { + fail("The concurrency test case is erroneous, the thread did not respond in time."); + } + } + + assertEquals(NUM_SPLITS, splitsRetrieved.get()); + assertEquals(SUM_OF_IDS, sumOfIds.get()); + + // nothing left + assertNull(ia.getNextInputSplit("")); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java new file mode 100644 index 0000000..aa56c1c --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import static org.junit.Assert.*; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.LocatableInputSplit; +import org.apache.flink.util.LogUtils; + +import org.junit.BeforeClass; +import org.junit.Test; + + +public class LocatableSplitAssignerTest { + + @Test + public void testSerialSplitAssignmentWithNullHost() { + try { + final int NUM_SPLITS = 50; + final String[][] hosts = new String[][] { + new String[] { "localhost" }, + new String[0], + null + }; + + // load some splits + Set splits = new HashSet(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new LocatableInputSplit(i, hosts[i%3])); + } + + // get all available splits + LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); + InputSplit is = null; + while ((is = ia.getNextInputSplit(null)) != null) { + assertTrue(splits.remove(is)); + } + + // check we had all + assertTrue(splits.isEmpty()); + assertNull(ia.getNextInputSplit("")); + assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments()); + assertEquals(0, ia.getNumberOfLocalAssignments()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSerialSplitAssignmentAllForSameHost() { + try { + final int NUM_SPLITS = 50; + + // load some splits + Set splits = new HashSet(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new LocatableInputSplit(i, "testhost")); + } + + // get all available splits + LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); + InputSplit is = null; + while ((is = ia.getNextInputSplit("testhost")) != null) { + assertTrue(splits.remove(is)); + } + + // check we had all + assertTrue(splits.isEmpty()); + assertNull(ia.getNextInputSplit("")); + + assertEquals(0, ia.getNumberOfRemoteAssignments()); + assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSerialSplitAssignmentAllForRemoteHost() { + try { + final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" }; + final int NUM_SPLITS = 10 * hosts.length; + + // load some splits + Set splits = new HashSet(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new LocatableInputSplit(i, hosts[i % hosts.length])); + } + + // get all available splits + LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); + InputSplit is = null; + while ((is = ia.getNextInputSplit("testhost")) != null) { + assertTrue(splits.remove(is)); + } + + // check we had all + assertTrue(splits.isEmpty()); + assertNull(ia.getNextInputSplit("anotherHost")); + + assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments()); + assertEquals(0, ia.getNumberOfLocalAssignments()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSerialSplitAssignmentMixedLocalHost() { + try { + final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" }; + final int NUM_SPLITS = 10 * hosts.length; + + // load some splits + Set splits = new HashSet(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new LocatableInputSplit(i, hosts[i % hosts.length])); + } + + // get all available splits + LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); + InputSplit is = null; + int i = 0; + while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) { + assertTrue(splits.remove(is)); + } + + // check we had all + assertTrue(splits.isEmpty()); + assertNull(ia.getNextInputSplit("anotherHost")); + + assertEquals(0, ia.getNumberOfRemoteAssignments()); + assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testConcurrentSplitAssignmentNullHost() { + try { + final int NUM_THREADS = 10; + final int NUM_SPLITS = 500; + final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2; + + final String[][] hosts = new String[][] { + new String[] { "localhost" }, + new String[0], + null + }; + + // load some splits + Set splits = new HashSet(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new LocatableInputSplit(i, hosts[i%3])); + } + + final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); + + final AtomicInteger splitsRetrieved = new AtomicInteger(0); + final AtomicInteger sumOfIds = new AtomicInteger(0); + + Runnable retriever = new Runnable() { + + @Override + public void run() { + LocatableInputSplit split; + while ((split = ia.getNextInputSplit(null)) != null) { + splitsRetrieved.incrementAndGet(); + sumOfIds.addAndGet(split.getSplitNumber()); + } + } + }; + + // create the threads + Thread[] threads = new Thread[NUM_THREADS]; + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(retriever); + threads[i].setDaemon(true); + } + + // launch concurrently + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].start(); + } + + // sync + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].join(5000); + } + + // verify + for (int i = 0; i < NUM_THREADS; i++) { + if (threads[i].isAlive()) { + fail("The concurrency test case is erroneous, the thread did not respond in time."); + } + } + + assertEquals(NUM_SPLITS, splitsRetrieved.get()); + assertEquals(SUM_OF_IDS, sumOfIds.get()); + + // nothing left + assertNull(ia.getNextInputSplit("")); + + assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments()); + assertEquals(0, ia.getNumberOfLocalAssignments()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testConcurrentSplitAssignmentForSingleHost() { + try { + final int NUM_THREADS = 10; + final int NUM_SPLITS = 500; + final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2; + + // load some splits + Set splits = new HashSet(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new LocatableInputSplit(i, "testhost")); + } + + final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); + + final AtomicInteger splitsRetrieved = new AtomicInteger(0); + final AtomicInteger sumOfIds = new AtomicInteger(0); + + Runnable retriever = new Runnable() { + + @Override + public void run() { + LocatableInputSplit split; + while ((split = ia.getNextInputSplit("testhost")) != null) { + splitsRetrieved.incrementAndGet(); + sumOfIds.addAndGet(split.getSplitNumber()); + } + } + }; + + // create the threads + Thread[] threads = new Thread[NUM_THREADS]; + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(retriever); + threads[i].setDaemon(true); + } + + // launch concurrently + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].start(); + } + + // sync + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].join(5000); + } + + // verify + for (int i = 0; i < NUM_THREADS; i++) { + if (threads[i].isAlive()) { + fail("The concurrency test case is erroneous, the thread did not respond in time."); + } + } + + assertEquals(NUM_SPLITS, splitsRetrieved.get()); + assertEquals(SUM_OF_IDS, sumOfIds.get()); + + // nothing left + assertNull(ia.getNextInputSplit("testhost")); + + assertEquals(0, ia.getNumberOfRemoteAssignments()); + assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testConcurrentSplitAssignmentForMultipleHosts() { + try { + final int NUM_THREADS = 10; + final int NUM_SPLITS = 500; + final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2; + + final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" }; + + // load some splits + Set splits = new HashSet(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new LocatableInputSplit(i, hosts[i%hosts.length])); + } + + final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); + + final AtomicInteger splitsRetrieved = new AtomicInteger(0); + final AtomicInteger sumOfIds = new AtomicInteger(0); + + Runnable retriever = new Runnable() { + + @Override + public void run() { + final String threadHost = hosts[(int) (Math.random() * hosts.length)]; + + LocatableInputSplit split; + while ((split = ia.getNextInputSplit(threadHost)) != null) { + splitsRetrieved.incrementAndGet(); + sumOfIds.addAndGet(split.getSplitNumber()); + } + } + }; + + // create the threads + Thread[] threads = new Thread[NUM_THREADS]; + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(retriever); + threads[i].setDaemon(true); + } + + // launch concurrently + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].start(); + } + + // sync + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].join(5000); + } + + // verify + for (int i = 0; i < NUM_THREADS; i++) { + if (threads[i].isAlive()) { + fail("The concurrency test case is erroneous, the thread did not respond in time."); + } + } + + assertEquals(NUM_SPLITS, splitsRetrieved.get()); + assertEquals(SUM_OF_IDS, sumOfIds.get()); + + // nothing left + assertNull(ia.getNextInputSplit("testhost")); + + // at least one fraction of hosts needs be local, no matter how bad the thread races + assertTrue(ia.getNumberOfLocalAssignments() >= NUM_SPLITS / hosts.length); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java index 3518927..a5d2b91 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java @@ -22,7 +22,7 @@ package org.apache.flink.api.java.record.io; import java.io.IOException; import org.junit.Assert; - +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.java.record.io.ExternalProcessFixedLengthInputFormat; import org.apache.flink.api.java.record.io.ExternalProcessInputFormat; @@ -286,8 +286,8 @@ private ExternalProcessFixedLengthInputFormat format; } @Override - public Class getInputSplitType() { - return GenericInputSplit.class; + public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) { + return new DefaultInputSplitAssigner(splits); } @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java index 1b86ecb..96f6664 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java @@ -16,13 +16,12 @@ * limitations under the License. */ - package org.apache.flink.api.java.record.io; import java.io.IOException; import org.junit.Assert; - +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.java.record.io.ExternalProcessInputFormat; import org.apache.flink.api.java.record.io.ExternalProcessInputSplit; @@ -225,10 +224,10 @@ public class ExternalProcessInputFormatTest { } @Override - public Class getInputSplitType() { - return GenericInputSplit.class; + public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) { + return new DefaultInputSplitAssigner(splits); } - + @Override public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { return null; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java index 2ff49e0..7ec5a6a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.*; import java.io.IOException; +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; @@ -141,7 +142,7 @@ public class TypeExtractorInputFormatsTest { public InputSplit[] createInputSplits(int minNumSplits) { return null; } @Override - public Class getInputSplitType() { return null; } + public DefaultInputSplitAssigner getInputSplitAssigner(InputSplit[] splits) { return null; } @Override public void open(InputSplit split) {} @@ -211,7 +212,7 @@ public class TypeExtractorInputFormatsTest { public InputSplit[] createInputSplits(int minNumSplits) { return null; } @Override - public Class getInputSplitType() { return null; } + public DefaultInputSplitAssigner getInputSplitAssigner(InputSplit[] splits) { return null; } @Override public void open(InputSplit split) {} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java index c187961..1fa9491 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java @@ -41,6 +41,7 @@ public class AbstractID implements IOReadableWritable, Comparable, j /** The size of the ID in byte */ public static final int SIZE = 2 * SIZE_OF_LONG; + /** The upper part of the actual ID */ private long upperPart; @@ -48,11 +49,13 @@ public class AbstractID implements IOReadableWritable, Comparable, j /** The lower part of the actual ID */ private long lowerPart; + // -------------------------------------------------------------------------------------------- + /** * Constructs a new ID with a specific bytes value. */ public AbstractID(byte[] bytes) { - if (bytes.length != SIZE) { + if (bytes == null || bytes.length != SIZE) { throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes"); } @@ -80,6 +83,9 @@ public class AbstractID implements IOReadableWritable, Comparable, j * @param id the abstract ID to copy */ public AbstractID(AbstractID id) { + if (id == null) { + throw new IllegalArgumentException("Id must not be null."); + } this.lowerPart = id.lowerPart; this.upperPart = id.upperPart; } @@ -91,7 +97,19 @@ public class AbstractID implements IOReadableWritable, Comparable, j this.lowerPart = generateRandomLong(); this.upperPart = generateRandomLong(); } + + // -------------------------------------------------------------------------------------------- + + public long getLowerPart() { + return lowerPart; + } + + public long getUpperPart() { + return upperPart; + } + // -------------------------------------------------------------------------------------------- + /** * Generates a uniformly distributed random positive long. * http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java new file mode 100644 index 0000000..c4a4211 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime; + +public class JobException extends Exception { + + private static final long serialVersionUID = 1275864691743020176L; + + public JobException(String msg) { + super(msg); + } + + public JobException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java deleted file mode 100644 index 4960d80..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime; - -/** - * A job submission exception is thrown if an error occurs while submitting - * a job from the client to the job manager. - * - */ -public class JobSubmissionException extends Exception { - - /** - * Generated serial UID - */ - private static final long serialVersionUID = 1275864691743020176L; - - /** - * Constructs a new job submission exception with the given error message. - * - * @param msg - * the error message to be transported through this exception - */ - public JobSubmissionException(String msg) { - super(msg); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java index 2ca8b68..416453f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java @@ -37,18 +37,13 @@ public abstract class AbstractJobResult implements IOReadableWritable { /** * The possible return codes for a job operation. - * */ public enum ReturnCode { - /** - * The success return code. - */ + /** The success return code. */ SUCCESS, - /** - * The error return code. - */ + /** The error return code. */ ERROR }; @@ -60,7 +55,7 @@ public abstract class AbstractJobResult implements IOReadableWritable { /** * An optional description which can provide further information in case of an error. */ - private String description = null; + private String description = ""; /** * Constructs a new abstract job result object and sets the description. @@ -70,7 +65,7 @@ public abstract class AbstractJobResult implements IOReadableWritable { * @param description * the optional error description */ - public AbstractJobResult(final ReturnCode returnCode, final String description) { + public AbstractJobResult(ReturnCode returnCode, String description) { this.returnCode = returnCode; this.description = description; } @@ -79,8 +74,7 @@ public abstract class AbstractJobResult implements IOReadableWritable { * Construct a new abstract job result object. This constructor is required * for the deserialization process. */ - public AbstractJobResult() { - } + public AbstractJobResult() {} @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index 79da72f..028713d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.client; import java.io.IOException; @@ -45,29 +44,19 @@ import org.apache.flink.util.StringUtils; /** * The job client is able to submit, control, and abort jobs. - *

- * This class is thread-safe. */ public class JobClient { - /** - * The logging object used for debugging. - */ + /** The logging object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(JobClient.class); - /** - * The job management server stub. - */ + /** The job management server stub.*/ private final JobManagementProtocol jobSubmitClient; - /** - * The accumulator protocol stub to request accumulators from JobManager - */ + /** The accumulator protocol stub to request accumulators from JobManager */ private AccumulatorProtocol accumulatorProtocolProxy; - /** - * The job graph assigned with this job client. - */ + /** The job graph assigned with this job client. */ private final JobGraph jobGraph; /** @@ -121,7 +110,7 @@ public class JobClient { /** * Constructs a new job client object and instantiates a local - * RPC proxy for the {@link JobSubmissionProtocol}. + * RPC proxy for the JobSubmissionProtocol * * @param jobGraph * the job graph to run @@ -134,7 +123,7 @@ public class JobClient { /** * Constructs a new job client object and instantiates a local - * RPC proxy for the {@link JobSubmissionProtocol}. + * RPC proxy for the JobSubmissionProtocol * * @param jobGraph * the job graph to run @@ -160,7 +149,7 @@ public class JobClient { /** * Constructs a new job client object and instantiates a local - * RPC proxy for the {@link JobSubmissionProtocol}. + * RPC proxy for the JobSubmissionProtocol * * @param jobGraph * the job graph to run @@ -335,7 +324,7 @@ public class JobClient { if (event instanceof JobEvent) { final JobEvent jobEvent = (JobEvent) event; final JobStatus jobStatus = jobEvent.getCurrentJobStatus(); - if (jobStatus == JobStatus.SCHEDULED) { + if (jobStatus == JobStatus.RUNNING) { startTimestamp = jobEvent.getTimestamp(); } if (jobStatus == JobStatus.FINISHED) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java index c755dc6..53319d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java @@ -16,14 +16,8 @@ * limitations under the License. */ - package org.apache.flink.runtime.client; -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - /** * A JobSubmissionResult is used to report the results * of a job submission. It contains a return code and a description. @@ -53,17 +47,4 @@ public class JobSubmissionResult extends AbstractJobResult { public JobSubmissionResult() { super(); } - - - @Override - public void read(final DataInputView in) throws IOException { - super.read(in); - } - - - @Override - public void write(final DataOutputView out) throws IOException { - super.write(out); - } - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java index f89e999..b4a38f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.deployment; import java.io.IOException; @@ -24,43 +23,30 @@ import java.io.IOException; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.executiongraph.ExecutionEdge2; import org.apache.flink.runtime.io.network.channels.ChannelID; /** * A channel deployment descriptor contains all the information necessary to deploy either an input or an output channel * as part of a task on a task manager. - *

- * This class is not thread-safe in general. - * */ public final class ChannelDeploymentDescriptor implements IOReadableWritable { - /** - * The ID of the output channel. - */ + /** The ID of the output channel. */ private final ChannelID outputChannelID; - /** - * The ID of the input channel. - */ + /** The ID of the input channel. */ private final ChannelID inputChannelID; /** * Constructs a new channel deployment descriptor. * - * @param outputChannelID - * the ID of the output channel - * @param inputChannelID - * the ID of the input channel + * @param outputChannelID The ID of the output channel + * @param inputChannelID The ID of the input channel */ - public ChannelDeploymentDescriptor(final ChannelID outputChannelID, final ChannelID inputChannelID) { - - if (outputChannelID == null) { - throw new IllegalArgumentException("Argument outputChannelID must not be null"); - } - - if (inputChannelID == null) { - throw new IllegalArgumentException("Argument inputChannelID must not be null"); + public ChannelDeploymentDescriptor(ChannelID outputChannelID, ChannelID inputChannelID) { + if (outputChannelID == null || inputChannelID == null) { + throw new IllegalArgumentException("Channel IDs must not be null"); } this.outputChannelID = outputChannelID; @@ -71,23 +57,20 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable { * Default constructor for serialization/deserialization. */ public ChannelDeploymentDescriptor() { - this.outputChannelID = new ChannelID(); this.inputChannelID = new ChannelID(); } @Override - public void write(final DataOutputView out) throws IOException { - + public void write(DataOutputView out) throws IOException { this.outputChannelID.write(out); this.inputChannelID.write(out); } @Override - public void read(final DataInputView in) throws IOException { - + public void read(DataInputView in) throws IOException { this.outputChannelID.read(in); this.inputChannelID.read(in); } @@ -98,7 +81,6 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable { * @return the output channel ID attached to this deployment descriptor */ public ChannelID getOutputChannelID() { - return this.outputChannelID; } @@ -108,7 +90,12 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable { * @return the input channel ID attached to this deployment descriptor */ public ChannelID getInputChannelID() { - return this.inputChannelID; } + + // -------------------------------------------------------------------------------------------- + + public static ChannelDeploymentDescriptor fromExecutionEdge(ExecutionEdge2 edge) { + return new ChannelDeploymentDescriptor(edge.getOutputChannelId(), edge.getInputChannelId()); + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java index dfec497..e4a447f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java @@ -16,74 +16,36 @@ * limitations under the License. */ - package org.apache.flink.runtime.deployment; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.io.network.channels.ChannelType; -import org.apache.flink.runtime.io.network.gates.GateID; -import org.apache.flink.runtime.util.EnumUtils; +import org.apache.flink.runtime.executiongraph.ExecutionEdge2; /** - * A gate deployment descriptor contains all the information necessary to deploy either an input or an output gate as - * part of a task on a task manager. - *

- * This class is not thread-safe in general. - * + * A gate deployment descriptor contains the deployment descriptors for the channels associated with that gate. */ public final class GateDeploymentDescriptor implements IOReadableWritable { - /** - * The ID of the gate. - */ - private final GateID gateID; - - /** - * The channel type of the gate. - */ - private ChannelType channelType; - - /** - * The list of channel deployment descriptors attached to this gate. - */ + /** The list of channel deployment descriptors attached to this gate. */ private final List channels; /** * Constructs a new gate deployment descriptor * - * @param gateID - * the ID of the gate - * @param channelType - * the channel type of the gate - * @param compressionLevel - * the compression level of the gate * @param channels * the list of channel deployment descriptors attached to this gate */ - public GateDeploymentDescriptor(final GateID gateID, final ChannelType channelType, - List channels) { - - if (gateID == null) { - throw new IllegalArgumentException("Argument gateID must no be null"); - } - - if (channelType == null) { - throw new IllegalArgumentException("Argument channelType must no be null"); - } - + public GateDeploymentDescriptor(List channels) { if (channels == null) { - throw new IllegalArgumentException("Argument channels must no be null"); + throw new NullPointerException(); } - this.gateID = gateID; - this.channelType = channelType; this.channels = channels; } @@ -91,71 +53,49 @@ public final class GateDeploymentDescriptor implements IOReadableWritable { * Default constructor for serialization/deserialization. */ public GateDeploymentDescriptor() { - - this.gateID = new GateID(); - this.channelType = null; this.channels = new ArrayList(); } - + + public List getChannels() { + return channels; + } + + // -------------------------------------------------------------------------------------------- + @Override public void write(final DataOutputView out) throws IOException { - - this.gateID.write(out); - EnumUtils.writeEnum(out, channelType); out.writeInt(this.channels.size()); - final Iterator it = this.channels.iterator(); - while (it.hasNext()) { - it.next().write(out); + for (ChannelDeploymentDescriptor cdd : this.channels) { + cdd.write(out); } } - @Override public void read(final DataInputView in) throws IOException { - - this.gateID.read(in); - this.channelType = EnumUtils.readEnum(in, ChannelType.class); final int nocdd = in.readInt(); for (int i = 0; i < nocdd; ++i) { - final ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor(); + ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor(); cdd.read(in); this.channels.add(cdd); } } - - /** - * Returns the ID of the gate. - * - * @return the ID of the gate - */ - public GateID getGateID() { - - return this.gateID; - } - - /** - * Returns the channel type of the gate. - * - * @return the channel type of the gate - */ - public ChannelType getChannelType() { - - return this.channelType; - } - - /** - * Returns the number of channel deployment descriptors attached to this gate descriptor. - * - * @return the number of channel deployment descriptors - */ - public int getNumberOfChannelDescriptors() { - - return this.channels.size(); + + // -------------------------------------------------------------------------------------------- + + public static GateDeploymentDescriptor fromEdges(List edges) { + List channels = new ArrayList(edges.size()); + for (ExecutionEdge2 edge : edges) { + channels.add(ChannelDeploymentDescriptor.fromExecutionEdge(edge)); + } + return new GateDeploymentDescriptor(channels); } - - public ChannelDeploymentDescriptor getChannelDescriptor(final int index) { - - return this.channels.get(index); + + public static GateDeploymentDescriptor fromEdges(ExecutionEdge2[] edges) { + List channels = new ArrayList(edges.length); + for (ExecutionEdge2 edge : edges) { + channels.add(ChannelDeploymentDescriptor.fromExecutionEdge(edge)); + } + return new GateDeploymentDescriptor(channels); } }