samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Riccomini" <criccom...@apache.org>
Subject Re: Review Request 28016: SAMZA-226: Auto create Changelog stream topics
Date Fri, 14 Nov 2014 17:32:24 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28016/#review61445
-----------------------------------------------------------


Nit: space after //

Just to make code more uniform.


docs/learn/documentation/versioned/jobs/configuration-table.html
<https://reviews.apache.org/r/28016/#comment103025>

    Do we really need this config? What's the use case where you'd want to set the changelog
partition count to something other than the correct size (which is automatically determined
in the container)?



samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
<https://reviews.apache.org/r/28016/#comment103024>

    Should use "stream" instead of "topic" in this API. Samza doesn't use "topic".
    
    I think this method should be createChangelogSream(String streamName, int partitions).
The reason for keeping it changelog-specific is that we might want custom configs for changelog
streams (e.g. custom replication count, segment size in Kafka, etc). If we don't make the
API specific to the changelog, there's no way to have custom configs for changelogs (since
we won't know if the topic being created is a changelog or not).
    
    Also, the pattern that I'm following for SAMZA-448 is to add a new interface for create*Stream
methods in the SystemAdmin is to have an interface that extends SystemAdmin with just the
create method in it (e.g. ChangelogSystemAdmin). Then, only KafkaSystemAdmin would implement
the new interface, whereas the FileReaderSystemAdmin would not. This was based on Yan's feedback.



samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
<https://reviews.apache.org/r/28016/#comment103026>

    Don't need this if you have a ChangelogSystemAdmin



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103039>

    Remove. Move getChangeLogOldestOffsetsForPartition  into TaskStorageManager.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103030>

    Not needed.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103036>

    Prefer = Map() over null.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103040>

    createStreams



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103034>

    No need for :Unit =



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103031>

    Remove newlines.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103033>

    Message in SamzaException.
    
    space after ,



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103032>

    Indentation.



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103043>

    Don't need if we hav a ChangelogSystemAdmin.



samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
<https://reviews.apache.org/r/28016/#comment103044>

    Don't need if we hav a ChangelogSystemAdmin.



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
<https://reviews.apache.org/r/28016/#comment103048>

    I don't think we need this, do we? Is there ever a case where you'd want to set the changelog
partition count to a non-default number?



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
<https://reviews.apache.org/r/28016/#comment103045>

    stores.%s.changelog.kafka
    
    (typo on changelog, and kafkaprops -> kafka)



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
<https://reviews.apache.org/r/28016/#comment103047>

    map -> foreach



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103054>

    Javadocs.
    
    After seeing the implementation, I prefer not passing a config object into the constructor,
and just having a constructor with a lot of defaults (as we had before). The reason that I
don't like this is:
    
    1. It's impossible to know what to set in the config to create the KafkaSystemAdmin without
reading its code.
    2. It bleeds wiring with the class itself. These are two separate concerns, and should
be treated separately.
    3. It makes writing tests harder, since you have to create a config object, rather than
just passing in the params.
    
    Recommend switching back to the old style, and just adding new params to the constructor.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103057>

    Javadocs.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103056>

    Javadocs.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103065>

    This code looks to be pretty much copied from KafkaCheckpointManager. Can we just have
the code once, and call it in both places? Maybe in KafkaUtil?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103061>

    Don't think we need this. Should just use numKafkaChangeLogPartitions.
    
    Also, we should try and standardize on "Changelog" not "ChangeLog".



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103062>

    checkpoint topic -> changelog topic



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103063>

    Checkpoint -> changelog.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103067>

    This looks to be copied from KafkaCheckpointManager as well. Can we converge on one util
method in KafkaUtil, and call from both spots?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103068>

    Checkpoint again.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103059>

    What is this? Javadocs.



samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
<https://reviews.apache.org/r/28016/#comment103069>

    Not needed if we have ChangelogSystemAdmin.



samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
<https://reviews.apache.org/r/28016/#comment103071>

    Remove.


- Chris Riccomini


On Nov. 13, 2014, 11:58 p.m., Naveen Somasundaram wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28016/
> -----------------------------------------------------------
> 
> (Updated Nov. 13, 2014, 11:58 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> I have added an new method to the system admin as discussed in the jira, the task storage
manager fetches all the information necessary and creates the change log topic using the system
admin.
> 
> PENDING: I have to update the Samza docs with the new configurations added, will update
the rb with docs updates
> 
> 
> Diffs
> -----
> 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 4266a137ae003e946e11c122d94061c31d643c77

>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 571c60631357ea9a0b4fa24e7253008619ef2f32

>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
38e313f3c39454110efd354e6ca025869fa930cd 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala d91d6d7940bd07a145dd3b782a9239f24bb5cf2e

>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b8719c36c2b9346bcd3f291e23b33d2c00cebfa9

>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
98e92bc12f3e2827cdec02f1ce94d7e2314e4b4e 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala a79eccaa8fc18d197b77f9363f1814fefc4ac40d

>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9fc1f56d4404ec7722c0d34fde2804e981b41309

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 5ac33ea36da451250655d9dd373692b964322b41

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala 4ed5e881031e019d8df6de259cabb658820a3ba0

>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java fa1d51b290013a3913d64884dc43907a76670849

>   samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
118f5eee22016db3b802c32fb26c5d72fa61f1a7 
> 
> Diff: https://reviews.apache.org/r/28016/diff/
> 
> 
> Testing
> -------
> 
> Modoified TestStatefulTask to disable auto creation of topics and the test seems to work.
> 
> 
> Thanks,
> 
> Naveen Somasundaram
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message