flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject incubator-flink git commit: Minor code cleanups - left from previous patches
Date Sat, 22 Nov 2014 20:32:46 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master cf54a1c2a -> 42fe87494


Minor code cleanups - left from previous patches

This closes #225


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/42fe8749
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/42fe8749
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/42fe8749

Branch: refs/heads/master
Commit: 42fe87494ace54bdb45dbc5ed7d19a28c3060b47
Parents: cf54a1c
Author: Suneel Marthi <suneel.marthi@gmail.com>
Authored: Fri Nov 21 13:13:04 2014 -0500
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sat Nov 22 20:57:50 2014 +0100

----------------------------------------------------------------------
 .../flink/api/avro/DataOutputEncoder.java       |  2 +-
 .../flink/api/common/io/BinaryInputFormat.java  | 17 +++---
 .../runtime/fs/hdfs/DistributedFileSystem.java  | 55 ++++++++++----------
 3 files changed, 35 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42fe8749/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
index 6b5e6b4..0102cc1 100644
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
@@ -169,7 +169,7 @@ public final class DataOutputEncoder extends Encoder implements java.io.Serializ
 	// --------------------------------------------------------------------------------------------
 		
 	
-	public static final void writeVarLongCount(DataOutput out, long val) throws IOException
{
+	public static void writeVarLongCount(DataOutput out, long val) throws IOException {
 		if (val < 0) {
 			throw new IOException("Illegal count (must be non-negative): " + val);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42fe8749/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index 11a7f28..ad4f52a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -65,8 +65,6 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
{
 
 	private DataInputStream dataInputStream;
 
-	private BlockBasedInput blockBasedInput;
-
 	private BlockInfo blockInfo;
 
 	private long readRecords;
@@ -95,9 +93,8 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
{
 
 		final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(minNumSplits);
 		for (FileStatus file : files) {
-			long splitSize = blockSize;
-			for (long pos = 0, length = file.getLen(); pos < length; pos += splitSize) {
-				long remainingLength = Math.min(pos + splitSize, length) - pos;
+			for (long pos = 0, length = file.getLen(); pos < length; pos += blockSize) {
+				long remainingLength = Math.min(pos + blockSize, length) - pos;
 
 				// get the block locations and make sure they are in order with respect to their offset
 				final BlockLocation[] blocks = fs.getFileBlockLocations(file, pos, remainingLength);
@@ -132,9 +129,9 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
{
 		if (pathFile.isDir()) {
 			// input is directory. list all contained files
 			final FileStatus[] partials = fs.listStatus(this.filePath);
-			for (int i = 0; i < partials.length; i++) {
-				if (!partials[i].isDir()) {
-					files.add(partials[i]);
+			for (FileStatus partial : partials) {
+				if (!partial.isDir()) {
+					files.add(partial);
 				}
 			}
 		} else {
@@ -258,8 +255,8 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
{
 		}
 
 		this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
-		this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockSize);
-		this.dataInputStream = new DataInputStream(this.blockBasedInput);
+		BlockBasedInput blockBasedInput = new BlockBasedInput(this.stream, (int) blockSize);
+		this.dataInputStream = new DataInputStream(blockBasedInput);
 		this.readRecords = 0;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/42fe8749/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
index d5f370f..0836651 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
@@ -197,25 +197,23 @@ public final class DistributedFileSystem extends FileSystem {
 			possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME")+"/conf";
 			possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2
 		}
-		
-		for (int i = 0; i < possibleHadoopConfPaths.length; i++) {
-			if (possibleHadoopConfPaths[i] == null) {
-				continue;
-			}
-			
-			if (new File(possibleHadoopConfPaths[i]).exists()) {
-				if (new File(possibleHadoopConfPaths[i]+"/core-site.xml").exists()) {
-					retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPaths[i]+"/core-site.xml"));
-					
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Adding "+possibleHadoopConfPaths[i]+"/core-site.xml to hadoop configuration");
+
+		for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+			if (possibleHadoopConfPath != null) {
+				if (new File(possibleHadoopConfPath).exists()) {
+					if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
+						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
+
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
+						}
 					}
-				}
-				if (new File(possibleHadoopConfPaths[i]+"/hdfs-site.xml").exists()) {
-					retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPaths[i]+"/hdfs-site.xml"));
-					
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Adding "+possibleHadoopConfPaths[i]+"/hdfs-site.xml to hadoop configuration");
+					if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
+						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
+
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
+						}
 					}
 				}
 			}
@@ -287,14 +285,15 @@ public final class DistributedFileSystem extends FileSystem {
 						try {
 							this.fs.initialize(initURI, this.conf);
 						}
-						catch (Exception e) {
-							throw new IOException(getMissingAuthorityErrorPrefix(path) + "Could not initialize
the file system connection with the given address of the HDFS Namenode"
-								+ e.getMessage() != null ? ": " + e.getMessage() : ".", e);
+						catch (IOException e) {
+							throw new IOException(getMissingAuthorityErrorPrefix(path) +
+									"Could not initialize the file system connection with the given address of the HDFS
NameNode: " + e.getMessage(), e);
 						}
 					}
 				}
 				catch (IllegalArgumentException e) {
-					throw new IOException(getMissingAuthorityErrorPrefix(path) + "The configuration contains
an invalid hdfs default name (fs.default.name or fs.defaultFS): " + configEntry);
+					throw new IOException(getMissingAuthorityErrorPrefix(path) +
+							"The configuration contains an invalid hdfs default name (fs.default.name or fs.defaultFS):
" + configEntry);
 				}
 			} 
 		}
@@ -304,7 +303,7 @@ public final class DistributedFileSystem extends FileSystem {
 				this.fs.initialize(path, this.conf);
 			}
 			catch (UnknownHostException e) {
-				String message = "The HDFS namenode host at '" + path.getAuthority()
+				String message = "The HDFS NameNode host at '" + path.getAuthority()
 						+ "', specified by file path '" + path.toString() + "', cannot be resolved"
 						+ (e.getMessage() != null ? ": " + e.getMessage() : ".");
 				
@@ -315,15 +314,15 @@ public final class DistributedFileSystem extends FileSystem {
 				throw new IOException(message, e);
 			}
 			catch (Exception e) {
-				throw new IOException("The given file URI (" + path.toString() + ") points to the HDFS
Namenode at "
+				throw new IOException("The given file URI (" + path.toString() + ") points to the HDFS
NameNode at "
 						+ path.getAuthority() + ", but the File System could not be initialized with that address"
 					+ (e.getMessage() != null ? ": " + e.getMessage() : "."), e);
 			}
 		}
 	}
 	
-	private static final String getMissingAuthorityErrorPrefix(URI path) {
-		return "The given HDFS file URI (" + path.toString() + ") did not describe the HDFS Namenode."
+
+	private static String getMissingAuthorityErrorPrefix(URI path) {
+		return "The given HDFS file URI (" + path.toString() + ") did not describe the HDFS NameNode."
+
 				" The attempt to use a default HDFS configuration, as specified in the '" + ConfigConstants.HDFS_DEFAULT_CONFIG
+ "' or '" + 
 				ConfigConstants.HDFS_SITE_CONFIG + "' config parameter failed due to the following problem:
";
 	}
@@ -385,9 +384,9 @@ public final class DistributedFileSystem extends FileSystem {
 
 	@Override
 	public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException
{
-		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs
+		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
 			.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite);
-		return new DistributedDataOutputStream(fdos);
+		return new DistributedDataOutputStream(fsDataOutputStream);
 	}
 
 	@Override


Mime
View raw message