kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/4] kafka git commit: KAFKA-3607: Close KStreamTestDriver upon completing; follow-up fixes to be tracked in KAFKA-3623
Date Tue, 26 Apr 2016 18:39:52 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index f6ebbe1..16015fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -19,17 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -42,309 +44,291 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableKTableJoinTest {
 
-    private String topic1 = "topic1";
-    private String topic2 = "topic2";
+    final private String topic1 = "topic1";
+    final private String topic2 = "topic2";
 
-    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
 
-    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
-        }
-    };
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
 
-    private static class JoinedKeyValue extends KeyValue<Integer, String> {
-        public JoinedKeyValue(Integer key, String value) {
-            super(key, value);
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
         }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
     @Test
     public void testJoin() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> processor;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> processor;
 
-            processor = new MockProcessorSupplier<>();
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.join(table2, joiner);
-            joined.toStream().process(processor);
+        processor = new MockProcessorSupplier<>();
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
+        joined.toStream().process(processor);
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
+        KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            KTableValueGetter<Integer, String> getter = getterSupplier.get();
-            getter.init(driver.context());
+        KTableValueGetter<Integer, String> getter = getterSupplier.get();
+        getter.init(driver.context());
 
-            // push two items to the primary stream. the other table is empty
+        // push two items to the primary stream. the other table is empty
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-
-            processor.checkAndClearProcessResult("0:null", "1:null");
-            checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            // push two items to the other stream. this should produce two items.
+        processor.checkAndClearProcessResult("0:null", "1:null");
+        checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        // push two items to the other stream. this should produce two items.
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            // push all four items to the primary stream. this should produce four items.
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        // push all four items to the primary stream. this should produce four items.
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            // push all four items to the primary stream. this should produce four items.
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        // push all four items to the primary stream. this should produce four items.
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            processor.checkAndClearProcessResult("0:null", "1:null");
-            checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            // push all four items to the primary stream. this should produce four items.
+        processor.checkAndClearProcessResult("0:null", "1:null");
+        checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        // push all four items to the primary stream. this should produce four items.
 
-            processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
-            checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
-
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
+        checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
     }
 
     @Test
     public void testNotSendingOldValues() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.join(table2, joiner);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            // push two items to the primary stream. the other table is empty
+        // push two items to the primary stream. the other table is empty
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-
-            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
-
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
-            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
     }
 
     @Test
     public void testSendingOldValues() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.join(table2, joiner);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
 
-            ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
-
-            // push two items to the primary stream. the other table is empty
-
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
+        // push two items to the primary stream. the other table is empty
 
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
-            proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
 
-            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
     }
 
-    private JoinedKeyValue kv(Integer key, String value) {
-        return new JoinedKeyValue(key, value);
+    private KeyValue<Integer, String> kv(Integer key, String value) {
+        return new KeyValue<>(key, value);
     }
 
-    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, JoinedKeyValue... expected) {
-        for (JoinedKeyValue kv : expected) {
+    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, KeyValue<Integer, String>... expected) {
+        for (KeyValue<Integer, String> kv : expected) {
             String value = getter.get(kv.key);
             if (kv.value == null) {
                 assertNull(value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 449ea05..5132ce3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -19,18 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -43,313 +44,287 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableKTableLeftJoinTest {
 
-    private String topic1 = "topic1";
-    private String topic2 = "topic2";
+    final private String topic1 = "topic1";
+    final private String topic2 = "topic2";
 
-    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
 
-    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
-        }
-    };
-
-    private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper =
-        new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
-            @Override
-            public KeyValue<Integer, String> apply(Integer key, String value) {
-                return KeyValue.pair(key, value);
-            }
-        };
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
 
-    private static class JoinedKeyValue extends KeyValue<Integer, String> {
-        public JoinedKeyValue(Integer key, String value) {
-            super(key, value);
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
         }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
     @Test
     public void testJoin() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
-
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1);
-            KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2);
-            KTable<Integer, String> joined = table1.leftJoin(table2, joiner);
-            MockProcessorSupplier<Integer, String> processor;
-            processor = new MockProcessorSupplier<>();
-            joined.toStream().process(processor);
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1);
+        KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2);
+        KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
+        MockProcessorSupplier<Integer, String> processor;
+        processor = new MockProcessorSupplier<>();
+        joined.toStream().process(processor);
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
-            KTableValueGetter<Integer, String> getter = getterSupplier.get();
-            getter.init(driver.context());
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            // push two items to the primary stream. the other table is empty
+        KTableValueGetter<Integer, String> getter = getterSupplier.get();
+        getter.init(driver.context());
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
+        // push two items to the primary stream. the other table is empty
 
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
-            checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
+        // push all four items to the primary stream. this should produce four items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+        checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
     }
 
     @Test
     public void testNotSendingOldValue() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.leftJoin(table2, joiner);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            // push two items to the primary stream. the other table is empty
-
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        // push two items to the primary stream. the other table is empty
 
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
-
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
     }
 
     @Test
     public void testSendingOldValue() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
-
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.leftJoin(table2, joiner);
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
-            assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            // push two items to the primary stream. the other table is empty
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+        // push two items to the primary stream. the other table is empty
 
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
-            proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
 
-            proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
     }
 
-    private JoinedKeyValue kv(Integer key, String value) {
-        return new JoinedKeyValue(key, value);
+    private KeyValue<Integer, String> kv(Integer key, String value) {
+        return new KeyValue<>(key, value);
     }
 
-    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, JoinedKeyValue... expected) {
-        for (JoinedKeyValue kv : expected) {
+    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, KeyValue<Integer, String>... expected) {
+        for (KeyValue<Integer, String> kv : expected) {
             String value = getter.get(kv.key);
             if (kv.value == null) {
                 assertNull(value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index ea7476a..3124556 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -19,17 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -42,334 +44,316 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableKTableOuterJoinTest {
 
-    private String topic1 = "topic1";
-    private String topic2 = "topic2";
+    final private String topic1 = "topic1";
+    final private String topic2 = "topic2";
 
-    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
 
-    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
-        }
-    };
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
 
-    private static class JoinedKeyValue extends KeyValue<Integer, String> {
-        public JoinedKeyValue(Integer key, String value) {
-            super(key, value);
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
         }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
     @Test
     public void testJoin() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> processor;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> processor;
 
-            processor = new MockProcessorSupplier<>();
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.outerJoin(table2, joiner);
-            joined.toStream().process(processor);
+        processor = new MockProcessorSupplier<>();
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
+        joined.toStream().process(processor);
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
+        KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            KTableValueGetter<Integer, String> getter = getterSupplier.get();
-            getter.init(driver.context());
+        KTableValueGetter<Integer, String> getter = getterSupplier.get();
+        getter.init(driver.context());
 
-            // push two items to the primary stream. the other table is empty
+        // push two items to the primary stream. the other table is empty
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            // push two items to the other stream. this should produce two items.
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        // push two items to the other stream. this should produce two items.
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
-
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
-            checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
+        // push all four items to the primary stream. this should produce four items.
 
-            // push middle two items to the primary stream with null. this should produce two items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            for (int i = 1; i < 3; i++) {
-                driver.process(topic1, expectedKeys[i], null);
-            }
+        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+        checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
 
-            processor.checkAndClearProcessResult("1:null", "2:null+YY2");
-            checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3"));
+        // push middle two items to the primary stream with null. this should produce two items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 1; i < 3; i++) {
+            driver.process(topic1, expectedKeys[i], null);
         }
+
+        processor.checkAndClearProcessResult("1:null", "2:null+YY2");
+        checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3"));
     }
 
     @Test
     public void testNotSendingOldValue() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.outerJoin(table2, joiner);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            // push two items to the primary stream. the other table is empty
+        // push two items to the primary stream. the other table is empty
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            // push two items to the other stream. this should produce two items.
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        // push two items to the other stream. this should produce two items.
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
-
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push middle two items to the primary stream with null. this should produce two items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            for (int i = 1; i < 3; i++) {
-                driver.process(topic1, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)");
+        // push middle two items to the primary stream with null. this should produce two items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 1; i < 3; i++) {
+            driver.process(topic1, expectedKeys[i], null);
         }
+
+        proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)");
     }
 
     @Test
     public void testSendingOldValue() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.outerJoin(table2, joiner);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
 
-            ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
-
-            // push two items to the primary stream. the other table is empty
-
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+        // push two items to the primary stream. the other table is empty
 
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
-            proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
 
-            proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push middle two items to the primary stream with null. this should produce two items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            for (int i = 1; i < 3; i++) {
-                driver.process(topic1, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
 
-            proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
+        // push middle two items to the primary stream with null. this should produce two items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 1; i < 3; i++) {
+            driver.process(topic1, expectedKeys[i], null);
         }
+
+        proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
     }
 
-    private JoinedKeyValue kv(Integer key, String value) {
-        return new JoinedKeyValue(key, value);
+    private KeyValue<Integer, String> kv(Integer key, String value) {
+        return new KeyValue<>(key, value);
     }
 
-    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, JoinedKeyValue... expected) {
-        for (JoinedKeyValue kv : expected) {
+    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, KeyValue<Integer, String>... expected) {
+        for (KeyValue<Integer, String> kv : expected) {
             String value = getter.get(kv.key);
             if (kv.value == null) {
                 assertNull(value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index ce1b9d6..cf74017 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -38,6 +39,16 @@ public class KTableMapKeysTest {
     final private Serde<String> stringSerde = new Serdes.StringSerde();
     final private Serde<Integer>  integerSerde = new Serdes.IntegerSerde();
 
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void testMapKeysConvertingToStream() {
         final KStreamBuilder builder = new KStreamBuilder();
@@ -70,7 +81,7 @@ public class KTableMapKeysTest {
 
         convertedStream.process(processor);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
 
         for (int i = 0;  i < originalKeys.length; i++) {
             driver.process(topic1, originalKeys[i], values[i]);


Mime
View raw message