streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [03/11] incubator-streams git commit: /streams/webhooks/* confirmed working
Date Thu, 12 Feb 2015 16:18:12 GMT
/streams/webhooks/* confirmed working


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6840547b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6840547b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6840547b

Branch: refs/heads/master
Commit: 6840547bf7cd49e4e395557eca8f509a7fd8d391
Parents: 0896cd5
Author: sblackmon <sblackmon@apache.org>
Authored: Thu Nov 20 15:28:31 2014 -0600
Committer: sblackmon <sblackmon@w2odigital.com>
Committed: Fri Feb 6 18:06:25 2015 -0600

----------------------------------------------------------------------
 .../apache/streams/datasift/DatasiftPush.json   |  35 ---
 streams-core/pom.xml                            |   5 +
 .../apache/streams/core/StreamsResource.java    |  17 ++
 .../streams-runtime-dropwizard/pom.xml          |   5 -
 .../dropwizard/GenericWebhookResource.java      | 217 ++++++++++++++++++
 .../dropwizard/StreamDropwizardBuilder.java     |  39 ++++
 .../streams/dropwizard/StreamsApplication.java  |  84 +++++--
 .../dropwizard/StreamsDropwizardModule.java     |  31 +--
 .../streams/dropwizard/WebhookResource.java     | 222 -------------------
 .../streams/dropwizard/GenericWebhookData.json  |  35 +++
 10 files changed, 382 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6840547b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
deleted file mode 100644
index 39565ad..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
+++ /dev/null
@@ -1,35 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType": "org.apache.streams.datasift.DatasiftPush",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "id": {
-            "type": "string"
-        },
-        "hash": {
-            "type": "string"
-        },
-        "hash_type": {
-            "type": "string"
-        },
-        "count": {
-            "type": "long"
-        },
-        "delivered_at": {
-            "type": "string",
-            "format": "date-time"
-        },
-        "interactions": {
-            "type": "array",
-            "items": {
-                "type": "object",
-                "javaType": "org.apache.streams.datasift.Datasift"
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6840547b/streams-core/pom.xml
----------------------------------------------------------------------
diff --git a/streams-core/pom.xml b/streams-core/pom.xml
index 83c4f1a..fe1c142 100644
--- a/streams-core/pom.xml
+++ b/streams-core/pom.xml
@@ -39,6 +39,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>jsr311-api</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6840547b/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java b/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
new file mode 100644
index 0000000..4bd18e2
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
@@ -0,0 +1,17 @@
+package org.apache.streams.core;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+
+/**
+ * Created by sblackmon on 11/20/14.
+ */
+public interface StreamsResource {
+
+    public Response json(HttpHeaders headers, String body);
+
+    public Response json_new_line(HttpHeaders headers, String body);
+
+    public Response json_meta(HttpHeaders headers, String body);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6840547b/streams-runtimes/streams-runtime-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/pom.xml b/streams-runtimes/streams-runtime-dropwizard/pom.xml
index ab6e927..045fa86 100644
--- a/streams-runtimes/streams-runtime-dropwizard/pom.xml
+++ b/streams-runtimes/streams-runtime-dropwizard/pom.xml
@@ -138,11 +138,6 @@
             <artifactId>streams-runtime-local</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-persist-console</artifactId>
-            <version>${project.version}</version>
-        </dependency>
 
         <dependency>
             <groupId>ch.qos.logback</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6840547b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
new file mode 100644
index 0000000..56d57b0
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
@@ -0,0 +1,217 @@
+package org.apache.streams.dropwizard;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResource;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Resource;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+@Resource
+@Path("/streams/webhooks")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class GenericWebhookResource implements StreamsProvider, StreamsResource {
+
+    public GenericWebhookResource() {
+    }
+
+    private static final Logger log = LoggerFactory
+            .getLogger(GenericWebhookResource.class);
+
+    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
+
+    @POST
+    @Path("json")
+    public Response json(@Context HttpHeaders headers,
+                                  String body) {
+
+        ObjectNode response = mapper.createObjectNode();
+
+        StreamsDatum datum = new StreamsDatum(body);
+
+        lock.writeLock().lock();
+        ComponentUtils.offerUntilSuccess(datum, providerQueue);
+        lock.writeLock().unlock();
+
+        Boolean success = true;
+
+        response.put("success", success);
+
+        return Response.status(200).entity(response).build();
+
+    }
+
+    @POST
+    @Path("json_new_line")
+    public Response json_new_line(@Context HttpHeaders headers,
+                                           String body) {
+
+        ObjectNode response = mapper.createObjectNode();
+
+        if (body.equalsIgnoreCase("{}")) {
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+        }
+
+        try {
+
+            for( String item : Splitter.on(newLinePattern).split(body)) {
+                StreamsDatum datum = new StreamsDatum(item);
+
+                lock.writeLock().lock();
+                ComponentUtils.offerUntilSuccess(datum, providerQueue);
+                lock.writeLock().unlock();
+
+            }
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+
+        } catch (Exception e) {
+            log.warn(e.toString(), e);
+
+            Boolean success = false;
+
+            response.put("success", success);
+
+            return Response.status(500).entity(response).build();
+
+        }
+
+    }
+
+    @POST
+    @Path("json_meta")
+    public Response json_meta(@Context HttpHeaders headers,
+                                       String body) {
+
+        //log.debug(headers.toString(), headers);
+
+        //log.debug(body.toString(), body);
+
+        ObjectNode response = mapper.createObjectNode();
+
+        if (body.equalsIgnoreCase("{}")) {
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+        }
+
+        try {
+
+            GenericWebhookData objectWrapper = mapper.readValue(body, GenericWebhookData.class);
+
+            for( ObjectNode item : objectWrapper.getData()) {
+
+                String json = mapper.writeValueAsString(item);
+
+                StreamsDatum datum = new StreamsDatum(json);
+
+                lock.writeLock().lock();
+                ComponentUtils.offerUntilSuccess(datum, providerQueue);
+                lock.writeLock().unlock();
+            }
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+
+        } catch (Exception e) {
+            log.warn(e.toString(), e);
+        }
+
+        return Response.status(500).build();
+    }
+
+    public List<ObjectNode> getData(GenericWebhookData wrapper) {
+        return wrapper.getData();
+    }
+
+    @Override
+    public void startStream() {
+        return;
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+
+        StreamsResultSet current;
+
+        lock.writeLock().lock();
+        current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+        providerQueue.clear();
+        lock.writeLock().unlock();
+
+        return current;
+
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return true;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6840547b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
new file mode 100644
index 0000000..4292900
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
@@ -0,0 +1,39 @@
+package org.apache.streams.dropwizard;
+
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 11/20/14.
+ */
+public class StreamDropwizardBuilder extends LocalStreamBuilder implements StreamBuilder
{
+
+    public StreamDropwizardBuilder() {
+        super();
+    }
+
+    public StreamDropwizardBuilder(Map<String, Object> streamConfig) {
+        super(streamConfig);
+    }
+
+    public StreamDropwizardBuilder(int maxQueueCapacity) {
+        super(maxQueueCapacity);
+    }
+
+    public StreamDropwizardBuilder(int maxQueueCapacity, Map<String, Object> streamConfig)
{
+        super(maxQueueCapacity, streamConfig);
+    }
+
+    @Override
+    public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider) {
+        return super.newPerpetualStream(streamId, provider);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6840547b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
index 733b078..67d0446 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
@@ -6,7 +6,9 @@ import com.fasterxml.jackson.datatype.guava.GuavaModule;
 import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.hubspot.dropwizard.guice.GuiceBundle;
+import com.sun.jersey.api.core.ResourceConfig;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
@@ -14,22 +16,37 @@ import io.dropwizard.Application;
 import io.dropwizard.jackson.GuavaExtrasModule;
 import io.dropwizard.setup.Bootstrap;
 import io.dropwizard.setup.Environment;
+import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.console.ConsolePersistWriter;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.pojo.json.Activity;
+import org.joda.time.DateTime;
+import org.reflections.Reflections;
+import org.reflections.util.ConfigurationBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.inject.Inject;
+
+import javax.annotation.Resource;
+import javax.ws.rs.Path;
+
 public class StreamsApplication extends Application<StreamsDropwizardConfiguration>
{
 
     private static final Logger LOGGER = LoggerFactory
@@ -37,11 +54,11 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
 
     private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    private StreamBuilder builder;
+    protected StreamBuilder builder;
 
-    private WebhookResource webhook;
+    private static StreamsConfiguration streamsConfiguration;
 
-    private String broadcastURI;
+    private Set<StreamsProvider> resourceProviders = Sets.newConcurrentHashSet();
 
     private Executor executor = Executors.newSingleThreadExecutor();
 
@@ -70,33 +87,63 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
     @Override
     public void run(StreamsDropwizardConfiguration streamsDropwizardConfiguration, Environment
environment) throws Exception {
 
-        webhook = new WebhookResource();
-
         executor = Executors.newSingleThreadExecutor();
 
-        executor.execute(new StreamsDropwizardRunner());
+        for( Class<?> resourceProviderClass : environment.jersey().getResourceConfig().getRootResourceClasses()
) {
+            StreamsProvider provider = (StreamsProvider)resourceProviderClass.newInstance();
+            if( StreamsProvider.class.isInstance(provider))
+                resourceProviders.add(provider);
+        }
+
+        streamsConfiguration = mapper.convertValue(streamsDropwizardConfiguration, StreamsConfiguration.class);
+
+        builder = setup(streamsConfiguration, resourceProviders);
+
+        executor.execute(new StreamsDropwizardRunner(builder, streamsConfiguration));
 
         // wait for streams to start up
         Thread.sleep(10000);
 
-        //environment.jersey().register(webhook);
+        for (StreamsProvider resource : resourceProviders) {
+            environment.jersey().register(resource);
+            LOGGER.info("Added resource class: {}", resource);
+        }
+
+    }
+
+    public StreamBuilder setup(StreamsConfiguration streamsConfiguration, Set<StreamsProvider>
resourceProviders) {
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
+        if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI",
streamsConfiguration.getBroadcastURI());
+        StreamBuilder builder = new StreamDropwizardBuilder(1000, streamConfig);
 
+        List<String> providers = new ArrayList<>();
+        for( StreamsProvider provider: resourceProviders) {
+            String providerId = provider.getClass().getSimpleName();
+            builder.newPerpetualStream(providerId, provider);
+            providers.add(providerId);
+        }
+
+        return builder;
     }
 
     private class StreamsDropwizardRunner implements Runnable {
 
+        private StreamsConfiguration streamsConfiguration;
+
+        private StreamBuilder builder;
+
+        protected StreamsDropwizardRunner(StreamBuilder builder, StreamsConfiguration streamsConfiguration)
{
+            this.streamsConfiguration = streamsConfiguration;
+            this.builder = builder;
+        }
+
         @Override
         public void run() {
 
-            Map<String, Object> streamConfig = Maps.newHashMap();
-            streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
-            if(! Strings.isNullOrEmpty(broadcastURI) ) streamConfig.put("broadcastURI", broadcastURI);
-            builder = new LocalStreamBuilder(1000, streamConfig);
+            builder.start();
 
-            // prepare stream components
-            builder.newPerpetualStream("webhooks", webhook);
-
-            builder.addStreamsPersistWriter("console", new ConsolePersistWriter(), 1, "webhooks");
         }
     }
 
@@ -104,9 +151,8 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
     public static void main(String[] args) throws Exception
     {
 
-        StreamsApplication application = new StreamsApplication();
-        if( args.length == 1 ) application.broadcastURI = args[0];
-        application.run(args);
+        new StreamsApplication().run(args);
 
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6840547b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
index 4264dbb..f5cd020 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
@@ -3,6 +3,7 @@ package org.apache.streams.dropwizard;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
+import com.google.inject.Singleton;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
@@ -18,37 +19,13 @@ public class StreamsDropwizardModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        // anything you'd like to configure
+        requestStaticInjection(StreamsConfiguration.class);
     }
 
     @Provides
-    public StreamsConfiguration providesStreamsConfiguration(StreamsDropwizardConfiguration
configuration) {
+    @Singleton
+    public StreamsConfiguration providesStreamsConfiguration() {
         return StreamsConfigurator.detectConfiguration();
     }
 
-//    private StreamsDropwizardConfiguration reconfigure(StreamsDropwizardConfiguration streamsConfiguration)
{
-//
-//        // config from dropwizard
-//        Config configDropwizard = null;
-//        try {
-//            configDropwizard = ConfigFactory.parseString(mapper.writeValueAsString(streamsConfiguration));
-//        } catch (JsonProcessingException e) {
-//            e.printStackTrace();
-//            LOGGER.error("Invalid Configuration: " + streamsConfiguration);
-//        }
-//
-//        Config combinedConfig = configTypesafe.withFallback(configDropwizard);
-//        String combinedConfigJson = combinedConfig.root().render(ConfigRenderOptions.concise());
-//
-//        StreamsDropwizardConfiguration combinedDropwizardConfig = null;
-//        try {
-//            combinedDropwizardConfig = mapper.readValue(combinedConfigJson, StreamsDropwizardConfiguration.class);
-//        } catch (IOException e) {
-//            e.printStackTrace();
-//            LOGGER.error("Invalid Configuration after merge: " + streamsConfiguration);
-//        }
-//
-//        return  combinedDropwizardConfig;
-//
-//    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6840547b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
deleted file mode 100644
index 1f80c5c..0000000
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
+++ /dev/null
@@ -1,222 +0,0 @@
-package org.apache.streams.dropwizard;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Queues;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Resource;
-import javax.inject.Inject;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.math.BigInteger;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-@Resource
-@Path("/streams/webhooks")
-@Produces(MediaType.APPLICATION_JSON)
-@Consumes(MediaType.APPLICATION_JSON)
-public class WebhookResource implements StreamsProvider {
-
-    public WebhookResource() {
-    }
-
-    private static final Logger log = LoggerFactory
-            .getLogger(WebhookResource.class);
-
-    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
-
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
-
-    @POST
-    @Path("json")
-    public Response json(@Context HttpHeaders headers,
-                                  String body) {
-
-        ObjectNode response = mapper.createObjectNode();
-
-        StreamsDatum datum = new StreamsDatum(body);
-
-        lock.writeLock().lock();
-        ComponentUtils.offerUntilSuccess(datum, providerQueue);
-        lock.writeLock().unlock();
-
-        Boolean success = true;
-
-        response.put("success", success);
-
-        return Response.status(200).entity(response).build();
-
-    }
-
-    @POST
-    @Path("json_new_line")
-    public Response json_new_line(@Context HttpHeaders headers,
-                                           String body) {
-
-        ObjectNode response = mapper.createObjectNode();
-
-        if (body.equalsIgnoreCase("{}")) {
-
-            Boolean success = true;
-
-            response.put("success", success);
-
-            return Response.status(200).entity(response).build();
-        }
-
-        try {
-
-            for( String item : Splitter.on('\n').split(body)) {
-                StreamsDatum datum = new StreamsDatum(item);
-
-                lock.writeLock().lock();
-                ComponentUtils.offerUntilSuccess(datum, providerQueue);
-                lock.writeLock().unlock();
-
-            }
-
-            Boolean success = true;
-
-            response.put("success", success);
-
-            return Response.status(200).entity(response).build();
-
-        } catch (Exception e) {
-            log.warn(e.toString(), e);
-
-            Boolean success = false;
-
-            response.put("success", success);
-
-            return Response.status(500).entity(response).build();
-
-        }
-
-    }
-
-//    @POST
-//    @Path("json_meta")
-//    public Response json_meta(@Context HttpHeaders headers,
-//                                       String body) {
-//
-//        //log.debug(headers.toString(), headers);
-//
-//        //log.debug(body.toString(), body);
-//
-//        ObjectNode response = mapper.createObjectNode();
-//
-//        if (body.equalsIgnoreCase("{}")) {
-//
-//            Boolean success = true;
-//
-//            response.put("success", success);
-//
-//            return Response.status(200).entity(response).build();
-//        }
-//
-//        try {
-//
-//            ObjectNode objectWrapper = mapper.readValue(body, ObjectNode.class);
-//
-//            for( ObjectNode item : objectWrapper.getData()) {
-//
-//                String json = mapper.writeValueAsString(item);
-//
-//                StreamsDatum datum = new StreamsDatum(json, item.getInteraction().getId(),
item.getInteraction().getCreatedAt());
-//
-//                lock.writeLock().lock();
-//                ComponentUtils.offerUntilSuccess(datum, providerQueue);
-//                lock.writeLock().unlock();
-//            }
-//
-//            log.info("interactionQueue: " + providerQueue.size());
-//
-//            Boolean success = true;
-//
-//            response.put("success", success);
-//
-//            return Response.status(200).entity(response).build();
-//
-//        } catch (Exception e) {
-//            log.warn(e.toString(), e);
-//        }
-//
-//        return Response.status(500).build();
-//    }
-
-    @Override
-    public void startStream() {
-        return;
-    }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-
-        StreamsResultSet current;
-
-        lock.writeLock().lock();
-        current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
-        providerQueue.clear();
-        lock.writeLock().unlock();
-
-        return current;
-
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return false;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-    public void addDatum(StreamsDatum datum) {
-        try {
-            lock.readLock().lock();
-            ComponentUtils.offerUntilSuccess(datum, providerQueue);
-        } finally {
-            lock.readLock().unlock();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6840547b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
new file mode 100644
index 0000000..adcafe2
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
@@ -0,0 +1,35 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "javaType": "org.apache.streams.dropwizard.GenericWebhookData",
+    "properties": {
+        "id": {
+            "type": "string"
+        },
+        "hash": {
+            "type": "string"
+        },
+        "hash_type": {
+            "type": "string"
+        },
+        "count": {
+            "type": "long"
+        },
+        "delivered_at": {
+            "type": "string",
+            "format": "date-time"
+        },
+        "data": {
+            "type": "array",
+            "items": {
+                "type": "object",
+                "javaType": "com.fasterxml.jackson.databind.node.ObjectNode"
+            }
+        }
+    }
+}


Mime
View raw message