Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E532BE397 for ; Wed, 16 Jan 2013 01:42:56 +0000 (UTC) Received: (qmail 7866 invoked by uid 500); 16 Jan 2013 01:42:56 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 7840 invoked by uid 500); 16 Jan 2013 01:42:56 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 7831 invoked by uid 99); 16 Jan 2013 01:42:56 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Jan 2013 01:42:56 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 680051F28E; Wed, 16 Jan 2013 01:42:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hshreedharan@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-1844. HDFSEventSink should have option to use RawLocalFileSystem. Message-Id: <20130116014256.680051F28E@tyr.zones.apache.org> Date: Wed, 16 Jan 2013 01:42:56 +0000 (UTC) Updated Branches: refs/heads/trunk 750809c70 -> 118752374 FLUME-1844. HDFSEventSink should have option to use RawLocalFileSystem. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/11875237 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/11875237 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/11875237 Branch: refs/heads/trunk Commit: 11875237420eef3e47ad84e7998e2caff6fc2de6 Parents: 750809c Author: Hari Shreedharan Authored: Tue Jan 15 17:31:51 2013 -0800 Committer: Hari Shreedharan Committed: Tue Jan 15 17:42:41 2013 -0800 ---------------------------------------------------------------------- .../org/apache/flume/sink/hdfs/BucketWriter.java | 3 +- .../flume/sink/hdfs/HDFSCompressedDataStream.java | 15 ++ .../org/apache/flume/sink/hdfs/HDFSDataStream.java | 21 +++- .../apache/flume/sink/hdfs/HDFSSequenceFile.java | 19 +++- .../apache/flume/sink/hdfs/TestHDFSEventSink.java | 17 ++- .../flume/sink/hdfs/TestUseRawLocalFileSystem.java | 105 +++++++++++++++ 6 files changed, 174 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index f14f7cb..3f31ef2 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -34,6 +34,7 @@ import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; @@ -128,7 +129,7 @@ class BucketWriter { fileExtensionCounter = new AtomicLong(clock.currentTimeMillis()); isOpen = false; - writer.configure(context); + this.writer.configure(context); } /** http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java index 18fe6d4..95eb252 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java @@ -26,6 +26,7 @@ import org.apache.flume.serialization.EventSerializerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; @@ -46,12 +47,17 @@ public class HDFSCompressedDataStream implements HDFSWriter { private String serializerType; private Context serializerContext; private EventSerializer serializer; + private boolean useRawLocalFileSystem; @Override public void configure(Context context) { serializerType = context.getString("serializer", "TEXT"); + useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem", + false); serializerContext = new Context( context.getSubProperties(EventSerializer.CTX_PREFIX)); + logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = " + + useRawLocalFileSystem); } @Override @@ -67,6 +73,14 @@ public class HDFSCompressedDataStream implements HDFSWriter { Configuration conf = new Configuration(); Path dstPath = new Path(filePath); FileSystem hdfs = dstPath.getFileSystem(conf); + if(useRawLocalFileSystem) { + if(hdfs instanceof LocalFileSystem) { + hdfs = ((LocalFileSystem)hdfs).getRaw(); + } else { + logger.warn("useRawLocalFileSystem is set to true but file system " + + "is not of type LocalFileSystem: " + hdfs.getClass().getName()); + } + } boolean appending = false; if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile @@ -76,6 +90,7 @@ public class HDFSCompressedDataStream implements HDFSWriter { } else { fsOut = hdfs.create(dstPath); } + System.out.println("fsOut " + fsOut); cmpOut = codec.createOutputStream(fsOut); serializer = EventSerializerFactory.getInstance(serializerType, serializerContext, cmpOut); http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java index bd40a88..04120ec 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java @@ -26,22 +26,33 @@ import org.apache.flume.serialization.EventSerializerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HDFSDataStream implements HDFSWriter { - private FSDataOutputStream outStream; + private static final Logger logger = + LoggerFactory.getLogger(HDFSSequenceFile.class); + + private FSDataOutputStream outStream; private String serializerType; private Context serializerContext; private EventSerializer serializer; + private boolean useRawLocalFileSystem; @Override public void configure(Context context) { serializerType = context.getString("serializer", "TEXT"); + useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem", + false); serializerContext = new Context(context.getSubProperties(EventSerializer.CTX_PREFIX)); + logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = " + + useRawLocalFileSystem); } @Override @@ -49,6 +60,14 @@ public class HDFSDataStream implements HDFSWriter { Configuration conf = new Configuration(); Path dstPath = new Path(filePath); FileSystem hdfs = dstPath.getFileSystem(conf); + if(useRawLocalFileSystem) { + if(hdfs instanceof LocalFileSystem) { + hdfs = ((LocalFileSystem)hdfs).getRaw(); + } else { + logger.warn("useRawLocalFileSystem is set to true but file system " + + "is not of type LocalFileSystem: " + hdfs.getClass().getName()); + } + } boolean appending = false; if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java index 1e6d68f..e127f6a 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java @@ -24,17 +24,23 @@ import org.apache.flume.Event; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HDFSSequenceFile implements HDFSWriter { + private static final Logger logger = + LoggerFactory.getLogger(HDFSSequenceFile.class); private SequenceFile.Writer writer; private String writeFormat; private Context serializerContext; private SeqFileFormatter formatter; + private boolean useRawLocalFileSystem; public HDFSSequenceFile() { writer = null; @@ -44,10 +50,14 @@ public class HDFSSequenceFile implements HDFSWriter { public void configure(Context context) { // use binary writable format by default writeFormat = context.getString("hdfs.writeFormat", SeqFileFormatterType.Writable.name()); + useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem", + false); serializerContext = new Context( context.getSubProperties(SeqFileFormatterFactory.CTX_PREFIX)); formatter = SeqFileFormatterFactory .getFormatter(writeFormat, serializerContext); + logger.info("writeFormat = " + writeFormat + ", UseRawLocalFileSystem = " + + useRawLocalFileSystem); } @Override @@ -61,7 +71,14 @@ public class HDFSSequenceFile implements HDFSWriter { Configuration conf = new Configuration(); Path dstPath = new Path(filePath); FileSystem hdfs = dstPath.getFileSystem(conf); - + if(useRawLocalFileSystem) { + if(hdfs instanceof LocalFileSystem) { + hdfs = ((LocalFileSystem)hdfs).getRaw(); + } else { + logger.warn("useRawLocalFileSystem is set to true but file system " + + "is not of type LocalFileSystem: " + hdfs.getClass().getName()); + } + } if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile (dstPath)) { FSDataOutputStream outStream = hdfs.append(dstPath); http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index 1035ac3..cdddd50 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -112,8 +112,17 @@ public class TestHDFSEventSink { } @Test - public void testTextBatchAppend() throws InterruptedException, LifecycleException, - EventDeliveryException, IOException { + public void testTextBatchAppend() throws Exception { + doTestTextBatchAppend(false); + } + + @Test + public void testTextBatchAppendRawFS() throws Exception { + doTestTextBatchAppend(true); + } + + public void doTestTextBatchAppend(boolean useRawLocalFileSystem) + throws Exception { LOG.debug("Starting..."); final long rollCount = 10; @@ -140,6 +149,8 @@ public class TestHDFSEventSink { context.put("hdfs.rollSize", "0"); context.put("hdfs.batchSize", String.valueOf(batchSize)); context.put("hdfs.writeFormat", "Text"); + context.put("hdfs.useRawLocalFileSystem", + Boolean.toString(useRawLocalFileSystem)); context.put("hdfs.fileType", "DataStream"); Configurables.configure(sink, context); @@ -154,7 +165,7 @@ public class TestHDFSEventSink { List bodies = Lists.newArrayList(); // push the event batches into channel to roll twice - for (i = 1; i <= rollCount*2/batchSize; i++) { + for (i = 1; i <= (rollCount*10)/batchSize; i++) { Transaction txn = channel.getTransaction(); txn.begin(); for (j = 1; j <= batchSize; j++) { http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java new file mode 100644 index 0000000..ffbdde0 --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.hdfs; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.apache.flume.Clock; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.SinkCounter; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +public class TestUseRawLocalFileSystem { + + private static Logger logger = + LoggerFactory.getLogger(TestUseRawLocalFileSystem.class); + private Context context; + + private File baseDir; + private File testFile; + private Event event; + + @Before + public void setup() throws Exception { + baseDir = Files.createTempDir(); + testFile = new File(baseDir.getAbsoluteFile(), "test"); + context = new Context(); + event = EventBuilder.withBody("test", Charsets.UTF_8); + } + + @After + public void teardown() throws Exception { + FileUtils.deleteQuietly(baseDir); + } + + @Test + public void testTestFile() throws Exception { + String file = testFile.getCanonicalPath(); + HDFSDataStream stream = new HDFSDataStream(); + context.put("hdfs.useRawLocalFileSystem", "true"); + stream.configure(context); + stream.open(file); + stream.append(event); + stream.sync(); + Assert.assertTrue(testFile.length() > 0); + } + @Test + public void testCompressedFile() throws Exception { + String file = testFile.getCanonicalPath(); + HDFSCompressedDataStream stream = new HDFSCompressedDataStream(); + context.put("hdfs.useRawLocalFileSystem", "true"); + stream.configure(context); + stream.open(file, new GzipCodec(), CompressionType.RECORD); + stream.append(event); + stream.sync(); + Assert.assertTrue(testFile.length() > 0); + } + @Test + public void testSequenceFile() throws Exception { + String file = testFile.getCanonicalPath(); + HDFSSequenceFile stream = new HDFSSequenceFile(); + context.put("hdfs.useRawLocalFileSystem", "true"); + stream.configure(context); + stream.open(file); + stream.append(event); + stream.sync(); + Assert.assertTrue(testFile.length() > 0); + } +} \ No newline at end of file