Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EAB8719E6C for ; Thu, 21 Apr 2016 09:35:52 +0000 (UTC) Received: (qmail 19896 invoked by uid 500); 21 Apr 2016 09:35:52 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 19857 invoked by uid 500); 21 Apr 2016 09:35:52 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 19848 invoked by uid 99); 21 Apr 2016 09:35:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Apr 2016 09:35:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AE58CDFF48; Thu, 21 Apr 2016 09:35:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gyfora@apache.org To: commits@flink.apache.org Message-Id: <5b2371ba62ae464385908375331213b8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-3790] [streaming] Use proper hadoop config in rolling sink Date: Thu, 21 Apr 2016 09:35:52 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master d636bf78e -> d45cb69af [FLINK-3790] [streaming] Use proper hadoop config in rolling sink Closes #1919 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d45cb69a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d45cb69a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d45cb69a Branch: refs/heads/master Commit: d45cb69afd38e5dabf64f90e35a21e56a25ddd6f Parents: d636bf7 Author: Gyula Fora Authored: Wed Apr 20 22:22:55 2016 +0200 Committer: Gyula Fora Committed: Thu Apr 21 11:34:30 2016 +0200 ---------------------------------------------------------------------- .../streaming/connectors/fs/RollingSink.java | 18 ++++++++++-------- .../connectors/fs/SequenceFileWriter.java | 8 +++++--- 2 files changed, 15 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d45cb69a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java index 76324d7..799e908 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -279,7 +280,9 @@ public class RollingSink extends RichSinkFunction implements InputTypeConf * current part file path, the valid length of the in-progress files and pending part files. */ private transient BucketState bucketState; - + + private transient org.apache.hadoop.conf.Configuration hadoopConf; + /** * Creates a new {@code RollingSink} that writes files to the given base directory. * @@ -317,7 +320,8 @@ public class RollingSink extends RichSinkFunction implements InputTypeConf bucketState = new BucketState(); } - FileSystem fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration()); + hadoopConf = HadoopFileSystem.getHadoopConfiguration(); + FileSystem fs = new Path(basePath).getFileSystem(hadoopConf); refTruncate = reflectTruncate(fs); // delete pending/in-progress files that might be left if we fail while @@ -412,9 +416,7 @@ public class RollingSink extends RichSinkFunction implements InputTypeConf private void openNewPartFile() throws Exception { closeCurrentPartFile(); - org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - - FileSystem fs = new Path(basePath).getFileSystem(conf); + FileSystem fs = new Path(basePath).getFileSystem(hadoopConf); Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath)); @@ -466,7 +468,7 @@ public class RollingSink extends RichSinkFunction implements InputTypeConf if (currentPartPath != null) { Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix); Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix); - FileSystem fs = inProgressPath.getFileSystem(new org.apache.hadoop.conf.Configuration()); + FileSystem fs = inProgressPath.getFileSystem(hadoopConf); fs.rename(inProgressPath, pendingPath); LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, @@ -541,7 +543,7 @@ public class RollingSink extends RichSinkFunction implements InputTypeConf Path pendingPath = new Path(finalPath.getParent(), pendingPrefix + finalPath.getName()).suffix(pendingSuffix); - FileSystem fs = pendingPath.getFileSystem(new org.apache.hadoop.conf.Configuration()); + FileSystem fs = pendingPath.getFileSystem(hadoopConf); fs.rename(pendingPath, finalPath); LOG.debug( "Moving pending file {} to final location after complete checkpoint {}.", @@ -579,7 +581,7 @@ public class RollingSink extends RichSinkFunction implements InputTypeConf bucketState.pendingFiles.clear(); FileSystem fs = null; try { - fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration()); + fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration()); } catch (IOException e) { LOG.error("Error while creating FileSystem in checkpoint restore.", e); throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e); http://git-wip-us.apache.org/repos/asf/flink/blob/d45cb69a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java index 32b8d49..c71e97f 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -86,9 +87,11 @@ public class SequenceFileWriter extends } CompressionCodec codec = null; + + Configuration conf = HadoopFileSystem.getHadoopConfiguration(); if (!compressionCodecName.equals("None")) { - CompressionCodecFactory codecFactory = new CompressionCodecFactory(new Configuration()); + CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); codec = codecFactory.getCodecByName(compressionCodecName); if (codec == null) { throw new RuntimeException("Codec " + compressionCodecName + " not found."); @@ -96,7 +99,7 @@ public class SequenceFileWriter extends } // the non-deprecated constructor syntax is only available in recent hadoop versions... - writer = SequenceFile.createWriter(new Configuration(), + writer = SequenceFile.createWriter(conf, getStream(), keyClass, valueClass, @@ -119,7 +122,6 @@ public class SequenceFileWriter extends } @Override - @SuppressWarnings("unchecked") public void setInputType(TypeInformation type, ExecutionConfig executionConfig) { if (!type.isTupleType()) { throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");