flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2048. Add an Avro container file deserializer for Spool Directory Source.
Date Wed, 19 Jun 2013 20:35:42 GMT
Updated Branches:
  refs/heads/flume-1.4 0fefd95b8 -> 3ff4b829e


FLUME-2048. Add an Avro container file deserializer for Spool Directory Source.

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3ff4b829
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3ff4b829
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3ff4b829

Branch: refs/heads/flume-1.4
Commit: 3ff4b829e4b99068fb0cdbc80d046e42031a623d
Parents: 0fefd95
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Wed Jun 19 13:33:44 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Wed Jun 19 13:34:54 2013 -0700

----------------------------------------------------------------------
 flume-ng-core/pom.xml                           |   5 +
 .../serialization/AvroEventDeserializer.java    | 220 +++++++++++++++++++
 .../serialization/EventDeserializerType.java    |   1 +
 .../flume/serialization/LengthMeasurable.java   |  27 +++
 .../flume/serialization/RemoteMarkable.java     |  38 ++++
 .../ResettableFileInputStream.java              |  27 ++-
 .../TestAvroEventDeserializer.java              | 177 +++++++++++++++
 .../serialization/TransientPositionTracker.java |  50 +++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  27 +++
 pom.xml                                         |   6 +
 10 files changed, 577 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/3ff4b829/flume-ng-core/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index e37de5b..887a65d 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -279,6 +279,11 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flume/blob/3ff4b829/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java
b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java
new file mode 100644
index 0000000..e44978f
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.serialization;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+
+/**
+ * A deserializer that parses Avro container files, generating one Flume event
+ * per record in the Avro file, and storing binary avro-encoded records in
+ * the Flume event body.
+ */
+public class AvroEventDeserializer implements EventDeserializer {
+
+  private static final Logger logger = LoggerFactory.getLogger
+      (AvroEventDeserializer.class);
+
+  private final AvroSchemaType schemaType;
+  private final ResettableInputStream ris;
+
+  private Schema schema;
+  private byte[] schemaHash;
+  private String schemaHashString;
+  private DataFileReader<GenericRecord> fileReader;
+  private GenericDatumWriter datumWriter;
+  private GenericRecord record;
+  private ByteArrayOutputStream out;
+  private BinaryEncoder encoder;
+
+  @VisibleForTesting
+  public static enum AvroSchemaType {
+    HASH,
+    LITERAL;
+  }
+
+  public static final String CONFIG_SCHEMA_TYPE_KEY = "schemaType";
+  public static final String AVRO_SCHEMA_HEADER_HASH
+      = "flume.avro.schema.hash";
+  public static final String AVRO_SCHEMA_HEADER_LITERAL
+      = "flume.avro.schema.literal";
+
+  private AvroEventDeserializer(Context context, ResettableInputStream ris) {
+    this.ris = ris;
+
+    schemaType = AvroSchemaType.valueOf(
+        context.getString(CONFIG_SCHEMA_TYPE_KEY,
+            AvroSchemaType.HASH.toString()).toUpperCase());
+    if (schemaType == AvroSchemaType.LITERAL) {
+      logger.warn(CONFIG_SCHEMA_TYPE_KEY + " set to " +
+          AvroSchemaType.LITERAL.toString() + ", so storing full Avro " +
+          "schema in the header of each event, which may be inefficient. " +
+          "Consider using the hash of the schema " +
+          "instead of the literal schema.");
+    }
+  }
+
+  private void initialize() throws IOException, NoSuchAlgorithmException {
+    SeekableResettableInputBridge in = new SeekableResettableInputBridge(ris);
+    long pos = in.tell();
+    in.seek(0L);
+    fileReader = new DataFileReader<GenericRecord>(in,
+        new GenericDatumReader<GenericRecord>());
+    fileReader.sync(pos);
+
+    schema = fileReader.getSchema();
+    datumWriter = new GenericDatumWriter(schema);
+    out = new ByteArrayOutputStream();
+    encoder = EncoderFactory.get().binaryEncoder(out, encoder);
+
+    schemaHash = SchemaNormalization.parsingFingerprint("CRC-64-AVRO", schema);
+    schemaHashString = Hex.encodeHexString(schemaHash);
+  }
+
+  @Override
+  public Event readEvent() throws IOException {
+    if (fileReader.hasNext()) {
+      record = fileReader.next(record);
+      out.reset();
+      datumWriter.write(record, encoder);
+      encoder.flush();
+      // annotate header with 64-bit schema CRC hash in hex
+      Event event = EventBuilder.withBody(out.toByteArray());
+      if (schemaType == AvroSchemaType.HASH) {
+        event.getHeaders().put(AVRO_SCHEMA_HEADER_HASH, schemaHashString);
+      } else {
+        event.getHeaders().put(AVRO_SCHEMA_HEADER_LITERAL, schema.toString());
+      }
+      return event;
+    }
+    return null;
+  }
+
+  @Override
+  public List<Event> readEvents(int numEvents) throws IOException {
+    List<Event> events = Lists.newArrayList();
+    for (int i = 0; i < numEvents && fileReader.hasNext(); i++) {
+      Event event = readEvent();
+      if (event != null) {
+        events.add(event);
+      }
+    }
+    return events;
+  }
+
+  @Override
+  public void mark() throws IOException {
+    long pos = fileReader.previousSync() - DataFileConstants.SYNC_SIZE;
+    if (pos < 0) pos = 0;
+    ((RemoteMarkable) ris).markPosition(pos);
+  }
+
+  @Override
+  public void reset() throws IOException {
+    long pos = ((RemoteMarkable) ris).getMarkPosition();
+    fileReader.sync(pos);
+  }
+
+  @Override
+  public void close() throws IOException {
+    ris.close();
+  }
+
+  public static class Builder implements EventDeserializer.Builder {
+
+    @Override
+    public EventDeserializer build(Context context, ResettableInputStream in) {
+      if (!(in instanceof RemoteMarkable)) {
+        throw new IllegalArgumentException("Cannot use this deserializer " +
+            "without a RemoteMarkable input stream");
+      }
+      AvroEventDeserializer deserializer
+          = new AvroEventDeserializer(context, in);
+      try {
+        deserializer.initialize();
+      } catch (Exception e) {
+        throw new FlumeException("Cannot instantiate deserializer", e);
+      }
+      return deserializer;
+    }
+  }
+
+  private static class SeekableResettableInputBridge implements SeekableInput {
+
+    ResettableInputStream ris;
+    public SeekableResettableInputBridge(ResettableInputStream ris) {
+      this.ris = ris;
+    }
+
+    @Override
+    public void seek(long p) throws IOException {
+      ris.seek(p);
+    }
+
+    @Override
+    public long tell() throws IOException {
+      return ris.tell();
+    }
+
+    @Override
+    public long length() throws IOException {
+      if (ris instanceof LengthMeasurable) {
+        return ((LengthMeasurable) ris).length();
+      } else {
+        // FIXME: Avro doesn't seem to complain about this,
+        // but probably not a great idea...
+        return Long.MAX_VALUE;
+      }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      return ris.read(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+      ris.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3ff4b829/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializerType.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializerType.java
b/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializerType.java
index 02abc80..ce18130 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializerType.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializerType.java
@@ -25,6 +25,7 @@ import org.apache.flume.annotations.InterfaceStability;
 @InterfaceStability.Unstable
 public enum EventDeserializerType {
   LINE(LineDeserializer.Builder.class),
+  AVRO(AvroEventDeserializer.Builder.class),
   OTHER(null);
 
   private final Class<? extends EventDeserializer.Builder> builderClass;

http://git-wip-us.apache.org/repos/asf/flume/blob/3ff4b829/flume-ng-core/src/main/java/org/apache/flume/serialization/LengthMeasurable.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/LengthMeasurable.java
b/flume-ng-core/src/main/java/org/apache/flume/serialization/LengthMeasurable.java
new file mode 100644
index 0000000..bc6647f
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/LengthMeasurable.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.serialization;
+
+import java.io.IOException;
+
+public interface LengthMeasurable {
+
+  /** returns the total length of the stream or file */
+  long length() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3ff4b829/flume-ng-core/src/main/java/org/apache/flume/serialization/RemoteMarkable.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/RemoteMarkable.java
b/flume-ng-core/src/main/java/org/apache/flume/serialization/RemoteMarkable.java
new file mode 100644
index 0000000..774152e
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/RemoteMarkable.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.serialization;
+
+import java.io.IOException;
+
+/** Allows for calling mark() without a seek() */
+public interface RemoteMarkable {
+
+  /**
+   * Indicate that the specified position should be returned to in the case of
+   * {@link Resettable#reset()} being called.
+   * @throws java.io.IOException
+   */
+  void markPosition(long position) throws IOException;
+
+  /**
+   * Return the saved mark position without moving the mark pointer.
+   * @throws IOException
+   */
+  long getMarkPosition() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3ff4b829/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
index 49521ab..09f490f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
@@ -21,6 +21,8 @@ package org.apache.flume.serialization;
 import com.google.common.base.Charsets;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -44,7 +46,10 @@ import java.nio.charset.CoderResult;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class ResettableFileInputStream extends ResettableInputStream {
+public class ResettableFileInputStream extends ResettableInputStream
+    implements RemoteMarkable, LengthMeasurable {
+
+  Logger logger = LoggerFactory.getLogger(ResettableFileInputStream.class);
 
   public static final int DEFAULT_BUF_SIZE = 16384;
 
@@ -126,6 +131,8 @@ public class ResettableFileInputStream extends ResettableInputStream {
 
   @Override
   public synchronized int read(byte[] b, int off, int len) throws IOException {
+    logger.trace("read(buf, {}, {})", off, len);
+
     if (position >= fileSize) {
       return -1;
     }
@@ -194,17 +201,35 @@ public class ResettableFileInputStream extends ResettableInputStream
{
   }
 
   @Override
+  public void markPosition(long position) throws IOException {
+    tracker.storePosition(position);
+  }
+
+  @Override
+  public long getMarkPosition() throws IOException {
+    return tracker.getPosition();
+  }
+
+  @Override
   public void reset() throws IOException {
     seek(tracker.getPosition());
   }
 
   @Override
+  public long length() throws IOException {
+    return file.length();
+  }
+
+  @Override
   public long tell() throws IOException {
+    logger.trace("Tell position: {}", syncPosition);
+
     return syncPosition;
   }
 
   @Override
   public synchronized void seek(long newPos) throws IOException {
+    logger.trace("Seek to position: {}", newPos);
 
     // check to see if we can seek within our existing buffer
     long relativeChange = newPos - position;

http://git-wip-us.apache.org/repos/asf/flume/blob/3ff4b829/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java
new file mode 100644
index 0000000..6f9ddc2
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.serialization;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collections;
+
+public class TestAvroEventDeserializer {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(TestAvroEventDeserializer.class);
+
+  private static final Schema schema;
+  static {
+    schema = Schema.createRecord("MyRecord", "", "org.apache.flume",  false);
+    Schema.Field field = new Schema.Field("foo",
+        Schema.create(Schema.Type.STRING), "", null);
+    schema.setFields(Collections.singletonList(field));
+  }
+
+  @Test
+  public void resetTest() throws IOException {
+    File tempFile = newTestFile(true);
+
+    String target = tempFile.getAbsolutePath();
+    logger.info("Target: {}", target);
+    TransientPositionTracker tracker = new TransientPositionTracker(target);
+
+    AvroEventDeserializer.Builder desBuilder =
+        new AvroEventDeserializer.Builder();
+    EventDeserializer deserializer = desBuilder.build(new Context(),
+        new ResettableFileInputStream(tempFile, tracker));
+
+    BinaryDecoder decoder = null;
+    DatumReader<GenericRecord> reader =
+        new GenericDatumReader<GenericRecord>(schema);
+
+    decoder = DecoderFactory.get().binaryDecoder(
+        deserializer.readEvent().getBody(), decoder);
+    assertEquals("bar", reader.read(null, decoder).get("foo").toString());
+
+    deserializer.reset();
+
+    decoder = DecoderFactory.get().binaryDecoder(
+        deserializer.readEvent().getBody(), decoder);
+    assertEquals("bar", reader.read(null, decoder).get("foo").toString());
+
+    deserializer.mark();
+
+    decoder = DecoderFactory.get().binaryDecoder(
+        deserializer.readEvent().getBody(), decoder);
+    assertEquals("baz", reader.read(null, decoder).get("foo").toString());
+
+    deserializer.reset();
+
+    decoder = DecoderFactory.get().binaryDecoder(
+        deserializer.readEvent().getBody(), decoder);
+    assertEquals("baz", reader.read(null, decoder).get("foo").toString());
+
+    assertNull(deserializer.readEvent());
+  }
+
+  @Test
+  public void testSchemaHash() throws IOException, NoSuchAlgorithmException {
+    File tempFile = newTestFile(true);
+
+    String target = tempFile.getAbsolutePath();
+    logger.info("Target: {}", target);
+    TransientPositionTracker tracker = new TransientPositionTracker(target);
+
+    Context context = new Context();
+    context.put(AvroEventDeserializer.CONFIG_SCHEMA_TYPE_KEY,
+        AvroEventDeserializer.AvroSchemaType.HASH.toString());
+
+    ResettableInputStream in =
+        new ResettableFileInputStream(tempFile, tracker);
+    EventDeserializer des =
+        new AvroEventDeserializer.Builder().build(context, in);
+
+    Event event = des.readEvent();
+    String eventSchemaHash =
+        event.getHeaders().get(AvroEventDeserializer.AVRO_SCHEMA_HEADER_HASH);
+    String expectedSchemaHash = Hex.encodeHexString(
+        SchemaNormalization.parsingFingerprint("CRC-64-AVRO", schema));
+
+    Assert.assertEquals(expectedSchemaHash, eventSchemaHash);
+  }
+
+  @Test
+  public void testSchemaLiteral() throws IOException {
+    File tempFile = newTestFile(true);
+
+    String target = tempFile.getAbsolutePath();
+    logger.info("Target: {}", target);
+    TransientPositionTracker tracker = new TransientPositionTracker(target);
+
+    Context context = new Context();
+    context.put(AvroEventDeserializer.CONFIG_SCHEMA_TYPE_KEY,
+        AvroEventDeserializer.AvroSchemaType.LITERAL.toString());
+
+    ResettableInputStream in =
+        new ResettableFileInputStream(tempFile, tracker);
+    EventDeserializer des =
+        new AvroEventDeserializer.Builder().build(context, in);
+
+    Event event = des.readEvent();
+    String eventSchema =
+        event.getHeaders().get(AvroEventDeserializer.AVRO_SCHEMA_HEADER_LITERAL);
+
+    Assert.assertEquals(schema.toString(), eventSchema);
+  }
+
+  private File newTestFile(boolean deleteOnExit) throws IOException {
+    File tempFile = File.createTempFile("testDirectFile", "tmp");
+    if (deleteOnExit) {
+      tempFile.deleteOnExit();
+    }
+
+    DataFileWriter<GenericRecord> writer =
+        new DataFileWriter<GenericRecord>(
+            new GenericDatumWriter<GenericRecord>(schema));
+    writer.create(schema, tempFile);
+    GenericRecordBuilder recordBuilder;
+    recordBuilder = new GenericRecordBuilder(schema);
+    recordBuilder.set("foo", "bar");
+    GenericRecord record = recordBuilder.build();
+    writer.append(record);
+    writer.sync();
+    recordBuilder = new GenericRecordBuilder(schema);
+    recordBuilder.set("foo", "baz");
+    record = recordBuilder.build();
+    writer.append(record);
+    writer.sync();
+    writer.flush();
+    writer.close();
+
+    return tempFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3ff4b829/flume-ng-core/src/test/java/org/apache/flume/serialization/TransientPositionTracker.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TransientPositionTracker.java
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TransientPositionTracker.java
new file mode 100644
index 0000000..ba50d54
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TransientPositionTracker.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.serialization;
+
+import java.io.IOException;
+
+public class TransientPositionTracker implements PositionTracker {
+
+  private final String target;
+  private long position = 0;
+
+  public TransientPositionTracker(String target) {
+    this.target = target;
+  }
+
+  @Override
+  public void storePosition(long position) throws IOException {
+    this.position = position;
+  }
+
+  @Override
+  public long getPosition() {
+    return position;
+  }
+
+  @Override
+  public String getTarget() {
+    return target;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3ff4b829/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 74863d4..06f8efa 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -980,6 +980,33 @@ deserializer.maxLineLength      2048            Maximum number of characters
to
 deserializer.outputCharset      UTF-8           Charset to use for encoding events put into
the channel.
 ==============================  ==============  ==========================================================
 
+AVRO
+^^^^
+
+This deserializer is able to read an Avro container file, and it generates
+one event per Avro record in the file.
+Each event is annotated with a header that indicates the schema used.
+The body of the event is the binary Avro record data, not
+including the schema or the rest of the container file elements.
+
+Note that if the spool directory source must retry putting one of these events
+onto a channel (for example, because the channel is full), then it will reset
+and retry from the most recent Avro container file sync point. To reduce
+potential event duplication in such a failure scenario, write sync markers more
+frequently in your Avro input files.
+
+==============================  ==============  ======================================================================
+Property Name                   Default         Description
+==============================  ==============  ======================================================================
+deserializer.schemaType         HASH            How the schema is represented. By default,
or when the value ``HASH``
+                                                is specified, the Avro schema is hashed and
+                                                the hash is stored in every event in the
event header
+                                                "flume.avro.schema.hash". If ``LITERAL``
is specified, the JSON-encoded
+                                                schema itself is stored in every event in
the event header
+                                                "flume.avro.schema.literal". Using ``LITERAL``
mode is relatively
+                                                inefficient compared to ``HASH`` mode.
+==============================  ==============  ======================================================================
+
 NetCat Source
 ~~~~~~~~~~~~~
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3ff4b829/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 251f345..350d6a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -780,6 +780,12 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>commons-codec</groupId>
+        <artifactId>commons-codec</artifactId>
+        <version>1.8</version>
+      </dependency>
+
+      <dependency>
         <groupId>commons-io</groupId>
         <artifactId>commons-io</artifactId>
         <version>2.1</version>


Mime
View raw message