flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzulitai <...@git.apache.org>
Subject [GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Date Thu, 21 Dec 2017 22:51:25 GMT
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5193#discussion_r158394094
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
---
    @@ -277,88 +263,67 @@ public void testLogTimeoutAlmostReachedWarningDuringRecovery() throws
Exception
     
     	private void assertExactlyOnce(List<String> expectedValues) throws IOException
{
     		ArrayList<String> actualValues = new ArrayList<>();
    -		for (File file : targetDirectory.listFiles()) {
    -			actualValues.addAll(Files.readAllLines(file.toPath(), Charset.defaultCharset()));
    +		for (String name : targetDirectory.listFiles()) {
    +			actualValues.addAll(targetDirectory.read(name));
     		}
     		Collections.sort(actualValues);
     		Collections.sort(expectedValues);
     		assertEquals(expectedValues, actualValues);
     	}
     
    -	private class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction<String, FileTransaction,
Void> {
    +	private class ContentDumpSinkFunction extends TwoPhaseCommitSinkFunction<String,
ContentTransaction, Void> {
     
    -		public FileBasedSinkFunction() {
    +		public ContentDumpSinkFunction() {
     			super(
    -				new KryoSerializer<>(FileTransaction.class, new ExecutionConfig()),
    +				new KryoSerializer<>(ContentTransaction.class, new ExecutionConfig()),
     				VoidSerializer.INSTANCE, clock);
    -
    -			if (!tmpDirectory.isDirectory() || !targetDirectory.isDirectory()) {
    -				throw new IllegalArgumentException();
    -			}
     		}
     
     		@Override
    -		protected void invoke(FileTransaction transaction, String value, Context context) throws
Exception {
    -			transaction.writer.write(value);
    +		protected void invoke(ContentTransaction transaction, String value, Context context)
throws Exception {
    +			transaction.tmpContentWriter.write(value);
     		}
     
     		@Override
    -		protected FileTransaction beginTransaction() throws Exception {
    -			File tmpFile = new File(tmpDirectory, UUID.randomUUID().toString());
    -			return new FileTransaction(tmpFile);
    +		protected ContentTransaction beginTransaction() throws Exception {
    +			return new ContentTransaction(tmpDirectory.createWriter(UUID.randomUUID().toString()));
     		}
     
     		@Override
    -		protected void preCommit(FileTransaction transaction) throws Exception {
    -			transaction.writer.flush();
    -			transaction.writer.close();
    +		protected void preCommit(ContentTransaction transaction) throws Exception {
    +			transaction.tmpContentWriter.flush();
    --- End diff --
    
    Should also call `transaction.tmpContentWriter.close()` after the precommit?


---

Mime
View raw message