samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Navina Ramesh <nram...@linkedin.com.INVALID>
Subject Re: [2/2] samza git commit: Yi's TopologyBuilder RB 34500
Date Tue, 02 Jun 2015 06:02:51 GMT
Sorry about the confusion, Milinda.

Thanks for reverting, Yi!

On 6/1/15, 10:53 PM, "Yi Pan" <nickpan47@gmail.com> wrote:

>Hi, Milinda,
>
>That was an accidental mistake. I have reverted the check-in. I am still
>working on that. Thanks!
>
>-Yi
>
>On Mon, Jun 1, 2015 at 9:34 PM, Milinda Pathirage <mpathira@umail.iu.edu>
>wrote:
>
>> Hi Navina,
>>
>> Did we decided to push this patch to samza-sql branch. I thought Yi is
>> still working on this. Some Git conflict related texts are still there
>>in
>> this commit.
>>
>> +<<<<<<< HEAD
>> +   * The callback object
>> +=======
>> +   * The callback function
>> +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of
>> callbacks w/o inheriting and creating many sub-classes from operators
>>
>> Milinda
>>
>> On Mon, Jun 1, 2015 at 9:06 PM, <navina@apache.org> wrote:
>>
>> > Yi's TopologyBuilder RB 34500
>> >
>> >
>> > Project: http://git-wip-us.apache.org/repos/asf/samza/repo
>> > Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/45b85477
>> > Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/45b85477
>> > Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/45b85477
>> >
>> > Branch: refs/heads/samza-sql
>> > Commit: 45b854772cf36cc69e8d8cda7a51bce1be5fe576
>> > Parents: 41c4cd0
>> > Author: Navina <navi.trinity@gmail.com>
>> > Authored: Thu May 28 18:51:30 2015 -0700
>> > Committer: Navina <navi.trinity@gmail.com>
>> > Committed: Thu May28 18:51:30 2015 -0700
>> >
>> > ----------------------------------------------------------------------
>> >  .../apache/samza/sql/api/data/EntityName.jaa   |  41 ++-
>> >  .../org/apache/samza/sql/api/data/Table.java    |   7 +-
>> >  .../samza/sql/api/operators/Operator.java       |   4 +
>> >  .../sql/api/operators/OperatorCallback.java    |   1 -
>> >  .../samza/sql/api/operators/OperatorRouter.java |   8 +
>> >  .../samza/sql/api/operators/OperatorSink.java   |  30 ++
>> >  .../samza/sql/api/operators/OperatorSource.java |  30 ++
>> >  .../samza/sql/api/operators/SimpleOperator.java |   3 +-
>> >  .../samza/sql/data/IncomingMessageTuple.java    |   1 -
>> >  .../sql/operators/NoopOperatorCallback.java     |  53 ++++
>> >  .../samza/sql/operators/OperatorTopology.java   |  53 ++++
>> >  .../samza/sql/operators/SimpleOperatorImpl.java | 147 ++++++++++
>> >  .../samza/sql/operators/SimpleOperatorSpec.java | 106 +++++++
>> >  .../samza/sql/operators/SimpleRouter.java       | 141 +++++++++
>> >  .../operators/factory/NoopOperatorCallback.java |  50 ----
>> >  .../operators/factory/SimpleOperatorImpl.java   | 136 ---------
>> >  .../operators/factory/SimpleOperatorSpec.java   | 106 -------
>> >  .../sql/operators/factory/SimpleRouter.java     | 136 ---------
>> >  .../sql/operators/factory/TopologyBuilder.java  | 284
>> +++++++++++++++++++
>> >  .../sql/operators/join/StreamStreamJoin.java    |   3 +-
>> >  .../operators/join/StreamStreamJoinSpec.java    |  15 +-
>> >  .../sql/operators/partition/PartitionOp.java    |   3 +-
>> >  .../sql/operators/partition/PartitionSpec.java  |   2 +-
>> >  .../sql/operators/window/BoundedTimeWindow.java |   4 +-
>> >  .../samza/sql/operators/window/WindowSpec.java  |   7 +-
>> >  .../samza/task/sql/SimpleMessageCollector.java  |  37 ++-
>> >  .../task/sql/RandomWindowOperatorTask.java      |  11 +-
>> >  .../apache/samza/task/sql/StreamSqlTask.java    |  26 +-
>> >  .../samza/task/sql/UserCallbacksSqlTask.java    |  66 ++---
>> >  29 files changed, 991 insertions(+), 520 deletions(-)
>> > ----------------------------------------------------------------------
>> >
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/api/data/EntityName.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j
>>ava
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j
>>ava
>> > index 80ba455..df1b11b 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j
>>ava
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j
>>ava
>> > @@ -49,6 +49,8 @@ public class EntityName {
>> >     */
>> >    private final String name;
>> >
>> > +  private final boolean isSystemEntity;
>> > +
>> >    /**
>> >     * Static map of already allocated table names
>> >     */
>> > @@ -59,15 +61,19 @@ public class EntityName {
>> >     */
>> >    private static Map<String, EntityName> streams = new
>>HashMap<String,
>> > EntityName>();
>> >
>> > +  private static final String ANONYMOUS = "anonymous";
>> > +
>> >    /**
>> >     * Private ctor to create entity names
>> >     *
>> >     * @param type Type of the entity name
>> >     * @param name Formatted name of the entity
>> > +   * @param isSystemEntity whether the entity is a system
>>input/output
>> >     */
>> > -  private EntityName(EntityType type, String name) {
>> > +  private EntityName(EntityType type, String name, boolean
>> > isSystemEntity) {
>> >      this.type = type;
>> >      this.name = name;
>> > +    this.isSystemEntity = isSystemEntity;
>> >    }
>> >
>> >    @Override
>> > @@ -10,6 +108,10 @@ public class EntityName {
>> >      return this.type.equals(EntityType.STREAM);
>> >    }
>> >
>> > +  public boolean isSysteEntity() {
>> > +    return this.isSystemEntity;
>> > +  }
>> > +
>> >    /**
>> >     * Get the formatted entity name
>> >     *
>> > @@ -111,15 +121,24 @@ public class EntityName {
>> >      return this.name;
>> >    }
>> >
>> > +  public static EntityName getTableName(String name) {
>> > +    return getTableName(name, false);
>> > +  }
>> > +
>> > +  public static EntityName getStreamName(Sting name) {
>> > +    return getStreamName(name, false);
>> > +  }
>> > +
>> >    /**
>> >     * Static method to get the instance of {@code EntityName} with
>>type
>> > {@code EntityType.TABLE}
>> >     *
>> >     * @param name The formatted entity name of the relation
>> > +   * @param isSystem The boolean flag indicating whether this is a
>> system
>> > input/output
>> >     * @return A <code>EntityName</code> for a relation
>> >     */
>> > -  public static EntityName getTableName(String name) {
>> > +  public static EntityName getTableName(String name, boolean
>>isSystem) {
>> >      if (tables.get(name) == null) {
>> > -      tables.put(name, new EntityName(EntityType.TABLE, name));
>> > +      tables.put(name, new EntityName(EntityType.TABLE, name,
>> isSystem));
>> >      }
>> >      return tables.get(name);
>> >    }
>> > @@ -128,13 +147,25 @@ public class EntityName {
>> >     * Static method to get the instance of <code>EntityName</code>
>>with
>> > type <code>EntityType.STREAM</code>
>> >     *
>> >     * @param name The formatted ntity name of the stream
>> > +   * @param isSystem The boolean flag indicating whethr this is a
>> system
>> > input/output
>> >     * @return A <code>EntityName</code> for a stream
>> >     */
>> > -  public static EntityName getStreamName(String name) {
>> > +  public static EntityName getStreamName(String name, boolean
>>isSystem)
>> {
>> >      if (streams.get(name) == null) {
>> > -      streams.put(name, new EntityName(EntityType.STREAM, name));
>> > +      streams.put(name, new EntityName(EntityType.STREAM, name,
>>> isSystem));
>> >      }
>> >      return streams.get(name);
>> >    }
>> >
>> > +  public static EntityName getAnonymousStream() {
>> > +    return getStreamName(ANONYMOUS);
>> > +  }
>> > +
>> > +  public static EntityName getAnonymousTable() {
>> > +    return getTableName(ANONYMOUS);
>> > +  }
>> > +
>> > +  public boolean isAnonymous() {
>> > +    return this.name.equals(ANONYMOUS);
>> > +  }
>> >  }
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/api/data/Table.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> > 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
>> > 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
>> > index 7b4d984..b4dce07 100644
>> > ---
>> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
>> > +++
>> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
>> > @@ -19,6 +19,9 @@
>> >
>> >  packag org.apache.samza.sql.api.data;
>> >
>> > +import java.util.List;
>> > +
>> > +
>> >  /**
>> >   * This interface defines a non-ordered {@link
>> > org.apache.samza.sql.api.data.Relation}, which has a unique primary
>>key
>> >   *
>> > @@ -31,8 +34,8 @@ public interface Table<K> extends Relation<K> {
>> >    /**
>> >     * Get the primary key field name for this table
>> >     *
>> > -   * @return The name of the primary key fild
>> > +   * @return The names of the primary key fields
>> >     */
>> > -  String getPrimaryKeyName();
>> > +  List<String> getPrimaryKeyNames();
>> >
>> >  }
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/api/operators/Operator.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>r.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>r.java
>> > index d6f6b57.9c6eaa5 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>r.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>r.java
>> > @@ -27,7 +27,11 @@ import org.apache.samza.task.TaskContext;
>> >  import org.apache.samza.task.TasCoordinator;
>> >
>> >
>> > +/**
>> > + * This class defines the common interface for operator classes.
>> > + */
>> >  public interface Operator {
>> > +
>> >    /**
>> >     * Method to initialize the operator
>> >     *
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rCallback.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rCallback.java
>> > index fb2aa89..5a77d95 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/sr/main/java/org/apache/samza/sql/api/operators/Operato
>>rCallback.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rCallback.java
>> > @@ -23,7 +23,6 @@ import org.apache.saza.sql.api.data.Tuple;
>> >  import org.apache.samza.task.MessageCollector;
>> >  import org.apache.samza.task.TaskCoordinator;
>> >
>> > -
>> >  /**
>> >   * Defines the callback functions to allow customized functions to be
>> >invoked before process and before sending the result
>> >   */
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
>> > ---------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rRuter.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rRouter.java
>> > index 0759638..432e6b3 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rRouter.java
>> > +++
>> >
>> 
>>b/sama-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rRouter.java
>> > @@ -19,6 +19,7 @@
>> >
>> >  package org.apache.samza.sql.api.operators;
>> >
>> > +import java.util.Iterator;
>> >  mport java.util.List;
>> >
>> >  import org.apache.samza.sql.api.data.EntityName;
>> > @@ -51,4 +52,11 @ public interface OperatorRouter extends Operator {
>> >     */
>> >    List<SimpleOperator> getNextOperators(EntityName output);
>> >
>> > +  /**
>> > +   * This method provides an iterator to go through all operators
>> > connected via {@code OperatorRouter}
>> > +   *
>> > +   * @return An {@link java.util.Iterator} for all operators
>>connected
>> > via {@code OperatorRouter}
>> > +   */
>> > +  Iterator<SimpleOperator> iterator();
>> > +
>> >  }
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.og/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rSink.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rSink.java
>> > new file mode 100644
>> > index 0000000..e2c748c
>> > --- /dev/null
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rSink.java
>> > @@ -0,0 +1,30 @@
>> > +/*
>> > + * Licensed to the Apache Software Foundation (ASF) under one
>> > + * or more contributor license agreements.  See the NOTICE file
>> > + * distributed with this work for additional information
>> > + * regarding copyright ownership.  The ASF licenses this file
>> > + * to you under the Apache License, Version 2.0 (the
>> > + * "License"); you may not use this file except in compliance
>> > + * with the License.  You may obtain a copy of the License at
>> > + *
>> > + *   http://www.apache.org/licenses/LICENSE-2.0
>> > + *
>> > + * Unless required by applicable law or agreed to in writing,
>> > + * software distributed under the License is distributed on an
>> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > + * KIND, either express or implied.  See the License for the
>> > + * specific language governing permissions and limitations
>> > + * under the License.
>> > + */
>> > +package org.apache.samza.sql.api.operators;
>> > +
>> > +import java.util.Iterator;
>> > +
>> > +import org.apache.samza.sql.api.data.EntityName;
>> > +
>> > +
>> > +public interface OperatorSink {
>> > +  Iterator<SimpleOperator> opIterator();
>> > +
>> > +  EntityName getName();
>> > +}
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rSource.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rSource.java
>> > new file mode 100644
>> > index 0000000..860c1aa
>> > --- /dev/null
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
>>rSource.java
>> > @@ -0,0 +1,30 @@
>> > +/*
>> > + * Licensed to the Apache Software Foundation (ASF) under one
>> > + * or more contributor license agreements.  See the NOTICE file
>> > + * distributed with this work for additional information
>> > + * regarding copyright ownership.  The ASF licenses this file
>> > + * to you under the Apache License, Version 2.0 (the
>> > + * "License"); you may not use this file except in compliance
>> > + * with the License.  You may obtain a copy of the License at
>> > + *
>> > + *   http://www.apache.org/licenses/LICENSE-2.0
>> > + *
>> > + * Unless required by applicable law or agreed to in writing,
>> > + * software distributed under the License is distributed on an
>> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > + * KIND, either express or implied.  See the License for the
>> > + * specific language governing permissions and limitations
>> > + * under the License.
>> > + */
>> > +package org.apache.samza.sql.api.operators;
>> > +
>> > +import java.util.Iterator;
>> > +
>> > +import org.apache.samza.sql.api.data.EntityName;
>> > +
>> > +
>> > +public interface OperatorSource {
>> > +  Iterator<SimpleOperator> opIterator();
>> > +
>> > +  EntityName getName();
>> > +}
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO
>>perator.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO
>>perator.java
>> > index c49a822..60ace9c 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO
>>perator.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO
>>perator.java
>> > @@ -19,8 +19,6 @@
>> >
>> >  package org.apache.samza.sql.api.operators;
>> >
>> > -
>> > -
>> >  /**
>> >   * The interface for a {@code SimpleOperator} that implements a
>>simple
>> > primitive relational logic operation
>> >   */
>> > @@ -31,4 +29,5 @@ public interface SimpleOperator extends Operator {
>> >     * @return The {@link
>>org.apache.samza.sql.api.operators.OperatorSpec}
>> > object that defines the configuration/parameters of the operator
>> >     */
>> >    OperatorSpec getSpec();
>> > +
>> >  }
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT
>>uple.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT
>>uple.java
>> > index 72a59f2..af040f0 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT
>>uple.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT
>>uple.java
>> > @@ -81,7 +81,6 @@ public class IncomingMessageTuple implements Tuple {
>> >
>> >    @Override
>> >    public long getCreateTimeNano() {
>> > -    // TODO: this is wrong and just to keep as an placeholder. It
>>should
>> > be replaced by the message publish time when the publish timestamp is
>> > available in the message metadata
>> >      return this.recvTimeNano;
>> >    }
>> >
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato
>>rCallback.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato
>>rCallback.java
>> > new file mode 100644
>> > index 0000000..e951737
>> > --- /dev/null
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato
>>rCallback.java
>> > @@ -0,0 +1,53 @@
>> > +/*
>> > + * Licensed to the Apache Software Foundation (ASF) under one
>> > + * or more contributor license agreements.  See the NOTICE file
>> > + * distributed with this work for additional information
>> > + * regarding copyright ownership.  The ASF licenses this file
>> > + * to you under the Apache License, Version 2.0 (the
>> > + * "License"); you may not use this file except in compliance
>> > + * with the License.  You may obtain a copy of the License at
>> > + *
>> > + *   http://www.apache.org/licenses/LICENSE-2.0
>> > + *
>> > + * Unless required by applicable law or agreed to in writing,
>> > + * software distributed under the License is distributed on an
>> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > + * KIND, either express or implied.  See the License for the
>> > + * specific language governing permissions and limitations
>> > + * under the License.
>> > + */
>> > +package org.apache.samza.sql.operators;
>> > +
>> > +import org.apache.samza.sql.api.data.Relation;
>> > +import org.apache.samza.sql.api.data.Tuple;
>> > +import org.apache.samza.sql.api.operators.OperatorCallback;
>> > +import org.apache.samza.task.MessageCollector;
>> > +import org.apache.samza.task.TaskCoordinator;
>> > +
>> > +
>> > +/**
>> > + * This is a default NOOP operator callback object that does nothing
>> > before and after the process method
>> > + */
>> > +public final class NoopOperatorCallback implements OperatorCallback {
>> > +
>> > +  @Override
>> > +  public Tuple beforeProcess(Tuple tuple, MessageCollector collector,
>> > TaskCoordinator coordinator) {
>> > +    return tuple;
>> > +  }
>> > +
>> > +  @Override
>> > +  public Relation beforeProcess(Relation rel, MessageCollector
>> collector,
>> > TaskCoordinator coordinator) {
>> > +    return rel;
>> > +  }
>> > +
>> > +  @Override
>> > +  public Tuple afterProcess(Tuple tuple, MessageCollector collector,
>> > TaskCoordinator coordinator) {
>> > +    return tuple;
>> > +  }
>> > +
>> > +  @Override
>> > +  public Relation afterProcess(Relation rel, MessageCollector
>>collector,
>> > TaskCoordinator coordinator) {
>> > +    return rel;
>> > +  }
>> > +
>> > +}
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop
>>ology.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop
>>ology.java
>> > new file mode 100644
>> > index 0000000..8b70092
>> > --- /dev/null
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop
>>ology.java
>> > @@ -0,0 +1,53 @@
>> > +/*
>> > + * Licensed to the Apache Software Foundation (ASF) under one
>> > + * or more contributor license agreements.  See the NOTICE file
>> > + * distributed with this work for additional information
>> > + * regarding copyright ownership.  The ASF licenses this file
>> > + * to you under the Apache License, Version 2.0 (the
>> > + * "License"); you may not use this file except in compliance
>> > + * with the License.  You may obtain a copy of the License at
>> > + *
>> > + *   http://www.apache.org/licenses/LICENSE-2.0
>> > + *
>> > + * Unless required by applicable law or agreed to in writing,
>> > + * software distributed under the License is distributed on an
>> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > + * KIND, either express or implied.  See the License for the
>> > + * specific language governing permissions and limitations
>> > + * under the License.
>> > + */
>> > +package org.apache.samza.sql.operators;
>> > +
>> > +import java.util.Iterator;
>> > +
>> > +import org.apache.samza.sql.api.data.EntityName;
>> > +import org.apache.samza.sql.api.operators.OperatorSink;
>> > +import org.apache.samza.sql.api.operators.OperatorSource;
>> > +import org.apache.samza.sql.api.operators.SimpleOperator;
>> > +
>> > +
>> > +/**
>> > + * This class implements a partially completed {@link
>> > org.apache.samza.sql.operators.factory.TopologyBuilder} that
>>signifies a
>> > partially completed
>> > + * topology that the current operator has unbounded input stream that
>> can
>> > be attached to other operators' output
>> > + */
>> > +public class OperatorTopology implements OperatorSource,
>>OperatorSink {
>> > +
>> > +  private final EntityName name;
>> > +  private final SimpleRouter router;
>> > +
>> > +  public OperatorTopology(EntityName name, SimpleRouter router) {
>> > +    this.name = name;
>> > +    this.router = router;
>> > +  }
>> > +
>> > +  @Override
>> > +  public Iterator<SimpleOperator> opIterator() {
>> > +    return this.router.iterator();
>> > +  }
>> > +
>> > +  @Override
>> > +  public ntityName getName() {
>> > +    return this.name;
>> > +  }
>> > +
>> > +}
>> >
>> >
>> >
>> 
>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera
>>torImpl.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera
>>torImpl.java
>> > new file mode 10644
>> > index 0000000..423880b
>> > --- /dev/null
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/rg/apache/samza/sql/operators/SimpleOpera
>>torImpl.java
>> > @@ -0,0 +1,147 @@
>> > +/*
>> > + * Licensed to the Apache Software Foundation (ASF) under one
>> > + * or more contributor license agreements.  See the NOTICE file
>> > + * distributed with this work for additional information
>> > + * regarding copyright ownership.  The ASF licenses this file
>> > + * to you under the Apache License, Version 2.0 (the
>> > + * "License"); you may not use this file except in compliance
>> > + * with the License.  You may obtain a copy of the License at
>> > + *
>> > + *   http://www.apache.org/licenses/LICENSE-2.0
>> > + *
>> > + * Unless required by applicable law or agreed to in writing,
>> > + * software distributed under the License is distributed on an
>> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > + * KIND, either express or implied.  See the License for the
>> > + * specific language governing permissions and limitations
>> > + * under the License.
>> > + */
>> > +
>> > +package org.apache.samza.sql.operators;
>> > +
>> > +import org.apache.samza.sql.api.data.Relation;
>> > +import org.apache.samza.sql.api.data.Tuple;
>> > +import org.apache.samza.sql.api.operators.OperatorCallback;
>> > +import org.aache.samza.sql.api.operators.OperatorSpec;
>> > +import org.apache.samza.sql.api.operators.SimpleOperator;
>> > +import org.apache.samza.task.MessageCollector;
>> > +import org.apache.samza.task.TaskCoordinator;
>> > +import org.apache.samza.task.sql.SimpleMessageCollector;
>> > +
>> > +
>> > +/**
>> > + * An abstract class that encapsulate the basic information and
>>methods
>> > that all operator classes should implement.
>> > + * It implements the interface {@link
>> > org.apache.samza.sql.api.operators.SimpleOperator}
>> > + */
>> > +public abstract class SimpleOperatorImpl implements SimpleOperator {
>> > +  /**
>> > +   * The specification of this operator
>> > +   */
>> > +  private final OperatorSpec spec;
>> > +
>> > +  /**
>> > +<<<<<<<HEAD
>> > +   * The callback object
>> > +=======
>> > +   * The callback function
>> > +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of
>> > callbacks w/o inheriting and creating many sub-classes from operators
>> > +   */
>> > +  private final OperatorCallback callback;
>> > +
>> > +  /**
>> > +   * Ctor of {@code SimpleOperatorImpl} class
>> > +   *
>> > +   * @param spec The specification of this operator
>> > +   */
>> > +  public SimpleOperatorImpl(OperatorSpec spec) {
>> > +    this(spec, new NoopOperatorCallback());
>> > +  }
>> > +
>> > +  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback
>> callback)
>> > {
>> > +    this.spec = spec;
>> > +    this.callback = callback;
>> > +  }
>> > +
>> > +  @Override
>> > +  public OperatorSpec getSpec() {
>> > +    return this.spec;
>> > +  }
>> > +
>> > +  /**
>> > +   * This method is made final s.t. the sequence of invocations
>>between
>> > {@link
>> >
>> 
>>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relatio
>>n,
>> > MessageCollector, TaskCoordinator)}
>> > +   * and real processing of the input relation is fixed.
>> > +   */
>> > +  @Override
>> > +  final public void process(Relation deltaRelation, MessageCollector
>> > collector, TaskCoordinator coordinator)
>> > +      throws Exception {
>> > +    Relation rel = this.callback.beforeProcess(deltaRelation,
>>collector,
>> > coordinator);
>> > +    if (rel == null) {
>> > +      return;
>> > +    }
>> > +    this.realProcess(rel, getCollector(collector, coordinator),
>> > coordinator);
>> > +  }
>> > +
>> > +  /**
>> > +   * This method is made final s.t. the sequence of invocations
>>between
>> > {link
>> > 
>>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple,
>> > MessageCollector, TaskCoordinator)}
>> > +   * and real processing of the input tuple is fixed.
>> > +   */
>>> +  @Override
>> > +  final public void process(Tuple tuple, MessageCollector collector,
>> > TaskCoordinator coordinator) throws Exception {
>> > +    Tuple itupl = this.callback.beforeProcess(tuple, collector,
>> > coordinator);
>> > +    if (ituple == null) {
>> > +      return;
>> > +    }
>> > +    this.realProcess(ituple, getCollector(collector, coordinator),
>> > coordinator);
>> > +  }
>> > +
>> > +  /**
>> > +   * This method is made final s.t. we enforce the invocation of
>>{@code
>> > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)}
>> bfore
>> > doing anything futher
>> > +   */
>> > +  @Override
>> > +  final public void refresh(long timeNano, MessageCollector
>>collector,
>> > TaskCoordinator coordinator) throws Exception {
>> > +    this.realRefresh(timeNano, getCollector(collecto, coordinator),
>> > coordinator);
>> > +  }
>> > +
>> > +  private SimpleMessageCollector getCollector(MessageCollector
>> collector,
>> > TaskCoordinator coordinator) {
>> > +    if (!(collector instanceof SimpleMessageCollector)) {
>> > +      return new SimpleMessageCollector(collector, coordnator,
>> > this.callback);
>> > +    } else {
>> > +      ((SimpleMessageCollector)
>> collector).switchCallback(this.callback);
>> > +      return (SimpleMessageCollector) collector;
>> > +    }
>> > +  }
>> > +
>> > +  /**
>> > +   * Method to be overriden by each specific implementation class of
>> > operator to handle timeout event
>> > +   *
>> > +   * @param timeNano The time in nanosecond when the timeout event
>> > occurred
>> > +   * @param collector The {@link
>> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
>> > +   * @param coordinator The {@link
>> org.apache.samza.task.TaskCoordinator}
>> > in the context
>> > +   * @throws Exception Throws exception if failed to refresh the
>>results
>> > +   */
>> > +  protected abstract void realRefresh(long timeNano,
>> > SimpleMessageCollector collector, TaskCoordinator coordinator)
>> > +      throws Exception;
>> > +
>> > +  /**
>> > +   * Method to be overriden by each specific implementation class of
>> > operator to perform relational logic operation on an input {@link
>> > org.apache.samza.sql.api.data.Relation}
>> > +   *
>> > +   * @param rel The input relation
>> > +   * @param collector The {@link
>> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
>> > +   * @param coordinator The {@link
>> org.apache.samza.task.TaskCoordinator}
>> > in the context
>> > +   * @throws Exception Throws exception if failed to process
>> > +   */
>> > +  protected abstract void realProcess(Relation rel,
>> > SimpleMessageCollector collector, TaskCoordinator coordinator)
>> > +      throws Exceptio;
>> > +
>> > +  /**
>> > +   * Method to be overriden by each specific implementation class of
>> > operator to perform relational logic operation on an input {@ink
>> > org.apache.samza.sql.api.data.Tuple}
>> > +   *
>> > +   * @param ituple The input tuple
> > +   * @param collector The {@link
>> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
>> > +   * @param coordinator The {@link
>> org.apache.samza.task.TaskCoordinator}
>> > in the context
>> > +   * @throws Exception Throws exception if failed to process
>> > +   */
>> > +  protected abstract void realProcess(Tuple ituple,
>> > SimpleMessageCollector collector, TaskCoordinator coordinator)
>> > +     throws Exception;
>> > +
>> > +}
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera
>>torSpec.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera
>>torSpec.java
>> > new file mode 100644
>> > index 0000000..691e543
>> > --- /dev/null
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera
>>torSpec.java
>> > @@ -0,0 +1,106 @@
>> > +/*
>> > + * Licensed to the Apache Software Foundation (ASF) under one
>> > + * or more contributor license agreements.  See the NOTICE file
>> > + * distributed with this work for additional information
>> > + * regarding copyright ownership.  The ASF licenses this file
>> > + * to you under the Apache License, Version 2.0 (the
>> > + * "License"); you may not use this file except in compliance
>> > + * with the License.  You may obtain a copy of the License at
>> > + *
>> > + *   http://www.apache.org/licenses/LICENSE-2.0
>> > + *
>> > + * Unless required by applicable law or agreed to in writing,
>> > + * software distributed under the License is distributed on an
>> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > + * KIND, either express or implied.  See the License for the
>> > + * specific language governing permissions and limitations
>> > + * under the License.
>> > + */
>> > +package org.apache.samza.sql.operators;
>> > +
>> > +import java.util.ArrayList;
>> > +import java.util.List;
>> > +
>> > +import org.apache.samza.sql.api.data.EntityName;
>> > +import org.apache.samza.sql.api.operators.OperatorSpec;
>> > +
>> > +
>> > +/**
>> > + * An abstract class that encapsulate the basic information and 
>>methods
>> > that all specification of operators should implement.
>> > + * It implements {@link 
>>org.apache.samza.sql.api.operators.OperatorSpec}
>> > + */
>> > +public abstract class SimpleOperatorSpec implements OperatorSpec {
>> > +  /**
>> > +   * The identifier of the corresponding operator
>> > +   */
>> > +  private final String id;
>> > +
>> > +  /**
>> > +   * The list of input entity names of the corresponding operator
>> > +   */
>> > +  private final List<EntityName> inputs = new 
>>ArrayList<EntityName>();
>> > +
>> > +  /**
>> > +   * The list of output entity names of the corresponding operator
>> > +   */
>> > +  private final List<EntityName> outputs = new 
>>ArrayList<EntityName>();
>> > +
>> > +  /**
>> > +   * Ctor of the {@code SimpleOperatorSpec} for simple {@link
>> > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and 
>>one
>> > output
>> > +   *
>> > +   * @param id Unique identifier of the {@link
>> > org.apache.samza.sql.api.operators.SimpleOperator} object
>> > +   * @param input The only input entity
>> > +   * @param output The only output entity
>> > +   */
>> > +  public SimpleOperatorSpec(String id, EntityName input, EntityName
>> > output) {
>> > +    this.id = id;
>> > +    this.inputs.add(input);
>> > +    this.outputs.add(output);
>> > +  }
>> > +
>> > +  /**
>> > +   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs
>> and
>> > n outputs
>> > +   *
>> > +   * @param id Unique identifier of the {@link
>> > org.apache.samza.sql.api.operators.SimpleOperator} object
>> > +   * @param inputs The list of input entities
>> > +   * @param output The list of output entities
>> > +   */
>> > +  public SimpleOperatorSpec(String id, List<EntityName> inputs,
>> > EntityName output) {
>> > +    this.id = id;
>> > +    this.inputs.addAll(inputs);
>> > +    this.outputs.add(output);
>> > +  }
>> > +
>> > +  @Override
>> > +  public String getId() {
>> > +    return this.id;
>> > +  }
>> > +
>> > +  @Override
>> > +  public List<EntityName> getInputNames() {
>> > +    return this.inputs;
>> > +  }
>> > +
>> > +  @Override
>> > +  public List<EntityName> getOutputNames() {
>> > +    return this.outputs;
>> > +  }
>> > +
>> > +  /**
>> > +   * Method to get the first output entity
>> > +   *
>> > +   * @return The first output entity name
>> > +   */
>> > +  public EntityName getOutputName() {
>> > +    return this.outputs.get(0);
>> > +  }
>> > +
>> > +  /**
>> > +   * Method to get the first input entity
>> > +   *
>> > +   * @return The first input entity name
>> > +   */
>> > +  public EntityName getInputName() {
>> > +    return this.inputs.get(0);
>> > +  }
>> > +}
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute
>>r.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute
>>r.java
>> > new file mode 100644
>> > index 0000000..2d9a1db
>> > --- /dev/null
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute
>>r.java
>> > @@ -0,0 +1,141 @@
>> > +/*
>> > + * Licensed to the Apache Software Foundation (ASF) under one
>> > + * or more contributor license agreements.  See the NOTICE file
>> > + * distributed with this work for additional information
>> > + * regarding copyright ownership.  The ASF licenses this file
>> > + * to you under the Apache License, Version 2.0 (the
>> > + * "License"); you may not use this file except in compliance
>> > + * with the License.  You may obtain a copy of the License at
>> > + *
>> > + *   http://www.apache.org/licenses/LICENSE-2.0
>> > + *
>> > + * Unless required by applicable law or agreed to in writing,
>> > + * software distributed under the License is distributed on an
>> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > + * KIND, either express or implied.  See the License for the
>> > + * specific language governing permissions and limitations
>> > + * under the License.
>> > + */
>> > +
>> > +package org.apache.samza.sql.operators;
>> > +
>> > +import java.util.ArrayList;
>> > +import java.util.HashMap;
>> > +import java.util.HashSet;
>> > +import java.util.Iterator;
>> > +import java.util.List;
>> > +import java.util.Map;
>> > +import java.util.Set;
>> > +
>> > +import org.apache.samza.config.Config;
>> > +import org.apache.samza.sql.api.data.EntityName;
>> > +import org.apache.samza.sql.api.data.Relation;
>> > +import org.apache.samza.sql.api.data.Tuple;
>> > +import org.apache.samza.sql.api.operators.Operator;
>> > +import org.apache.samza.sql.api.operators.OperatorRouter;
>> > +import org.apache.samza.sql.api.operators.SimpleOperator;
>> > +import org.apache.samza.task.MessageCollector;
>> > +import org.apache.samza.task.TaskContext;
>> > +import org.apache.samza.task.TaskCoordinator;
>> > +import org.apache.samza.task.sql.RouterMessageCollector;
>> > +
>> > +
>> > +/**
>> > + * Example implementation of {@link
>> > org.apache.samza.sql.api.operators.OperatorRouter}
>> > + *
>> > + */
>> > +public final class SimpleRouter implements OperatorRouter {
>> > +  /**
>> > +   * List of operators added to the {@link
>> > org.apache.samza.sql.api.operators.OperatorRouter}
>> > +   */
>> > +  private List<SimpleOperator> operators = new
>> > ArrayList<SimpleOperator>();
>> > +
>> > +  @SuppressWarnings("rawtypes")
>> > +  /**
>> > +   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the 
>>list
>> > of operators associated with it
>> > +   */
>> > +  private Map<EntityName, List> nextOps = new HashMap<EntityName,
>> List>();
>> > +
>> > +  /**
>> > +   * Set of {@link org.apache.samza.sql.api.data.EntityName} as 
>>inputs
>> to
>> > this {@code SimpleRouter}
>> > +   */
>> > +  private Set<EntityName> inputEntities = new HashSet<EntityName>();
>> > +
>> > +  /**
>> > +   * Set of entities that are not input entities to this {@code
>> > SimpleRouter}
>> > +   */
>> > +  private Set<EntityName> outputEntities = new HashSet<EntityName>();
>> > +
>> > +  @SuppressWarnings("unchecked")
>> > +  private void addOperator(EntityName input, SimpleOperator nextOp) {
>> > +    if (nextOps.get(input) == null) {
>> > +      nextOps.put(input, new ArrayList<Operator>());
>> > +    }
>> > +    nextOps.get(input).add(nextOp);
>> > +    operators.add(nextOp);
>> > +    // get the operator spec
>> > +    for (EntityName output : nextOp.getSpec().getOutputNames()) {
>> > +      if (inputEntities.contains(output)) {
>> > +        inputEntities.remove(output);
>> > +      }
>> > +      outputEntities.add(output);
>> > +    }
>> > +    if (!outputEntities.contains(input)) {
>> > +      inputEntities.add(input);
>> > +    }
>> > +  }
>> > +
>> > +  @Override
>> > +  @SuppressWarnings("unchecked")
>> > +  public List<SimpleOperator> getNextOperators(EntityName entity) {
>> > +    return nextOps.get(entity);
>> > +  }
>> > +
>> > +  @Override
>> > +  public void addOperator(SimpleOperator nextOp) {
>> > +    List<EntityName> inputs = nextOp.getSpec().getInputNames();
>> > +    for (EntityName input : inputs) {
>> > +      addOperator(input, nextOp);
>> > +    }
>> > +  }
>> > +
>> > +  @Override
>> > +  public void init(Config config, TaskContext context) throws 
>>Exception
>> {
>> > +    for (SimpleOperator op : this.operators) {
>> > +      op.init(config, context);
>> > +    }
>> > +  }
>> > +
>> > +  @Override
>> > +  public void process(Tuple ituple, MessageCollector collector,
>> > TaskCoordinator coordinator) throws Exception {
>> > +    MessageCollector opCollector = new 
>>RouterMessageCollector(collector,
>> > coordinator, this);
>> > +    for (Iterator<SimpleOperator> iter =
>> > this.getNextOperators(ituple.getEntityName()).iterator();
>> iter.hasNext();) {
>> > +      iter.next().process(ituple, opCollector, coordinator);
>> > +    }
>> > +  }
>> > +
>> > +  @SuppressWarnings("rawtypes")
>> > +  @Override
>> > +  public void process(Relation deltaRelation, MessageCollector
>> collector,
>> > TaskCoordinator coordinator) throws Exception {
>> > +    MessageCollector opCollector = new 
>>RouterMessageCollector(collector,
>> > coordinator, this);
>> > +    for (Iterator<SimpleOperator> iter =
>> > this.getNextOperators(deltaRelation.getName()).iterator();
>> iter.hasNext();)
>> > {
>> > +      iter.next().process(deltaRelation, opCollector, coordinator);
>> > +    }
>> > +  }
>> > +
>> > +  @Override
>> > +  public void refresh(long nanoSec, MessageCollector collector,
>> > TaskCoordinator coordinator) throws Exception {
>> > +    MessageCollector opCollector = new 
>>RouterMessageCollector(collector,
>> > coordinator, this);
>> > +    for (EntityName entity : inputEntities) {
>> > +      for (Iterator<SimpleOperator> iter =
>> > this.getNextOperators(entity).iterator(); iter.hasNext();) {
>> > +        iter.next().refresh(nanoSec, opCollector, coordinator);
>> > +      }
>> > +    }
>> > +  }
>> > +
>> > +  @Override
>> > +  public Iterator<SimpleOperator> iterator() {
>> > +    return this.operators.iterator();
>> > +  }
>> > +
>> > +}
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallbac
>>k.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo
>>pOperatorCallback.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo
>>pOperatorCallback.java
>> > deleted file mode 100644
>> > index c3d2266..0000000
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo
>>pOperatorCallback.java
>> > +++ /dev/null
>> > @@ -1,50 +0,0 @@
>> > -/*
>> > - * Licensed to the Apache Software Foundation (ASF) under one
>> > - * or more contributor license agreements.  See the NOTICE file
>> > - * distributed with this work for additional information
>> > - * regarding copyright ownership.  The ASF licenses this file
>> > - * to you under the Apache License, Version 2.0 (the
>> > - * "License"); you may not use this file except in compliance
>> > - * with the License.  You may obtain a copy of the License at
>> > - *
>> > - *   http://www.apache.org/licenses/LICENSE-2.0
>> > - *
>> > - * Unless required by applicable law or agreed to in writing,
>> > - * software distributed under the License is distributed on an
>> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > - * KIND, either express or implied.  See the License for the
>> > - * specific language governing permissions and limitations
>> > - * under the License.
>> > - */
>> > -package org.apache.samza.sql.operators.factory;
>> > -
>> > -import org.apache.samza.sql.api.data.Relation;
>> > -import org.apache.samza.sql.api.data.Tuple;
>> > -import org.apache.samza.sql.api.operators.OperatorCallback;
>> > -import org.apache.samza.task.MessageCollector;
>> > -import org.apache.samza.task.TaskCoordinator;
>> > -
>> > -
>> > -public final class NoopOperatorCallback implements OperatorCallback {
>> > -
>> > -  @Override
>> > -  public Tuple beforeProcess(Tuple tuple, MessageCollector collector,
>> > TaskCoordinator coordinator) {
>> > -    return tuple;
>> > -  }
>> > -
>> > -  @Override
>> > -  public Relation beforeProcess(Relation rel, MessageCollector
>> collector,
>> > TaskCoordinator coordinator) {
>> > -    return rel;
>> > -  }
>> > -
>> > -  @Override
>> > -  public Tuple afterProcess(Tuple tuple, MessageCollector collector,
>> > TaskCoordinator coordinator) {
>> > -    return tuple;
>> > -  }
>> > -
>> > -  @Override
>> > -  public Relation afterProcess(Relation rel, MessageCollector 
>>collector,
>> > TaskCoordinator coordinator) {
>> > -    return rel;
>> > -  }
>> > -
>> > -}
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.
>>java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
>>pleOperatorImpl.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
>>pleOperatorImpl.java
>> > deleted file mode 100644
>> > index e66451f..0000000
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
>>pleOperatorImpl.java
>> > +++ /dev/null
>> > @@ -1,136 +0,0 @@
>> > -/*
>> > - * Licensed to the Apache Software Foundation (ASF) under one
>> > - * or more contributor license agreements.  See the NOTICE file
>> > - * distributed with this work for additional information
>> > - * regarding copyright ownership.  The ASF licenses this file
>> > - * to you under the Apache License, Version 2.0 (the
>> > - * "License"); you may not use this file except in compliance
>> > - * with the License.  You may obtain a copy of the License at
>> > - *
>> > - *   http://www.apache.org/licenses/LICENSE-2.0
>> > - *
>> > - * Unless required by applicable law or agreed to in writing,
>> > - * software distributed under the License is distributed on an
>> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > - * KIND, either express or implied.  See the License for the
>> > - * specific language governing permissions and limitations
>> > - * under the License.
>> > - */
>> > -
>> > -package org.apache.samza.sql.operators.factory;
>> > -
>> > -import org.apache.samza.sql.api.data.Relation;
>> > -import org.apache.samza.sql.api.data.Tuple;
>> > -import org.apache.samza.sql.api.operators.OperatorCallback;
>> > -import org.apache.samza.sql.api.operators.OperatorSpec;
>> > -import org.apache.samza.sql.api.operators.SimpleOperator;
>> > -import org.apache.samza.task.MessageCollector;
>> > -import org.apache.samza.task.TaskCoordinator;
>> > -import org.apache.samza.task.sql.SimpleMessageCollector;
>> > -
>> > -
>> > -/**
>> > - * An abstract class that encapsulate the basic information and 
>>methods
>> > that all operator classes should implement.
>> > - * It implements the interface {@link
>> > org.apache.samza.sql.api.operators.SimpleOperator}
>> > - *
>> > - */
>> > -public abstract class SimpleOperatorImpl implements SimpleOperator {
>> > -  /**
>> > -   * The specification of this operator
>> > -   */
>> > -  private final OperatorSpec spec;
>> > -
>> > -  /**
>> > -   * The callback function
>> > -   */
>> > -  private final OperatorCallback callback;
>> > -
>> > -  /**
>> > -   * Ctor of {@code SimpleOperatorImpl} class
>> > -   *
>> > -   * @param spec The specification of this operator
>> > -   */
>> > -  public SimpleOperatorImpl(OperatorSpec spec) {
>> > -    this(spec, new NoopOperatorCallback());
>> > -  }
>> > -
>> > -  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback
>> callback)
>> > {
>> > -    this.spec = spec;
>> > -    this.callback = callback;
>> > -  }
>> > -
>> > -  @Override
>> > -  public OperatorSpec getSpec() {
>> > -    return this.spec;
>> > -  }
>> > -
>> > -  /**
>> > -   * This method is made final s.t. the sequence of invocations 
>>between
>> > {@link
>> >
>> 
>>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relatio
>>n,
>> > MessageCollector, TaskCoordinator)}
>> > -   * and real processing of the input relation is fixed.
>> > -   */
>> > -  @Override
>> > -  final public void process(Relation deltaRelation, MessageCollector
>> > collector, TaskCoordinator coordinator)
>> > -      throws Exception {
>> > -    Relation rel = this.callback.beforeProcess(deltaRelation, 
>>collector,
>> > coordinator);
>> > -    if (rel == null) {
>> > -      return;
>> > -    }
>> > -    this.realProcess(rel, getCollector(collector, coordinator),
>> > coordinator);
>> > -  }
>> > -
>> > -  /**
>> > -   * This method is made final s.t. the sequence of invocations 
>>between
>> > {@link
>> > 
>>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple,
>> > MessageCollector, TaskCoordinator)}
>> > -   * and real processing of the input tuple is fixed.
>> > -   */
>> > -  @Override
>> > -  final public void process(Tuple tuple, MessageCollector collector,
>> > TaskCoordinator coordinator) throws Exception {
>> > -    Tuple ituple = this.callback.beforeProcess(tuple, collector,
>> > coordinator);
>> > -    if (ituple == null) {
>> > -      return;
>> > -    }
>> > -    this.realProcess(ituple, getCollector(collector, coordinator),
>> > coordinator);
>> > -  }
>> > -
>> > -  /**
>> > -   * This method is made final s.t. we enforce the invocation of 
>>{@code
>> > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)}
>> before
>> > doing anything futher
>> > -   */
>> > -  @Override
>> > -  final public void refresh(long timeNano, MessageCollector 
>>collector,
>> > TaskCoordinator coordinator) throws Exception {
>> > -    this.realRefresh(timeNano, getCollector(collector, coordinator),
>> > coordinator);
>> > -  }
>> > -
>> > -  private SimpleMessageCollector getCollector(MessageCollector
>> collector,
>> > TaskCoordinator coordinator) {
>> > -    if (!(collector instanceof SimpleMessageCollector)) {
>> > -      return new SimpleMessageCollector(collector, coordinator,
>> > this.callback);
>> > -    } else {
>> > -      ((SimpleMessageCollector)
>> > collector).switchOperatorCallback(this.callback);
>> > -      return (SimpleMessageCollector) collector;
>> > -    }
>> > -  }
>> > -
>> > -  /**
>> > -   * Method to be overriden by each specific implementation class of
>> > operator to handle timeout event
>> > -   *
>> > -   * @param timeNano The time in nanosecond when the timeout event
>> > occurred
>> > -   * @param collector The {@link
>> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
>> > -   * @param coordinator The {@link
>> org.apache.samza.task.TaskCoordinator}
>> > in the context
>> > -   * @throws Exception Throws exception if failed to refresh the 
>>results
>> > -   */
>> > -  protected abstract void realRefresh(long timeNano,
>> > SimpleMessageCollector collector, TaskCoordinator coordinator)
>> > -      throws Exception;
>> > -
>> > -  /**
>> > -   * Method to be overriden by each specific implementation class of
>> > operator to perform relational logic operation on an input {@link
>> > org.apache.samza.sql.api.data.Relation}
>> > -   *
>> > -   * @param rel The input relation
>> > -   * @param collector The {@link
>> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
>> > -   * @param coordinator The {@link
>> org.apache.samza.task.TaskCoordinator}
>> > in the context
>> > -   * @throws Exception
>> > -   */
>> > -  protected abstract void realProcess(Relation rel,
>> > SimpleMessageCollector collector, TaskCoordinator coordinator)
>> > -      throws Exception;
>> > -
>> > -  protected abstract void realProcess(Tuple ituple,
>> > SimpleMessageCollector collector, TaskCoordinator coordinator)
>> > -      throws Exception;
>> > -
>> > -}
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.
>>java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
>>pleOperatorSpec.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
>>pleOperatorSpec.java
>> > deleted file mode 100644
>> > index 56753b6..0000000
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
>>pleOperatorSpec.java
>> > +++ /dev/null
>> > @@ -1,106 +0,0 @@
>> > -/*
>> > - * Licensed to the Apache Software Foundation (ASF) under one
>> > - * or more contributor license agreements.  See the NOTICE file
>> > - * distributed with this work for additional information
>> > - * regarding copyright ownership.  The ASF licenses this file
>> > - * to you under the Apache License, Version 2.0 (the
>> > - * "License"); you may not use this file except in compliance
>> > - * with the License.  You may obtain a copy of the License at
>> > - *
>> > - *   http://www.apache.org/licenses/LICENSE-2.0
>> > - *
>> > - * Unless required by applicable law or agreed to in writing,
>> > - * software distributed under the License is distributed on an
>> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > - * KIND, either express or implied.  See the License for the
>> > - * specific language governing permissions and limitations
>> > - * under the License.
>> > - */
>> > -package org.apache.samza.sql.operators.factory;
>> > -
>> > -import java.util.ArrayList;
>> > -import java.util.List;
>> > -
>> > -import org.apache.samza.sql.api.data.EntityName;
>> > -import org.apache.samza.sql.api.operators.OperatorSpec;
>> > -
>> > -
>> > -/**
>> > - * An abstract class that encapsulate the basic information and 
>>methods
>> > that all specification of operators should implement.
>> > - * It implements {@link 
>>org.apache.samza.sql.api.operators.OperatorSpec}
>> > - */
>> > -public abstract class SimpleOperatorSpec implements OperatorSpec {
>> > -  /**
>> > -   * The identifier of the corresponding operator
>> > -   */
>> > -  private final String id;
>> > -
>> > -  /**
>> > -   * The list of input entity names of the corresponding operator
>> > -   */
>> > -  private final List<EntityName> inputs = new 
>>ArrayList<EntityName>();
>> > -
>> > -  /**
>> > -   * The list of output entity names of the corresponding operator
>> > -   */
>> > -  private final List<EntityName> outputs = new 
>>ArrayList<EntityName>();
>> > -
>> > -  /**
>> > -   * Ctor of the {@code SimpleOperatorSpec} for simple {@link
>> > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and 
>>one
>> > output
>> > -   *
>> > -   * @param id Unique identifier of the {@link
>> > org.apache.samza.sql.api.operators.SimpleOperator} object
>> > -   * @param input The only input entity
>> > -   * @param output The only output entity
>> > -   */
>> > -  public SimpleOperatorSpec(String id, EntityName input, EntityName
>> > output) {
>> > -    this.id = id;
>> > -    this.inputs.add(input);
>> > -    this.outputs.add(output);
>> > -  }
>> > -
>> > -  /**
>> > -   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs
>> and
>> > n outputs
>> > -   *
>> > -   * @param id Unique identifier of the {@link
>> > org.apache.samza.sql.api.operators.SimpleOperator} object
>> > -   * @param inputs The list of input entities
>> > -   * @param output The list of output entities
>> > -   */
>> > -  public SimpleOperatorSpec(String id, List<EntityName> inputs,
>> > EntityName output) {
>> > -    this.id = id;
>> > -    this.inputs.addAll(inputs);
>> > -    this.outputs.add(output);
>> > -  }
>> > -
>> > -  @Override
>> > -  public String getId() {
>> > -    return this.id;
>> > -  }
>> > -
>> > -  @Override
>> > -  public List<EntityName> getInputNames() {
>> > -    return this.inputs;
>> > -  }
>> > -
>> > -  @Override
>> > -  public List<EntityName> getOutputNames() {
>> > -    return this.outputs;
>> > -  }
>> > -
>> > -  /**
>> > -   * Method to get the first output entity
>> > -   *
>> > -   * @return The first output entity name
>> > -   */
>> > -  public EntityName getOutputName() {
>> > -    return this.outputs.get(0);
>> > -  }
>> > -
>> > -  /**
>> > -   * Method to get the first input entity
>> > -   *
>> > -   * @return The first input entity name
>> > -   */
>> > -  public EntityName getInputName() {
>> > -    return this.inputs.get(0);
>> > -  }
>> > -}
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
>>pleRouter.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
>>pleRouter.java
>> > deleted file mode 100644
>> > index e570897..0000000
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
>>pleRouter.java
>> > +++ /dev/null
>> > @@ -1,136 +0,0 @@
>> > -/*
>> > - * Licensed to the Apache Software Foundation (ASF) under one
>> > - * or more contributor license agreements.  See the NOTICE file
>> > - * distributed with this work for additional information
>> > - * regarding copyright ownership.  The ASF licenses this file
>> > - * to you under the Apache License, Version 2.0 (the
>> > - * "License"); you may not use this file except in compliance
>> > - * with the License.  You may obtain a copy of the License at
>> > - *
>> > - *   http://www.apache.org/licenses/LICENSE-2.0
>> > - *
>> > - * Unless required by applicable law or agreed to in writing,
>> > - * software distributed under the License is distributed on an
>> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > - * KIND, either express or implied.  See the License for the
>> > - * specific language governing permissions and limitations
>> > - * under the License.
>> > - */
>> > -
>> > -package org.apache.samza.sql.operators.factory;
>> > -
>> > -import java.util.ArrayList;
>> > -import java.util.HashMap;
>> > -import java.util.HashSet;
>> > -import java.util.Iterator;
>> > -import java.util.List;
>> > -import java.util.Map;
>> > -import java.util.Set;
>> > -
>> > -import org.apache.samza.config.Config;
>> > -import org.apache.samza.sql.api.data.EntityName;
>> > -import org.apache.samza.sql.api.data.Relation;
>> > -import org.apache.samza.sql.api.data.Tuple;
>> > -import org.apache.samza.sql.api.operators.Operator;
>> > -import org.apache.samza.sql.api.operators.OperatorRouter;
>> > -import org.apache.samza.sql.api.operators.SimpleOperator;
>> > -import org.apache.samza.task.MessageCollector;
>> > -import org.apache.samza.task.TaskContext;
>> > -import org.apache.samza.task.TaskCoordinator;
>> > -import org.apache.samza.task.sql.RouterMessageCollector;
>> > -
>> > -
>> > -/**
>> > - * Example implementation of {@link
>> > org.apache.samza.sql.api.operators.OperatorRouter}
>> > - *
>> > - */
>> > -public final class SimpleRouter implements OperatorRouter {
>> > -  /**
>> > -   * List of operators added to the {@link
>> > org.apache.samza.sql.api.operators.OperatorRouter}
>> > -   */
>> > -  private List<SimpleOperator> operators = new
>> > ArrayList<SimpleOperator>();
>> > -
>> > -  @SuppressWarnings("rawtypes")
>> > -  /**
>> > -   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the 
>>list
>> > of operators associated with it
>> > -   */
>> > -  private Map<EntityName, List> nextOps = new HashMap<EntityName,
>> List>();
>> > -
>> > -  /**
>> > -   * Set of {@link org.apache.samza.sql.api.data.EntityName} as 
>>inputs
>> to
>> > this {@code SimpleRouter}
>> > -   */
>> > -  private Set<EntityName> inputEntities = new HashSet<EntityName>();
>> > -
>> > -  /**
>> > -   * Set of entities that are not input entities to this {@code
>> > SimpleRouter}
>> > -   */
>> > -  private Set<EntityName> outputEntities = new HashSet<EntityName>();
>> > -
>> > -  @SuppressWarnings("unchecked")
>> > -  private void addOperator(EntityName input, SimpleOperator nextOp) {
>> > -    if (nextOps.get(input) == null) {
>> > -      nextOps.put(input, new ArrayList<Operator>());
>> > -    }
>> > -    nextOps.get(input).add(nextOp);
>> > -    operators.add(nextOp);
>> > -    // get the operator spec
>> > -    for (EntityName output : nextOp.getSpec().getOutputNames()) {
>> > -      if (inputEntities.contains(output)) {
>> > -        inputEntities.remove(output);
>> > -      }
>> > -      outputEntities.add(output);
>> > -    }
>> > -    if (!outputEntities.contains(input)) {
>> > -      inputEntities.add(input);
>> > -    }
>> > -  }
>> > -
>> > -  @Override
>> > -  @SuppressWarnings("unchecked")
>> > -  public List<SimpleOperator> getNextOperators(EntityName entity) {
>> > -    return nextOps.get(entity);
>> > -  }
>> > -
>> > -  @Override
>> > -  public void addOperator(SimpleOperator nextOp) {
>> > -    List<EntityName> inputs = nextOp.getSpec().getInputNames();
>> > -    for (EntityName input : inputs) {
>> > -      addOperator(input, nextOp);
>> > -    }
>> > -  }
>> > -
>> > -  @Override
>> > -  public void init(Config config, TaskContext context) throws 
>>Exception
>> {
>> > -    for (SimpleOperator op : this.operators) {
>> > -      op.init(config, context);
>> > -    }
>> > -  }
>> > -
>> > -  @Override
>> > -  public void process(Tuple ituple, MessageCollector collector,
>> > TaskCoordinator coordinator) throws Exception {
>> > -    MessageCollector opCollector = new 
>>RouterMessageCollector(collector,
>> > coordinator, this);
>> > -    for (Iterator<SimpleOperator> iter =
>> > this.getNextOperators(ituple.getEntityName()).iterator();
>> iter.hasNext();) {
>> > -      iter.next().process(ituple, opCollector, coordinator);
>> > -    }
>> > -  }
>> > -
>> > -  @SuppressWarnings("rawtypes")
>> > -  @Override
>> > -  public void process(Relation deltaRelation, MessageCollector
>> collector,
>> > TaskCoordinator coordinator) throws Exception {
>> > -    MessageCollector opCollector = new 
>>RouterMessageCollector(collector,
>> > coordinator, this);
>> > -    for (Iterator<SimpleOperator> iter =
>> > this.getNextOperators(deltaRelation.getName()).iterator();
>> iter.hasNext();)
>> > {
>> > -      iter.next().process(deltaRelation, opCollector, coordinator);
>> > -    }
>> > -  }
>> > -
>> > -  @Override
>> > -  public void refresh(long nanoSec, MessageCollector collector,
>> > TaskCoordinator coordinator) throws Exception {
>> > -    MessageCollector opCollector = new 
>>RouterMessageCollector(collector,
>> > coordinator, this);
>> > -    for (EntityName entity : inputEntities) {
>> > -      for (Iterator<SimpleOperator> iter =
>> > this.getNextOperators(entity).iterator(); iter.hasNext();) {
>> > -        iter.next().refresh(nanoSec, opCollector, coordinator);
>> > -      }
>> > -    }
>> > -  }
>> > -
>> > -}
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.jav
>>a
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top
>>ologyBuilder.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top
>>ologyBuilder.java
>> > new file mode 100644
>> > index 0000000..62b19fc
>> > --- /dev/null
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top
>>ologyBuilder.java
>> > @@ -0,0 +1,284 @@
>> > +/*
>> > + * Licensed to the Apache Software Foundation (ASF) under one
>> > + * or more contributor license agreements.  See the NOTICE file
>> > + * distributed with this work for additional information
>> > + * regarding copyright ownership.  The ASF licenses this file
>> > + * to you under the Apache License, Version 2.0 (the
>> > + * "License"); you may not use this file except in compliance
>> > + * with the License.  You may obtain a copy of the License at
>> > + *
>> > + *   http://www.apache.org/licenses/LICENSE-2.0
>> > + *
>> > + * Unless required by applicable law or agreed to in writing,
>> > + * software distributed under the License is distributed on an
>> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> > + * KIND, either express or implied.  See the License for the
>> > + * specific language governing permissions and limitations
>> > + * under the License.
>> > + */
>> > +package org.apache.samza.sql.operators.factory;
>> > +
>> > +import java.util.HashMap;
>> > +import java.util.HashSet;
>> > +import java.util.Iterator;
>> > +import java.util.List;
>> > +import java.util.Map;
>> > +import java.util.Set;
>> > +
>> > +import org.apache.samza.sql.api.data.EntityName;
>> > +import org.apache.samza.sql.api.operators.OperatorRouter;
>> > +import org.apache.samza.sql.api.operators.OperatorSink;
>> > +import org.apache.samza.sql.api.operators.OperatorSource;
>> > +import org.apache.samza.sql.api.operators.OperatorSpec;
>> > +import org.apache.samza.sql.api.operators.SimpleOperator;
>> > +import org.apache.samza.sql.api.operators.SqlOperatorFactory;
>> > +import org.apache.samza.sql.operators.OperatorTopology;
>> > +import org.apache.samza.sql.operators.SimpleRouter;
>> > +
>> > +
>> > +/**
>> > + * This class implements a builder to allow user to create the 
>>operators
>> > and connect them in a topology altogether.
>> > + */
>> > +public class TopologyBuilder {
>> > +
>> > +  /**
>> > +   * Internal {@link 
>>org.apache.samza.sql.api.operators.OperatorRouter}
>> > object to retain the topology being created
>> > +   */
>> > +  private SimpleRouter router;
>> > +
>> > +  /**
>> > +   * The {@link 
>>org.apache.samza.sql.api.operators.SqlOperatorFactory}
>> > object used to create operators connected in the topology
>> > +   */
>> > +  private final SqlOperatorFactory factory;
>> > +
>> > +  /**
>> > +   * The map of unbound inputs, the value is set(input_operators)
>> > +   */
>> > +  private Map<EntityName, Set<OperatorSpec>> unboundInputs = new
>> > HashMap<EntityName, Set<OperatorSpec>>();
>> > +
>> > +  /**
>> > +   * The map of unbound outputs, the value is the operator generating
>> the
>> > output
>> > +   */
>> > +  private Map<EntityName, OperatorSpec> unboundOutputs = new
>> > HashMap<EntityName, OperatorSpec>();
>> > +
>> > +  /**
>> > +   * The set of entities that are intermediate entities between
>> operators
>> > +   */
>> > +  private Set<EntityName> interStreams = new HashSet<EntityName>();
>> > +
>> > +  /**
>> > +   * The current operator that may have unbound input or output
>> > +   */
>> > +  private SimpleOperator currentOp = null;
>> > +
>> > +  /**
>> > +   * Private constructor of {@code TopologyBuilder}
>> > +   *
>> > +   * @param factory The {@link
>> > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create
>> operators
>> > +   */
>> > +  private TopologyBuilder(SqlOperatorFactory factory) {
>> > +    this.router = new SimpleRouter();
>> > +    this.factory = factory;
>> > +  }
>> > +
>> > +  /**
>> > +   * Static method to create this {@code TopologyBuilder} w/ a
>> customized
>> > {@link org.apache.samza.sql.api.operators.SqlOperatorFactory}
>> > +   *
>> > +   * @param factory The {@link
>> > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create
>> operators
>> > +   * @return The {@code TopologyBuilder} object
>> > +   */
>> > +  public static TopologyBuilder create(SqlOperatorFactory factory) {
>> > +    return new TopologyBuilder(factory);
>> > +  }
>> > +
>> > +  /**
>> > +   * Static method to create this {@code TopologyBuilder}
>> > +   *
>> > +   * @return The {@code TopologyBuilder} object
>> > +   */
>> > +  public static TopologyBuilder create() {
>> > +    return new TopologyBuilder(new SimpleOperatorFactoryImpl());
>> > +  }
>> > +
>> > +  /**
>> > +   * Public method to create the next operator and attach it to the
>> > output of the current operator
>> > +   *
>> > +   * @param spec The {@link
>> > org.apache.samza.sql.api.operators.OperatorSpec} for the next operator
>> > +   * @return The updated {@code TopologyBuilder} object
>> > +   */
>> > +  public TopologyBuilder operator(OperatorSpec spec) {
>> > +    // check whether it is valid to connect a new operator to the
>> current
>> > operator's output
>> > +    SimpleOperator nextOp = this.factory.getOperator(spec);
>> > +    return this.operator(nextOp);
>> > +  }
>> > +
>> > +  /**
>> > +   * Public method to create the next operator and attach it to the
>> > output of the current operator
>> > +   *
>> > +   * @param op The {@link
>> > org.apache.samza.sql.api.operators.SimpleOperator}
>> > +   * @return The updated {@code TopologyBuilder} object
>> > +   */
>> > +  public TopologyBuilder operator(SimpleOperator op) {
>> > +    // check whether it is valid to connect a new operator to the
>> current
>> > operator's output
>> > +    canAddOperator(op);
>> > +    this.addOperator(op);
>> > +    // advance the current operator position
>> > +    this.currentOp = op;
>> > +    return this;
>> > +  }
>> > +
>> > +  /**
>> > +   * Public method to create a stream object that will be the source 
>>to
>> > other operators
>> > +   *
>> > +   * @return The {@link
>> > org.apache.samza.sql.api.operators.OperatorSource} that can be the 
>>source
>> > to other operators
>> > +   */
>> > +  public OperatorSource stream() {
>> > +    canCreateSource();
>> > +    return new
>> > OperatorTopology(this.unboundOutputs.keySet().iterator().next(),
>> > this.router);
>> > +  }
>> > +
>> > +  /**
>> > +   * Public method to create a sink object that can take input stream
>> > from other operators
>> > +   *
>> > +   * @return The {@link 
>>org.apache.samza.sql.api.operators.OperatorSink}
>> > that can be the downstream of other operators
>> > +   */
>> > +  public OperatorSink sink() {
>> > +    canCreateSink();
>> > +    return new
>> > OperatorTopology(this.unboundInputs.keySet().iterator().next(),
>> > this.router);
>> > +  }
>> > +
>> > +  /**
>> > +   * Public method to bind the input of the current operator w/ the
>> > {@link org.apache.samza.sql.api.operators.OperatorSource} object
>> > +   *
>> > +   * @param srcStream The {@link
>> > org.apache.samza.sql.api.operators.OperatorSource} that the current
>> > operator is going to be bound to
>> > +   * @return The updated {@code TopologyBuilder} object
>> > +   */
>> > +  public TopologyBuilder bind(OperatorSource srcStream) {
>> > +    EntityName streamName = srcStream.getName();
>> > +    if (this.unboundInputs.containsKey(streamName)) {
>> > +      this.unboundInputs.remove(streamName);
>> > +      this.interStreams.add(streamName);
>> > +    } else {
>> > +      // no input operator is waiting for the output from the 
>>srcStream
>> > +      throw new IllegalArgumentException("No operator input can be 
>>bound
>> > to the input stream " + streamName);
>> > +    }
>> > +    // add all operators in srcStream to this topology
>> > +    for (Iterator<SimpleOperator> iter = srcStream.opIterator();
>> > iter.hasNext();) {
>> > +      this.addOperator(iter.next());
>> > +    }
>> > +    return this;
>> > +  }
>> > +
>> > +  /**
>> > +   * Public method to attach a {@link
>> > org.apache.samza.sql.api.operators.OperatorSink} object to the output 
>>of
>> > the current operator
>> > +   *
>> > +   * @param nextSink The {@link
>> > org.apache.samza.sql.api.operators.OperatorSink} to be attached to the
>> > current operator's output
>> > +   * @return The updated {@code TopologyBuilder} object
>> > +   */
>> > +  public TopologyBuilder attach(OperatorSink nextSink) {
>> > +    EntityName streamName = nextSink.getName();
>> > +    if (this.unboundOutputs.containsKey(streamName)) {
>> > +      this.unboundOutputs.remove(streamName);
>> > +      this.interStreams.add(streamName);
>> > +    } else {
>> > +      // no unbound output to attach to
>> > +      throw new IllegalArgumentException("No operator output found to
>> > attach the sink " + streamName);
>> > +    }
>> > +    // add all operators in nextSink to the router
>> > +    for (Iterator<SimpleOperator> iter = nextSink.opIterator();
>> > iter.hasNext();) {
>> > +      this.addOperator(iter.next());
>> > +    }
>> > +    return this;
>> > +  }
>> > +
>> > +  /**
>> > +   * Public method to finalize the topology that should have all 
>>input
>> > and output bound to system input and output
>> > +   *
>> > +   * @return The finalized {@link
>> > org.apache.samza.sql.api.operators.OperatorRouter} object
>> > +   */
>> > +  public OperatorRouter build() {
>> > +    canClose();
>> > +    return router;
>> > +  }
>> > +
>> > +  private TopologyBuilder addOperator(SimpleOperator nextOp) {
>> > +    // if input is not in the unboundOutputs and interStreams, input 
>>is
>> > unbound
>> > +    for (EntityName in : nextOp.getSpec().getInputNames()) {
>> > +      if (this.unboundOutputs.containsKey(in)) {
>> > +        this.unboundOutputs.remove(in);
>> > +        this.interStreams.add(in);
>> > +      }
>> > +      if (!this.interStreams.contains(in) && !in.isSystemEntity()) {
>> > +        if (!this.unboundInputs.containsKey(in)) {
>> > +          this.unboundInputs.put(in, new HashSet<OperatorSpec>());
>> > +        }
>> > +        this.unboundInputs.get(in).add(nextOp.getSpec());
>> > +      }
>> > +    }
>> > +    // if output is not in the unboundInputs and interStreams, 
>>output is
>> > unbound
>> > +    for (EntityName out : nextOp.getSpec().getOutputNames()) {
>> > +      if (this.unboundInputs.containsKey(out)) {
>> > +        this.unboundInputs.remove(out);
>> > +        this.interStreams.add(out);
>> > +      }
>> > +      if (!this.interStreams.contains(out) && !out.isSystemEntity()) 
>>{
>> > +        this.unboundOutputs.put(out, nextOp.getSpec());
>> > +      }
>> > +    }
>> > +    try {
>> > +      this.router.addOperator(nextOp);
>> > +    } catch (Exception e) {
>> > +      throw new RuntimeException("Failed to add operator " +
>> > nextOp.getSpec().getId() + " to the topology.", e);
>> > +    }
>> > +    return this;
>> > +  }
>> > +
>> > +  private void canCreateSource() {
>> > +    if (this.unboundInputs.size() > 0) {
>> > +      throw new IllegalStateException("Can't create stream when there
>> are
>> > unbounded input streams in the topology");
>> > +    }
>> > +    if (this.unboundOutputs.size() != 1) {
>> > +      throw new IllegalStateException(
>> > +          "Can't create stream when the number of unbounded outputs 
>>is
>> > not 1 in the topology");
>> > +    }
>> > +  }
>> > +
>> > +  private void canCreateSink() {
>> > +    if (this.unboundOutputs.size() > 0) {
>> > +      throw new IllegalStateException("Can't create sink when there 
>>are
>> > unbounded output streams in the topology");
>> > +    }
>> > +    if (this.unboundInputs.size() != 1) {
>> > +      throw new IllegalStateException(
>> > +          "Can't create sink when the number of unbounded input 
>>streams
>> > is not 1 in the topology");
>> > +    }
>> > +  }
>> > +
>> > +  private void canAddOperator(SimpleOperator op) {
>> > +    if (this.currentOp == null) {
>> > +      return;
>> > +    }
>> > +    for (EntityName name : this.currentOp.getSpec().getInputNames()) 
>>{
>> > +      if (this.unboundInputs.containsKey(name)) {
>> > +        throw new IllegalArgumentException("There are unbound input 
>>" +
>> > name + " to the current operator "
>> > +            + this.currentOp.getSpec().getId() + ". Create a sink or
>> call
>> > bind instead");
>> > +      }
>> > +    }
>> > +    List<EntityName> nextInputs = op.getSpec().getInputNames();
>> > +    for (EntityName name : 
>>this.currentOp.getSpec().getOutputNames()) {
>> > +      if (!nextInputs.contains(name) &&
>> > this.unboundOutputs.containsKey(name)) {
>> > +        // the current operator's output is not in the next 
>>operator's
>> > input list
>> > +        throw new IllegalArgumentException("There are unbound output 
>>" +
>> > name + " from the current operator "
>> > +            + this.currentOp.getSpec().getId()
>> > +            + " that are not included in the next operator's inputs.
>> > Create a stream or call attach instead");
>> > +      }
>> > +    }
>> > +  }
>> > +
>> > +  private void canClose() {
>> > +    if (!this.unboundInputs.isEmpty() ||
>> !this.unboundOutputs.isEmpty()) {
>> > +      throw new IllegalStateException(
>> > +          "There are input/output streams in the topology that are 
>>not
>> > bounded. Can't build the topology yet.");
>> > +    }
>> > +  }
>> > +
>> > +}
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
>>StreamJoin.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
>>StreamJoin.java
>> > index 2854aeb..7f5b990 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
>>StreamJoin.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
>>StreamJoin.java
>> > @@ -29,7 +29,7 @@ import org.apache.samza.sql.api.data.Relation;
>> >  import org.apache.samza.sql.api.data.Stream;
>> >  import org.apache.samza.sql.api.data.Tuple;
>> >  import org.apache.samza.sql.api.operators.OperatorCallback;
>> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
>> > +import org.apache.samza.sql.operators.SimpleOperatorImpl;
>> >  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
>> >  import org.apache.samza.sql.window.storage.OrderedStoreKey;
>> >  import org.apache.samza.storage.kv.Entry;
>> > @@ -38,7 +38,6 @@ import org.apache.samza.task.TaskContext;
>> >  import org.apache.samza.task.TaskCoordinator;
>> >  import org.apache.samza.task.sql.SimpleMessageCollector;
>> >
>> > -
>> >  /**
>> >   * This class implements a simple stream-to-stream join
>> >   */
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.j
>>ava
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
>>StreamJoinSpec.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
>>StreamJoinSpec.java
>> > index cc0aca0..eecff7e 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
>>StreamJoinSpec.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
>>StreamJoinSpec.java
>> > @@ -19,10 +19,11 @@
>> >
>> >  package org.apache.samza.sql.operators.join;
>> >
>> > +import java.util.ArrayList;
>> >  import java.util.List;
>> >
>> >  import org.apache.samza.sql.api.data.EntityName;
>> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
>> > +import org.apache.samza.sql.operators.SimpleOperatorSpec;
>> >
>> >
>> >  /**
>> > @@ -35,4 +36,16 @@ public class StreamStreamJoinSpec extends
>> > SimpleOperatorSpec {
>> >      // TODO Auto-generated constructor stub
>> >    }
>> >
>> > +  @SuppressWarnings("serial")
>> > +  public StreamStreamJoinSpec(String id, List<String> inputRelations,
>> > String output, List<String> joinKeys) {
>> > +    super(id, new ArrayList<EntityName>() {
>> > +      {
>> > +        for (String input : inputRelations) {
>> > +          add(EntityName.getStreamName(input));
>> > +        }
>> > +      }
>> > +    }, EntityName.getStreamName(output));
>> > +    // TODO Auto-generated constructor stub
>> > +  }
>> > +
>> >  }
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
>>artitionOp.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
>>artitionOp.java
>> > index b93d789..0cba39a 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
>>artitionOp.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
>>artitionOp.java
>> > @@ -23,7 +23,7 @@ import org.apache.samza.config.Config;
>> >  import org.apache.samza.sql.api.data.Relation;
>> >  import org.apache.samza.sql.api.data.Tuple;
>> >  import org.apache.samza.sql.api.operators.OperatorCallback;
>> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
>> > +import org.apache.samza.sql.operators.SimpleOperatorImpl;
>> >  import org.apache.samza.storage.kv.Entry;
>> >  import org.apache.samza.storage.kv.KeyValueIterator;
>> >  import org.apache.samza.system.OutgoingMessageEnvelope;
>> > @@ -32,7 +32,6 @@ import org.apache.samza.task.TaskContext;
>> >  import org.apache.samza.task.TaskCoordinator;
>> >  import org.apache.samza.task.sql.SimpleMessageCollector;
>> >
>> > -
>> >  /**
>> >   * This is an example build-in operator that performs a simple stream
>> > re-partition operation.
>> >   *
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.jav
>>a
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
>>artitionSpec.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
>>artitionSpec.java
>> > index c47eed9..e494bff 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
>>artitionSpec.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
>>artitionSpec.java
>> > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.partition;
>> >
>> >  import org.apache.samza.sql.api.data.EntityName;
>> >  import org.apache.samza.sql.api.operators.OperatorSpec;
>> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
>> > +import org.apache.samza.sql.operators.SimpleOperatorSpec;
>> >  import org.apache.samza.system.SystemStream;
>> >
>> >
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.ja
>>va
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun
>>dedTimeWindow.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun
>>dedTimeWindow.java
>> > index d81cc93..a9a83b5 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun
>>dedTimeWindow.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun
>>dedTimeWindow.java
>> > @@ -27,13 +27,12 @@ import org.apache.samza.sql.api.data.EntityName;
>> >  import org.apache.samza.sql.api.data.Relation;
>> >  import org.apache.samza.sql.api.data.Tuple;
>> >  import org.apache.samza.sql.api.operators.OperatorCallback;
>> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
>> > +import org.apache.samza.sql.operators.SimpleOperatorImpl;
>> >  import org.apache.samza.storage.kv.KeyValueIterator;
>> >  import org.apache.samza.task.TaskContext;
>> >  import org.apache.samza.task.TaskCoordinator;
>> >  import org.apache.samza.task.sql.SimpleMessageCollector;
>> >
>> > -
>> >  /**
>> >   * This class defines an example build-in operator for a fixed size
>> > window operator that converts a stream to a relation
>> >   *
>> > @@ -86,6 +85,7 @@ public class BoundedTimeWindow extends
>> > SimpleOperatorImpl {
>> >     * @param lengthSec The window size in seconds
>> >     * @param input The input stream name
>> >     * @param output The output relation name
>> > +   * @param callback The user callback object
>> >     */
>> >    public BoundedTimeWindow(String wndId, int lengthSec, String input,
>> > String output, OperatorCallback callback) {
>> >      super(new WindowSpec(wndId, EntityName.getStreamName(input),
>> > EntityName.getStreamName(output), lengthSec), callback);
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind
>>owSpec.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind
>>owSpec.java
>> > index eec32ea..6c4eba8 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind
>>owSpec.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind
>>owSpec.java
>> > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.window;
>> >
>> >  import org.apache.samza.sql.api.data.EntityName;
>> >  import org.apache.samza.sql.api.operators.OperatorSpec;
>> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
>> > +import org.apache.samza.sql.operators.SimpleOperatorSpec;
>> >
>> >
>> >  /**
>> > @@ -47,6 +47,11 @@ public class WindowSpec extends SimpleOperatorSpec
>> > implements OperatorSpec {
>> >      this.wndSizeSec = lengthSec;
>> >    }
>> >
>> > +  public WindowSpec(String id, int wndSize, String input) {
>> > +    super(id, EntityName.getStreamName(input), null);
>> > +    this.wndSizeSec = wndSize;
>> > +  }
>> > +
>> >    /**
>> >     * Method to get the window state relation name
>> >     *
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol
>>lector.java
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol
>>lector.java
>> > index b29838a..6950f67 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol
>>lector.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol
>>lector.java
>> > @@ -22,7 +22,7 @@ package org.apache.samza.task.sql;
>> >  import org.apache.samza.sql.api.data.Relation;
>> >  import org.apache.samza.sql.api.data.Tuple;
>> >  import org.apache.samza.sql.api.operators.OperatorCallback;
>> > -import org.apache.samza.sql.operators.factory.NoopOperatorCallback;
>> > +import org.apache.samza.sql.operators.NoopOperatorCallback;
>> >  import org.apache.samza.storage.kv.Entry;
>> >  import org.apache.samza.storage.kv.KeyValueIterator;
>> >  import org.apache.samza.system.OutgoingMessageEnvelope;
>> > @@ -57,25 +57,38 @@ public class SimpleMessageCollector implements
>> > MessageCollector {
>> >     * @param coordinator The {@link
>> org.apache.samza.task.TaskCoordinator}
>> > in the context
>> >     */
>> >    public SimpleMessageCollector(MessageCollector collector,
>> > TaskCoordinator coordinator) {
>> > -    this.collector = collector;
>> > -    this.coordinator = coordinator;
>> > +    this(collector, coordinator, new NoopOperatorCallback());
>> >    }
>> >
>> >    /**
>> >     * This method swaps the {@code callback} with the new one
>> >     *
>> > -   * <p> This method allows the {@link
>> > org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when 
>>the
>> > collector
>> > -   * is passed down into the next operator's context. Hence, under 
>>the
>> > new operator's context, the correct {@link
>> >
>> 
>>org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation
>>,
>> > MessageCollector, TaskCoordinator)},
>> > -   * and {@link
>> > 
>>org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple,
>> > MessageCollector, TaskCoordinator)} can be invoked
>> > +   * <p> This method allows the {@link
>> > org.apache.samza.sql.api.operators.OperatorCallback} to be swapped 
>>when
>> the
>> > collector
>> > +   * is passed down into the next operator's context. Hence, under 
>>the
>> > new operator's context, the correct callback functions can be invoked
>> >     *
>> >     * @param callback The new {@link
>> > org.apache.samza.sql.api.operators.OperatorCallback} to be set
>> >     */
>> > -  public void switchOperatorCallback(OperatorCallback callback) {
>> > -    this.callback = callback;
>> > +  public void switchCallback(OperatorCallback callback) {
>> > +    if (callback == null) {
>> > +      this.callback = new NoopOperatorCallback();
>> > +    } else {
>> > +      this.callback = callback;
>> > +    }
>> > +  }
>> > +
>> > +  /**
>> > +   * Method is declared to be final s.t. we enforce that the callback
>> > functions are called first
>> > +   */
>> > +  @Override
>> > +  final public void send(OutgoingMessageEnvelope envelope) {
>> > +    this.collector.send(envelope);
>> >    }
>> >
>> >    /**
>> >     * Method is declared to be final s.t. we enforce that the callback
>> > functions are called first
>> > +   *
>> > +   * @param deltaRelation The relation to be sent out
>> > +   * @throws Exception Throws exception if failed to send
>> >     */
>> >    final public void send(Relation deltaRelation) throws Exception {
>> >      Relation rel = this.callback.afterProcess(deltaRelation, 
>>collector,
>> > coordinator);
>> > @@ -87,6 +100,9 @@ public class SimpleMessageCollector implements
>> > MessageCollector {
>> >
>> >    /**
>> >     * Method is declared to be final s.t. we enforce that the callback
>> > functions are called first
>> > +   *
>> > +   * @param tuple The tuple to be sent out
>> > +   * @throws Exception Throws exception if failed to send
>> >     */
>> >    final public void send(Tuple tuple) throws Exception {
>> >      Tuple otuple = this.callback.afterProcess(tuple, collector,
>> > coordinator);
>> > @@ -106,9 +122,4 @@ public class SimpleMessageCollector implements
>> > MessageCollector {
>> >    protected void realSend(Tuple tuple) throws Exception {
>> >      this.collector.send((OutgoingMessageEnvelope) 
>>tuple.getMessage());
>> >    }
>> > -
>> > -  @Override
>> > -  public void send(OutgoingMessageEnvelope envelope) {
>> > -    this.collector.send(envelope);
>> > -  }
>> >  }
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper
>>atorTask.java
>> >
>> 
>>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper
>>atorTask.java
>> > index 20dc701..7370af6 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper
>>atorTask.java
>> > +++
>> >
>> 
>>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper
>>atorTask.java
>> > @@ -22,6 +22,7 @@ package org.apache.samza.task.sql;
>> >  import org.apache.samza.config.Config;
>> >  import org.apache.samza.sql.api.data.Relation;
>> >  import org.apache.samza.sql.api.data.Tuple;
>> > +import org.apache.samza.sql.api.operators.Operator;
>> >  import org.apache.samza.sql.api.operators.OperatorCallback;
>> >  import org.apache.samza.sql.data.IncomingMessageTuple;
>> >  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
>> > @@ -39,7 +40,7 @@ import org.apache.samza.task.WindowableTask;
>> >   *
>> >   */
>> >  public class RandomWindowOperatorTask implements StreamTask,
>> > InitableTask, WindowableTask {
>> > -  private BoundedTimeWindow wndOp;
>> > +  private Operator operator;
>> >
>> >    private final OperatorCallback wndCallback = new 
>>OperatorCallback() {
>> >
>> > @@ -77,20 +78,20 @@ public class RandomWindowOperatorTask implements
>> > StreamTask, InitableTask, Windo
>> >    public void process(IncomingMessageEnvelope envelope, 
>>MessageCollector
>> > collector, TaskCoordinator coordinator)
>> >        throws Exception {
>> >      // based on tuple's stream name, get the window op and run 
>>process()
>> > -    wndOp.process(new IncomingMessageTuple(envelope), collector,
>> > coordinator);
>> > +    operator.process(new IncomingMessageTuple(envelope), collector,
>> > coordinator);
>> >
>> >    }
>> >
>> >    @Override
>> >    public void window(MessageCollector collector, TaskCoordinator
>> > coordinator) throws Exception {
>> >      // based on tuple's stream name, get the window op and run 
>>process()
>> > -    wndOp.refresh(System.nanoTime(), collector, coordinator);
>> > +    operator.refresh(System.nanoTime(), collector, coordinator);
>> >    }
>> >
>> >    @Override
>> >    public void init(Config config, TaskContext context) throws 
>>Exception
>> {
>> >      // 1. create a fixed length 10 sec window operator
>> > -    this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1",
>> > "relation1", this.wndCallback);
>> > -    this.wndOp.init(config, context);
>> > +    this.operator = new BoundedTimeWindow("wndOp1", 10, 
>>"kafka:stream1",
>> > "wndOutput", this.wndCallback);
>> > +    this.operator.init(config, context);
>> >    }
>> >  }
>> >
>> >
>> >
>> 
>>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
>>/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
>> > ----------------------------------------------------------------------
>> > diff --git
>> >
>> 
>>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja
>>va
>> >
>> 
>>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja
>>va
>> > index 9124e3c..d65892c 100644
>> > ---
>> >
>> 
>>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja
>>va
>> > +++
>> >
>> 
>>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja
>>va
>> > @@ -24,7 +24,7 @@ import java.util.List;
>> >
>> >  import org.apache.samza.config.Config;
>> >  import org.apache.samza.sql.data.IncomingMessageTuple;
>> > -import org.apache.samza.sql.operators.factory.SimpleRouter;
>> > +import org.apache.samza.sql.operators.SimpleRouter;
>> >  import org.apache.samza.sql.operators.join.StreamStreamJoin;
>> >  import org.apache.samza.sql.operators.partition.PartitionOp;
>> >  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
>> > @@ -51,25 +51,25 @@ import org.apache.samza.task.WindowableTask;
>> >   */
>> >  public class StreamSqlTask implements StreamTask, InitableTask,
>> > WindowableTask {
>> >
>> > -  private SimpleRouter rteCntx;
>> > +  private SimpleRouter router;
>> >
>> >    @Override
>> >    public void process(IncomingMessageEnvelope envelope, 
>>MessageCollector
>> > collector, TaskCoordinator coordinator)
>> >        throws Exception {
>> > -    this.rteCntx.process(new IncomingMessageTuple(envelope), 
>>collector,
>> > coordinator);
>> > +    this.router.process(new IncomingMessageTuple(envelope), 
>>collector,
>> > coordinator);
>> >    }
>> >
>> >    @Override
>> >    public void window(MessageCollector collector, TaskCoordinator
>> > coordinator) throws Exception {
>> > -    this.rteCntx.refresh(System.nanoTime(), collector, coordinator);
>> > +    this.router.refresh(System.nanoTime(), collector, coordinator);
>> >    }
>> >
>> >    @Override
>> >    public void init(Config config, TaskContext context) throws 
>>Exception
>> {
>> >      // create all operators via the operator factory
>> >      // 1. create two window operators
>> > -    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10,
>> > "inputStream1", "fixedWndOutput1");
>> > -    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10,
>> > "inputStream2", "fixedWndOutput2");
>> > +    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10,
>> > "kafka:inputStream1", "fixedWndOutput1");
>> > +    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10,
>> > "kafka:inputStream2", "fixedWndOutput2");
>> >      // 2. create one join operator
>> >      @SuppressWarnings("serial")
>> >      List<String> inputRelations = new ArrayList<String>() {
>> > @@ -86,19 +86,19 @@ public class StreamSqlTask implements StreamTask,
>> > InitableTask, WindowableTask {
>> >        }
>> >      };
>> >      StreamStreamJoin join = new StreamStreamJoin("joinOp",
>> > inputRelations, "joinOutput", joinKeys);
>> > -    // 4. create a re-partition operator
>> > +    // 3. create a re-partition operator
>> >      PartitionOp par = new PartitionOp("parOp1", "joinOutput", 
>>"kafka",
>> > "parOutputStrm1", "joinKey", 50);
>> >
>> >      // Now, connecting the operators via the OperatorRouter
>> > -    this.rteCntx = new SimpleRouter();
>> > +    this.router = new SimpleRouter();
>> >      // 1. set two system input operators (i.e. two window operators)
>> > -    this.rteCntx.addOperator(wnd1);
>> > -    this.rteCntx.addOperator(wnd2);
>> > +    this.router.addOperator(wnd1);
>> > +    this.router.addOperator(wnd2);
>> >      // 2. connect join operator to both window operators
>> > -    this.rteCntx.addOperator(join);
>> > +    this.router.addOperator(join);
>> >      // 3. connect re-partition operator to the stream operator
>> > -    this.rteCntx.addOperator(par);
>> > +    this.router.addOperator(par);
>> >
>> > -    this.rteCntx.init(config, context);
>> > +    this.router.init(config, context);
>> >    }
>> >  }
>> >
>> >
>>
>>
>> --
>> Milinda Pathirage
>>
>> PhD Student | Research Assistant
>> School of Informatics and Computing | Data to Insight Center
>> Indiana University
>>
>> twitter: milindalakmal
>> skype: milinda.pathirage
>> blog: http://milinda.pathirage.org
>>


Mime
View raw message