streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblackmon <>
Subject [DISCUSS] new interfaces for extracting ID, event time from documents
Date Thu, 01 Dec 2016 16:52:53 GMT

Streams doesn’t currently define an interface for implementing methods to determine ID or
event time.  

According to the activity streams spec, the ‘id’ and ‘published’ fields of Activity
and ActivityObject should contain this information, but it’s quite common for documents
at the front of a pipeline to not yet be in activity streams format.

By apache streams convention, getId() and getTimestamp() on StreamsDatum often (but not always)
contain this information.  
This leads to scenarios building streams where to determine the ID or event time of a document,
you have to do it manually, duplicating code embedded in the converter, or attempt a complete

Contrast this with the approach for conversion itself, where
and implementations, spread throughout the
provider modules, bind a conversion to a specific document type, and thus the appropriate
converter for an incoming document can be determined and applied by org.apache.streams.converter.ActivityConverterProcessor
and org.apache.streams.converter.ActivityObjectConverterProcessor respectively.  

Also a factor - frameworks such as flink and beam use Key and EventTime as core processing
concepts - the sooner those values are known the more flexibility the pipeline designer has,
and the more parallelism can occur.  So it would benefit streams users if we make it possible
to determine those values at the moment data enters the pipeline, prior to performing any
type conversion or other intensive processing. 

I think we’d be better off defining core interfaces for extracting unique identifiers and
event times that each provider module can implement, distinct from the provider converters
(but used by them). 

Here’s what I have in mind:  

public interface IdentifierExtractor extends Serializable {  
Class documentClass();
String extractIdentifier(T document);

public interface EventTimeExtractor extends Serializable {
Class documentClass();
java.time.Instant extractEventTime(T document);

And implementation for tweet:

public class TweetIdentifierExtractor implements IdentifierExtractor {  
public documentClass() {
return org.apache.streams.twitter.pojo.Tweet.class;
public extractIdentifier(org.apache.streams.twitter.pojo.Tweet tweet) {
return “id:twitter:post:”+tweet.getIdStr();

public class TweetEventTimeExtractor {  
public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
public static final DateTimeFormatter TWITTER_FORMATTER = DateTimeFormat.forPattern(TWITTER_FORMAT);
public documentClass() {
return org.apache.streams.twitter.pojo.Tweet.class;
public java.time.Instant extractEventTime(org.apache.streams.twitter.pojo.Tweet tweet) {
return TWITTER_FORMAT.parseDateTime(tweet.getCreatedAt());

Does anyone disagree that this is a sensible thing to do with our existing code, and a reasonable
standard to expect from any new providers that get built?  Any other ideas or facts we should


View raw message