flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2190. Add a source capable of feeding off of the Twitter Streaming API
Date Fri, 27 Sep 2013 20:24:40 GMT
Updated Branches:
  refs/heads/flume-1.5 30d9d56c1 -> 2cecd8f74


FLUME-2190. Add a source capable of feeding off of the Twitter Streaming API

(Roman Shaposhnik via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2cecd8f7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2cecd8f7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2cecd8f7

Branch: refs/heads/flume-1.5
Commit: 2cecd8f7479a4c510b2cae68738563ec982c92de
Parents: 30d9d56
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri Sep 27 13:21:48 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Sep 27 13:24:22 2013 -0700

----------------------------------------------------------------------
 flume-ng-dist/pom.xml                           |   4 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  41 +++
 flume-ng-sources/flume-twitter-source/pom.xml   |  61 ++++
 .../flume/source/twitter/TwitterSource.java     | 333 +++++++++++++++++++
 .../flume/source/twitter/TestTwitterSource.java | 112 +++++++
 .../src/test/resources/log4j.properties         |  33 ++
 .../src/test/resources/twitter-flume.conf       |  92 +++++
 flume-ng-sources/pom.xml                        |   1 +
 pom.xml                                         |  23 ++
 9 files changed, 700 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2cecd8f7/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index 83332a9..2d0ee47 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -121,6 +121,10 @@
       <artifactId>flume-jms-source</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.flume.flume-ng-sources</groupId>
+      <artifactId>flume-twitter-source</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.flume.flume-ng-legacy-sources</groupId>
       <artifactId>flume-avro-source</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/2cecd8f7/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index dac3ce7..007436b 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -977,6 +977,47 @@ Example for an agent named agent-1:
   agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
   agent-1.sources.src-1.fileHeader = true
 
+Twitter 1% firehose Source (experimental)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. warning::
+  This source is hightly experimental and may change between minor versions of Flume.
+  Use at your own risk.
+
+Experimental source that connects via Streaming API to the 1% sample twitter
+firehose, continously downloads tweets, converts them to Avro format and
+sends Avro events to a downstream Flume sink. Requires the consumer and 
+access tokens and secrets of a Twitter developer account.
+Required properties are in **bold**.
+
+==================     ===========  ===================================================
+Property Name          Default      Description
+==================     ===========  ===================================================
+**channels**           --
+**type**               --           The component type name, needs to be ``org.apache.flume.source.twitter.TwitterSource``
+**consumerKey**        --           OAuth consumer key
+**consumerSecret**     --           OAuth consumer secret
+**accessToken**        --           OAuth access token
+**accessTokenSecret**  --           OAuth toekn secret 
+maxBatchSize           1000         Maximum number of twitter messages to put in a single
batch
+maxBatchDurationMillis 1000         Maximum number of milliseconds to wait before closing
a batch
+==================     ===========  ===================================================
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+  a1.sources = r1
+  a1.channels = c1
+  a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
+  a1.sources.r1.channels = c1
+  a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
+  a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
+  a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
+  a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
+  a1.sources.r1.maxBatchSize = 10
+  a1.sources.r1.maxBatchDurationMillis = 200
+
 Event Deserializers
 '''''''''''''''''''
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2cecd8f7/flume-ng-sources/flume-twitter-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-twitter-source/pom.xml b/flume-ng-sources/flume-twitter-source/pom.xml
new file mode 100644
index 0000000..a5a27cf
--- /dev/null
+++ b/flume-ng-sources/flume-twitter-source/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-ng-sources</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.5.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-sources</groupId>
+  <artifactId>flume-twitter-source</artifactId>
+  <name>Flume Twitter Source</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.twitter4j</groupId>
+      <artifactId>twitter4j-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.twitter4j</groupId>
+      <artifactId>twitter4j-media-support</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.twitter4j</groupId>
+      <artifactId>twitter4j-stream</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/2cecd8f7/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
new file mode 100644
index 0000000..27b2c3f
--- /dev/null
+++ b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source.twitter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import twitter4j.MediaEntity;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+import twitter4j.User;
+import twitter4j.auth.AccessToken;
+
+/**
+ * Demo Flume source that connects via Streaming API to the 1% sample twitter
+ * firehose, continously downloads tweets, converts them to Avro format and
+ * sends Avro events to a downstream Flume sink.
+ *
+ * Requires the consumer and access tokens and secrets of a Twitter developer
+ * account
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TwitterSource
+    extends AbstractSource
+    implements EventDrivenSource, Configurable, StatusListener {
+
+  private TwitterStream twitterStream;
+  private Schema avroSchema;
+
+  private long docCount = 0;
+  private long startTime = 0;
+  private long exceptionCount = 0;
+  private long totalTextIndexed = 0;
+  private long skippedDocs = 0;
+  private long batchEndTime = 0;
+  private final List<Record> docs = new ArrayList<Record>();
+  private final ByteArrayOutputStream serializationBuffer =
+      new ByteArrayOutputStream();
+  private DataFileWriter<GenericRecord> dataFileWriter;
+
+  private int maxBatchSize = 1000;
+  private int maxBatchDurationMillis = 1000;
+
+  // Fri May 14 02:52:55 +0000 2010
+  private SimpleDateFormat formatterTo =
+      new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+  private DecimalFormat numFormatter = new DecimalFormat("###,###.###");
+
+  private static int REPORT_INTERVAL = 100;
+  private static int STATS_INTERVAL = REPORT_INTERVAL * 10;
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TwitterSource.class);
+
+  public TwitterSource() {
+  }
+
+  @Override
+  public void configure(Context context) {
+    String consumerKey = context.getString("consumerKey");
+    String consumerSecret = context.getString("consumerSecret");
+    String accessToken = context.getString("accessToken");
+    String accessTokenSecret = context.getString("accessTokenSecret");
+
+    LOGGER.info("Consumer Key:        '" + consumerKey + "'");
+    LOGGER.info("Consumer Secret:     '" + consumerSecret + "'");
+    LOGGER.info("Access Token:        '" + accessToken + "'");
+    LOGGER.info("Access Token Secret: '" + accessTokenSecret + "'");
+
+    twitterStream = new TwitterStreamFactory().getInstance();
+    twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
+    twitterStream.setOAuthAccessToken(new AccessToken(accessToken,
+                                                      accessTokenSecret));
+    twitterStream.addListener(this);
+    avroSchema = createAvroSchema();
+    dataFileWriter = new DataFileWriter<GenericRecord>(
+        new GenericDatumWriter<GenericRecord>(avroSchema));
+
+    maxBatchSize = context.getInteger("maxBatchSize", maxBatchSize);
+    maxBatchDurationMillis = context.getInteger("maxBatchDurationMillis",
+                                                maxBatchDurationMillis);
+  }
+
+  @Override
+  public synchronized void start() {
+    LOGGER.info("Starting twitter source {} ...", this);
+    docCount = 0;
+    startTime = System.currentTimeMillis();
+    exceptionCount = 0;
+    totalTextIndexed = 0;
+    skippedDocs = 0;
+    batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
+    twitterStream.sample();
+    LOGGER.info("Twitter source {} started.", getName());
+    // This should happen at the end of the start method, since this will 
+    // change the lifecycle status of the component to tell the Flume 
+    // framework that this component has started. Doing this any earlier 
+    // tells the framework that the component started successfully, even 
+    // if the method actually fails later.
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    LOGGER.info("Twitter source {} stopping...", getName());
+    twitterStream.shutdown();
+    super.stop();
+    LOGGER.info("Twitter source {} stopped.", getName());
+  }
+
+  public void onStatus(Status status)  {
+    Record doc = extractRecord("", avroSchema, status);
+    if (doc == null) {
+      return; // skip
+    }
+    docs.add(doc);
+    if (docs.size() >= maxBatchSize ||
+        System.currentTimeMillis() >= batchEndTime) {
+      batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
+      byte[] bytes;
+      try {
+        bytes = serializeToAvro(avroSchema, docs);
+      } catch (IOException e) {
+        LOGGER.error("Exception while serializing tweet", e);
+        return; //skip
+      }
+      Event event = EventBuilder.withBody(bytes);
+      getChannelProcessor().processEvent(event); // send event to the flume sink
+      docs.clear();
+    }
+    docCount++;
+    if ((docCount % REPORT_INTERVAL) == 0) {
+      LOGGER.info(String.format("Processed %s docs",
+                                numFormatter.format(docCount)));
+    }
+    if ((docCount % STATS_INTERVAL) == 0) {
+      logStats();
+    }
+  }
+
+  private Schema createAvroSchema() {
+    Schema avroSchema = Schema.createRecord("Doc", "adoc", null, false);
+    List<Field> fields = new ArrayList<Field>();
+    fields.add(new Field("id", Schema.create(Type.STRING), null, null));
+    fields.add(new Field("user_friends_count",
+                         createOptional(Schema.create(Type.INT)),
+                         null, null));
+    fields.add(new Field("user_location",
+                         createOptional(Schema.create(Type.STRING)),
+                         null, null));
+    fields.add(new Field("user_description",
+                         createOptional(Schema.create(Type.STRING)),
+                         null, null));
+    fields.add(new Field("user_statuses_count",
+                         createOptional(Schema.create(Type.INT)),
+                         null, null));
+    fields.add(new Field("user_followers_count",
+                         createOptional(Schema.create(Type.INT)),
+                         null, null));
+    fields.add(new Field("user_name",
+                         createOptional(Schema.create(Type.STRING)),
+                         null, null));
+    fields.add(new Field("user_screen_name",
+                         createOptional(Schema.create(Type.STRING)),
+                         null, null));
+    fields.add(new Field("created_at",
+                         createOptional(Schema.create(Type.STRING)),
+                         null, null));
+    fields.add(new Field("text",
+                         createOptional(Schema.create(Type.STRING)),
+                         null, null));
+    fields.add(new Field("retweet_count",
+                         createOptional(Schema.create(Type.LONG)),
+                         null, null));
+    fields.add(new Field("retweeted",
+                         createOptional(Schema.create(Type.BOOLEAN)),
+                         null, null));
+    fields.add(new Field("in_reply_to_user_id",
+                         createOptional(Schema.create(Type.LONG)),
+                         null, null));
+    fields.add(new Field("source",
+                         createOptional(Schema.create(Type.STRING)),
+                         null, null));
+    fields.add(new Field("in_reply_to_status_id",
+                         createOptional(Schema.create(Type.LONG)),
+                         null, null));
+    fields.add(new Field("media_url_https",
+                         createOptional(Schema.create(Type.STRING)),
+                         null, null));
+    fields.add(new Field("expanded_url",
+                         createOptional(Schema.create(Type.STRING)),
+                         null, null));
+    avroSchema.setFields(fields);
+    return avroSchema;
+  }
+
+  private Record extractRecord(String idPrefix, Schema avroSchema, Status status) {
+    User user = status.getUser();
+    Record doc = new Record(avroSchema);
+
+    doc.put("id", idPrefix + status.getId());
+    doc.put("created_at", formatterTo.format(status.getCreatedAt()));
+    doc.put("retweet_count", status.getRetweetCount());
+    doc.put("retweeted", status.isRetweet());
+    doc.put("in_reply_to_user_id", status.getInReplyToUserId());
+    doc.put("in_reply_to_status_id", status.getInReplyToStatusId());
+
+    addString(doc, "source", status.getSource());
+    addString(doc, "text", status.getText());
+
+    MediaEntity[] mediaEntities = status.getMediaEntities();
+    if (mediaEntities.length > 0) {
+      addString(doc, "media_url_https", mediaEntities[0].getMediaURLHttps());
+      addString(doc, "expanded_url", mediaEntities[0].getExpandedURL());
+    }
+
+    doc.put("user_friends_count", user.getFriendsCount());
+    doc.put("user_statuses_count", user.getStatusesCount());
+    doc.put("user_followers_count", user.getFollowersCount());
+    addString(doc, "user_location", user.getLocation());
+    addString(doc, "user_description", user.getDescription());
+    addString(doc, "user_screen_name", user.getScreenName());
+    addString(doc, "user_name", user.getName());
+    return doc;
+  }
+
+  private byte[] serializeToAvro(Schema avroSchema, List<Record> docList)
+      throws IOException {
+    serializationBuffer.reset();
+    dataFileWriter.create(avroSchema, serializationBuffer);
+    for (Record doc2 : docList) {
+      dataFileWriter.append(doc2);
+    }
+    dataFileWriter.close();
+    return serializationBuffer.toByteArray();
+  }
+
+  private Schema createOptional(Schema schema) {
+    return Schema.createUnion(Arrays.asList(
+        new Schema[] { schema, Schema.create(Type.NULL) }));
+  }
+
+  private void addString(Record doc, String avroField, String val) {
+    if (val == null) {
+      return;
+    }
+    doc.put(avroField, val);
+    totalTextIndexed += val.length();
+  }
+
+  private void logStats() {
+    double mbIndexed = totalTextIndexed / (1024 * 1024.0);
+    long seconds = (System.currentTimeMillis() - startTime) / 1000;
+    seconds = Math.max(seconds, 1);
+    LOGGER.info(String.format("Total docs indexed: %s, total skipped docs: %s",
+                numFormatter.format(docCount), numFormatter.format(skippedDocs)));
+    LOGGER.info(String.format("    %s docs/second",
+                numFormatter.format(docCount / seconds)));
+    LOGGER.info(String.format("Run took %s seconds and processed:",
+                numFormatter.format(seconds)));
+    LOGGER.info(String.format("    %s MB/sec sent to index",
+                numFormatter.format(((float) totalTextIndexed / (1024 * 1024)) / seconds)));
+    LOGGER.info(String.format("    %s MB text sent to index",
+                numFormatter.format(mbIndexed)));
+    LOGGER.info(String.format("There were %s exceptions ignored: ",
+                numFormatter.format(exceptionCount)));
+  }
+
+  public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+    // Do nothing...
+  }
+
+  public void onScrubGeo(long userId, long upToStatusId) {
+    // Do nothing...
+  }
+
+  public void onStallWarning(StallWarning warning) {
+    // Do nothing...
+  }
+
+  public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
+    // Do nothing...
+  }
+
+  public void onException(Exception e) {
+    LOGGER.error("Exception while streaming tweets", e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2cecd8f7/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
b/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
new file mode 100644
index 0000000..f6cc2c9
--- /dev/null
+++ b/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source.twitter;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.sink.DefaultSinkProcessor;
+import org.apache.flume.sink.LoggerSink;
+import org.apache.flume.source.twitter.TwitterSource;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestTwitterSource extends Assert {
+
+  @BeforeClass
+  public static void setUp() {
+    try {
+      Assume.assumeNotNull(InetAddress.getByName("stream.twitter.com"));
+    } catch (UnknownHostException e) {
+      Assume.assumeTrue(false); // ignore Test if twitter is unreachable
+    }
+  }
+  
+  @Test
+  public void testBasic() throws Exception {
+    String consumerKey = System.getProperty("twitter.consumerKey");
+    Assume.assumeNotNull(consumerKey);
+
+    String consumerSecret = System.getProperty("twitter.consumerSecret");
+    Assume.assumeNotNull(consumerSecret);
+
+    String accessToken = System.getProperty("twitter.accessToken");
+    Assume.assumeNotNull(accessToken);
+
+    String accessTokenSecret = System.getProperty("twitter.accessTokenSecret");
+    Assume.assumeNotNull(accessTokenSecret);
+
+    Context context = new Context();
+    context.put("consumerKey", consumerKey);
+    context.put("consumerSecret", consumerSecret);
+    context.put("accessToken", accessToken);
+    context.put("accessTokenSecret", accessTokenSecret);
+    context.put("maxBatchDurationMillis", "1000");
+
+    TwitterSource source = new TwitterSource();
+    source.configure(context);
+
+    Map<String, String> channelContext = new HashMap();
+    channelContext.put("capacity", "1000000");
+    channelContext.put("keep-alive", "0"); // for faster tests
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context(channelContext));
+
+    Sink sink = new LoggerSink();
+    sink.setChannel(channel);
+    sink.start();
+    DefaultSinkProcessor proc = new DefaultSinkProcessor();
+    proc.setSinks(Collections.singletonList(sink));
+    SinkRunner sinkRunner = new SinkRunner(proc);
+    sinkRunner.start();
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(Collections.singletonList(channel));
+    ChannelProcessor chp = new ChannelProcessor(rcs);
+    source.setChannelProcessor(chp);
+    source.start();
+
+    Thread.sleep(5000);
+    source.stop();
+    sinkRunner.stop();
+    sink.stop();
+  }
+
+  @Test
+  public void testCarrotDateFormatBug() throws Exception {
+    SimpleDateFormat formatterFrom = new SimpleDateFormat("EEE MMM dd HH:mm:ss Z yyyy");
+    formatterFrom.parse("Fri Oct 26 22:53:55 +0000 2012");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2cecd8f7/flume-ng-sources/flume-twitter-source/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-twitter-source/src/test/resources/log4j.properties b/flume-ng-sources/flume-twitter-source/src/test/resources/log4j.properties
new file mode 100644
index 0000000..7755024
--- /dev/null
+++ b/flume-ng-sources/flume-twitter-source/src/test/resources/log4j.properties
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+log4j.rootLogger=WARN, A1
+
+log4j.logger.org.apache.flume.sink=INFO
+#log4j.logger.org.apache.flume.sink.solr=DEBUG
+log4j.logger.org.apache.solr=INFO
+#log4j.logger.org.apache.solr.hadoop=DEBUG
+log4j.logger.org.apache.solr.morphline=DEBUG
+log4j.logger.org.apache.solr.update.processor.LogUpdateProcessor=WARN
+log4j.logger.org.apache.solr.core.SolrCore=WARN
+log4j.logger.org.apache.solr.search.SolrIndexSearcher=ERROR
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flume/blob/2cecd8f7/flume-ng-sources/flume-twitter-source/src/test/resources/twitter-flume.conf
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-twitter-source/src/test/resources/twitter-flume.conf b/flume-ng-sources/flume-twitter-source/src/test/resources/twitter-flume.conf
new file mode 100644
index 0000000..72fe4ef
--- /dev/null
+++ b/flume-ng-sources/flume-twitter-source/src/test/resources/twitter-flume.conf
@@ -0,0 +1,92 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# The configuration file needs to define the sources, 
+# the channels and the sinks.
+# Sources, channels and sinks are defined per agent, 
+# in this case called 'agent'
+
+agent.sources = twitterSrc
+#agent.sources = httpSrc
+#agent.sources = spoolSrc
+#agent.sources = avroSrc
+agent.channels = memoryChannel
+agent.sinks = solrSink
+#agent.sinks = loggerSink
+
+agent.sources.twitterSrc.type = org.apache.flume.source.twitter.TwitterSource
+agent.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY
+agent.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
+agent.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN
+agent.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
+agent.sources.twitterSrc.maxBatchDurationMillis = 200
+agent.sources.twitterSrc.channels = memoryChannel
+
+agent.sources.httpSrc.type = org.apache.flume.source.http.HTTPSource
+agent.sources.httpSrc.port = 5140
+agent.sources.httpSrc.handler = org.apache.flume.sink.solr.morphline.BlobHandler
+agent.sources.httpSrc.handler.maxBlobLength = 2000000000
+#agent.sources.httpSrc.interceptors = uuidinterceptor
+#agent.sources.httpSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
+#agent.sources.httpSrc.interceptors.uuidinterceptor.headerName = id
+##agent.sources.httpSrc.interceptors.uuidinterceptor.preserveExisting = false
+##agent.sources.httpSrc.interceptors.uuidinterceptor.prefix = myhostname
+agent.sources.httpSrc.channels = memoryChannel
+
+agent.sources.spoolSrc.type = spooldir
+agent.sources.spoolSrc.spoolDir = /tmp/myspooldir
+agent.sources.spoolSrc.ignorePattern = \.
+agent.sources.spoolSrc.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
+agent.sources.spoolSrc.deserializer.maxBlobLength = 2000000000
+agent.sources.spoolSrc.batchSize = 1
+agent.sources.spoolSrc.fileHeader = true
+agent.sources.spoolSrc.fileHeaderKey = resourceName
+#agent.sources.spoolSrc.interceptors = uuidinterceptor
+#agent.sources.spoolSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
+#agent.sources.spoolSrc.interceptors.uuidinterceptor.headerName = id
+##agent.sources.spoolSrc.interceptors.uuidinterceptor.preserveExisting = false
+##agent.sources.spoolSrc.interceptors.uuidinterceptor.prefix = myhostname
+agent.sources.spoolSrc.channels = memoryChannel
+
+agent.sources.avroSrc.type = avro
+agent.sources.avroSrc.bind = 127.0.0.1
+agent.sources.avroSrc.port = 10000
+agent.sources.avroSrc.channels = memoryChannel
+agent.sources.avroSrc.interceptors = uuidinterceptor
+agent.sources.avroSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
+agent.sources.avroSrc.interceptors.uuidinterceptor.headerName = id
+#agent.sources.avroSrc.interceptors.uuidinterceptor.preserveExisting = false
+#agent.sources.avroSrc.interceptors.uuidinterceptor.prefix = myhostname
+#agent.sources.avroSrc.interceptors = morphlineinterceptor
+#agent.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
+#agent.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
+#agent.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
+
+agent.channels.memoryChannel.type = memory
+agent.channels.memoryChannel.capacity = 10000
+agent.channels.memoryChannel.transactionCapacity = 1000
+
+#agent.channels.fileChannel.type = file
+#agent.channels.fileChannel.capacity = 1000000
+#agent.channels.fileChannel.transactionCapacity = 1000
+#agent.channels.fileChannel.write-timeout = 1
+
+agent.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
+agent.sinks.solrSink.channel = memoryChannel
+#agent.sinks.solrSink.batchSize = 1000
+#agent.sinks.solrSink.batchDurationMillis = 1000
+agent.sinks.solrSink.morphlineFile = /etc/flume-ng/conf/morphline.conf
+#agent.sinks.solrSink.morphlineId = morphline1
+
+#agent.sinks.loggerSink.type = logger
+#agent.sinks.loggerSink.channel = memoryChannel

http://git-wip-us.apache.org/repos/asf/flume/blob/2cecd8f7/flume-ng-sources/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/pom.xml b/flume-ng-sources/pom.xml
index 6006fa1..0b57d6d 100644
--- a/flume-ng-sources/pom.xml
+++ b/flume-ng-sources/pom.xml
@@ -43,6 +43,7 @@ limitations under the License.
   <modules>
     <module>flume-scribe-source</module>
     <module>flume-jms-source</module>
+    <module>flume-twitter-source</module>
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/2cecd8f7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8b36402..f0fd22e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -968,6 +968,12 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.flume.flume-ng-sources</groupId>
+        <artifactId>flume-twitter-source</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.flume.flume-ng-legacy-sources</groupId>
         <artifactId>flume-thrift-source</artifactId>
         <version>1.5.0-SNAPSHOT</version>
@@ -1058,6 +1064,23 @@ limitations under the License.
         <version>4.3.0</version>
       </dependency>
 
+      <!-- Dependencies of the Twitter source -->
+      <dependency>
+        <groupId>org.twitter4j</groupId>
+        <artifactId>twitter4j-core</artifactId>
+        <version>3.0.3</version>
+      </dependency>
+      <dependency>
+        <groupId>org.twitter4j</groupId>
+        <artifactId>twitter4j-media-support</artifactId>
+        <version>3.0.3</version>
+      </dependency>
+      <dependency>
+        <groupId>org.twitter4j</groupId>
+        <artifactId>twitter4j-stream</artifactId>
+        <version>3.0.3</version>
+      </dependency>
+
     </dependencies>
   </dependencyManagement>
 


Mime
View raw message