samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Riccomini" <criccom...@apache.org>
Subject Re: Review Request 27649: SAMZA-448
Date Thu, 13 Nov 2014 17:33:55 GMT


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java, line 57
> > <https://reviews.apache.org/r/27649/diff/1/?file=760109#file760109line57>
> >
> >     a little concerned about adding this method in the interface.
> >     1. it breaks backwards compatibility for all existing systems, though we only
have KafkaSystem and FileSystem.
> >     2. it "implies" all systemAdmins should implement this. Actually only a few
systemAdmins maybe used to generate coordinatorStream.

Yea, I think you're right. I was considering adding a more specific SystemAdmin interface,
like CoordinatorSystemAdmin, which would mix in this method. Does that sounds good?


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
lines 56-57
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line56>
> >
> >     a little doc here ? I am still a little confusing about what each map contains.

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
line 71
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line71>
> >
> >     what is this "user.name" for?

It tracks the username of the account that sent the coordinator stream message. This is mostly
useful when we have control-job.sh, and developers are manually modifying their cooridnator
stream. It should help with debugging when some confusing message is in the coordinator stream.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
line 77
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line77>
> >
> >     maybe some messages?

Decided to switch this to a warn(), and just setHost to "".


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
line 114
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line114>
> >
> >     the name of this method is too ambiguous.

Not quite sure what to call it. I tried getMessageValue.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
line 119
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line119>
> >
> >     the name of this method is a little ambiguous. I was thinking it may put value
into KeyMap.

Same as above.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
line 156
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line156>
> >
> >     duplicate code in getValue()
> >     "Map<String, String>) messageMap.get("values")"

Good catch. Added getMessageValues().


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
line 199
> > <https://reviews.apache.org/r/27649/diff/1/?file=760111#file760111line199>
> >
> >     what does this "value" mean?

I switched to putMessageValue, and added a lot more docs in the CoordinatorStreamMessage's
Javadoc section to explain this.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java,
line 46
> > <https://reviews.apache.org/r/27649/diff/1/?file=760112#file760112line46>
> >
> >     remove "reads"?

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java,
line 91
> > <https://reviews.apache.org/r/27649/diff/1/?file=760112#file760112line91>
> >
> >     in the comment, it says "the earliest offset" while the method called is "getOldestOffset().
Can they use the same word ?

Done. Standardized on "oldest", since that's what we use for config as well. Agree this is
confusing.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java,
line 34
> > <https://reviews.apache.org/r/27649/diff/1/?file=760113#file760113line34>
> >
> >     remove "reads" here?

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala, lines 41-42
> > <https://reviews.apache.org/r/27649/diff/1/?file=760115#file760115line41>
> >
> >     shall we follow the same naming fashion as JOB_COORDINATOR_SYSTEM and JOB_CONTAINER_COUNT
?

Good call. Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala, line
59
> > <https://reviews.apache.org/r/27649/diff/1/?file=760117#file760117line59>
> >
> >     some javadocs here?

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala, line
61
> > <https://reviews.apache.org/r/27649/diff/1/?file=760117#file760117line61>
> >
> >     do we want to have a metric such as JobCoordinatorMetric ?

Yes. I was planning to leave that for the JobCoordinator refactor, though.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala,
lines 59-63
> > <https://reviews.apache.org/r/27649/diff/1/?file=760118#file760118line59>
> >
> >     a lot of duplicate code here. can we get rid of some of them?

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala, line
57
> > <https://reviews.apache.org/r/27649/diff/1/?file=760120#file760120line57>
> >
> >     "coorindatorSystemConfig" -> "coordinatorSystemConfig" ? typo . lol

Ooof. My hands have the hardest time typing "coordinator". Fixed.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 155
> > <https://reviews.apache.org/r/27649/diff/1/?file=760123#file760123line155>
> >
> >     space

Done.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala,
line 112
> > <https://reviews.apache.org/r/27649/diff/1/?file=760130#file760130line112>
> >
> >     this will throw exceptions when we have fewer than 3 brokers.

Yea, I ran into this while testing. I'm hesitant to default it to 1, though, since this is
unsafe (don't want to lose coordinator stream). On the other hand, we default it to 3 in the
KafkaSystemFactory anyway, so the only time this constructor's default value is used would
be in a test. Switched to 1.


> On Nov. 13, 2014, 9:55 a.m., Yan Fang wrote:
> > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java, line
67
> > <https://reviews.apache.org/r/27649/diff/1/?file=760134#file760134line67>
> >
> >     space

Fixed.


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27649/#review61222
-----------------------------------------------------------


On Nov. 12, 2014, 6:50 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27649/
> -----------------------------------------------------------
> 
> (Updated Nov. 12, 2014, 6:50 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-448
>     https://issues.apache.org/jira/browse/SAMZA-448
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> make deletes actually work
> 
> 
> add javadocs to mock coordinator stream classes.
> 
> 
> delete old configs when job runner publishes new configs
> 
> 
> add rewriting into job coordinator
> 
> 
> make process job properly set only coordinator stream config
> 
> 
> all tests pass
> 
> 
> fix samza container performance test
> 
> 
> explicitly flush all buffers when closing the kafka producer. fix stateful task test.
> 
> 
> fix kafka tests
> 
> 
> all core tests work
> 
> 
> fix test checkpoint tool by adding a mock coordinator consumer that dumps the entire
config
> 
> 
> working on fixing checkpoint tool tests
> 
> 
> fleshing out the coordinator stream message javadocs
> 
> 
> remove duplicate code from coordinator system factory
> 
> 
> add more javadocs. clean up todos inkafka system admin.
> 
> 
> remove yarn.container.count from yarn config, but use it as a fallback to job.container.count
> 
> 
> add some docs and headers to the coodinator stream and system admin
> 
> 
> refactoring to add coordinator stream system consumer
> 
> 
> cleanup source in job runne
> 
> 
> abstract coordinator system producer creation into a factory
> 
> 
> add todos
> 
> 
> config stream works
> 
> 
> create coordinator stream in system admin
> 
> 
> connecting job coordinator to job runner via coordinator stream
> 
> 
> add util and logging methods
> 
> 
> adding coordinator message and system producer wrapper
> 
> 
> Diffs
> -----
> 
>   build.gradle 828bce9913db00161971607e4c9ac19c63cecb95 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 571c60631357ea9a0b4fa24e7253008619ef2f32

>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
38e313f3c39454110efd354e6ca025869fa930cd 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ddc30af7c52d8a4d5c5de02f6757c040b1f31c93

>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 3b6685e00837a4aaf809813e62b7e52823bc07a9

>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02

>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala c14f2f623bb4bae911dd3085ce428175930e4545

>   samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 16345cd1c1354a0d25a0000d81a307dbe3abbe81

>   samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 6985af6e7cc0d408fa07fbac60141d1126323777

>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 530255e5866bc49ec5ce1a0b7437470cd4e17010

>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
98e92bc12f3e2827cdec02f1ce94d7e2314e4b4e 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 1a67586eeec95dabfeb3b6881af9b3865c3029ca

>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
PRE-CREATION 
>   samza-core/src/test/resources/test.properties 9348c7de956ebf02f58a163dc6fb391a7e29ae64

>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala af800dfeedbfea75abaac3f15fd53bc55b743daf

>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala a79eccaa8fc18d197b77f9363f1814fefc4ac40d

>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 1eb0edab1bc792ccf8c503b03687284451ab0f34

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 5ac33ea36da451250655d9dd373692b964322b41

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala 4ed5e881031e019d8df6de259cabb658820a3ba0

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
a0e1ccbfe9dc4fd26ca6b30fc2d1348fb7d007e4 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java fa1d51b290013a3913d64884dc43907a76670849

>   samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
118f5eee22016db3b802c32fb26c5d72fa61f1a7 
>   samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
d589d762a18f9425aa8d8dd589011a151bcb59a4 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 03395e2efa0fec723e354177d06bfacf7d2a9215

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 91aff3c5e0a2bcea45120d794371fca1c638ccfe

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
b0b6543856cb87888c5a719182ad9576b51bba1a 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 24b11da06a69da734c85720ef39d65ee46d821d5

>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
81dea9d6d1921462b200c62dbdf016c0eb2f01b2 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
cab5101c5c9e2a979bca545fa8046e93dcfe46e2 
> 
> Diff: https://reviews.apache.org/r/27649/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message