kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject kudu git commit: Add AvroKuduEventProducer to Kudu-Flume integration
Date Tue, 06 Sep 2016 05:55:34 GMT
Repository: kudu
Updated Branches:
  refs/heads/master d69b68d71 -> 346b63d9c


Add AvroKuduEventProducer to Kudu-Flume integration

This adds a KuduEventProducer that accepts events with Avro-encoded
bodies and allows the KuduSink to push them to Kudu. It only accepts
schemas of Record type that hold values of types compatible with
Kudu's types. The schema is provided to the EventProducer ahead of
time or in the event header, as a URL or as a literal.

Change-Id: I6715df72e447e72f4801a2e026f6840d09b401e1
Reviewed-on: http://gerrit.cloudera.org:8080/4034
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/346b63d9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/346b63d9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/346b63d9

Branch: refs/heads/master
Commit: 346b63d9cbf30aeb603920904b3cbb02074d9c75
Parents: d69b68d
Author: Will Berkeley <wdberkeley@gmail.com>
Authored: Sat Aug 6 20:26:14 2016 -0400
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue Sep 6 05:55:17 2016 +0000

----------------------------------------------------------------------
 java/kudu-flume-sink/pom.xml                    |  50 ++++
 .../kudu/flume/sink/AvroKuduEventProducer.java  | 284 +++++++++++++++++++
 .../src/test/avro/testAvroEventProducer.avsc    |  11 +
 .../flume/sink/AvroKuduEventProducerTest.java   | 215 ++++++++++++++
 4 files changed, 560 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/346b63d9/java/kudu-flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/pom.xml b/java/kudu-flume-sink/pom.xml
index a212995..7bb193c 100644
--- a/java/kudu-flume-sink/pom.xml
+++ b/java/kudu-flume-sink/pom.xml
@@ -21,7 +21,9 @@
   <name>Kudu Flume NG Sink</name>
 
   <properties>
+    <avro.version>1.8.1</avro.version>
     <flume.version>1.6.0</flume.version>
+    <hadoop.version>2.7.0</hadoop.version>
   </properties>
 
   <build>
@@ -44,6 +46,41 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.3</version>
+        <configuration>
+          <relocations>
+            <relocation>
+              <pattern>org.apache.hadoop</pattern>
+              <shadedPattern>org.apache.kudu.flume.sink.shaded.org.apache.hadoop</shadedPattern>
+            </relocation>
+          </relocations>
+          <shadeTestJar>true</shadeTestJar>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>${avro.version}</version>
+        <executions>
+          <execution>
+            <phase>generate-test-sources</phase>
+              <goals>
+                <goal>schema</goal>
+              </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 
@@ -69,6 +106,19 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
       <version>${slf4j.version}</version>

http://git-wip-us.apache.org/repos/asf/kudu/blob/346b63d9/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java
b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java
new file mode 100644
index 0000000..f9ee683
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.flume.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+/**
+ * <p>An Avro serializer that generates one insert or operation per event by deserializing
the event
+ * body as an Avro record and mapping its fields to columns in a Kudu table.
+ * <p><strong>Avro Kudu Event Producer configuration parameters</strong>
+ * <table cellpadding=3 cellspacing=0 border=1>
+ * <tr><th>Property Name</th>
+ * <th>Default</th>
+ * <th>Required?</th>
+ * <th>Description</th></tr>
+ * <tr><td>producer.operation</td>
+ * <td>operation</td>
+ * <td>No</td>
+ * <td>The operation used to write events to Kudu.</td>
+ * </tr>
+ * <tr><td>producer.schemaPath</td>
+ * <td></td>
+ * <td>No</td>
+ * <td>The location of the Avro schema file used to deserialize the Avro-encoded event
bodies.
+ * It's used whenever an event does not include its own schema. If not specified, the
+ * schema must be specified on a per-event basis, either by url or as a literal.
+ * Schemas must be a record type.
+ * </td>
+ * </tr>
+ * </table>
+ */
+public class AvroKuduEventProducer implements KuduEventProducer {
+  public static final String OPERATION_PROP = "operation";
+  public static final String SCHEMA_PROP = "schemaPath";
+  public static final String DEFAULT_OPERATION = "upsert";
+  public static final String SCHEMA_URL_HEADER = "flume.avro.schema.url";
+  public static final String SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal";
+
+  private String operation;
+  private DatumReader<GenericRecord> reader;
+  private GenericRecord reuse;
+  private KuduTable table;
+  private byte[] payload;
+  private String defaultSchemaURL;
+
+  /**
+   * The binary decoder to reuse for event parsing.
+   */
+  private BinaryDecoder decoder = null;
+
+  public AvroKuduEventProducer() {
+  }
+
+  private static final Configuration conf = new Configuration();
+
+  /**
+   * A cache of schemas retrieved by URL to avoid re-parsing the schema.
+   */
+  private static final LoadingCache<String, Schema> schemasFromURL =
+      CacheBuilder.newBuilder()
+          .build(new CacheLoader<String, Schema>() {
+            @Override
+            public Schema load(String url) throws IOException {
+              Schema.Parser parser = new Schema.Parser();
+              InputStream is = null;
+              try {
+                FileSystem fs = FileSystem.get(URI.create(url), conf);
+                if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) {
+                  is = fs.open(new Path(url));
+                } else {
+                  is = new URL(url).openStream();
+                }
+                return parser.parse(is);
+              } finally {
+                if (is != null) {
+                  is.close();
+                }
+              }
+            }
+          });
+
+  /**
+   * A cache of literal schemas to avoid re-parsing the schema.
+   */
+  private static final LoadingCache<String, Schema> schemasFromLiteral =
+      CacheBuilder.newBuilder()
+          .build(new CacheLoader<String, Schema>() {
+            @Override
+            public Schema load(String literal) {
+              Preconditions.checkNotNull(literal,
+                  "Schema literal cannot be null without a Schema URL");
+              return new Schema.Parser().parse(literal);
+            }
+          });
+
+  /**
+   * A cache of DatumReaders per schema.
+   */
+  private final LoadingCache<Schema, DatumReader<GenericRecord>> readers =
+      CacheBuilder.newBuilder()
+          .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
+            @Override
+            public DatumReader<GenericRecord> load(Schema schema) {
+              return new GenericDatumReader<>(schema);
+            }
+          });
+
+  @Override
+  public void configure(Context context) {
+    this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION);
+
+    String schemaPath = context.getString(SCHEMA_PROP);
+    if (schemaPath != null) {
+      defaultSchemaURL = schemaPath;
+    }
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+
+  // NB: this is called once per event
+  @Override
+  public void initialize(Event event, KuduTable table) {
+    this.payload = event.getBody();
+    this.table = table;
+    this.reader = readers.getUnchecked(getSchema(event));
+  }
+
+  @Override
+  public List<Operation> getOperations() throws FlumeException {
+    decoder = DecoderFactory.get().binaryDecoder(payload, decoder);
+    try {
+      reuse = reader.read(reuse, decoder);
+    } catch (IOException e) {
+      throw new FlumeException("Cannot deserialize event", e);
+    }
+    Operation op;
+    switch (operation.toLowerCase()) {
+      case "upsert":
+        op = table.newUpsert();
+        break;
+      case "insert":
+        op = table.newInsert();
+        break;
+      default:
+        throw new FlumeException(String.format("Unexpected operation %s", operation));
+    }
+    setupOp(op, reuse);
+    return Collections.singletonList(op);
+  }
+
+  private void setupOp(Operation op, GenericRecord record) {
+    PartialRow row = op.getRow();
+    for (ColumnSchema col : table.getSchema().getColumns()) {
+      String name = col.getName();
+      Object value = record.get(name);
+      if (value == null) {
+        if (col.isNullable()) {
+          row.setNull(name);
+        } else {
+          // leave unset for possible Kudu default
+        }
+      } else {
+        // Avro doesn't support 8- or 16-bit integer types, but we'll allow them to be passed
as
+        // a larger type.
+        try {
+          switch (col.getType()) {
+            case BOOL:
+              row.addBoolean(name, (boolean) value);
+              break;
+            case INT8:
+              row.addByte(name, (byte) value);
+              break;
+            case INT16:
+              row.addShort(name, (short) value);
+              break;
+            case INT32:
+              row.addInt(name, (int) value);
+              break;
+            case INT64: // Fall through
+            case TIMESTAMP:
+              row.addLong(name, (long) value);
+              break;
+            case FLOAT:
+              row.addFloat(name, (float) value);
+              break;
+            case DOUBLE:
+              row.addDouble(name, (double) value);
+              break;
+            case STRING:
+              row.addString(name, value.toString());
+              break;
+            case BINARY:
+              row.addBinary(name, (byte[]) value);
+              break;
+            default:
+              throw new FlumeException(String.format(
+                  "Unrecognized type %s for column %s", col.getType().toString(), name));
+          }
+        } catch (ClassCastException e) {
+          throw new FlumeException(String.format("Failed to coerce value for column %s to
type %s",
+              col.getName(),
+              col.getType()));
+        }
+      }
+    }
+  }
+
+  private Schema getSchema(Event event) throws FlumeException {
+    Map<String, String> headers = event.getHeaders();
+    String schemaURL = headers.get(SCHEMA_URL_HEADER);
+    String schemaLiteral = headers.get(SCHEMA_LITERAL_HEADER);
+    try {
+      if (schemaURL != null) {
+        return schemasFromURL.get(schemaURL);
+      } else if (schemaLiteral != null) {
+        return schemasFromLiteral.get(schemaLiteral);
+      } else if (defaultSchemaURL != null) {
+        return schemasFromURL.get(defaultSchemaURL);
+      } else {
+        throw new FlumeException(
+            String.format("No schema for event! Specify configuration property %s or event
header %s",
+                SCHEMA_PROP,
+                SCHEMA_URL_HEADER));
+      }
+    } catch (ExecutionException ex) {
+      throw new FlumeException("Cannot get schema", ex);
+    } catch (UncheckedExecutionException ex) {
+      throw new FlumeException("Cannot parse schema", ex);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/346b63d9/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc b/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc
new file mode 100644
index 0000000..d86955c
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc
@@ -0,0 +1,11 @@
+{"namespace": "org.apache.kudu.flume.sink",
+  "type": "record",
+  "name": "AvroKuduEventProducerTestRecord",
+  "fields": [
+    {"name": "key", "type": "int"},
+    {"name": "longField",  "type": "long"},
+    {"name": "doubleField",  "type": "double"},
+    {"name": "nullableField",  "type": ["string", "null"]},
+    {"name": "stringField", "type": "string"}
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/346b63d9/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java
new file mode 100644
index 0000000..c1de448
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.flume.sink;
+
+import static org.apache.kudu.flume.sink.AvroKuduEventProducer.SCHEMA_LITERAL_HEADER;
+import static org.apache.kudu.flume.sink.AvroKuduEventProducer.SCHEMA_PROP;
+import static org.apache.kudu.flume.sink.AvroKuduEventProducer.SCHEMA_URL_HEADER;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AvroKuduEventProducerTest extends BaseKuduTest {
+  private static final String schemaPath = "src/test/avro/testAvroEventProducer.avsc";
+  private static String schemaLiteral;
+
+  private static org.apache.avro.Schema schema;
+
+  enum SchemaLocation {
+    GLOBAL, URL, LITERAL
+  }
+
+  @BeforeClass
+  public static void setupAvroSchemaBeforeClass() {
+    org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
+    try {
+      schemaLiteral = Files.toString(new File(schemaPath), Charsets.UTF_8);
+      schema = parser.parse(new File(schemaPath));
+    } catch (IOException e) {
+      throw new FlumeException("Unable to open and parse schema file!", e);
+    }
+  }
+
+  @Test
+  public void testEmptyChannel() throws Exception {
+    testEvents(0, SchemaLocation.GLOBAL);
+  }
+
+  @Test
+  public void testOneEvent() throws Exception {
+    testEvents(1, SchemaLocation.GLOBAL);
+  }
+
+  @Test
+  public void testThreeEvents() throws Exception {
+    testEvents(3, SchemaLocation.GLOBAL);
+  }
+
+  @Test
+  public void testThreeEventsSchemaURLInEvent() throws Exception {
+    testEvents(3, SchemaLocation.URL);
+  }
+
+  @Test
+  public void testThreeEventsSchemaLiteralInEvent() throws Exception {
+    testEvents(3, SchemaLocation.LITERAL);
+  }
+
+  private void testEvents(int eventCount, SchemaLocation schemaLocation)
+      throws Exception {
+    KuduTable table = createNewTable(
+        String.format("test%sevents%s", eventCount, schemaLocation));
+    String tableName = table.getName();
+    String schemaURI = new File(schemaPath).getAbsoluteFile().toURI().toString();
+    Context ctx = schemaLocation != SchemaLocation.GLOBAL ? new Context()
+        : new Context(ImmutableMap.of(PRODUCER_PREFIX + SCHEMA_PROP, schemaURI));
+    KuduSink sink = createSink(tableName, ctx);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    writeEventsToChannel(channel, eventCount, schemaLocation);
+    tx.commit();
+    tx.close();
+
+    Sink.Status status = sink.process();
+    if (eventCount == 0) {
+      assertEquals("incorrect status for empty channel", status, Sink.Status.BACKOFF);
+    } else {
+      assertEquals("incorrect status for non-empty channel", status, Sink.Status.READY);
+    }
+
+    List<String> answers = makeAnswers(eventCount);
+    List<String> rows = scanTableToStrings(table);
+    assertEquals("wrong number of rows inserted", answers.size(), rows.size());
+    assertArrayEquals("wrong rows inserted", answers.toArray(), rows.toArray());
+  }
+
+  private KuduTable createNewTable(String tableName) throws Exception {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(5);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING).nullable(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build());
+    CreateTableOptions createOptions =
+        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))
+            .setNumReplicas(1);
+    return createTable(tableName, new Schema(columns), createOptions);
+  }
+
+  private KuduSink createSink(String tableName, Context ctx) {
+    KuduSink sink = new KuduSink(syncClient);
+    HashMap<String, String> parameters = new HashMap<>();
+    parameters.put(TABLE_NAME, tableName);
+    parameters.put(MASTER_ADDRESSES, getMasterAddresses());
+    parameters.put(PRODUCER,
+        "org.apache.kudu.flume.sink.AvroKuduEventProducer");
+    Context context = new Context(parameters);
+    context.putAll(ctx.getParameters());
+    Configurables.configure(sink, context);
+
+    return sink;
+  }
+
+  private void writeEventsToChannel(Channel channel, int eventCount,
+                                    SchemaLocation schemaLocation) throws Exception {
+    for (int i = 0; i < eventCount; i++) {
+      AvroKuduEventProducerTestRecord record = new AvroKuduEventProducerTestRecord();
+      record.setKey(10 * i);
+      record.setLongField(2L * i);
+      record.setDoubleField(2.71828 * i);
+      record.setNullableField(i % 2 == 0 ? null : "taco");
+      record.setStringField(String.format("hello %d", i));
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+      DatumWriter<AvroKuduEventProducerTestRecord> writer =
+          new SpecificDatumWriter<>(AvroKuduEventProducerTestRecord.class);
+      writer.write(record, encoder);
+      encoder.flush();
+      Event e = EventBuilder.withBody(out.toByteArray());
+      if (schemaLocation == SchemaLocation.URL) {
+        String schemaURI = new File(schemaPath).getAbsoluteFile().toURI().toString();
+        e.setHeaders(ImmutableMap.of(SCHEMA_URL_HEADER, schemaURI));
+      } else if (schemaLocation == SchemaLocation.LITERAL) {
+        e.setHeaders(ImmutableMap.of(SCHEMA_LITERAL_HEADER, schemaLiteral));
+      }
+      channel.put(e);
+    }
+  }
+
+  private List<String> makeAnswers(int eventCount) {
+    List<String> answers = Lists.newArrayList();
+    for (int i = 0; i < eventCount; i++) {
+      answers.add(String.format(
+          "INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, " +
+              "STRING nullableField=%s, STRING stringField=hello %s",
+          10 * i,
+          2 * i,
+          2.71828 * i,
+          i % 2 == 0 ? "NULL" : "taco",
+          i));
+    }
+    Collections.sort(answers);
+    return answers;
+  }
+}


Mime
View raw message