gearpump-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Weihua Jiang (JIRA)" <>
Subject [jira] [Commented] (GEARPUMP-24) refactor DataSource API
Date Mon, 11 Apr 2016 01:23:25 GMT


Weihua Jiang commented on GEARPUMP-24:

Hi [~mauzhang], for your concerns, I think:
1. Maybe we shall change the read() signature to 
def read(batchSize: Int): java.util.List[Message]
2. We shall mention clearly in the API DOC that this list may be reused by this DataSource
API. So, it will be unsafe for user to keep this list across multiple read invocations. In
this way, DataSource implementation has the flexibity to reuse the buffer and avoid reallocation.
This shall eliminate your concern about performance. 
3. It is a standard convention that if I ask for a batch, I may get less than that batch size
in return. A typical example is So, as long as we clearly documented this behavior,
I don't think it will be a problem.
4. User can always get the size of the returned list by calling list.size() method cheaply.

> refactor DataSource API
> -----------------------
>                 Key: GEARPUMP-24
>                 URL:
>             Project: Apache Gearpump
>          Issue Type: Improvement
>          Components: streaming
>    Affects Versions: 0.8.0
>            Reporter: Manu Zhang
>            Assignee: Manu Zhang
>             Fix For: 0.8.1
> From []:
> The current DataSource API
> {code}
> trait DataSource extends {
>   /**
>    * open connection to data source
>    * invoked in onStart() method of [[io.gearpump.streaming.source.DataSourceTask]]
>    * @param context is the task context at runtime
>    * @param startTime is the start time of system
>    */
>   def open(context: TaskContext, startTime: Option[TimeStamp]): Unit
>   /**
>    * read a number of messages from data source.
>    * invoked in each onNext() method of [[io.gearpump.streaming.source.DataSourceTask]]
>    * @param batchSize max number of messages to read
>    * @return a list of messages wrapped in [[io.gearpump.Message]]
>    */
>   def read(batchSize: Int): List[Message]
>   /**
>    * close connection to data source.
>    * invoked in onStop() method of [[io.gearpump.streaming.source.DataSourceTask]]
>    */
>   def close(): Unit
> }
> {code}
> has several issues
> 1. read returns a scala list of Message which is unfriendly to Java DataSources. Same
for Option parameter in open
> 2. the number of read messages may not be the same as the passed in batchSize which leaves
uncertainty to users (users may access out of boundary list positions)
> 3. to return a list an extra buffer could be needed in read (e.g. KafkaSource) which
is not best for performance

This message was sent by Atlassian JIRA

View raw message