flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject flume git commit: FLUME-2999. Kafka channel and sink should enable statically assigned partition per event via header
Date Mon, 10 Oct 2016 18:17:32 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 7d5ceacac -> 585c4c92e


FLUME-2999. Kafka channel and sink should enable statically assigned partition per event via header

This feature is useful for anyone who needs greater control of which
partitions are being written to - normally in a situation where multiple
Flume agents are being deployed in order to horizontally scale, or
alternatively if there is a scenario where there is a skew in data that
might lead to one or more partitions hotspotting.

We also have the ability to specify custom partitions on to the Kafka
Producer itself using the kafka.* configuration properties. The Kafka
Producer provides the ability to set the partition ID using a particular
ProducerRecord constructor, this is just a matter of providing the
option to use this constructor.

Reviewers: Attila Simon, Mike Percy

(Tristan Stevens via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/585c4c92
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/585c4c92
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/585c4c92

Branch: refs/heads/trunk
Commit: 585c4c92ed65c529871765ab64a5b3637fbdb07d
Parents: 7d5ceac
Author: Tristan Stevens <tristan@cloudera.com>
Authored: Mon Oct 10 19:32:24 2016 +0200
Committer: Mike Percy <mpercy@apache.org>
Committed: Mon Oct 10 20:15:25 2016 +0200

----------------------------------------------------------------------
 flume-ng-channels/flume-kafka-channel/pom.xml   |   5 +
 .../flume/channel/kafka/KafkaChannel.java       |  31 ++-
 .../kafka/KafkaChannelConfiguration.java        |   3 +
 .../flume/channel/kafka/TestKafkaChannel.java   | 149 ++++++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  16 ++
 flume-ng-sinks/flume-ng-kafka-sink/pom.xml      |   6 +
 .../org/apache/flume/sink/kafka/KafkaSink.java  |  38 +++-
 .../flume/sink/kafka/KafkaSinkConstants.java    |   3 +
 .../apache/flume/sink/kafka/TestKafkaSink.java  | 222 +++++++++++++++++++
 flume-ng-sources/flume-kafka-source/pom.xml     |   5 +
 flume-shared/flume-shared-kafka-test/pom.xml    |  85 +++++++
 .../kafka/test/KafkaPartitionTestUtil.java      | 213 ++++++++++++++++++
 .../shared/kafka/test/PartitionOption.java      |  25 +++
 .../kafka/test/PartitionTestScenario.java       |  26 +++
 flume-shared/pom.xml                            |  61 +++++
 pom.xml                                         |   9 +-
 16 files changed, 886 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-ng-channels/flume-kafka-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml
index c1cc844..a3c6de2 100644
--- a/flume-ng-channels/flume-kafka-channel/pom.xml
+++ b/flume-ng-channels/flume-kafka-channel/pom.xml
@@ -38,6 +38,11 @@ limitations under the License.
       <artifactId>flume-ng-sdk</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.flume.flume-shared</groupId>
+      <artifactId>flume-shared-kafka-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
       <version>${kafka.version}</version>

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 66b553a..47c0634 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -98,6 +98,8 @@ public class KafkaChannel extends BasicChannelSemantics {
   private String zookeeperConnect = null;
   private String topicStr = DEFAULT_TOPIC;
   private String groupId = DEFAULT_GROUP_ID;
+  private String partitionHeader = null;
+  private Integer staticPartitionId;
   private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
 
   //used to indicate if a rebalance has occurred during the current transaction
@@ -191,6 +193,9 @@ public class KafkaChannel extends BasicChannelSemantics {
     parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
     pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT);
 
+    staticPartitionId = ctx.getInteger(STATIC_PARTITION_CONF);
+    partitionHeader = ctx.getString(PARTITION_HEADER_NAME);
+
     migrateZookeeperOffsets = ctx.getBoolean(MIGRATE_ZOOKEEPER_OFFSETS,
       DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS);
     zookeeperConnect = ctx.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
@@ -421,10 +426,30 @@ public class KafkaChannel extends BasicChannelSemantics {
         producerRecords = Optional.of(new LinkedList<ProducerRecord<String, byte[]>>());
       }
       String key = event.getHeaders().get(KEY_HEADER);
+
+      Integer partitionId = null;
       try {
-        producerRecords.get().add(
-            new ProducerRecord<String, byte[]>(topic.get(), key,
-                                               serializeValue(event, parseAsFlumeEvent)));
+        if (staticPartitionId != null) {
+          partitionId = staticPartitionId;
+        }
+        //Allow a specified header to override a static ID
+        if (partitionHeader != null) {
+          String headerVal = event.getHeaders().get(partitionHeader);
+          if (headerVal != null) {
+            partitionId = Integer.parseInt(headerVal);
+          }
+        }
+        if (partitionId != null) {
+          producerRecords.get().add(
+              new ProducerRecord<String, byte[]>(topic.get(), partitionId, key,
+                                                 serializeValue(event, parseAsFlumeEvent)));
+        } else {
+          producerRecords.get().add(
+              new ProducerRecord<String, byte[]>(topic.get(), key,
+                                                 serializeValue(event, parseAsFlumeEvent)));
+        }
+      } catch (NumberFormatException e) {
+        throw new ChannelException("Non integer partition id specified", e);
       } catch (Exception e) {
         throw new ChannelException("Error while serializing event", e);
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
index 3ab807b..ad5a15b 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -49,6 +49,9 @@ public class KafkaChannelConfiguration {
   public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
   public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true;
 
+  public static final String PARTITION_HEADER_NAME = "partitionIdHeader";
+  public static final String STATIC_PARTITION_CONF = "defaultPartitionId";
+
   public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
   public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index 57c0b28..276fee1 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -28,6 +28,9 @@ import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.shared.kafka.test.KafkaPartitionTestUtil;
+import org.apache.flume.shared.kafka.test.PartitionOption;
+import org.apache.flume.shared.kafka.test.PartitionTestScenario;
 import org.apache.flume.sink.kafka.util.TestUtil;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -69,6 +72,8 @@ public class TestKafkaChannel {
   private String topic = null;
   private final Set<String> usedTopics = new HashSet<String>();
 
+  private static final int DEFAULT_TOPIC_PARTITIONS = 5;
+
   @BeforeClass
   public static void setupClass() throws Exception {
     testUtil.prepare();
@@ -79,7 +84,7 @@ public class TestKafkaChannel {
   public void setup() throws Exception {
     topic = findUnusedTopic();
     try {
-      createTopic(topic, 5);
+      createTopic(topic, DEFAULT_TOPIC_PARTITIONS);
     } catch (Exception e) {
     }
     Thread.sleep(2500);
@@ -248,6 +253,41 @@ public class TestKafkaChannel {
     doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both");
   }
 
+  @Test
+  public void testPartitionHeaderSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.PARTITION_ID_HEADER_ONLY);
+  }
+
+  @Test
+  public void testPartitionHeaderNotSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.NO_PARTITION_HEADERS);
+  }
+
+  @Test
+  public void testStaticPartitionAndHeaderSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID);
+  }
+
+  @Test
+  public void testStaticPartitionHeaderNotSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.STATIC_HEADER_ONLY);
+  }
+
+  @Test
+  public void testPartitionHeaderMissing() throws Exception {
+    doPartitionErrors(PartitionOption.NOTSET);
+  }
+
+  @Test(expected = org.apache.flume.ChannelException.class)
+  public void testPartitionHeaderOutOfRange() throws Exception {
+    doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE);
+  }
+
+  @Test(expected = org.apache.flume.ChannelException.class)
+  public void testPartitionHeaderInvalid() throws Exception {
+    doPartitionErrors(PartitionOption.NOTANUMBER);
+  }
+
   public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
                                             String group) throws Exception {
     // create a topic with 1 partition for simplicity
@@ -328,6 +368,112 @@ public class TestKafkaChannel {
     }
   }
 
+  /**
+   * This function tests three scenarios:
+   * 1. PartitionOption.VALIDBUTOUTOFRANGE: An integer partition is provided,
+   *    however it exceeds the number of partitions available on the topic.
+   *    Expected behaviour: ChannelException thrown.
+   *
+   * 2. PartitionOption.NOTSET: The partition header is not actually set.
+   *    Expected behaviour: Exception is not thrown because the code avoids an NPE.
+   *
+   * 3. PartitionOption.NOTANUMBER: The partition header is set, but is not an Integer.
+   *    Expected behaviour: ChannelExeption thrown.
+   *
+   * @param option
+   * @throws Exception
+   */
+  private void doPartitionErrors(PartitionOption option) throws Exception {
+    Context context = prepareDefaultContext(false);
+    context.put(PARTITION_HEADER_NAME, KafkaPartitionTestUtil.PARTITION_HEADER);
+    String tempTopic = findUnusedTopic();
+    createTopic(tempTopic, 5);
+    final KafkaChannel channel = createChannel(context);
+    channel.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    Map<String, String> headers = new HashMap<String, String>();
+    switch (option) {
+      case VALIDBUTOUTOFRANGE:
+        headers.put(KafkaPartitionTestUtil.PARTITION_HEADER,
+            String.valueOf(DEFAULT_TOPIC_PARTITIONS + 2));
+        break;
+      case NOTSET:
+        headers.put("wrong-header", "2");
+        break;
+      case NOTANUMBER:
+        headers.put(KafkaPartitionTestUtil.PARTITION_HEADER, "not-a-number");
+        break;
+      default:
+        break;
+    }
+
+    Event event = EventBuilder.withBody(String.valueOf(9).getBytes(), headers);
+
+    channel.put(event);
+
+    tx.commit();
+
+    deleteTopic(tempTopic);
+  }
+
+  /**
+   * This method tests both the default behavior (usePartitionHeader=false)
+   * and the behaviour when the partitionId setting is used.
+   * Under the default behaviour, one would expect an even distribution of
+   * messages to partitions, however when partitionId is used we manually create
+   * a large skew to some partitions and then verify that this actually happened
+   * by reading messages directly using a Kafka Consumer.
+   *
+   * @param usePartitionHeader
+   * @param staticPtn
+   * @throws Exception
+   */
+  private void doPartitionHeader(PartitionTestScenario scenario) throws Exception {
+    final int numPtns = DEFAULT_TOPIC_PARTITIONS;
+    final int numMsgs = numPtns * 10;
+    final Integer staticPtn = DEFAULT_TOPIC_PARTITIONS - 2 ;
+    Context context = prepareDefaultContext(false);
+    if (scenario == PartitionTestScenario.PARTITION_ID_HEADER_ONLY ||
+        scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID) {
+      context.put(PARTITION_HEADER_NAME, "partition-header");
+    }
+    if (scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID ||
+        scenario == PartitionTestScenario.STATIC_HEADER_ONLY) {
+      context.put(STATIC_PARTITION_CONF, staticPtn.toString());
+    }
+    final KafkaChannel channel = createChannel(context);
+    channel.start();
+
+    // Create a map of PartitionId:List<Messages> according to the desired distribution
+    // Initialise with empty ArrayLists
+    Map<Integer, List<Event>> partitionMap = new HashMap<Integer, List<Event>>(numPtns);
+    for (int i = 0; i < numPtns; i++) {
+      partitionMap.put(i, new ArrayList<Event>());
+    }
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    List<Event> orderedEvents = KafkaPartitionTestUtil.generateSkewedMessageList(scenario, numMsgs,
+                                                                 partitionMap, numPtns, staticPtn);
+
+    for (Event event : orderedEvents) {
+      channel.put(event);
+    }
+
+    tx.commit();
+
+    Map<Integer, List<byte[]>> resultsMap = KafkaPartitionTestUtil.retrieveRecordsFromPartitions(
+                                                       topic, numPtns, channel.getConsumerProps());
+
+    KafkaPartitionTestUtil.checkResultsAgainstSkew(scenario, partitionMap, resultsMap, staticPtn,
+                                                   numMsgs);
+
+    channel.stop();
+  }
+
   private Event takeEventWithoutCommittingTxn(KafkaChannel channel) {
     for (int i = 0; i < 5; i++) {
       Transaction txn = channel.getTransaction();
@@ -713,4 +859,5 @@ public class TestKafkaChannel {
         ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
     AdminUtils.deleteTopic(zkUtils, topicName);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 1cf4100..25db777 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2583,6 +2583,14 @@ useFlumeEventFormat              false                By default events are put
                                                       true to store events as the Flume Avro binary format. Used in conjunction with the same property
                                                       on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve
                                                       any Flume headers for the producing side.
+defaultPartitionId               --                   Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless
+                                                      overriden by ``partitionIdHeader``. By default, if this property is not set, events will be
+                                                      distributed by the Kafka Producer's partitioner - including by ``key`` if specified (or by a
+                                                      partitioner specified by ``kafka.partitioner.class``).
+partitionIdHeader                --                   When set, the sink will take the value of the field named using the value of this property
+                                                      from the event header and send the message to the specified partition of the topic. If the
+                                                      value represents an invalid partition, an EventDeliveryException will be thrown. If the header value
+                                                      is present then this setting overrides ``defaultPartitionId``.
 Other Kafka Producer Properties  --                   These properties are used to configure the Kafka Producer. Any producer property supported
                                                       by Kafka can be used. The only requirement is to prepend the property name with the prefix
                                                       ``kafka.producer``.
@@ -2786,6 +2794,14 @@ migrateZookeeperOffsets                  true                        When no Kaf
                                                                      configuration defines how offsets are handled.
 pollTimeout                              500                         The amount of time(in milliseconds) to wait in the "poll()" call of the conumer.
                                                                      https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
+defaultPartitionId                       --                          Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless
+                                                                     overriden by ``partitionIdHeader``. By default, if this property is not set, events will be
+                                                                     distributed by the Kafka Producer's partitioner - including by ``key`` if specified (or by a 
+                                                                     partitioner specified by ``kafka.partitioner.class``).
+partitionIdHeader                        --                          When set, the producer will take the value of the field named using the value of this property
+                                                                     from the event header and send the message to the specified partition of the topic. If the
+                                                                     value represents an invalid partition the event will not be accepted into the channel. If the header value
+                                                                     is present then this setting overrides ``defaultPartitionId``.
 kafka.consumer.auto.offset.reset         latest                      What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
                                                                      (e.g. because that data has been deleted):
                                                                      earliest: automatically reset the offset to the earliest offset

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
index 195c921..2ab4b4a 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -59,6 +59,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.flume.flume-shared</groupId>
+      <artifactId>flume-shared-kafka-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 89bdd84..dd40224 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -114,6 +114,8 @@ public class KafkaSink extends AbstractSink implements Configurable {
   private List<Future<RecordMetadata>> kafkaFutures;
   private KafkaSinkCounter counter;
   private boolean useAvroEventFormat;
+  private String partitionHeader = null;
+  private Integer staticPartitionId = null;
   private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
           Optional.absent();
   private Optional<SpecificDatumReader<AvroFlumeEvent>> reader =
@@ -187,13 +189,34 @@ public class KafkaSink extends AbstractSink implements Configurable {
         // create a message and add to buffer
         long startTime = System.currentTimeMillis();
 
+        Integer partitionId = null;
         try {
-          kafkaFutures.add(producer.send(
-              new ProducerRecord<String, byte[]>(eventTopic, eventKey,
-                                                 serializeEvent(event, useAvroEventFormat)),
-              new SinkCallback(startTime)));
-        } catch (IOException ex) {
-          throw new EventDeliveryException("Could not serialize event", ex);
+          ProducerRecord<String, byte[]> record;
+          if (staticPartitionId != null) {
+            partitionId = staticPartitionId;
+          }
+          //Allow a specified header to override a static ID
+          if (partitionHeader != null) {
+            String headerVal = event.getHeaders().get(partitionHeader);
+            if (headerVal != null) {
+              partitionId = Integer.parseInt(headerVal);
+            }
+          }
+          if (partitionId != null) {
+            record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey,
+                serializeEvent(event, useAvroEventFormat));
+          } else {
+            record = new ProducerRecord<String, byte[]>(eventTopic, eventKey,
+                serializeEvent(event, useAvroEventFormat));
+          }
+          kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
+        } catch (NumberFormatException ex) {
+          throw new EventDeliveryException("Non integer partition id specified", ex);
+        } catch (Exception ex) {
+          // N.B. The producer.send() method throws all sorts of RuntimeExceptions
+          // Catching Exception here to wrap them neatly in an EventDeliveryException
+          // which is what our consumers will expect
+          throw new EventDeliveryException("Could not send event", ex);
         }
       }
 
@@ -290,6 +313,9 @@ public class KafkaSink extends AbstractSink implements Configurable {
     useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT,
                                             KafkaSinkConstants.DEFAULT_AVRO_EVENT);
 
+    partitionHeader = context.getString(KafkaSinkConstants.PARTITION_HEADER_NAME);
+    staticPartitionId = context.getInteger(KafkaSinkConstants.STATIC_PARTITION_CONF);
+
     if (logger.isDebugEnabled()) {
       logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat);
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 1bf380c..7c819f5 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -38,6 +38,9 @@ public class KafkaSinkConstants {
   public static final String AVRO_EVENT = "useFlumeEventFormat";
   public static final boolean DEFAULT_AVRO_EVENT = false;
 
+  public static final String PARTITION_HEADER_NAME = "partitionIdHeader";
+  public static final String STATIC_PARTITION_CONF = "defaultPartitionId";
+
   public static final String DEFAULT_KEY_SERIALIZER =
       "org.apache.kafka.common.serialization.StringSerializer";
   public static final String DEFAULT_VALUE_SERIAIZER =

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 76eca37..7eccf76 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -19,11 +19,16 @@
 package org.apache.flume.sink.kafka;
 
 import com.google.common.base.Charsets;
+
+import kafka.admin.AdminUtils;
 import kafka.message.MessageAndMetadata;
+import kafka.utils.ZkUtils;
+
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -33,6 +38,9 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.shared.kafka.test.KafkaPartitionTestUtil;
+import org.apache.flume.shared.kafka.test.PartitionOption;
+import org.apache.flume.shared.kafka.test.PartitionTestScenario;
 import org.apache.flume.sink.kafka.util.TestUtil;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -46,9 +54,11 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.AVRO_EVENT;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
@@ -71,6 +81,7 @@ import static org.junit.Assert.fail;
 public class TestKafkaSink {
 
   private static TestUtil testUtil = TestUtil.getInstance();
+  private final Set<String> usedTopics = new HashSet<String>();
 
   @BeforeClass
   public static void setup() {
@@ -298,6 +309,186 @@ public class TestKafkaSink {
     assertNull(testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC));
   }
 
+  @Test
+  public void testPartitionHeaderSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.PARTITION_ID_HEADER_ONLY);
+  }
+
+  @Test
+  public void testPartitionHeaderNotSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.NO_PARTITION_HEADERS);
+  }
+
+  @Test
+  public void testStaticPartitionAndHeaderSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID);
+  }
+
+  @Test
+  public void testStaticPartitionHeaderNotSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.STATIC_HEADER_ONLY);
+  }
+
+  @Test
+  public void testPartitionHeaderMissing() throws Exception {
+    doPartitionErrors(PartitionOption.NOTSET);
+  }
+
+  @Test(expected = org.apache.flume.EventDeliveryException.class)
+  public void testPartitionHeaderOutOfRange() throws Exception {
+    doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE);
+  }
+
+  @Test(expected = org.apache.flume.EventDeliveryException.class)
+  public void testPartitionHeaderInvalid() throws Exception {
+    doPartitionErrors(PartitionOption.NOTANUMBER);
+  }
+
+  /**
+   * This function tests three scenarios:
+   * 1. PartitionOption.VALIDBUTOUTOFRANGE: An integer partition is provided,
+   *    however it exceeds the number of partitions available on the topic.
+   *    Expected behaviour: ChannelException thrown.
+   *
+   * 2. PartitionOption.NOTSET: The partition header is not actually set.
+   *    Expected behaviour: Exception is not thrown because the code avoids an NPE.
+   *
+   * 3. PartitionOption.NOTANUMBER: The partition header is set, but is not an Integer.
+   *    Expected behaviour: ChannelExeption thrown.
+   *
+   * @param option
+   * @throws Exception
+   */
+  private void doPartitionErrors(PartitionOption option) throws Exception {
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    context.put(KafkaSinkConstants.PARTITION_HEADER_NAME, "partition-header");
+
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String topic = findUnusedTopic();
+    createTopic(topic, 5);
+
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put("topic", topic);
+    switch (option) {
+      case VALIDBUTOUTOFRANGE:
+        headers.put("partition-header", "9");
+        break;
+      case NOTSET:
+        headers.put("wrong-header", "2");
+        break;
+      case NOTANUMBER:
+        headers.put("partition-header", "not-a-number");
+        break;
+      default:
+        break;
+    }
+
+    Event event = EventBuilder.withBody(String.valueOf(9).getBytes(), headers);
+
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    Sink.Status status = kafkaSink.process();
+    assertEquals(Sink.Status.READY, status);
+
+    deleteTopic(topic);
+
+  }
+
+  /**
+   * This method tests both the default behavior (usePartitionHeader=false)
+   * and the behaviour when the partitionId setting is used.
+   * Under the default behaviour, one would expect an even distribution of
+   * messages to partitions, however when partitionId is used we manually create
+   * a large skew to some partitions and then verify that this actually happened
+   * by reading messages directly using a Kafka Consumer.
+   *
+   * @param usePartitionHeader
+   * @param staticPtn
+   * @throws Exception
+   */
+  private void doPartitionHeader(PartitionTestScenario scenario) throws Exception {
+    final int numPtns = 5;
+    final int numMsgs = numPtns * 10;
+    final Integer staticPtn = 3;
+
+    String topic = findUnusedTopic();
+    createTopic(topic, numPtns);
+    Context context = prepareDefaultContext();
+    context.put(BATCH_SIZE, "100");
+
+    if (scenario == PartitionTestScenario.PARTITION_ID_HEADER_ONLY ||
+        scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID) {
+      context.put(KafkaSinkConstants.PARTITION_HEADER_NAME,
+                  KafkaPartitionTestUtil.PARTITION_HEADER);
+    }
+    if (scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID ||
+        scenario == PartitionTestScenario.STATIC_HEADER_ONLY) {
+      context.put(KafkaSinkConstants.STATIC_PARTITION_CONF, staticPtn.toString());
+    }
+    Sink kafkaSink = new KafkaSink();
+
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    //Create a map of PartitionId:List<Messages> according to the desired distribution
+    Map<Integer, List<Event>> partitionMap = new HashMap<Integer, List<Event>>(numPtns);
+    for (int i = 0; i < numPtns; i++) {
+      partitionMap.put(i, new ArrayList<Event>());
+    }
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+
+    List<Event> orderedEvents = KafkaPartitionTestUtil.generateSkewedMessageList(scenario, numMsgs,
+                                                                 partitionMap, numPtns, staticPtn);
+
+    for (Event event : orderedEvents) {
+      event.getHeaders().put("topic", topic);
+      memoryChannel.put(event);
+    }
+
+    tx.commit();
+    tx.close();
+
+    Sink.Status status = kafkaSink.process();
+    assertEquals(Sink.Status.READY, status);
+
+    Properties props = new Properties();
+    props.put("bootstrap.servers", testUtil.getKafkaServerUrl());
+    props.put("group.id", "group_1");
+    props.put("enable.auto.commit", "true");
+    props.put("auto.commit.interval.ms", "1000");
+    props.put("session.timeout.ms", "30000");
+    props.put("key.deserializer",
+        "org.apache.kafka.common.serialization.StringDeserializer");
+    props.put("value.deserializer",
+        "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    props.put("auto.offset.reset", "earliest");
+    Map<Integer, List<byte[]>> resultsMap =
+        KafkaPartitionTestUtil.retrieveRecordsFromPartitions(topic, numPtns, props);
+
+    KafkaPartitionTestUtil.checkResultsAgainstSkew(scenario, partitionMap, resultsMap, staticPtn,
+                                                   numMsgs);
+
+    memoryChannel.stop();
+    kafkaSink.stop();
+    deleteTopic(topic);
+
+  }
+
   private Context prepareDefaultContext() {
     // Prepares a default context with Kafka Server Properties
     Context context = new Context();
@@ -325,4 +516,35 @@ public class TestKafkaSink {
     return kafkaSink.process();
   }
 
+  public static void createTopic(String topicName, int numPartitions) {
+    int sessionTimeoutMs = 10000;
+    int connectionTimeoutMs = 10000;
+    ZkUtils zkUtils =
+        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
+    int replicationFactor = 1;
+    Properties topicConfig = new Properties();
+    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
+  }
+
+  public static void deleteTopic(String topicName) {
+    int sessionTimeoutMs = 10000;
+    int connectionTimeoutMs = 10000;
+    ZkUtils zkUtils =
+        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
+    AdminUtils.deleteTopic(zkUtils, topicName);
+  }
+
+  public String findUnusedTopic() {
+    String newTopic = null;
+    boolean topicFound = false;
+    while (!topicFound) {
+      newTopic = RandomStringUtils.randomAlphabetic(8);
+      if (!usedTopics.contains(newTopic)) {
+        usedTopics.add(newTopic);
+        topicFound = true;
+      }
+    }
+    return newTopic;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-ng-sources/flume-kafka-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/pom.xml b/flume-ng-sources/flume-kafka-source/pom.xml
index c89ea1a..80bb7f4 100644
--- a/flume-ng-sources/flume-kafka-source/pom.xml
+++ b/flume-ng-sources/flume-kafka-source/pom.xml
@@ -49,6 +49,11 @@
       <artifactId>flume-ng-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.flume.flume-shared</groupId>
+      <artifactId>flume-shared-kafka-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
       <version>${kafka.version}</version>

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-shared/flume-shared-kafka-test/pom.xml
----------------------------------------------------------------------
diff --git a/flume-shared/flume-shared-kafka-test/pom.xml b/flume-shared/flume-shared-kafka-test/pom.xml
new file mode 100644
index 0000000..00260a9
--- /dev/null
+++ b/flume-shared/flume-shared-kafka-test/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>flume-shared</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.7.0-SNAPSHOT</version>
+  </parent>
+  <groupId>org.apache.flume.flume-shared</groupId>
+  <artifactId>flume-shared-kafka-test</artifactId>
+  <name>Flume Shared Kafka Test Utils</name>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/KafkaPartitionTestUtil.java
----------------------------------------------------------------------
diff --git a/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/KafkaPartitionTestUtil.java b/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/KafkaPartitionTestUtil.java
new file mode 100644
index 0000000..cb1e258
--- /dev/null
+++ b/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/KafkaPartitionTestUtil.java
@@ -0,0 +1,213 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.shared.kafka.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Assert;
+
+public class KafkaPartitionTestUtil {
+
+  public static final String PARTITION_HEADER = "partition-header";
+
+  /**
+   * This method checks the retrieved messages (passed as resultsMap) against the expected
+   * results (passed as partitionMap). The behaviour of this method is slightly different
+   * depending on the scenario:
+   *  - STATIC_HEADER_ONLY: Don't check partitionMap, just check that all messages are in the
+   *                        passed staticPtn partition.
+   *  - NO_PARTITION_HEADERS: Check that messages are evenly distributed between partitions
+   *                          (requires numMsgs to be a multiple of the number of partitons)
+   *  - else: Check that the contents of each partition list in resultsMap is the same as that
+   *          specified in partitionMap.
+   *
+   *  As this is a testing method, it will issue JUnit AssertionExceptions if the results are not
+   *  as expected.
+   *
+   * @param scenario
+   * @param partitionMap
+   * @param resultsMap
+   * @param staticPtn
+   * @param numMsgs
+   */
+  public static void checkResultsAgainstSkew(PartitionTestScenario scenario,
+      Map<Integer,List<Event>> partitionMap, Map<Integer, List<byte[]>> resultsMap,
+                                 int staticPtn, int numMsgs) {
+    int numPtns = partitionMap.size();
+
+    if (scenario == PartitionTestScenario.NO_PARTITION_HEADERS && numMsgs % numPtns != 0) {
+      throw new IllegalArgumentException("This method is not designed to work with scenarios" +
+                " where there is expected to be a non-even distribution of messages");
+    }
+
+    for (int ptn = 0; ptn < numPtns; ptn++) {
+      List<Event> expectedResults = partitionMap.get(ptn);
+      List<byte[]> actualResults = resultsMap.get(ptn);
+      if (scenario == PartitionTestScenario.PARTITION_ID_HEADER_ONLY ||
+          scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID) {
+        // In these two scenarios we're checking against partitionMap
+        Assert.assertEquals(expectedResults.size(), actualResults.size());
+        //Go and check the message payload is what we wanted it to be
+        for (int idx = 0; idx < expectedResults.size(); idx++) {
+          Assert.assertArrayEquals(expectedResults.get(idx).getBody(), actualResults.get(idx));
+        }
+      } else if (scenario == PartitionTestScenario.STATIC_HEADER_ONLY) {
+        // Check that if we are looking in the statically assigned partition
+        // all messages are in it, else all other partitions are zero
+        if (ptn == staticPtn) {
+          Assert.assertEquals(numMsgs, actualResults.size());
+        } else {
+          Assert.assertEquals(0, actualResults.size());
+        }
+      } else if (scenario == PartitionTestScenario.NO_PARTITION_HEADERS) {
+        // Checking for an even distribution
+        Assert.assertEquals(numMsgs / numPtns, actualResults.size());
+      }
+    }
+  }
+
+  /**
+   * This method is can be used to create a list of events for use in Kafka partition tests.
+   * Depending on the scenario it will deliberate generate an artificially skewed distribution
+   * of events-per-partition (populated into the passed partitionMap) and then ordered randomly
+   * into the resulting List of events.
+   * Four scenarios are catered for:
+   *  - STATIC_HEADER_ONLY: All events are put into the partition specified by the staticPtn param
+   *  - PARTITION_ID_HEADER_ONLY: Events are skewed into three partitions
+   *  - STATIC_HEADER_AND_PARTITION_ID: Events are skewed into two partitions, with all others
+   *                                    going into the partition specified by staticPtn
+   *  - NO_PARTITION_HEADERS: No partition header is set and the partitionMap is not populated
+   *
+   * @param scenario The scenario being catered for.
+   * @param numMsgs The number of messages to generate
+   * @param partitionMap A map of Integer (partitionId) and List of Events - to be populated
+   * @param numPtns The number of partitions to be populated.
+   * @param staticPtn The static partition to be assigned to.
+   * @return
+   */
+  public static List<Event> generateSkewedMessageList(PartitionTestScenario scenario, int numMsgs,
+      Map<Integer, List<Event>> partitionMap, int numPtns, int staticPtn) {
+    List<Event> msgs = new ArrayList<Event>(numMsgs);
+
+    // Pre-conditions
+    if (numMsgs < 0) {
+      throw new IllegalArgumentException("Number of messages must be greater than zero");
+    }
+    if (staticPtn >= numPtns) {
+      throw new IllegalArgumentException("The static partition must be less than the " +
+                                         "number of partitions");
+    }
+    if (numPtns < 5) {
+      throw new IllegalArgumentException("This method is designed to work with at least 5 " +
+                                         "partitions");
+    }
+    if (partitionMap.size() != numPtns) {
+      throw new IllegalArgumentException("partitionMap has not been correctly initialised");
+    }
+
+    for (int i = 0; i < numMsgs; i++) {
+      Map<String, String> headers = new HashMap<String, String>();
+      Integer partition = null;
+      // Simple code to artificially create a skew. In this scenario, with 5 partitions
+      // and 50 messages we end up with a ratio of 0:0:27:13:10 however these will be
+      // interleaved
+
+      if (scenario == PartitionTestScenario.NO_PARTITION_HEADERS) {
+        // Don't bother adding a header or computing a partition
+      } else if (scenario == PartitionTestScenario.STATIC_HEADER_ONLY) {
+        partition = staticPtn;
+      } else {
+        // We're going to artificially create a skew by putting every 5th event
+        // into partition 4, every 3rd event into partition 3 and everything else into
+        // partition 2 (unless a static partition is provided, in which case we'll
+        // put it into that partition instead, but without setting the header).
+        if (i % 5 == 0) {
+          partition = 4;
+          headers.put(PARTITION_HEADER, String.valueOf(partition));
+        } else if (i % 3 == 0 ) {
+          partition = 3;
+          headers.put(PARTITION_HEADER, String.valueOf(partition));
+        } else if (scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID) {
+          // In this case we're not going to set the header, but we are going
+          // to set partition which will then populate the partitionMap
+          partition = staticPtn;
+        } else if (scenario == PartitionTestScenario.PARTITION_ID_HEADER_ONLY) {
+          partition = 2;
+          headers.put(PARTITION_HEADER, String.valueOf(partition));
+        } // Logically no other scenarios
+      }
+
+      // Build the event
+      Event event = EventBuilder.withBody(String.valueOf(i).getBytes(), headers);
+
+      if (scenario != PartitionTestScenario.NO_PARTITION_HEADERS) {
+        // Save into partitionMap
+        partitionMap.get(partition).add(event);
+      }
+
+      // Add to ordered list
+      msgs.add(event);
+
+    }
+    return msgs;
+  }
+
+  /**
+   * Return a map containing one List of records per partition.
+   * This internally creates a Kafka Consumer using the provided consumer properties.
+   *
+   * @param numPtns
+   * @param consumerProperties
+   * @return A Map of Partitions(Integer) and the resulting List of messages (byte[]) retrieved
+   */
+  public static Map<Integer, List<byte[]>> retrieveRecordsFromPartitions(String topic, int numPtns,
+                                                                   Properties consumerProperties) {
+
+    Map<Integer, List<byte[]>> resultsMap = new HashMap<Integer, List<byte[]>>();
+    for (int i = 0; i < numPtns; i++) {
+      List<byte[]> partitionResults = new ArrayList<byte[]>();
+      resultsMap.put(i, partitionResults);
+      KafkaConsumer<String, byte[]> consumer =
+          new KafkaConsumer<String, byte[]>(consumerProperties);
+
+      TopicPartition partition = new TopicPartition(topic, i);
+
+      consumer.assign(Arrays.asList(partition));
+
+      ConsumerRecords<String, byte[]> records = consumer.poll(1000);
+      for (ConsumerRecord<String, byte[]> record : records) {
+        partitionResults.add(record.value());
+      }
+      consumer.close();
+    }
+    return resultsMap;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/PartitionOption.java
----------------------------------------------------------------------
diff --git a/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/PartitionOption.java b/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/PartitionOption.java
new file mode 100644
index 0000000..97bfe33
--- /dev/null
+++ b/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/PartitionOption.java
@@ -0,0 +1,25 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.shared.kafka.test;
+
+public enum PartitionOption {
+  VALIDBUTOUTOFRANGE,
+  NOTSET,
+  NOTANUMBER
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/PartitionTestScenario.java
----------------------------------------------------------------------
diff --git a/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/PartitionTestScenario.java b/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/PartitionTestScenario.java
new file mode 100644
index 0000000..8cafd14
--- /dev/null
+++ b/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/PartitionTestScenario.java
@@ -0,0 +1,26 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.flume.shared.kafka.test;
+
+public enum PartitionTestScenario {
+  STATIC_HEADER_ONLY,
+  PARTITION_ID_HEADER_ONLY,
+  STATIC_HEADER_AND_PARTITION_ID,
+  NO_PARTITION_HEADERS
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/flume-shared/pom.xml
----------------------------------------------------------------------
diff --git a/flume-shared/pom.xml b/flume-shared/pom.xml
new file mode 100644
index 0000000..c6ec4fd
--- /dev/null
+++ b/flume-shared/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-parent</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.7.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume</groupId>
+  <artifactId>flume-shared</artifactId>
+  <name>Flume Shared Utils</name>
+  <packaging>pom</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <modules>
+    <module>flume-shared-kafka-test</module>
+  </modules>
+
+  <profiles>
+
+    <profile>
+      <id>hadoop-1.0</id>
+      <activation>
+        <property>
+          <name>flume.hadoop.profile</name>
+          <value>1</value>
+        </property>
+      </activation>
+    </profile>
+
+  </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/585c4c92/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2332a29..ded7b7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@ limitations under the License.
     <module>flume-ng-tests</module>
     <module>flume-tools</module>
     <module>flume-ng-auth</module>
+    <module>flume-shared</module>
   </modules>
 
   <profiles>
@@ -825,7 +826,6 @@ limitations under the License.
         <groupId>junit</groupId>
         <artifactId>junit</artifactId>
         <version>4.10</version>
-        <scope>test</scope>
       </dependency>
 
       <dependency>
@@ -1144,6 +1144,13 @@ limitations under the License.
      </dependency>
 
       <dependency>
+        <groupId>org.apache.flume.flume-shared</groupId>
+        <artifactId>flume-shared-kafka-test</artifactId>
+        <version>1.7.0-SNAPSHOT</version>
+        <scope>test</scope>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.flume.flume-ng-sinks</groupId>
         <artifactId>flume-hdfs-sink</artifactId>
         <version>1.7.0-SNAPSHOT</version>


Mime
View raw message