Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CB5D5177D7 for ; Tue, 7 Apr 2015 21:47:13 +0000 (UTC) Received: (qmail 23233 invoked by uid 500); 7 Apr 2015 21:47:13 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 23202 invoked by uid 500); 7 Apr 2015 21:47:13 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 23189 invoked by uid 99); 7 Apr 2015 21:47:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 21:47:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 87E76E17A7; Tue, 7 Apr 2015 21:47:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hshreedharan@apache.org To: commits@flume.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flume git commit: FLUME-2613. Add support in FileChannelIntegrityTool to remove invalid events from the channel. Date: Tue, 7 Apr 2015 21:47:13 +0000 (UTC) Repository: flume Updated Branches: refs/heads/flume-1.6 cc13102a3 -> 6019fcf4c FLUME-2613. Add support in FileChannelIntegrityTool to remove invalid events from the channel. (Ashish Paliwal via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6019fcf4 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6019fcf4 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6019fcf4 Branch: refs/heads/flume-1.6 Commit: 6019fcf4c9f773b521813145e026c10c96584527 Parents: cc13102 Author: Hari Shreedharan Authored: Tue Apr 7 14:46:10 2015 -0700 Committer: Hari Shreedharan Committed: Tue Apr 7 14:46:10 2015 -0700 ---------------------------------------------------------------------- .../apache/flume/channel/file/EventUtils.java | 41 ++++++++ .../flume/channel/file/TestEventUtils.java | 44 +++++++++ .../org/apache/flume/tools/EventValidator.java | 49 ++++++++++ .../flume/tools/FileChannelIntegrityTool.java | 99 +++++++++++++++++++- .../tools/TestFileChannelIntegrityTool.java | 75 +++++++++++++++ 5 files changed, 306 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/6019fcf4/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java new file mode 100644 index 0000000..ff5242a --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.channel.file; + +import org.apache.flume.Event; + +/** + * + */ +public class EventUtils { + + /** + * Returns the Event encapsulated by a Put wrapper + * + * @param transactionEventRecord TransactionEvent + * @return Event if Put instance is present, null otherwise + */ + public static Event getEventFromTransactionEvent(TransactionEventRecord transactionEventRecord) { + if(transactionEventRecord instanceof Put) { + return ((Put)transactionEventRecord).getEvent(); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/6019fcf4/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java new file mode 100644 index 0000000..c72e3f2 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.channel.file; + +import junit.framework.Assert; +import org.apache.flume.Event; +import org.junit.Test; + +public class TestEventUtils { + + @Test + public void testPutEvent() { + FlumeEvent event = new FlumeEvent(null, new byte[5]); + Put put = new Put(1l, 1l, event); + Event returnEvent = EventUtils.getEventFromTransactionEvent(put); + Assert.assertNotNull(returnEvent); + Assert.assertEquals(5, returnEvent.getBody().length); + } + + @Test + public void testInvalidEvent() { + Take take = new Take(1l, 1l); + Event returnEvent = EventUtils.getEventFromTransactionEvent(take); + Assert.assertNull(returnEvent); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/6019fcf4/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java b/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java new file mode 100644 index 0000000..10e677d --- /dev/null +++ b/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.tools; + +import org.apache.flume.Event; +import org.apache.flume.conf.Configurable; + +/** + * Event Validator interface to be used for validating Events + * per custom logic + */ +public interface EventValidator { + + /** + * Validate the Event in a application specific manner + * + * @param event Flume Event + * @return true if Event is valid as per App Logic + */ + boolean validateEvent(Event event); + + EventValidator NOOP_VALIDATOR = new EventValidator() { + @Override + public boolean validateEvent(Event event) { + return true; + } + }; + + interface Builder extends Configurable { + EventValidator build(); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/6019fcf4/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java index 1030442..7abb7eb 100644 --- a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java +++ b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java @@ -22,15 +22,21 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.flume.Context; +import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.channel.file.CorruptEventException; +import org.apache.flume.channel.file.EventUtils; import org.apache.flume.channel.file.Log; import org.apache.flume.channel.file.LogFile; import org.apache.flume.channel.file.LogFileV3; import org.apache.flume.channel.file.LogRecord; import org.apache.flume.channel.file.Serialization; +import org.apache.flume.channel.file.TransactionEventRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +45,8 @@ import java.io.FilenameFilter; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; +import java.util.Set; public class FileChannelIntegrityTool implements FlumeTool { public static final Logger LOG = LoggerFactory.getLogger @@ -46,6 +54,15 @@ public class FileChannelIntegrityTool implements FlumeTool { private final List dataDirs = new ArrayList(); + private EventValidator eventValidator = EventValidator.NOOP_VALIDATOR; + + private long totalPutEvents; + private long invalidEvents; + private long eventsWithException; + private long corruptEvents; + private long validEvents; + private long totalChannelEvents; + @Override public void run(String[] args) throws IOException, ParseException { boolean shouldContinue = parseCommandLineOpts(args); @@ -85,12 +102,39 @@ public class FileChannelIntegrityTool implements FlumeTool { // this will throw a CorruptEventException - so the real logic // is in the catch block below. LogRecord record = reader.next(); + totalChannelEvents++; if (record != null) { - record.getEvent(); + TransactionEventRecord recordEvent = record.getEvent(); + Event event = EventUtils.getEventFromTransactionEvent(recordEvent); + if(event != null) { + totalPutEvents++; + try { + if (!eventValidator.validateEvent(event)) { + if (!fileBackedup) { + Serialization.copyFile(dataFile, new File(dataFile.getParent(), + dataFile.getName() + ".bak")); + fileBackedup = true; + } + invalidEvents++; + updater.markRecordAsNoop(eventPosition); + } else { + validEvents++; + } + } catch (Exception e) { + // OOPS, didn't expected an exception + // considering as failure case + // marking as noop + System.err.println("Encountered Exception while validating event, marking as invalid"); + updater.markRecordAsNoop(eventPosition); + eventsWithException++; + } + } } else { fileDone = true; } } catch (CorruptEventException e) { + corruptEvents++; + totalChannelEvents++; LOG.warn("Corruption found in " + dataFile.toString() + " at " + eventPosition); if (!fileBackedup) { @@ -106,6 +150,7 @@ public class FileChannelIntegrityTool implements FlumeTool { } } } + printSummary(); } private boolean parseCommandLineOpts(String[] args) throws ParseException { @@ -113,7 +158,17 @@ public class FileChannelIntegrityTool implements FlumeTool { options .addOption("l", "dataDirs", true, "Comma-separated list of data " + "directories which the tool must verify. This option is mandatory") - .addOption("h", "help", false, "Display help"); + .addOption("h", "help", false, "Display help") + .addOption("e", "eventValidator", true, "Fully Qualified Name of Event Validator Implementation");; + + + Option property = OptionBuilder.withArgName("property=value") + .hasArgs(2) + .withValueSeparator() + .withDescription( "custom properties" ) + .create( "D" ); + + options.addOption(property); CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(options, args); @@ -137,6 +192,46 @@ public class FileChannelIntegrityTool implements FlumeTool { dataDirs.add(f); } } + + if(commandLine.hasOption("eventValidator")) { + try { + Class eventValidatorClassName = + (Class)Class.forName( + commandLine.getOptionValue("eventValidator")); + EventValidator.Builder eventValidatorBuilder = eventValidatorClassName.newInstance(); + + // Pass on the configuration parameter + Properties systemProperties = commandLine.getOptionProperties("D"); + Context context = new Context(); + + Set keys = systemProperties.stringPropertyNames(); + for (String key : keys) { + context.put(key, systemProperties.getProperty(key)); + } + eventValidatorBuilder.configure(context); + eventValidator = eventValidatorBuilder.build(); + } catch (Exception e) { + System.err.println(String.format("Could find class %s in lib folder", + commandLine.getOptionValue("eventValidator"))); + e.printStackTrace(); + return false; + } + } return true; } + + /** + * Prints the summary of run. Following information is printed + * + */ + private void printSummary() { + System.out.println("---------- Summary --------------------"); + System.out.println("Number of Events in the Channel = "+totalChannelEvents++); + System.out.println("Number of Put Events Processed = "+totalPutEvents); + System.out.println("Number of Valid Put Events = "+validEvents); + System.out.println("Number of Invalid Put Events = "+invalidEvents); + System.out.println("Number of Put Events that threw Exception during validation = "+eventsWithException); + System.out.println("Number of Corrupt Events = "+corruptEvents); + System.out.println("---------------------------------------"); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/6019fcf4/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java index f24ae56..ac4dac4 100644 --- a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java +++ b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java @@ -58,6 +58,7 @@ public class TestFileChannelIntegrityTool { private File checkpointDir; private File dataDir; + private static int invalidEvent = 0; @BeforeClass public static void setUpClass() throws Exception{ @@ -97,6 +98,45 @@ public class TestFileChannelIntegrityTool { } @Test + public void testFixInvalidRecords() throws Exception { + doTestFixInvalidEvents(false, DummyEventVerifier.Builder.class.getName()); + } + @Test + public void testFixInvalidRecordsWithCheckpoint() throws Exception { + doTestFixInvalidEvents(true, DummyEventVerifier.Builder.class.getName()); + } + + public void doTestFixInvalidEvents(boolean withCheckpoint, String eventHandler) throws Exception { + FileChannelIntegrityTool tool = new FileChannelIntegrityTool(); + tool.run(new String[] {"-l", dataDir.toString(), "-e", eventHandler, "-DvalidatorValue=0"}); + FileChannel channel = new FileChannel(); + channel.setName("channel"); + String cp; + if(withCheckpoint) { + cp = origCheckpointDir.toString(); + } else { + FileUtils.deleteDirectory(checkpointDir); + Assert.assertTrue(checkpointDir.mkdirs()); + cp = checkpointDir.toString(); + } + ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp); + ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString()); + channel.configure(ctx); + channel.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + int i = 0; + while(channel.take() != null) { + i++; + } + tx.commit(); + tx.close(); + channel.stop(); + Assert.assertTrue(invalidEvent != 0); + Assert.assertEquals(25 - invalidEvent, i); + } + + @Test public void testFixCorruptRecords() throws Exception { doTestFixCorruptEvents(false); } @@ -226,6 +266,12 @@ public class TestFileChannelIntegrityTool { Transaction tx = channel.getTransaction(); tx.begin(); for (int i = 0; i < 5; i++) { + if(i % 3 == 0) { + event.getBody()[0] = 0; + invalidEvent++; + } else { + event.getBody()[0] = 1; + } channel.put(event); } tx.commit(); @@ -244,4 +290,33 @@ public class TestFileChannelIntegrityTool { .invoke(true)); channel.stop(); } + + public static class DummyEventVerifier implements EventValidator { + + private int value = 0; + + private DummyEventVerifier(int val) { + value = val; + } + + @Override + public boolean validateEvent(Event event) { + return event.getBody()[0] != value; + } + + public static class Builder implements EventValidator.Builder { + + private int binaryValidator = 0; + + @Override + public EventValidator build() { + return new DummyEventVerifier(binaryValidator); + } + + @Override + public void configure(Context context) { + binaryValidator = context.getInteger("validatorValue"); + } + } + } }