flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [1/3] flume git commit: FLUME-2810. Add static Schema URL to AvroEventSerializer configuration
Date Tue, 19 Jul 2016 21:57:16 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk c7de4ba5c -> 9965dae7b


FLUME-2810. Add static Schema URL to AvroEventSerializer configuration

Currently the only way to pass a schema to the avro event serializer is
via header. This introduces an option to specify the location directly
in the Flume configuration.

(Jeff Holoman via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dbf2e989
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dbf2e989
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dbf2e989

Branch: refs/heads/trunk
Commit: dbf2e989744a6b2921076be017102f75323a69f4
Parents: c7de4ba
Author: Jeff Holoman <jeff.holoman@gmail.com>
Authored: Tue Jul 19 12:29:08 2016 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue Jul 19 14:43:33 2016 -0700

----------------------------------------------------------------------
 ...roEventSerializerConfigurationConstants.java |  5 ++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 51 ++++++++++++++++++--
 .../flume/sink/hdfs/AvroEventSerializer.java    | 45 +++++++++++------
 .../sink/hdfs/TestAvroEventSerializer.java      | 47 ++++++++++++++----
 4 files changed, 117 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
index cce6716..470fcea 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
@@ -35,4 +35,9 @@ public class AvroEventSerializerConfigurationConstants {
   public static final String COMPRESSION_CODEC = "compressionCodec";
   public static final String DEFAULT_COMPRESSION_CODEC = "null"; // no codec
 
+  /**
+   * Avro static Schema URL
+   */
+  public static final String STATIC_SCHEMA_URL = "schemaURL";
+  public static final String DEFAULT_STATIC_SCHEMA_URL = null;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index f9ca1b2..3937514 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -3235,19 +3235,59 @@ Example for agent named a1:
   a1.sinks.k1.sink.serializer = text
   a1.sinks.k1.sink.serializer.appendNewline = false
 
+"Flume Event" Avro Event Serializer
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Alias: ``avro_event``.
+
+This interceptor serializes Flume events into an Avro container file. The schema used is
the same schema used for
+Flume events in the Avro RPC mechanism.
+
+This serializer inherits from the ``AbstractAvroEventSerializer`` class.
+
+Configuration options are as follows:
+
+==========================  ================  ===========================================================================
+Property Name               Default           Description
+==========================  ================  ===========================================================================
+syncIntervalBytes           2048000           Avro sync interval, in approximate bytes.
+compressionCodec            null              Avro compression codec. For supported codecs,
see Avro's CodecFactory docs.
+==========================  ================  ===========================================================================
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+  a1.sinks.k1.type = hdfs
+  a1.sinks.k1.channel = c1
+  a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
+  a1.sinks.k1.serializer = avro_event
+  a1.sinks.k1.serializer.compressionCodec = snappy
+
 Avro Event Serializer
 ~~~~~~~~~~~~~~~~~~~~~
 
-Alias: ``avro_event``. This interceptor serializes Flume events into an Avro
-container file. The schema used is the same schema used for Flume events
-in the Avro RPC mechanism. This serializers inherits from the
-``AbstractAvroEventSerializer`` class. Configuration options are as follows:
+Alias: This serializer does not have an alias, and must be specified using the fully-qualified
class name class name.
+
+This serializes Flume events into an Avro container file like the "Flume Event" Avro Event
Serializer, however the
+record schema is configurable. The record schema may be specified either as a Flume configuration
property or passed in an event header.
+
+To pass the record schema as part of the Flume configuration, use the property ``schemaURL``
as listed below.
+
+To pass the record schema in an event header, specify either the event header ``flume.avro.schema.literal``
+containing a JSON-format representation of the schema or ``flume.avro.schema.url`` with a
URL where
+the schema may be found (``hdfs:/...`` URIs are supported).
+
+This serializer inherits from the ``AbstractAvroEventSerializer`` class.
+
+Configuration options are as follows:
 
 ==========================  ================  ===========================================================================
 Property Name               Default           Description
 ==========================  ================  ===========================================================================
 syncIntervalBytes           2048000           Avro sync interval, in approximate bytes.
 compressionCodec            null              Avro compression codec. For supported codecs,
see Avro's CodecFactory docs.
+schemaURL                   null              Avro schema URL. Schemas specified in the header
ovverride this option.
 ==========================  ================  ===========================================================================
 
 Example for agent named a1:
@@ -3257,8 +3297,9 @@ Example for agent named a1:
   a1.sinks.k1.type = hdfs
   a1.sinks.k1.channel = c1
   a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
-  a1.sinks.k1.serializer = avro_event
+  a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
   a1.sinks.k1.serializer.compressionCodec = snappy
+  a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc
 
 
 Flume Interceptors

http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
index fea6218..3231742 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
@@ -18,14 +18,6 @@
  */
 package org.apache.flume.sink.hdfs;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
@@ -44,7 +36,21 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.COMPRESSION_CODEC;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_COMPRESSION_CODEC;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_STATIC_SCHEMA_URL;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_SYNC_INTERVAL_BYTES;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.STATIC_SCHEMA_URL;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.SYNC_INTERVAL_BYTES;
 
 /**
  * <p>
@@ -76,6 +82,7 @@ public class AvroEventSerializer implements EventSerializer, Configurable
{
   private int syncIntervalBytes;
   private String compressionCodec;
   private Map<String, Schema> schemaCache = new HashMap<String, Schema>();
+  private String staticSchemaURL;
 
   private AvroEventSerializer(OutputStream out) {
     this.out = out;
@@ -87,6 +94,7 @@ public class AvroEventSerializer implements EventSerializer, Configurable
{
         context.getInteger(SYNC_INTERVAL_BYTES, DEFAULT_SYNC_INTERVAL_BYTES);
     compressionCodec =
         context.getString(COMPRESSION_CODEC, DEFAULT_COMPRESSION_CODEC);
+    staticSchemaURL = context.getString(STATIC_SCHEMA_URL, DEFAULT_STATIC_SCHEMA_URL);
   }
 
   @Override
@@ -111,19 +119,24 @@ public class AvroEventSerializer implements EventSerializer, Configurable
{
   private void initialize(Event event) throws IOException {
     Schema schema = null;
     String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER);
-    if (schemaUrl != null) {
+    String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
+
+    if (schemaUrl != null) { // if URL_HEADER is there then use it
       schema = schemaCache.get(schemaUrl);
       if (schema == null) {
         schema = loadFromUrl(schemaUrl);
         schemaCache.put(schemaUrl, schema);
       }
-    }
-    if (schema == null) {
-      String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
-      if (schemaString == null) {
-        throw new FlumeException("Could not find schema for event " + event);
-      }
+    } else if (schemaString != null) { // fallback to LITERAL_HEADER if it was there
       schema = new Schema.Parser().parse(schemaString);
+    } else if (staticSchemaURL != null) {   // fallback to static url if it was there
+      schema = schemaCache.get(staticSchemaURL);
+      if (schema == null) {
+        schema = loadFromUrl(staticSchemaURL);
+        schemaCache.put(staticSchemaURL, schema);
+      }
+    } else { // no other options so giving up
+      throw new FlumeException("Could not find schema for event " + event);
     }
 
     writer = new GenericDatumWriter<Object>(schema);

http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
index 38af74d..6b38da2 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
@@ -39,10 +39,12 @@ import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.serialization.AvroEventSerializerConfigurationConstants;
 import org.apache.flume.serialization.EventSerializer;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.After;
 
 public class TestAvroEventSerializer {
 
@@ -53,39 +55,55 @@ public class TestAvroEventSerializer {
     file = File.createTempFile(getClass().getSimpleName(), "");
   }
 
+  @After
+  public void tearDown() throws Exception {
+    file.delete();
+  }
+
   @Test
   public void testNoCompression() throws IOException {
-    createAvroFile(file, null, false);
+    createAvroFile(file, null, false, false);
     validateAvroFile(file);
   }
 
   @Test
   public void testNullCompression() throws IOException {
-    createAvroFile(file, "null", false);
+    createAvroFile(file, "null", false, false);
     validateAvroFile(file);
   }
 
   @Test
   public void testDeflateCompression() throws IOException {
-    createAvroFile(file, "deflate", false);
+    createAvroFile(file, "deflate", false, false);
     validateAvroFile(file);
   }
 
   @Test
   public void testSnappyCompression() throws IOException {
-    createAvroFile(file, "snappy", false);
+    createAvroFile(file, "snappy", false, false);
     validateAvroFile(file);
   }
 
   @Test
   public void testSchemaUrl() throws IOException {
-    createAvroFile(file, null, true);
+    createAvroFile(file, null, true, false);
     validateAvroFile(file);
   }
 
-  public void createAvroFile(File file, String codec, boolean useSchemaUrl) throws
-      IOException {
+  @Test
+  public void testStaticSchemaUrl() throws IOException {
+    createAvroFile(file,null,false, true);
+    validateAvroFile(file);
+  }
 
+  @Test
+  public void testBothUrls() throws IOException {
+    createAvroFile(file,null,true,true);
+    validateAvroFile(file);
+  }
+
+  public void createAvroFile(File file, String codec, boolean useSchemaUrl,
+                             boolean useStaticSchemaUrl) throws IOException {
     // serialize a few events using the reflection-based avro serializer
     OutputStream out = new FileOutputStream(file);
 
@@ -100,11 +118,16 @@ public class TestAvroEventSerializer {
     }));
     GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
     File schemaFile = null;
-    if (useSchemaUrl) {
+    if (useSchemaUrl || useStaticSchemaUrl) {
       schemaFile = File.createTempFile(getClass().getSimpleName(), ".avsc");
       Files.write(schema.toString(), schemaFile, Charsets.UTF_8);
     }
 
+    if (useStaticSchemaUrl) {
+      ctx.put(AvroEventSerializerConfigurationConstants.STATIC_SCHEMA_URL,
+              schemaFile.toURI().toURL().toExternalForm());
+    }
+
     EventSerializer.Builder builder = new AvroEventSerializer.Builder();
     EventSerializer serializer = builder.build(ctx, out);
 
@@ -112,10 +135,10 @@ public class TestAvroEventSerializer {
     for (int i = 0; i < 3; i++) {
       GenericRecord record = recordBuilder.set("message", "Hello " + i).build();
       Event event = EventBuilder.withBody(serializeAvro(record, schema));
-      if (schemaFile == null) {
+      if (schemaFile == null && !useSchemaUrl) {
         event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_LITERAL_HEADER,
             schema.toString());
-      } else {
+      } else if (useSchemaUrl) {
         event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_URL_HEADER,
             schemaFile.toURI().toURL().toExternalForm());
       }
@@ -125,6 +148,10 @@ public class TestAvroEventSerializer {
     serializer.beforeClose();
     out.flush();
     out.close();
+    if (schemaFile != null ) {
+      schemaFile.delete();
+    }
+
   }
 
   private byte[] serializeAvro(Object datum, Schema schema) throws IOException {


Mime
View raw message