streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mfranklin <...@git.apache.org>
Subject [GitHub] incubator-streams pull request: Twitter Modificaitons
Date Mon, 05 May 2014 21:54:06 GMT
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/8#discussion_r12301958
  
    --- Diff: streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
---
    @@ -0,0 +1,141 @@
    +package org.apache.streams.s3;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.Protocol;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.BasicAWSCredentials;
    +import com.amazonaws.services.s3.AmazonS3Client;
    +import com.amazonaws.services.s3.S3ClientOptions;
    +import com.amazonaws.services.s3.model.ListObjectsRequest;
    +import com.amazonaws.services.s3.model.ObjectListing;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.google.common.collect.Queues;
    +import org.apache.streams.core.*;
    +import org.joda.time.DateTime;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.math.BigInteger;
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Queue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +public class S3PersistReader implements StreamsPersistReader, DatumStatusCountable {
    +
    +    private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class);
    +    public final static String STREAMS_ID = "S3PersistReader";
    +    protected final static char DELIMITER = '\t';
    +
    +    private S3ReaderConfiguration s3ReaderConfiguration;
    +    private AmazonS3Client amazonS3Client;
    +    private ObjectMapper mapper = new ObjectMapper();
    +    private Collection<String> files;
    +    private ExecutorService executor;
    +    protected volatile Queue<StreamsDatum> persistQueue;
    +
    +    protected DatumStatusCounter countersTotal = new DatumStatusCounter();
    +    protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
    +
    +    public AmazonS3Client getAmazonS3Client()                           { return this.amazonS3Client;
}
    +    public S3ReaderConfiguration getS3ReaderConfiguration()             { return this.s3ReaderConfiguration;
}
    +    public String getBucketName()                                       { return this.s3ReaderConfiguration.getBucket();
}
    +    public StreamsResultSet readNew(BigInteger sequence)                { return null;
}
    +    public StreamsResultSet readRange(DateTime start, DateTime end)     { return null;
}
    +    public DatumStatusCounter getDatumStatusCounter()                   { return countersTotal;
}
    +    public Collection<String> getFiles()                                { return
this.files; }
    +
    +    public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) {
    +        this.s3ReaderConfiguration = s3ReaderConfiguration;
    +    }
    +
    +    public void prepare(Object configurationObject) {
    +        // Connect to S3
    +        synchronized (this)
    +        {
    +            // Create the credentials Object
    +            AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(),
s3ReaderConfiguration.getSecretKey());
    +
    +            ClientConfiguration clientConfig = new ClientConfiguration();
    +            clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toUpperCase()));
    +
    +            // We want path style access
    +            S3ClientOptions clientOptions = new S3ClientOptions();
    +            clientOptions.setPathStyleAccess(true);
    +
    +            this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
    +            this.amazonS3Client.setS3ClientOptions(clientOptions);
    +        }
    +
    +        final ListObjectsRequest request = new ListObjectsRequest()
    +                .withBucketName(this.s3ReaderConfiguration.getBucket())
    +                .withPrefix(s3ReaderConfiguration.getReaderPath())
    +                .withMaxKeys(50);
    +
    +
    +        ObjectListing listing = this.amazonS3Client.listObjects(request);
    +
    +        this.files = new ArrayList<String>();
    +
    +        /**
    +         * If you can list files that are in this path, then you must be dealing with
a directory
    +         * if you cannot list files that are in this path, then you are most likely dealing
with
    +         * a simple file.
    +         */
    +        if(listing.getCommonPrefixes().size() > 0) {
    +            // Handle the 'directory' use case
    +            do
    +            {
    +                for (String file : listing.getCommonPrefixes())
    +                    this.files.add(file);
    +
    +                // get the next batch.
    +                listing = this.amazonS3Client.listNextBatchOfObjects(listing);
    +            } while (listing.isTruncated());
    +        }
    +        else {
    +            // handle the single file use-case
    +            this.files.add(s3ReaderConfiguration.getReaderPath());
    +        }
    +
    +        if(this.files.size() <= 0)
    +            LOGGER.error("There are no files to read");
    +
    +        this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
    +        this.executor = Executors.newSingleThreadExecutor();
    +    }
    +
    +    public void cleanUp() { }
    +
    +    public StreamsResultSet readAll() {
    +        startStream();
    +        return new StreamsResultSet(persistQueue);
    +    }
    +
    +    public void startStream() {
    +        LOGGER.debug("startStream");
    +        executor.submit(new S3PersistReaderTask(this));
    +    }
    +
    +    public StreamsResultSet readCurrent() {
    +
    +        StreamsResultSet current;
    +
    +        synchronized( S3PersistReader.class ) {
    +            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
    --- End diff --
    
    While this should work fine, you might want to consider just sending the current persist
queue and constructing a new instance.  That way, you don't have to incur the overhead of
copying and clearing the queue.
    
    Not critical


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message