streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mfranklin <...@git.apache.org>
Subject [GitHub] incubator-streams pull request: Streams 41
Date Wed, 21 May 2014 19:35:32 GMT
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/23#discussion_r12917440
  
    --- Diff: streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
---
    @@ -18,110 +38,129 @@
     import org.slf4j.LoggerFactory;
     
     import java.math.BigInteger;
    -import java.util.List;
    +import java.util.Map;
     import java.util.Queue;
    -import java.util.concurrent.*;
    -import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.ConcurrentLinkedQueue;
     
     /**
    - * Created by sblackmon on 12/10/13.
    + * Requires Java Version 1.7!
    + * {@code DatasiftStreamProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider}
interface.  The provider
    + * uses the Datasift java api to make connections. A single provider creates one connection
per StreamHash in the configuration.
      */
     public class DatasiftStreamProvider implements StreamsProvider {
     
         private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamProvider.class);
     
    -    protected DatasiftConfiguration config = null;
    -
    -    protected DataSiftClient client;
    -
    -    private Class klass;
    -
    -    public DatasiftConfiguration getConfig() {
    -        return config;
    -    }
    -
    -    public void setConfig(DatasiftConfiguration config) {
    -        this.config = config;
    -    }
    -
    -    protected BlockingQueue inQueue = new LinkedBlockingQueue<Interaction>(10000);
    -
    -    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
    -
    -    public BlockingQueue<Object> getInQueue() {
    -        return inQueue;
    -    }
    -
    -    protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(100,
100));
    -
    -    protected List<String> streamHashes;
    -
    -    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int
queueSize) {
    -        return new ThreadPoolExecutor(nThreads, nThreads,
    -                5000L, TimeUnit.MILLISECONDS,
    -                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
    -    }
    -
    -    public DatasiftStreamProvider() {
    -        Config datasiftConfig = StreamsConfigurator.config.getConfig("datasift");
    -        this.config = DatasiftStreamConfigurator.detectConfiguration(datasiftConfig);
    -    }
    -
    -    public DatasiftStreamProvider(DatasiftConfiguration config) {
    -        this.config = config;
    -    }
    -
    -    public DatasiftStreamProvider(Class klass) {
    -        Config config = StreamsConfigurator.config.getConfig("datasift");
    -        this.config = DatasiftStreamConfigurator.detectConfiguration(config);
    -        this.klass = klass;
    -    }
    -
    -    public DatasiftStreamProvider(DatasiftConfiguration config, Class klass) {
    -        this.config = config;
    -        this.klass = klass;
    +    private DatasiftConfiguration config;
    +    private ConcurrentLinkedQueue<Interaction> interactions;
    +    private Map<String, DataSiftClient> clients;
    +    private StreamEventListener eventListener;
    +
    +    /**
    +     * Constructor that searches for available configurations
    +     * @param listener {@link com.datasift.client.stream.StreamEventListener} that handles
deletion notices received from twitter.
    +     */
    +    public DatasiftStreamProvider(StreamEventListener listener) {
    +        this(listener, null);
    +    }
    +
    +    /**
    +     *
    +     * @param listener {@link com.datasift.client.stream.StreamEventListener} that handles
deletion notices received from twitter.
    +     * @param config  Configuration to use
    +     */
    +    public DatasiftStreamProvider(StreamEventListener listener, DatasiftConfiguration
config) {
    +        if(config == null) {
    +            Config datasiftConfig = StreamsConfigurator.config.getConfig("datasift");
    +            this.config = DatasiftStreamConfigurator.detectConfiguration(datasiftConfig);
    +        } else {
    +            this.config = config;
    +        }
    +        this.eventListener = listener;
         }
     
         @Override
         public void startStream() {
     
    -        Preconditions.checkNotNull(this.klass);
    -
    -        Preconditions.checkNotNull(config);
    -
    -        Preconditions.checkNotNull(config.getStreamHash());
    -
    -        Preconditions.checkNotNull(config.getStreamHash().get(0));
    -
    -        for( String hash : config.getStreamHash()) {
    -
    -            client.liveStream().subscribe(new Subscription(Stream.fromString(hash)));
    +        Preconditions.checkNotNull(this.config);
    +        Preconditions.checkNotNull(this.config.getStreamHash());
    +        Preconditions.checkNotNull(this.config.getStreamHash().get(0));
    +        Preconditions.checkNotNull(this.config.getApiKey());
    +        Preconditions.checkNotNull(this.config.getUserName());
    +        Preconditions.checkNotNull(this.clients);
     
    +        for( String hash : this.config.getStreamHash()) {
    +            startStreamForHash(hash);
             }
     
    -        for( int i = 0; i < ((config.getStreamHash().size() / 5) + 1); i++ )
    -            executor.submit(new DatasiftEventProcessor(inQueue, providerQueue, klass));
    -
         }
     
    -    public void stop() {
    +    /**
    +     * Creates a connection to datasift and starts collection of data from the resulting
string.
    +     * @param streamHash
    +     */
    +    public void startStreamForHash(String streamHash) {
    +        shutDownStream(streamHash);
    +        DataSiftClient client = getNewClient(this.config.getUserName(), this.config.getApiKey());
    +        client.liveStream().onStreamEvent(this.eventListener);
    +        client.liveStream().onError(new ErrorHandler(this, streamHash));
     
    -        for( String hash : config.getStreamHash()) {
    -
    -            client.liveStream().subscribe(new Subscription(Stream.fromString(hash)));
    +        client.liveStream().subscribe(new Subscription(Stream.fromString(streamHash),
this.interactions));
    +        synchronized (this.clients) {
    +            this.clients.put(streamHash, client);
    +        }
    +    }
     
    +    /**
    +     * Exposed for testing purposes.
    +     * @param userName
    +     * @param apiKey
    +     * @return
    +     */
    +    protected DataSiftClient getNewClient(String userName, String apiKey) {
    +        return new DataSiftClient(new DataSiftConfig(userName, apiKey));
    +    }
    +
    +
    +    /**
    +     * If a stream has been opened for the supplied stream hash, that stream will be
shutdown.
    +     * @param streamHash
    +     */
    +    public void shutDownStream(String streamHash) {
    +        synchronized (clients) {
    +            if(!this.clients.containsKey(streamHash))
    +                return;
    +            DataSiftClient client = this.clients.get(streamHash);
    +            LOGGER.debug("Shutting down stream for hash: {}", streamHash);
    +            client.shutdown();
    +            this.clients.remove(client);
             }
         }
     
    -    public Queue<StreamsDatum> getProviderQueue() {
    -        return this.providerQueue;
    +    /**
    +     * Shuts down all open streams from datasift.
    +     */
    +    public void stop() {
    +        synchronized (clients) {
    +            for(DataSiftClient client : this.clients.values()) {
    +                client.shutdown();
    +            }
    +        }
         }
     
    -    @Override
    +    // PRIME EXAMPLE OF WHY WE NEED NEW INTERFACES FOR PROVIDERS
    +    @Override //This is a hack.  It is only like this because of how perpetual streams
work at the moment.  Read list server to debate/vote for new interfaces.
         public StreamsResultSet readCurrent() {
    +        Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
     
    -        return (StreamsResultSet) providerQueue;
    +            while(!this.interactions.isEmpty()) {
    +                Interaction interaction = this.interactions.poll();
    +                while(!datums.offer(new StreamsDatum(interaction, interaction.getData().get("interaction").get("id").textValue())))
{
    --- End diff --
    
    There is a utility under component utils to offerUntilSuccess


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message