flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2294. Add a sink for Kite Datasets.
Date Sun, 19 Jan 2014 05:17:39 GMT
Updated Branches:
  refs/heads/trunk 9a4f04766 -> 68ba5cf71


FLUME-2294. Add a sink for Kite Datasets.

(Ryan Blue via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/68ba5cf7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/68ba5cf7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/68ba5cf7

Branch: refs/heads/trunk
Commit: 68ba5cf7185f333ad8723c3af5bcefe868c783cd
Parents: 9a4f047
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Sat Jan 18 21:16:12 2014 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Sat Jan 18 21:16:12 2014 -0800

----------------------------------------------------------------------
 flume-ng-dist/pom.xml                           |  18 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  33 ++
 flume-ng-sinks/flume-dataset-sink/pom.xml       | 139 ++++++
 .../org/apache/flume/sink/kite/DatasetSink.java | 367 ++++++++++++++
 .../flume/sink/kite/DatasetSinkConstants.java   |  56 +++
 .../apache/flume/sink/kite/TestDatasetSink.java | 475 +++++++++++++++++++
 flume-ng-sinks/pom.xml                          |  29 ++
 pom.xml                                         |  19 +-
 8 files changed, 1134 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/68ba5cf7/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index 2d0ee47..8b814b7 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -63,6 +63,24 @@
     </plugins>
   </build>
 
+  <profiles>
+    <profile>
+      <id>hadoop-2</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>2</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.flume.flume-ng-sinks</groupId>
+          <artifactId>flume-dataset-sink</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.flume</groupId>

http://git-wip-us.apache.org/repos/asf/flume/blob/68ba5cf7/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index a2790d9..d120a74 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2018,6 +2018,39 @@ Example for agent named a1:
   a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
   a1.sinks.k1.channel = c1
 
+Kite Dataset Sink (experimental)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. warning::
+  This source is experimental and may change between minor versions of Flume.
+  Use at your own risk.
+
+Experimental sink that writes events to a `Kite Dataset <http://kitesdk.org/docs/current/kite-data/guide.html>`_.
+This sink will deserialize the body of each incoming event and store the
+resulting record in a Kite Dataset. It determines target Dataset by opening a
+repository URI, ``kite.repo.uri``, and loading a Dataset by name,
+``kite.dataset.name``.
+
+The only supported serialization is avro, and the record schema must be passed
+in the event headers, using either ``flume.avro.schema.literal`` with the JSON
+schema representation or ``flume.avro.schema.url`` with a URL where the schema
+may be found (``hdfs:/...`` URIs are supported). This is compatible with the
+Log4jAppender flume client and the spooling directory source's Avro
+deserializer using ``deserializer.schemaType = LITERAL``.
+
+Note: The ``flume.avro.schema.hash`` header is **not supported**.
+
+=====================  =======  ===========================================================
+Property Name          Default  Description
+=====================  =======  ===========================================================
+**channel**            --
+**type**               --       Must be org.apache.flume.sink.kite.DatasetSink
+**kite.repo.uri**      --       URI of the repository to open
+**kite.dataset.name**  --       Name of the Dataset where records will be written
+kite.batchSize         100      Number of records to process in each batch
+kite.rollInterval      30       Maximum wait time (seconds) before data files are released
+=====================  =======  ===========================================================
+
 Custom Sink
 ~~~~~~~~~~~
 

http://git-wip-us.apache.org/repos/asf/flume/blob/68ba5cf7/flume-ng-sinks/flume-dataset-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/pom.xml b/flume-ng-sinks/flume-dataset-sink/pom.xml
new file mode 100644
index 0000000..57fd0e4
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/pom.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-ng-sinks</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.5.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-sinks</groupId>
+  <artifactId>flume-dataset-sink</artifactId>
+  <name>Flume NG Kite Dataset Sink</name>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <repositories>
+    <repository>
+      <id>cdh.repo</id>
+      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+      <name>Cloudera Repositories</name>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+
+    <repository>
+      <id>cdh.snapshots.repo</id>
+      <url>https://repository.cloudera.com/artifactory/libs-snapshot-local</url>
+      <name>Cloudera Snapshots Repository</name>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+    </repository>
+  </repositories>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kitesdk</groupId>
+      <artifactId>kite-data-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
+    <dependency>
+      <!-- build will fail if this is not hadoop-common 2.*
+           because kite uses hflush.
+           -->
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop2.version}</version>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <version>${hadoop2.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/68ba5cf7/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
new file mode 100644
index 0000000..9a00fb1
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetRepositories;
+import org.kitesdk.data.DatasetWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Experimental sink that writes events to a Kite Dataset. This sink will
+ * deserialize the body of each incoming event and store the resulting record
+ * in a Kite Dataset. It determines target Dataset by opening a repository URI,
+ * {@code kite.repo.uri}, and loading a Dataset by name,
+ * {@code kite.dataset.name}.
+ */
+public class DatasetSink extends AbstractSink implements Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class);
+
+  static Configuration conf = new Configuration();
+
+  /**
+   * Lock used to protect access to the current writer
+   */
+  private final ReentrantLock writerLock = new ReentrantLock(true);
+
+  private String repositoryURI = null;
+  private String datasetName = null;
+  private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE;
+  private Dataset<Object> targetDataset = null;
+  private DatasetWriter<Object> writer = null;
+  private SinkCounter counter = null;
+
+  // for rolling files at a given interval
+  private ScheduledExecutorService rollTimer;
+  private int rollInterval = DatasetSinkConstants.DEFAULT_ROLL_INTERVAL;
+
+  // for working with avro serialized records
+  private Object datum = null;
+  private BinaryDecoder decoder = null;
+  private LoadingCache<Schema, ReflectDatumReader<Object>> readers =
+      CacheBuilder.newBuilder()
+      .build(new CacheLoader<Schema, ReflectDatumReader<Object>>() {
+        @Override
+        public ReflectDatumReader<Object> load(Schema schema) {
+          // must use the target dataset's schema for reading to ensure the
+          // records are able to be stored using it
+          return new ReflectDatumReader<Object>(
+              schema, targetDataset.getDescriptor().getSchema());
+        }
+      });
+  private static LoadingCache<String, Schema> schemasFromLiteral = CacheBuilder
+      .newBuilder()
+      .build(new CacheLoader<String, Schema>() {
+        @Override
+        public Schema load(String literal) {
+          Preconditions.checkNotNull(literal,
+              "Schema literal cannot be null without a Schema URL");
+          return new Schema.Parser().parse(literal);
+        }
+      });
+  private static LoadingCache<String, Schema> schemasFromURL = CacheBuilder
+      .newBuilder()
+      .build(new CacheLoader<String, Schema>() {
+        @Override
+        public Schema load(String url) throws IOException {
+          Schema.Parser parser = new Schema.Parser();
+          InputStream is = null;
+          try {
+            FileSystem fs = FileSystem.get(URI.create(url), conf);
+            if (url.toLowerCase().startsWith("hdfs:/")) {
+              is = fs.open(new Path(url));
+            } else {
+              is = new URL(url).openStream();
+            }
+            return parser.parse(is);
+          } finally {
+            if (is != null) {
+              is.close();
+            }
+          }
+        }
+      });
+
+  protected List<String> allowedFormats() {
+    return Lists.newArrayList("avro");
+  }
+
+  @Override
+  public void configure(Context context) {
+    this.repositoryURI = context.getString(
+        DatasetSinkConstants.CONFIG_KITE_REPO_URI);
+    Preconditions.checkNotNull(repositoryURI, "Repository URI is missing");
+    this.datasetName = context.getString(
+        DatasetSinkConstants.CONFIG_KITE_DATASET_NAME);
+    Preconditions.checkNotNull(datasetName, "Dataset name is missing");
+    this.targetDataset = DatasetRepositories.open(repositoryURI)
+        .load(datasetName);
+
+    String formatName = targetDataset.getDescriptor().getFormat().getName();
+    Preconditions.checkArgument(allowedFormats().contains(formatName),
+        "Unsupported format: " + formatName);
+
+    // other configuration
+    this.batchSize = context.getLong(
+        DatasetSinkConstants.CONFIG_KITE_BATCH_SIZE,
+        DatasetSinkConstants.DEFAULT_BATCH_SIZE);
+    this.rollInterval = context.getInteger(
+        DatasetSinkConstants.CONFIG_KITE_ROLL_INTERVAL,
+        DatasetSinkConstants.DEFAULT_ROLL_INTERVAL);
+
+    this.counter = new SinkCounter(getName());
+  }
+
+  @Override
+  public synchronized void start() {
+    this.writer = openWriter(targetDataset);
+    if (rollInterval > 0) {
+      this.rollTimer = Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setNameFormat(getName() + "-timed-roll-thread")
+              .build());
+      rollTimer.scheduleWithFixedDelay(new Runnable() {
+        @Override
+        public void run() {
+          roll();
+        }
+      }, rollInterval, rollInterval, TimeUnit.SECONDS);
+    }
+    counter.start();
+    // signal that this sink is ready to process
+    LOG.info("Started DatasetSink " + getName());
+    super.start();
+  }
+
+  void roll() {
+    // if the writer is null, nothing to do
+    if (writer == null) {
+      return;
+    }
+
+    // no need to open/close while the lock is held, just replace the reference
+    DatasetWriter toClose = null;
+    DatasetWriter newWriter = openWriter(targetDataset);
+
+    writerLock.lock();
+    try {
+      toClose = writer;
+      this.writer = newWriter;
+    } finally {
+      writerLock.unlock();
+    }
+
+    LOG.info("Rolled writer for dataset: " + datasetName);
+    toClose.close();
+  }
+
+  @Override
+  public synchronized void stop() {
+    counter.stop();
+    if (rollTimer != null) {
+      rollTimer.shutdown();
+      try {
+        while (!rollTimer.isTerminated()) {
+          rollTimer.awaitTermination(
+              DatasetSinkConstants.DEFAULT_TERMINATION_INTERVAL,
+              TimeUnit.MILLISECONDS);
+        }
+      } catch (InterruptedException ex) {
+        LOG.warn("Interrupted while waiting for shutdown: " + rollTimer);
+        Thread.interrupted();
+      }
+    }
+
+    if (writer != null) {
+      // any write problems invalidate the writer, which is immediately closed
+      writer.close();
+      this.writer = null;
+    }
+
+    // signal that this sink has stopped
+    LOG.info("Stopped dataset sink: " + getName());
+    super.stop();
+  }
+
+  @Override
+  public Status process() throws EventDeliveryException {
+    if (writer == null) {
+      throw new EventDeliveryException(
+          "Cannot recover after previous failure");
+    }
+
+    Channel channel = getChannel();
+    Transaction transaction = null;
+    try {
+      long processedEvents = 0;
+
+      // coarse locking to avoid waiting within the loop
+      writerLock.lock();
+      transaction = channel.getTransaction();
+      transaction.begin();
+      try {
+        for (; processedEvents < batchSize; processedEvents += 1) {
+          Event event = channel.take();
+          if (event == null) {
+            // no events available in the channel
+            break;
+          }
+
+          this.datum = deserialize(event, datum);
+
+          // writeEncoded would be an optimization in some cases, but HBase
+          // will not support it and partitioned Datasets need to get partition
+          // info from the entity Object. We may be able to avoid the
+          // serialization round-trip otherwise.
+          writer.write(datum);
+        }
+        // TODO: Add option to sync, depends on CDK-203
+        writer.flush();
+      } finally {
+        writerLock.unlock();
+      }
+
+      // commit after data has been written and flushed
+      transaction.commit();
+
+      if (processedEvents == 0) {
+        counter.incrementBatchEmptyCount();
+        return Status.BACKOFF;
+      } else if (processedEvents < batchSize) {
+        counter.incrementBatchUnderflowCount();
+      } else {
+        counter.incrementBatchCompleteCount();
+      }
+
+      counter.addToEventDrainSuccessCount(processedEvents);
+
+      return Status.READY;
+
+    } catch (Throwable th) {
+      // catch-all for any unhandled Throwable so that the transaction is
+      // correctly rolled back.
+      if (transaction != null) {
+        try {
+          transaction.rollback();
+        } catch (Exception ex) {
+          LOG.error("Transaction rollback failed", ex);
+          throw Throwables.propagate(ex);
+        }
+      }
+
+      // remove the writer's reference and close it
+      DatasetWriter toClose = null;
+      writerLock.lock();
+      try {
+        toClose = writer;
+        this.writer = null;
+      } finally {
+        writerLock.unlock();
+      }
+      toClose.close();
+
+      // handle the exception
+      Throwables.propagateIfInstanceOf(th, Error.class);
+      Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
+      throw new EventDeliveryException(th);
+
+    } finally {
+      if (transaction != null) {
+        transaction.close();
+      }
+    }
+  }
+
+  /**
+   * Not thread-safe.
+   *
+   * @param event
+   * @param reuse
+   * @return
+   */
+  private Object deserialize(Event event, Object reuse)
+      throws EventDeliveryException {
+    decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
+    // no checked exception is thrown in the CacheLoader
+    ReflectDatumReader<Object> reader = readers.getUnchecked(schema(event));
+    try {
+      return reader.read(reuse, decoder);
+    } catch (IOException ex) {
+      throw new EventDeliveryException("Cannot deserialize event", ex);
+    }
+  }
+
+  private static Schema schema(Event event) throws EventDeliveryException {
+    Map<String, String> headers = event.getHeaders();
+    String schemaURL = headers.get(
+        DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER);
+    try {
+      if (headers.get(DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER) != null) {
+        return schemasFromURL.get(schemaURL);
+      } else {
+        return schemasFromLiteral.get(
+            headers.get(DatasetSinkConstants.AVRO_SCHEMA_LITERAL_HEADER));
+      }
+    } catch (ExecutionException ex) {
+      throw new EventDeliveryException("Cannot get schema", ex.getCause());
+    }
+  }
+
+  private static DatasetWriter<Object> openWriter(Dataset<Object> target) {
+    DatasetWriter<Object> writer = target.newWriter();
+    writer.open();
+    return writer;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/68ba5cf7/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
new file mode 100644
index 0000000..5087352
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite;
+
+public class DatasetSinkConstants {
+  /**
+   * URI of the Kite DatasetRepository.
+   */
+  public static final String CONFIG_KITE_REPO_URI = "kite.repo.uri";
+
+  /**
+   * Name of the Kite Dataset to write into.
+   */
+  public static final String CONFIG_KITE_DATASET_NAME = "kite.dataset.name";
+
+  /**
+   * Number of records to process from the incoming channel per call to process.
+   */
+  public static final String CONFIG_KITE_BATCH_SIZE = "kite.batchSize";
+  public static long DEFAULT_BATCH_SIZE = 100;
+
+  /**
+   * Maximum time to wait before finishing files.
+   */
+  public static final String CONFIG_KITE_ROLL_INTERVAL = "kite.rollInterval";
+  public static int DEFAULT_ROLL_INTERVAL = 30; // seconds
+
+  /**
+   * Interval to wait for thread termination
+   */
+  public static final int DEFAULT_TERMINATION_INTERVAL = 10000; // milliseconds
+
+  /**
+   * Headers with avro schema information is expected.
+   */
+  public static final String AVRO_SCHEMA_LITERAL_HEADER =
+      "flume.avro.schema.literal";
+  public static final String AVRO_SCHEMA_URL_HEADER = "flume.avro.schema.url";
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/68ba5cf7/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
new file mode 100644
index 0000000..5708f0c
--- /dev/null
+++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kite;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.DatasetReader;
+import org.kitesdk.data.DatasetRepositories;
+import org.kitesdk.data.DatasetRepository;
+import org.kitesdk.data.PartitionStrategy;
+
+public class TestDatasetSink {
+
+  public static final String FILE_REPO_URI = "repo:file:target/test-repo";
+  public static final String DATASET_NAME = "test";
+  public static final DatasetRepository REPO = DatasetRepositories
+      .open(FILE_REPO_URI);
+  public static final File SCHEMA_FILE = new File("target/record-schema.avsc");
+  public static final Schema RECORD_SCHEMA = new Schema.Parser().parse(
+      "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
+          "{\"name\":\"id\",\"type\":\"string\"}," +
+          "{\"name\":\"msg\",\"type\":[\"string\",\"null\"]," +
+              "\"default\":\"default\"}]}");
+  public static final Schema COMPATIBLE_SCHEMA = new Schema.Parser().parse(
+      "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
+          "{\"name\":\"id\",\"type\":\"string\"}]}");
+  public static final Schema INCOMPATIBLE_SCHEMA = new Schema.Parser().parse(
+      "{\"type\":\"record\",\"name\":\"user\",\"fields\":[" +
+          "{\"name\":\"username\",\"type\":\"string\"}]}");
+  public static final DatasetDescriptor DESCRIPTOR = new DatasetDescriptor
+      .Builder()
+      .schema(RECORD_SCHEMA)
+      .build();
+
+  Context config = null;
+  Channel in = null;
+  List<GenericData.Record> expected = null;
+  private static final String DFS_DIR = "target/test/dfs";
+  private static final String TEST_BUILD_DATA_KEY = "test.build.data";
+  private static String oldTestBuildDataProp = null;
+
+  @BeforeClass
+  public static void saveSchema() throws IOException {
+    oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY);
+    System.setProperty(TEST_BUILD_DATA_KEY, DFS_DIR);
+    FileWriter schema = new FileWriter(SCHEMA_FILE);
+    schema.append(RECORD_SCHEMA.toString());
+    schema.close();
+  }
+
+  @AfterClass
+  public static void tearDownClass() {
+    FileUtils.deleteQuietly(new File(DFS_DIR));
+    if (oldTestBuildDataProp != null) {
+      System.setProperty(TEST_BUILD_DATA_KEY, oldTestBuildDataProp);
+    }
+  }
+
+  @Before
+  public void setup() throws EventDeliveryException {
+    REPO.create(DATASET_NAME, DESCRIPTOR);
+
+    this.config = new Context();
+    config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, FILE_REPO_URI);
+    config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, DATASET_NAME);
+
+    this.in = new MemoryChannel();
+    Configurables.configure(in, config);
+
+    GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA);
+    expected = Lists.newArrayList(
+        builder.set("id", "1").set("msg", "msg1").build(),
+        builder.set("id", "2").set("msg", "msg2").build(),
+        builder.set("id", "3").set("msg", "msg3").build());
+
+    putToChannel(in, Iterables.transform(expected,
+        new Function<GenericData.Record, Event>() {
+          private int i = 0;
+
+          @Override
+          public Event apply(@Nullable GenericData.Record rec) {
+            this.i += 1;
+            boolean useURI = (i % 2) == 0;
+            return event(rec, RECORD_SCHEMA, SCHEMA_FILE, useURI);
+          }
+        }));
+  }
+
+  @After
+  public void teardown() {
+    REPO.delete(DATASET_NAME);
+  }
+
+  @Test
+  public void testFileStore() throws EventDeliveryException {
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
+  @Test
+  public void testPartitionedData() throws EventDeliveryException {
+    REPO.create("partitioned", new DatasetDescriptor.Builder(DESCRIPTOR)
+        .partitionStrategy(new PartitionStrategy.Builder()
+            .identity("id", String.class, 10) // partition by id
+            .build())
+        .build());
+
+    try {
+      config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, "partitioned");
+      DatasetSink sink = sink(in, config);
+
+      // run the sink
+      sink.start();
+      sink.process();
+      sink.stop();
+
+      Assert.assertEquals(
+          Sets.newHashSet(expected),
+          read(REPO.<GenericData.Record>load("partitioned")));
+      Assert.assertEquals("Should have committed", 0, remaining(in));
+    } finally {
+      if (REPO.exists("partitioned")) {
+        REPO.delete("partitioned");
+      }
+    }
+  }
+
+  @Test
+  public void testMiniClusterStore()
+      throws EventDeliveryException, IOException {
+    // setup a minicluster
+    MiniDFSCluster cluster = new MiniDFSCluster
+        .Builder(new Configuration())
+        .build();
+    DatasetRepository hdfsRepo = null;
+    try {
+      FileSystem dfs = cluster.getFileSystem();
+      Configuration conf = dfs.getConf();
+      String repoURI = "repo:" + conf.get("fs.defaultFS") + "/tmp/repo";
+
+      // create a repository and dataset in HDFS
+      hdfsRepo = DatasetRepositories.open(repoURI);
+      hdfsRepo.create(DATASET_NAME, DESCRIPTOR);
+
+      // update the config to use the HDFS repository
+      config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, repoURI);
+
+      DatasetSink sink = sink(in, config);
+
+      // run the sink
+      sink.start();
+      sink.process();
+      sink.stop();
+
+      Assert.assertEquals(
+          Sets.newHashSet(expected),
+          read(hdfsRepo.<GenericData.Record>load(DATASET_NAME)));
+      Assert.assertEquals("Should have committed", 0, remaining(in));
+
+    } finally {
+      if (hdfsRepo != null && hdfsRepo.exists(DATASET_NAME)) {
+        hdfsRepo.delete(DATASET_NAME);
+      }
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testBatchSize() throws EventDeliveryException {
+    DatasetSink sink = sink(in, config);
+
+    // release one record per process call
+    config.put("kite.batchSize", "2");
+    Configurables.configure(sink, config);
+
+    sink.start();
+    sink.process(); // process the first and second
+    sink.roll(); // roll at the next process call
+    sink.process(); // roll and process the third
+    Assert.assertEquals(
+        Sets.newHashSet(expected.subList(0, 2)),
+        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+    sink.roll(); // roll at the next process call
+    sink.process(); // roll, the channel is empty
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+    sink.stop();
+  }
+
+  @Test
+  public void testTimedFileRolling()
+      throws EventDeliveryException, InterruptedException {
+    // use a new roll interval
+    config.put("kite.rollInterval", "1"); // in seconds
+
+    DatasetSink sink = sink(in, config);
+
+    Dataset<GenericData.Record> records = REPO.load(DATASET_NAME);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    Assert.assertEquals(Sets.<GenericData.Record>newHashSet(), read(records));
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+
+    Thread.sleep(1100); // sleep longer than the roll interval
+    sink.process(); // rolling happens in the process method
+
+    Assert.assertEquals(Sets.newHashSet(expected), read(records));
+
+    // wait until the end to stop because it would close the files
+    sink.stop();
+  }
+
+  @Test
+  public void testCompatibleSchemas() throws EventDeliveryException {
+    DatasetSink sink = sink(in, config);
+
+    // add a compatible record that is missing the msg field
+    GenericRecordBuilder compatBuilder = new GenericRecordBuilder(
+        COMPATIBLE_SCHEMA);
+    GenericData.Record compatibleRecord = compatBuilder.set("id", "0").build();
+
+    // add the record to the incoming channel
+    putToChannel(in, event(compatibleRecord, COMPATIBLE_SCHEMA, null, false));
+
+    // the record will be read using the real schema, so create the expected
+    // record using it, but without any data
+
+    GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA);
+    GenericData.Record expectedRecord = builder.set("id", "0").build();
+    expected.add(expectedRecord);
+
+    // run the sink
+    sink.start();
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
+  @Test
+  public void testIncompatibleSchemas() throws EventDeliveryException {
+    final DatasetSink sink = sink(in, config);
+
+    GenericRecordBuilder builder = new GenericRecordBuilder(
+        INCOMPATIBLE_SCHEMA);
+    GenericData.Record rec = builder.set("username", "koala").build();
+    putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, null, false));
+
+    // run the sink
+    sink.start();
+    assertThrows("Should fail", EventDeliveryException.class,
+        new Callable() {
+          @Override
+          public Object call() throws EventDeliveryException {
+            sink.process();
+            return null;
+          }
+        });
+    sink.stop();
+
+    Assert.assertEquals("Should have rolled back",
+        expected.size() + 1, remaining(in));
+  }
+
+  @Test
+  public void testMissingSchema() throws EventDeliveryException {
+    final DatasetSink sink = sink(in, config);
+
+    Event badEvent = new SimpleEvent();
+    badEvent.setHeaders(Maps.<String, String>newHashMap());
+    badEvent.setBody(serialize(expected.get(0), RECORD_SCHEMA));
+    putToChannel(in, badEvent);
+
+    // run the sink
+    sink.start();
+    assertThrows("Should fail", EventDeliveryException.class,
+        new Callable() {
+          @Override
+          public Object call() throws EventDeliveryException {
+            sink.process();
+            return null;
+          }
+        });
+    sink.stop();
+
+    Assert.assertEquals("Should have rolled back",
+        expected.size() + 1, remaining(in));
+  }
+
+  public static DatasetSink sink(Channel in, Context config) {
+    DatasetSink sink = new DatasetSink();
+    sink.setChannel(in);
+    Configurables.configure(sink, config);
+    return sink;
+  }
+
+  public static <T> HashSet<T> read(Dataset<T> dataset) {
+    DatasetReader<T> reader = dataset.newReader();
+    try {
+      reader.open();
+      return Sets.newHashSet(reader.iterator());
+    } finally {
+      reader.close();
+    }
+  }
+
+  public static int remaining(Channel ch) throws EventDeliveryException {
+    Transaction t = ch.getTransaction();
+    try {
+      t.begin();
+      int count = 0;
+      while (ch.take() != null) {
+        count += 1;
+      }
+      t.commit();
+      return count;
+    } catch (Throwable th) {
+      t.rollback();
+      Throwables.propagateIfInstanceOf(th, Error.class);
+      Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
+      throw new EventDeliveryException(th);
+    } finally {
+      t.close();
+    }
+  }
+
+  public static void putToChannel(Channel in, Event... records)
+      throws EventDeliveryException {
+    putToChannel(in, Arrays.asList(records));
+  }
+
+  public static void putToChannel(Channel in, Iterable<Event> records)
+      throws EventDeliveryException {
+    Transaction t = in.getTransaction();
+    try {
+      t.begin();
+      for (Event record : records) {
+        in.put(record);
+      }
+      t.commit();
+    } catch (Throwable th) {
+      t.rollback();
+      Throwables.propagateIfInstanceOf(th, Error.class);
+      Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
+      throw new EventDeliveryException(th);
+    } finally {
+      t.close();
+    }
+  }
+
+  public static Event event(
+      Object datum, Schema schema, File file, boolean useURI) {
+    Map<String, String> headers = Maps.newHashMap();
+    if (useURI) {
+      headers.put(DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER,
+          file.getAbsoluteFile().toURI().toString());
+    } else {
+      headers.put(DatasetSinkConstants.AVRO_SCHEMA_LITERAL_HEADER,
+          schema.toString());
+    }
+    Event e = new SimpleEvent();
+    e.setBody(serialize(datum, schema));
+    e.setHeaders(headers);
+    return e;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static byte[] serialize(Object datum, Schema schema) {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+    ReflectDatumWriter writer = new ReflectDatumWriter(schema);
+    try {
+      writer.write(datum, encoder);
+      encoder.flush();
+    } catch (IOException ex) {
+      Throwables.propagate(ex);
+    }
+    return out.toByteArray();
+  }
+
+  /**
+   * A convenience method to avoid a large number of @Test(expected=...) tests.
+   *
+   * This variant uses a Callable, which is allowed to throw checked Exceptions.
+   *
+   * @param message A String message to describe this assertion
+   * @param expected An Exception class that the Runnable should throw
+   * @param callable A Callable that is expected to throw the exception
+   */
+  public static void assertThrows(
+      String message, Class<? extends Exception> expected, Callable callable) {
+    try {
+      callable.call();
+      Assert.fail("No exception was thrown (" + message + "), expected: " +
+          expected.getName());
+    } catch (Exception actual) {
+      Assert.assertEquals(message, expected, actual.getClass());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/68ba5cf7/flume-ng-sinks/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml
index d03576b..6ac2b4d 100644
--- a/flume-ng-sinks/pom.xml
+++ b/flume-ng-sinks/pom.xml
@@ -47,4 +47,33 @@ limitations under the License.
     <module>flume-ng-elasticsearch-sink</module>
     <module>flume-ng-morphline-solr-sink</module>
   </modules>
+
+  <profiles>
+
+    <profile>
+      <id>hadoop-1.0</id>
+      <activation>
+        <property>
+          <name>!hadoop.profile</name>
+        </property>
+      </activation>
+    </profile>
+
+    <profile>
+      <id>hadoop-2</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>2</value>
+        </property>
+      </activation>
+      <!-- add the flume-dataset-sink, which is only compatible with hadoop-2
+           -->
+      <modules>
+        <module>flume-dataset-sink</module>
+      </modules>
+    </profile>
+
+  </profiles>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/68ba5cf7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 453be95..d71239a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,8 @@ limitations under the License.
 
     <avro.version>1.7.3</avro.version>
     <elasticsearch.version>0.90.1</elasticsearch.version>
+
+    <hadoop2.version>2.1.0-beta</hadoop2.version>
   </properties>
 
   <modules>
@@ -106,7 +108,7 @@ limitations under the License.
         </property>
       </activation>
       <properties>
-        <hadoop.version>2.1.0-beta</hadoop.version>
+        <hadoop.version>${hadoop2.version}</hadoop.version>
         <hbase.version>0.94.2</hbase.version>
         <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id>
         <thrift.version>0.8.0</thrift.version>
@@ -143,6 +145,13 @@ limitations under the License.
             <artifactId>hadoop-auth</artifactId>
             <version>${hadoop.version}</version>
           </dependency>
+
+          <!-- only compatible with hadoop-2 -->
+          <dependency>
+            <groupId>org.apache.flume.flume-ng-sinks</groupId>
+            <artifactId>flume-dataset-sink</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+          </dependency>
         </dependencies>
       </dependencyManagement>
     </profile>
@@ -538,7 +547,7 @@ limitations under the License.
               </goals>
               <configuration>
                 <excludes>
-                  <exclude>.idea/</exclude>
+                  <exclude>**/.idea/</exclude>
                   <exclude>**/*.iml</exclude>
                   <exclude>**/nb-configuration.xml</exclude>
                   <exclude>.git/</exclude>
@@ -1087,6 +1096,12 @@ limitations under the License.
         <version>3.0.3</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.kitesdk</groupId>
+        <artifactId>kite-data-core</artifactId>
+        <version>0.10.1</version>
+      </dependency>
+
     </dependencies>
   </dependencyManagement>
 


Mime
View raw message