samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yan Fang" <yanfang...@gmail.com>
Subject Re: Review Request 34974: SAMZA-676: implement broadcast stream
Date Mon, 22 Jun 2015 06:19:03 GMT


> On June 18, 2015, 10:37 p.m., Navina Ramesh wrote:
> > Mostly looks good. Have some questions:
> > * Have you tried the message "filtering" logic to the container level instead of
the task level ? Not sure which is simpler in terms of code change. Since the container has
access to all the task Instances and the systemAdmins, it seems convenient to have the caughtUp
map within containerContext. I could be wrong :)
> > * I want to test the patch locally before confirming a ship it. Looks awesome for
a first draft!

The Container only initializes the task instances. At the beginning, the container knows all
the information about the tasks and systems. However, after the RunLoop is called, the containerContext
will not be updated. So it's a little difficult to inform the task intances when the offsets
are caught up.


> On June 18, 2015, 10:37 p.m., Navina Ramesh wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, line
412
> > <https://reviews.apache.org/r/34974/diff/4/?file=986051#file986051line412>
> >
> >     The exception message is inaccurate. It can also happen when the taskName is
not in startingOffsets map (although I am not sure if such a case will happen).

If the taskName is not in the startingOffsets map, this exception will not be thrown. (It
is inside the loop of line 374)


> On June 18, 2015, 10:37 p.m., Navina Ramesh wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, line 144
> > <https://reviews.apache.org/r/34974/diff/4/?file=986055#file986055line144>
> >
> >     Should we a have different metric for number of messages received by process()
than the number of messages actually processed?
> >     We need to clarify the semantics of all our metrics, in perhaps a separate RB

yes, fixed.


> On June 18, 2015, 10:37 p.m., Navina Ramesh wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, line 132
> > <https://reviews.apache.org/r/34974/diff/4/?file=986055#file986055line132>
> >
> >     instead of getOrElse(null), try .orNull

fixed


> On June 18, 2015, 10:37 p.m., Navina Ramesh wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala,
line 109
> > <https://reviews.apache.org/r/34974/diff/4/?file=986071#file986071line109>
> >
> >     We are registering with the offset in the method invocation in Line 105. Why
do we need to update the topicPartitionsAndOffsets map with the replaced offset ?
> >     
> >     I understand that all tasks within the same container may be at different offset
for broadcast stream ssps. But looks like consumer.register is being invoked in multiple places
- TaskStorageManager & CoordinatorStreamSystemConsumer . Will the change impact these
other components ?

Why do we need to update the topicPartitionsAndOffsets map with the replaced offset ?
  -- because linke 105 only registers the offsets in BlockingEnvelopeMap, which is for bufferring.
It has nothing to do with the starting offsets that consumers will consume.

 Will the change impact these other components ?
   -- No. If the stream is changelog, all tasks have their own changelog (partitions). So
one partitions is assigned to more than one tasks. The replacement will not happen. The code
is the same as before-this-patch. If the stream is coordinatorStream, we will always consume
from the beginning, the starting offset is always 0.


- Yan


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/#review88308
-----------------------------------------------------------


On June 22, 2015, 6:07 a.m., Yan Fang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34974/
> -----------------------------------------------------------
> 
> (Updated June 22, 2015, 6:07 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-676
>     https://issues.apache.org/jira/browse/SAMZA-676
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> 1. added offsetComparator method in SystemAdmin Interface
> 
> 2. added "task.global.inputs" config
> 
> 3. rewrote Grouper classes using Java; allows to assign global streams during grouping
> 
> 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to preserve
messages order
> 
> 5. added taskNames to the offsets in OffsetManager
> 
> 6. allowed to assign one SSP to multiple taskInstances
> 
> 7. skipped already-processed messages in RunLoop
> 
> 8. unit tests for all changes
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 3374f0c 
>   docs/learn/documentation/versioned/container/samza-container.md 9f46414 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 405e2ce 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
249b8ae 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 20e5d26 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala cbacd18 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala c5a5ea5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 9dc7051

>   samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
44e95fc 
>   samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
3c0acad 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
097f410 
>   samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java PRE-CREATION

>   samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c46

>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 64a5844 
>   samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 9fb1aa9

>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 7caad28

>   samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
a14169b 
>   samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
74daf72 
>   samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
deb3895 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f5

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
de00320 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala 1629035

>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
2a84328 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366

>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
1e936b4 
> 
> Diff: https://reviews.apache.org/r/34974/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yan Fang
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message