kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6473) Add MockProcessorContext to public test-utils
Date Tue, 27 Mar 2018 21:04:01 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416246#comment-16416246 ] 

ASF GitHub Bot commented on KAFKA-6473:
---------------------------------------

mjsax closed pull request #4736: KAFKA-6473: Add MockProcessorContext to public test-utils
URL: https://github.com/apache/kafka/pull/4736
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 8de03efeba0..d5fd7d5fd7b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1002,6 +1002,10 @@ project(':streams:examples') {
     compile project(':streams')
     compile project(':connect:json')  // this dependency should be removed after we unify data API
     compile libs.slf4jlog4j
+
+    testCompile project(':clients').sourceSets.test.output // for org.apache.kafka.test.IntegrationTest
+    testCompile project(':streams:test-utils')
+    testCompile libs.junit
   }
 
   javadoc {
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 34ac89ffe05..8552bcc8674 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -66,6 +66,7 @@
                 </ul>
                 </li>
                 <li><a class="reference internal" href="#writing-streams-back-to-kafka" id="id25">Writing streams back to Kafka</a></li>
+                <li><a class="reference internal" href="#testing-a-streams-app" id="id26">Testing a Streams application</a></li>
             </ul>
         </div>
         <div class="section" id="overview">
@@ -3154,6 +3155,10 @@ <h2><a class="toc-backref" href="#id7">Overview</a><a class="headerlink" href="#
                     retry on delivery failure or to prevent message duplication).</p>
 </div>
 </div>
+        <div class="section" id="testing-a-streams-app">
+            <a class="headerlink" href="#testing-a-streams-app" title="Permalink to this headline"><h2>Testing a Streams application</a></h2>
+            Kafka Streams comes with a <code>test-utils</code> module to help you test your application <a href="testing.html">here</a>.
+        </div>
 </div>
 
 
diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html
index b51bc22cfe2..e3432b79b7c 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -41,13 +41,16 @@
             <p class="topic-title first"><b>Table of Contents</b></p>
             <ul class="simple">
                 <li><a class="reference internal" href="#overview" id="id1">Overview</a></li>
-                <li><a class="reference internal" href="#defining-a-stream-processor" id="id2">Defining a Stream Processor</a></li>
-                <li><a class="reference internal" href="#state-stores" id="id3">State Stores</a><ul>
-                    <li><a class="reference internal" href="#defining-and-creating-a-state-store" id="id4">Defining and creating a State Store</a></li>
-                    <li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li>
-                    <li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li>
-                    <li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li>
-                </ul>
+                <li><a class="reference internal" href="#defining-a-stream-processor" id="id2">Defining a Stream
+                    Processor</a></li>
+                <li><a class="reference internal" href="#unit-testing-processors" id="id9">Unit Testing Processors</a></li>
+                <li><a class="reference internal" href="#state-stores" id="id3">State Stores</a>
+                    <ul>
+                        <li><a class="reference internal" href="#defining-and-creating-a-state-store" id="id4">Defining and creating a State Store</a></li>
+                        <li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li>
+                        <li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li>
+                        <li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li>
+                    </ul>
                 </li>
                 <li><a class="reference internal" href="#connecting-processors-and-state-stores" id="id8">Connecting Processors and State Stores</a></li>
             </ul>
@@ -98,11 +101,12 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple
                 times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
             <div class="admonition attention">
-                <p class="first admonition-title">Attention</p>
+                <p class="first admonition-title"><b>Attention</b></p>
                 <p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
                     If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
                     This behavior is independent of the configured timestamp extractor, i.e., using <code class="docutils literal"><span class="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <code class="docutils literal"><span class="pre">punctuate()</span></code>.</p>
             </div>
+            <p><b>Example</b></p>
             <p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p>
             <ul class="simple">
                 <li>In the <code class="docutils literal"><span class="pre">init()</span></code> method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name &#8220;Counts&#8221;.</li>
@@ -159,6 +163,16 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                     arrived records for stateful processing needs like aggregations and joins. For more information, see the <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> documentation.</p>
             </div>
         </div>
+        <div class="section" id="unit-testing-processors">
+            <h2>
+                <a class="toc-backref" href="#id9">Unit Testing Processors</a>
+                <a class="headerlink" href="#unit-testing-processors" title="Permalink to this headline"></a>
+            </h2>
+            <p>
+                Kafka Streams comes with a <code>test-utils</code> module to help you write unit tests for your
+                processors <a href="testing.html#unit-testing-processors">here</a>.
+            </p>
+        </div>
         <div class="section" id="state-stores">
             <span id="streams-developer-guide-state-store"></span><h2><a class="toc-backref" href="#id3">State Stores</a><a class="headerlink" href="#state-stores" title="Permalink to this headline"></a></h2>
             <p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code> or <code class="docutils literal"><span class="pre">Transformer</span></code>, you must provide one or more state stores to the processor
diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html
index e6886a1689f..ea2ae987c7e 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -18,26 +18,40 @@
 <script><!--#include virtual="../../js/templateData.js" --></script>
 
 <script id="content-template" type="text/x-handlebars-template">
-  <!-- h1>Developer Guide for Kafka Streams</h1 -->
-  <div class="sub-nav-sticky">
-    <div class="sticky-top">
-      <!-- div style="height:35px">
-        <a href="/{{version}}/documentation/streams/">Introduction</a>
-        <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
-        <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
-        <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
-        <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
-      </div -->
+    <!-- h1>Developer Guide for Kafka Streams</h1 -->
+    <div class="sub-nav-sticky">
+        <div class="sticky-top">
+            <!-- div style="height:35px">
+              <a href="/{{version}}/documentation/streams/">Introduction</a>
+              <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+              <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+              <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
+              <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
+            </div -->
+        </div>
     </div>
-  </div>
-
-  <div class="section" id="testing">
-    <span id="streams-developer-guide-testing"></span><h1>Testing a Streams Application<a class="headerlink" href="#testing" title="Permalink to this headline"></a></h1>
-    <p>
-      To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular dependency to your test code base.
-      Example <code>pom.xml</code> snippet when using Maven:
-    </p>
-    <pre>
+
+    <div class="section" id="testing">
+        <span id="streams-developer-guide-testing"></span>
+        <h1>Testing Kafka Streams<a class="headerlink" href="#testing" title="Permalink to this headline"></a></h1>
+        <div class="contents local topic" id="table-of-contents">
+            <p class="topic-title first"><b>Table of Contents</b></p>
+            <ul class="simple">
+                <li><a class="reference internal" href="#test-utils-artifact">Importing the test utilities</a></li>
+                <li><a class="reference internal" href="#testing-topologytestdriver">Testing Streams applications</a>
+                </li>
+                <li><a class="reference internal" href="#unit-testing-processors">Unit testing Processors</a>
+                </li>
+            </ul>
+        </div>
+        <div class="section" id="test-utils-artifact">
+            <h2><a class="toc-backref" href="#test-utils-artifact" title="Permalink to this headline">Importing the test
+                utilities</a></h2>
+            <p>
+                To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular
+                dependency to your test code base. Example <code>pom.xml</code> snippet when using Maven:
+            </p>
+            <pre>
 &lt;dependency&gt;
     &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
     &lt;artifactId&gt;kafka-streams-test-utils&lt;/artifactId&gt;
@@ -45,13 +59,21 @@
     &lt;scope&gt;test&lt;/scope&gt;
 &lt;/dependency&gt;
     </pre>
-    <p>
-    The test-utils package provides a <code>TopologyTestDriver</code> that can be used pipe data through a <code>Topology</code> that is either assembled manually
-    using Processor API or via the DSL using <code>StreamsBuilder</code>.
-    The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology.
-    You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records.
-    The test driver captures the results records and allows to query its embedded state stores.
-    <pre>
+        </div>
+        <div class="section" id="testing-topologytestdriver">
+            <h2><a class="toc-backref" href="#testing-topologytestdriver" title="Permalink to this headline">Testing a
+                Streams application</a></h2>
+
+            <p>
+                The test-utils package provides a <code>TopologyTestDriver</code> that can be used pipe data through a
+                <code>Topology</code> that is either assembled manually
+                using Processor API or via the DSL using <code>StreamsBuilder</code>.
+                The test driver simulates the library runtime that continuously fetches records from input topics and
+                processes them by traversing the topology.
+                You can use the test driver to verify that your specified processor topology computes the correct result
+                with the manually piped in data records.
+                The test driver captures the results records and allows to query its embedded state stores.
+            <pre>
 // Processor API
 Topology topology = new Topology();
 topology.addSource("sourceProcessor", "input-topic");
@@ -68,62 +90,66 @@
 config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
 config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
 TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
-    </pre>
-    <p>
-    The test driver accepts <code>ConsumerRecord</code>s with key and value type <code>byte[]</code>.
-    Because <code>byte[]</code> types can be problematic, you can use the <code>ConsumerRecordFactory</code> to generate those records
-    by providing regular Java types for key and values and the corresponding serializers.
-    </p>
-    <pre>
+        </pre>
+            <p>
+                The test driver accepts <code>ConsumerRecord</code>s with key and value type <code>byte[]</code>.
+                Because <code>byte[]</code> types can be problematic, you can use the <code>ConsumerRecordFactory</code>
+                to generate those records
+                by providing regular Java types for key and values and the corresponding serializers.
+            </p>
+            <pre>
 ConsumerRecordFactory&lt;String, Integer&gt; factory = new ConsumerRecordFactory&lt;&gt;("input-topic", new StringSerializer(), new IntegerSerializer());
 testDriver.pipe(factory.create("key", 42L));
-    </pre>
-    <p>
-    To verify the output, the test driver produces <code>ProducerRecord</code>s with key and value type <code>byte[]</code>.
-    For result verification, you can specify corresponding deserializers when reading the output record from the driver.
-    <pre>
+        </pre>
+            <p>
+                To verify the output, the test driver produces <code>ProducerRecord</code>s with key and value type
+                <code>byte[]</code>.
+                For result verification, you can specify corresponding deserializers when reading the output record from
+                the driver.
+            <pre>
 ProducerRecord&lt;String, Integer&gt; outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
-    </pre>
-    <p>
-    For result verification, you can use <code>OutputVerifier</code>.
-    It offers helper methods to compare only certain parts of the result record:
-    for example, you might only care about the key and value, but not the timestamp of the result record.
-    </p>
-    <pre>
+        </pre>
+            <p>
+                For result verification, you can use <code>OutputVerifier</code>.
+                It offers helper methods to compare only certain parts of the result record:
+                for example, you might only care about the key and value, but not the timestamp of the result record.
+            </p>
+            <pre>
 OutputVerifier.compareKeyValue(outputRecord, "key", 42L); // throws AssertionError if key or value does not match
-    </pre>
-    <p>
-    <code>TopologyTestDriver</code> supports punctuations, too.
-    Event-time punctuations are triggered automatically based on the processed records' timestamps.
-    Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the driver mocks wall-clock-time internally to give users control over it).
-    </p>
-    <pre>
+        </pre>
+            <p>
+                <code>TopologyTestDriver</code> supports punctuations, too.
+                Event-time punctuations are triggered automatically based on the processed records' timestamps.
+                Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the
+                driver mocks wall-clock-time internally to give users control over it).
+            </p>
+            <pre>
 testDriver.advanceWallClockTime(20L);
-    </pre>
-    </div>
-    <p>
-    Additionally, you can access state stores via the test driver before or after a test.
-    Accessing stores before a test is useful to pre-populate a store with some initial values.
-    After data was processed, expected updates to the store can be verified.
-    </p>
-    <pre>
+        </pre>
+            <p>
+                Additionally, you can access state stores via the test driver before or after a test.
+                Accessing stores before a test is useful to pre-populate a store with some initial values.
+                After data was processed, expected updates to the store can be verified.
+            </p>
+            <pre>
 KeyValueStore store = testDriver.getKeyValueStore("store-name");
-    </pre>
-    <p>
-    Note, that you should always close the test driver at the end to make sure all resources are release properly.
-    </p>
-    <pre>
+        </pre>
+            <p>
+                Note, that you should always close the test driver at the end to make sure all resources are release
+                properly.
+            </p>
+            <pre>
 testDriver.close();
-    </pre>
-
-    <h2>Example</h2>
-    <p>
-    The following example demonstrates how to use the test driver and helper classes.
-    The example creates a topology that computes the maximum value per key using a key-value-store.
-    While processing, no output is generated, but only the store is updated.
-    Output is only sent downstream based on event-time and wall-clock punctuations.
-    </p>
-    <pre>
+        </pre>
+
+            <h3>Example</h3>
+            <p>
+                The following example demonstrates how to use the test driver and helper classes.
+                The example creates a topology that computes the maximum value per key using a key-value-store.
+                While processing, no output is generated, but only the store is updated.
+                Output is only sent downstream based on event-time and wall-clock punctuations.
+            </p>
+            <pre>
 private TopologyTestDriver testDriver;
 private KeyValueStore&lt;String, Long&gt; store;
 
@@ -266,31 +292,147 @@ <h2>Example</h2>
     @Override
     public void close() {}
 }
-    </pre>
-  <div class="pagination">
-  <div class="pagination">
-    <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a>
-    <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a>
-  </div>
+        </pre>
+        </div>
+        <div class="section" id="unit-testing-processors">
+            <h2>
+                <a class="headerlink" href="#unit-testing-processors"
+                   title="Permalink to this headline">Unit Testing Processors</a>
+            </h2>
+            <p>
+                If you <a href="processor-api.html">write a Processor</a>, you will want to test it.
+            </p>
+            <p>
+                Because the <code>Processor</code> forwards its results to the context rather than returning them,
+                Unit testing requires a mocked context capable of capturing forwarded data for inspection.
+                For this reason, we provide a <code>MockProcessorContext</code> in <a href="#test-utils-artifact"><code>test-utils</code></a>.
+            </p>
+            <b>Construction</b>
+            <p>
+                To begin with, instantiate your processor and initialize it with the mock context:
+            <pre>
+final Processor processorUnderTest = ...;
+final MockProcessorContext context = new MockProcessorContext();
+processorUnderTest.init(context);
+                </pre>
+            If you need to pass configuration to your processor or set the default serdes, you can create the mock with
+            config:
+            <pre>
+final Properties config = new Properties();
+config.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test");
+config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+config.put("some.other.config", "some config value");
+final MockProcessorContext context = new MockProcessorContext(config);
+                </pre>
+            </p>
+            <b>Captured data</b>
+            <p>
+                The mock will capture any values that your processor forwards. You can make assertions on them:
+            <pre>
+processorUnderTest.process("key", "value");
+
+final Iterator&lt;CapturedForward&gt; forwarded = context.forwarded().iterator();
+assertEquals(forwarded.next().keyValue(), new KeyValue&lt;&gt;(..., ...));
+assertFalse(forwarded.hasNext());
+
+// you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios.
+context.resetForwards();
+
+assertEquals(context.forwarded().size(), 0);
+            </pre>
+            If your processor forwards to specific child processors, you can query the context for captured data by
+            child name:
+            <pre>
+final List&lt;CapturedForward&gt; captures = context.forwarded("childProcessorName");
+            </pre>
+            The mock also captures whether your processor has called <code>commit()</code> on the context:
+            <pre>
+assertTrue(context.committed());
+
+// commit captures can also be reset.
+context.resetCommit();
+
+assertFalse(context.committed());
+            </pre>
+            </p>
+            <b>Setting record metadata</b>
+            <p>
+                In case your processor logic depends on the record metadata (topic, partition, offset, or timestamp),
+                you can set them on the context, either all together or individually:
+            <pre>
+context.setRecordMetadata("topicName", /*partition*/ 0, /*offset*/ 0L, /*timestamp*/ 0L);
+context.setTopic("topicName");
+context.setPartition(0);
+context.setOffset(0L);
+context.setTimestamp(0L);
+                </pre>
+            Once these are set, the context will continue returning the same values, until you set new ones.
+            </p>
+            <b>State stores</b>
+            <p>
+                In case your punctuator is stateful, the mock context allows you to register state stores.
+                You're encouraged to use a simple in-memory store of the appropriate type (KeyValue, Windowed, or
+                Session), since the mock context does <i>not</i> manage changelogs, state directories, etc.
+            </p>
+            <pre>
+final KeyValueStore&lt;String, Integer&gt; store =
+    Stores.keyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("myStore"),
+            Serdes.String(),
+            Serdes.Integer()
+        )
+        .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
+        .build();
+store.init(context, store);
+context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null);
+            </pre>
+            <b>Verifying punctuators</b>
+            <p>
+                Processors can schedule punctuators to handle periodic tasks.
+                The mock context does <i>not</i> automatically execute punctuators, but it does capture them to
+                allow you to unit test them as well:
+            <pre>
+final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
+final long interval = capturedPunctuator.getIntervalMs();
+final PunctuationType type = capturedPunctuator.getType();
+final boolean cancelled = capturedPunctuator.cancelled();
+final Punctuator punctuator = capturedPunctuator.getPunctuator();
+punctuator.punctuate(/*timestamp*/ 0L);
+                </pre>
+            If you need to write tests involving automatic firing of scheduled punctuators, we recommend creating a
+            simple topology with your processor and using the <a href="testing.html#testing-topologytestdriver"><code>TopologyTestDriver</code></a>.
+            </p>
+        </div>
+    </div>
+    <div class="pagination">
+        <div class="pagination">
+            <a href="/{{version}}/documentation/streams/developer-guide/datatypes"
+               class="pagination__btn pagination__btn__prev">Previous</a>
+            <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries"
+               class="pagination__btn pagination__btn__next">Next</a>
+        </div>
+    </div>
 </script>
 
 <!--#include virtual="../../../includes/_header.htm" -->
 <!--#include virtual="../../../includes/_top.htm" -->
 <div class="content documentation documentation--current">
-  <!--#include virtual="../../../includes/_nav.htm" -->
-  <div class="right">
-    <!--#include virtual="../../../includes/_docs_banner.htm" -->
-    <ul class="breadcrumbs">
-      <li><a href="/documentation">Documentation</a></li>
-      <li><a href="/documentation/streams">Kafka Streams</a></li>
-      <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
-    </ul>
-    <div class="p-content"></div>
-  </div>
+    <!--#include virtual="../../../includes/_nav.htm" -->
+    <div class="right">
+        <!--#include virtual="../../../includes/_docs_banner.htm" -->
+        <ul class="breadcrumbs">
+            <li><a href="/documentation">Documentation</a></li>
+            <li><a href="/documentation/streams">Kafka Streams</a></li>
+            <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
+        </ul>
+        <div class="p-content"></div>
+    </div>
 </div>
 <!--#include virtual="../../../includes/_footer.htm" -->
 <script>
-    $(function() {
+    $(function () {
         // Show selected style on nav item
         $('.b-nav__streams').addClass('selected');
 
@@ -299,7 +441,7 @@ <h2>Example</h2>
             y_pos = $navbar.offset().top,
             height = $navbar.height();
 
-        $(window).scroll(function() {
+        $(window).scroll(function () {
             var scrollTop = $(window).scrollTop();
 
             if (scrollTop > y_pos - height) {
diff --git a/docs/streams/developer-guide/write-streams.html b/docs/streams/developer-guide/write-streams.html
index 1e4213d1b81..44cdb3fd9ba 100644
--- a/docs/streams/developer-guide/write-streams.html
+++ b/docs/streams/developer-guide/write-streams.html
@@ -37,6 +37,7 @@
       <ul class="simple">
           <li><a class="reference internal" href="#libraries-and-maven-artifacts" id="id1">Libraries and Maven artifacts</a></li>
           <li><a class="reference internal" href="#using-kafka-streams-within-your-application-code" id="id2">Using Kafka Streams within your application code</a></li>
+          <li><a class="reference internal" href="#testing-a-streams-app" id="id3">Testing a Streams application</a></li>
       </ul>
     <p>Any Java application that makes use of the Kafka Streams library is considered a Kafka Streams application.
       The computational logic of a Kafka Streams application is defined as a <a class="reference internal" href="../concepts.html#streams-concepts"><span class="std std-ref">processor topology</span></a>,
@@ -196,6 +197,11 @@ <h2>Using Kafka Streams within your application code<a class="headerlink" href="
       <p>After an application is stopped, Kafka Streams will migrate any tasks that had been running in this instance to available remaining
         instances.</p>
 </div>
+
+      <div class="section" id="testing-a-streams-app">
+          <a class="headerlink" href="#testing-a-streams-app" title="Permalink to this headline"><h2>Testing a Streams application</a></h2>
+          Kafka Streams comes with a <code>test-utils</code> module to help you test your application <a href="testing.html">here</a>.
+      </div>
 </div>
 
 
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index cfa2137c8fa..dbf2b707c3f 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -49,7 +49,7 @@
  */
 public class WordCountProcessorDemo {
 
-    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
+    static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
 
         @Override
         public Processor<String, String> get() {
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
new file mode 100644
index 00000000000..566b7d430ea
--- /dev/null
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}.
+ */
+public class WordCountProcessorTest {
+    @Test
+    public void test() {
+        final MockProcessorContext context = new MockProcessorContext();
+
+        // Create, initialize, and register the state store.
+        final KeyValueStore<String, Integer> store =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("Counts"), Serdes.String(), Serdes.Integer())
+                .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
+                .build();
+        store.init(context, store);
+        context.register(store, false, null);
+
+        // Create and initialize the processor under test
+        final Processor<String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get();
+        processor.init(context);
+
+        // send a record to the processor
+        processor.process("key", "alpha beta gamma alpha");
+
+        // note that the processor commits, but does not forward, during process()
+        assertTrue(context.committed());
+        assertTrue(context.forwarded().isEmpty());
+
+        // now, we trigger the punctuator, which iterates over the state store and forwards the contents.
+        context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
+
+        // finally, we can verify the output.
+        final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
+        assertEquals(new KeyValue<>("alpha", "2"), capturedForwards.next().keyValue());
+        assertEquals(new KeyValue<>("beta", "1"), capturedForwards.next().keyValue());
+        assertEquals(new KeyValue<>("gamma", "1"), capturedForwards.next().keyValue());
+        assertFalse(capturedForwards.hasNext());
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 572c0b02df5..56000234373 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -26,7 +26,7 @@
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -719,6 +719,6 @@ public void testAsymetricWindowingBefore() {
     }
 
     private void setRecordContext(final long time, final String topic) {
-        ((MockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
+        ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 39b318fec48..465082b7aa7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -26,7 +26,7 @@
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -304,6 +304,6 @@ public void testWindowing() {
     }
 
     private void setRecordContext(final long time, final String topic) {
-        ((MockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
+        ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 21dc4f037c9..212c48d43f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -31,7 +31,7 @@
 import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -81,13 +81,13 @@ public Long apply(final String aggKey, final Long aggOne, final Long aggTwo) {
     private final List<KeyValue> results = new ArrayList<>();
     private Processor<String, String> processor = sessionAggregator.get();
     private SessionStore<String, Long> sessionStore;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
 
 
     @Before
     public void initializeStore() {
         final File stateDir = TestUtils.tempDirectory();
-        context = new MockProcessorContext(stateDir,
+        context = new InternalMockProcessorContext(stateDir,
             Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 100000, new MockStreamsMetrics(new Metrics()))) {
             @Override
             public <K, V> void forward(final K key, final V value) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index aa660e08680..d3a74e336cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -31,7 +31,7 @@
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -143,7 +143,7 @@ public void testAggBasic() {
     }
 
     private void setRecordContext(final long time, final String topic) {
-        ((MockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
+        ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 4569858e357..347e9c4fd75 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -32,7 +32,7 @@
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestUtils;
@@ -194,7 +194,7 @@ public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws IOExcep
         testFile4.createNewFile();
         assertTrue(testFile4.exists());
 
-        task.processorContext = new MockProcessorContext(stateDirectory.directoryForTask(task.id), streamsConfig);
+        task.processorContext = new InternalMockProcessorContext(stateDirectory.directoryForTask(task.id), streamsConfig);
 
         task.stateMgr.register(store1, new MockRestoreCallback());
         task.stateMgr.register(store2, new MockRestoreCallback());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index df8d2010d24..d19e63e0543 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -34,7 +34,7 @@
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.NoOpReadOnlyStore;
 import org.apache.kafka.test.TestUtils;
@@ -86,7 +86,7 @@
     private MockConsumer<byte[], byte[]> consumer;
     private File checkpointFile;
     private ProcessorTopology topology;
-    private MockProcessorContext mockProcessorContext;
+    private InternalMockProcessorContext processorContext;
 
     @Before
     public void before() throws IOException {
@@ -120,8 +120,8 @@ public void before() throws IOException {
             stateDirectory,
             stateRestoreListener,
             streamsConfig);
-        mockProcessorContext = new MockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
-        stateManager.setGlobalProcessorContext(mockProcessorContext);
+        processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
+        stateManager.setGlobalProcessorContext(processorContext);
         checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
     }
 
@@ -631,7 +631,7 @@ public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws IOExcep
         assertTrue(testFile4.exists());
 
         // only delete and recreate store 1 and 3 -- 2 and 4 must be untouched
-        stateManager.reinitializeStateStoresForPartitions(Utils.mkList(t1, t3), mockProcessorContext);
+        stateManager.reinitializeStateStoresForPartitions(Utils.mkList(t1, t3), processorContext);
 
         assertFalse(testFile1.exists());
         assertTrue(testFile2.exists());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 90ef771ed61..0dea1930be5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -24,7 +24,7 @@
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -110,7 +110,7 @@ public void testMetrics() {
         final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
 
         final Metrics metrics = new Metrics();
-        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new RecordCollectorImpl(null, null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler()), metrics);
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde,  new RecordCollectorImpl(null, null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler()), metrics);
         final ProcessorNode node = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
         node.init(context);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 2fa1b591822..faf72e92d2d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -34,7 +34,7 @@
 import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.After;
@@ -54,7 +54,7 @@
     private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
     private final String[] topics = {"topic"};
 
-    final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
+    final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
             new RecordCollectorImpl(null, null,  new LogContext("record-queue-test "), new DefaultProductionExceptionHandler()));
     private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer);
     private final RecordQueue queue = new RecordQueue(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 67927403fc2..4b48a17ab7c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -24,7 +24,7 @@
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,7 +37,7 @@
 public class SinkNodeTest {
     private final Serializer anySerializer = Serdes.Bytes().serializer();
     private final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
-    private final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde,
         new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null, new LogContext("sinknode-test "), new DefaultProductionExceptionHandler()));
     private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index a3425856b15..fc810e3bc3b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -34,7 +34,7 @@
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 
@@ -179,7 +179,7 @@
     private final Set<K> flushedRemovals = new HashSet<>();
     private final List<KeyValue<byte[], byte[]>> restorableEntries = new LinkedList<>();
 
-    private final MockProcessorContext context;
+    private final InternalMockProcessorContext context;
     private final StateSerdes<K, V> stateSerdes;
 
     private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
@@ -227,7 +227,7 @@ private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass());
         props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class);
 
-        context = new MockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
+        context = new InternalMockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
             ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1024 * 1024L, metrics());
 
             @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 937b1d0b0bf..51c782ad9d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -25,7 +25,7 @@
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,14 +48,14 @@
 
     protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context);
 
-    protected MockProcessorContext context;
+    protected InternalMockProcessorContext context;
     protected KeyValueStore<Integer, String> store;
     protected KeyValueStoreTestDriver<Integer, String> driver;
 
     @Before
     public void before() {
         driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        context = (MockProcessorContext) driver.context();
+        context = (InternalMockProcessorContext) driver.context();
         context.setTime(10);
         store = createKeyValueStore(context);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 0e3b4e8223e..8705326b782 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -33,7 +33,7 @@
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,7 +58,7 @@
 public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     private final int maxCacheSizeBytes = 150;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private CachingKeyValueStore<String, String> store;
     private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
     private ThreadCache cache;
@@ -73,7 +73,7 @@ public void setUp() {
         store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
         store.setFlushListener(cacheFlushListener, false);
         cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
-        context = new MockProcessorContext(null, null, null, (RecordCollector) null, cache);
+        context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache);
         topic = "topic";
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic));
         store.init(context, null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 16ef47c1c1b..a9a66e9a7df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -30,7 +30,7 @@
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -52,7 +52,7 @@
 public class CachingSessionStoreTest {
 
     private static final int MAX_CACHE_SIZE_BYTES = 600;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private RocksDBSegmentedBytesStore underlying;
     private CachingSessionStore<String, String> cachingStore;
     private ThreadCache cache;
@@ -75,7 +75,7 @@ public void setUp() {
                                                  Segments.segmentInterval(retention, numSegments)
                                                  );
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
-        context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic"));
         cachingStore.init(context, cachingStore);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index bbf9bef03f3..c25655b6b2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -30,7 +30,7 @@
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -56,7 +56,7 @@
     private static final int MAX_CACHE_SIZE_BYTES = 150;
     private static final long DEFAULT_TIMESTAMP = 10L;
     private static final Long WINDOW_SIZE = 10000L;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private RocksDBSegmentedBytesStore underlying;
     private CachingWindowStore<String, String> cachingStore;
     private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> cacheListener;
@@ -80,7 +80,7 @@ public void setUp() {
         cachingStore.setFlushListener(cacheListener, false);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         topic = "topic";
-        context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
+        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic));
         cachingStore.init(context, cachingStore);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 9360daef272..7342c93abcc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -23,7 +23,7 @@
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -41,7 +41,7 @@
 
 public class ChangeLoggingKeyValueBytesStoreTest {
 
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray());
     private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner);
     private final Map sent = new HashMap<>();
@@ -64,7 +64,7 @@ public void before() {
                 sent.put(key, value);
             }
         };
-        context = new MockProcessorContext(
+        context = new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 59c7ade8f97..4fd7f30a626 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -27,7 +27,7 @@
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
@@ -44,7 +44,7 @@
 import static org.junit.Assert.assertTrue;
 
 public class MeteredWindowStoreTest {
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     @SuppressWarnings("unchecked")
     private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
     private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(innerStoreMock, "scope", new MockTime(), Serdes.String(), new SerdeThatDoesntHandleNull());
@@ -98,7 +98,7 @@ public Sensor addSensor(String name, Sensor.RecordingLevel recordLevel, Sensor..
 
         };
 
-        context = new MockProcessorContext(
+        context = new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
index 66cc9add145..098c3262e34 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
@@ -24,7 +24,7 @@
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -44,7 +44,7 @@
 
     private static final String STORE_NAME = "name";
     private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
-    private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                           Serdes.String(),
                                                                           Serdes.String(),
                                                                           new NoOpRecordCollector(),
@@ -73,7 +73,7 @@ public void shouldCreateLoggingEnabledStoreWhenStoreLogged() {
                 logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                       Serdes.String(),
                                                                       Serdes.String(),
                                                                       collector,
@@ -100,7 +100,7 @@ public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() {
                 logged.add(new ProducerRecord<>(topic, partition, timestamp, key, value));
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                       Serdes.String(),
                                                                       Serdes.String(),
                                                                       collector,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index e34d3cccc67..388a2fc47ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -26,7 +26,7 @@
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -56,7 +56,7 @@
 
     private final long retention = 60000L;
     private final int numSegments = 3;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private final String storeName = "bytes-store";
     private RocksDBSegmentedBytesStore bytesStore;
     private File stateDir;
@@ -71,7 +71,7 @@ public void before() {
                                                     schema);
 
         stateDir = TestUtils.tempDirectory();
-        context = new MockProcessorContext(
+        context = new InternalMockProcessorContext(
             stateDir,
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
index 19bfdedb072..272e0b0f5e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
@@ -26,7 +26,7 @@
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -47,7 +47,7 @@
     private static final String STORE_NAME = "name";
     private final List<ProducerRecord> logged = new ArrayList<>();
     private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
-    private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
         Serdes.String(),
         Serdes.String(),
         new NoOpRecordCollector() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index b25d72586ca..64953153045 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -25,7 +25,7 @@
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -45,7 +45,7 @@
 public class RocksDBSessionStoreTest {
 
     private SessionStore<String, Long> sessionStore;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
 
     @Before
     public void before() {
@@ -59,7 +59,7 @@ public void before() {
                                                  Serdes.String(),
                                                  Serdes.Long());
 
-        context = new MockProcessorContext(TestUtils.tempDirectory(),
+        context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                            Serdes.String(),
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index a89dc60e665..a09d87dbad3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -31,7 +31,7 @@
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -62,14 +62,14 @@
     private Serializer<String> stringSerializer = new StringSerializer();
     private Deserializer<String> stringDeserializer = new StringDeserializer();
     private RocksDBStore rocksDBStore;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private File dir;
 
     @Before
     public void setUp() {
         rocksDBStore = new RocksDBStore("test");
         dir = TestUtils.tempDirectory();
-        context = new MockProcessorContext(dir,
+        context = new InternalMockProcessorContext(dir,
             Serdes.String(),
             Serdes.String(),
             new NoOpRecordCollector(),
@@ -115,7 +115,7 @@ public void verifyRocksDbConfigSetterIsCalled() {
         configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test-server:9092");
         configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
         MockRocksDbConfigSetter.called = false;
-        rocksDBStore.openDB(new MockProcessorContext(tempDir, new StreamsConfig(configs)));
+        rocksDBStore.openDB(new InternalMockProcessorContext(tempDir, new StreamsConfig(configs)));
 
         assertTrue(MockRocksDbConfigSetter.called);
     }
@@ -123,7 +123,7 @@ public void verifyRocksDbConfigSetterIsCalled() {
     @Test(expected = ProcessorStateException.class)
     public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() throws IOException {
         final File tmpDir = TestUtils.tempDirectory();
-        MockProcessorContext tmpContext = new MockProcessorContext(tmpDir,
+        InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir,
             Serdes.String(),
             Serdes.Long(),
             new NoOpRecordCollector(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index bca6949c6b3..a6ccfdf9337 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -24,7 +24,7 @@
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -45,7 +45,7 @@
     private static final String STORE_NAME = "name";
     private WindowStore<String, String> store;
     private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
-    private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                           Serdes.String(),
                                                                           Serdes.String(),
                                                                           new NoOpRecordCollector(),
@@ -75,7 +75,7 @@ public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() {
                 logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                       Serdes.String(),
                                                                       Serdes.String(),
                                                                       collector,
@@ -102,7 +102,7 @@ public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() {
                 logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                       Serdes.String(),
                                                                       Serdes.String(),
                                                                       collector,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index c745e702b09..b3a60a965d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -37,7 +37,7 @@
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -94,7 +94,7 @@
     };
 
     private final File baseDir = TestUtils.tempDirectory("test");
-    private final MockProcessorContext context = new MockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
     private WindowStore<Integer, String> windowStore;
 
     private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
@@ -842,7 +842,7 @@ public void shouldFetchAndIterateOverExactBinaryKeys() {
         assertThat(toList(windowStore.fetch(key3, 0, Long.MAX_VALUE)), equalTo(expectedKey3));
     }
 
-    private void putFirstBatch(final WindowStore<Integer, String> store, final long startTime, final MockProcessorContext context) {
+    private void putFirstBatch(final WindowStore<Integer, String> store, final long startTime, final InternalMockProcessorContext context) {
         context.setRecordContext(createRecordContext(startTime));
         store.put(0, "zero");
         context.setRecordContext(createRecordContext(startTime + 1L));
@@ -855,7 +855,7 @@ private void putFirstBatch(final WindowStore<Integer, String> store, final long
         store.put(5, "five");
     }
 
-    private void putSecondBatch(final WindowStore<Integer, String> store, final long startTime, MockProcessorContext context) {
+    private void putSecondBatch(final WindowStore<Integer, String> store, final long startTime, InternalMockProcessorContext context) {
         context.setRecordContext(createRecordContext(startTime + 3L));
         store.put(2, "two+1");
         context.setRecordContext(createRecordContext(startTime + 4L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 9c150c58682..d61218eb9b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -23,7 +23,7 @@
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -48,12 +48,12 @@ public boolean hasNext(final KeyValueIterator iterator) {
         }
     };
 
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private SegmentIterator iterator = null;
 
     @Before
     public void before() {
-        context = new MockProcessorContext(
+        context = new InternalMockProcessorContext(
                 TestUtils.tempDirectory(),
                 Serdes.String(),
                 Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index deb26f735c9..ec59a008112 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -20,7 +20,7 @@
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -44,7 +44,7 @@
 public class SegmentsTest {
 
     private static final int NUM_SEGMENTS = 5;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private Segments segments;
     private long segmentInterval;
     private File stateDirectory;
@@ -54,7 +54,7 @@
     @Before
     public void createContext() {
         stateDirectory = TestUtils.tempDirectory();
-        context = new MockProcessorContext(stateDirectory,
+        context = new InternalMockProcessorContext(stateDirectory,
                                            Serdes.String(),
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
index d30372fd80b..b1818c21ef1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
@@ -21,7 +21,7 @@
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 
 import java.util.Collections;
@@ -42,7 +42,7 @@
 
         final StateStore stateStore = supplier.get();
         stateStore.init(
-            new MockProcessorContext(
+            new InternalMockProcessorContext(
                 StateSerdes.withBuiltinTypes(
                     ProcessorStateManager.storeChangelogTopic(applicationId, name),
                     keyType,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 32b56bbecfb..c62b09b640e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -23,7 +23,7 @@
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.After;
 import org.junit.Test;
 
@@ -39,7 +39,7 @@
 
     private final Map<Integer, String> logged = new HashMap<>();
 
-    private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
             new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler()) {
                 @Override
                 public <K1, V1> void send(final String topic,
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
similarity index 87%
rename from streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
rename to streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 6b0cb66bcb7..74bb5d14062 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -48,7 +48,7 @@
 import java.util.List;
 import java.util.Map;
 
-public class MockProcessorContext extends AbstractProcessorContext implements RecordCollector.Supplier {
+public class InternalMockProcessorContext extends AbstractProcessorContext implements RecordCollector.Supplier {
 
     private final File stateDir;
     private final Metrics metrics;
@@ -61,19 +61,19 @@
     private Serde<?> valSerde;
     private long timestamp = -1L;
 
-    public MockProcessorContext(final File stateDir,
-                                final StreamsConfig config) {
+    public InternalMockProcessorContext(final File stateDir,
+                                        final StreamsConfig config) {
         this(stateDir, null, null, new Metrics(), config, null, null);
     }
 
-    public MockProcessorContext(final StateSerdes<?, ?> serdes,
-                                final RecordCollector collector) {
+    public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
+                                        final RecordCollector collector) {
         this(null, serdes.keySerde(), serdes.valueSerde(), collector, null);
     }
 
-    public MockProcessorContext(final StateSerdes<?, ?> serdes,
-                                final RecordCollector collector,
-                                final Metrics metrics) {
+    public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
+                                        final RecordCollector collector,
+                                        final Metrics metrics) {
         this(null, serdes.keySerde(), serdes.valueSerde(), metrics, new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
             @Override
             public RecordCollector recordCollector() {
@@ -82,11 +82,11 @@ public RecordCollector recordCollector() {
         }, null);
     }
 
-    public MockProcessorContext(final File stateDir,
-                                final Serde<?> keySerde,
-                                final Serde<?> valSerde,
-                                final RecordCollector collector,
-                                final ThreadCache cache) {
+    public InternalMockProcessorContext(final File stateDir,
+                                        final Serde<?> keySerde,
+                                        final Serde<?> valSerde,
+                                        final RecordCollector collector,
+                                        final ThreadCache cache) {
         this(stateDir, keySerde, valSerde, new Metrics(), new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
             @Override
             public RecordCollector recordCollector() {
@@ -95,13 +95,13 @@ public RecordCollector recordCollector() {
         }, cache);
     }
 
-    private MockProcessorContext(final File stateDir,
-                                final Serde<?> keySerde,
-                                final Serde<?> valSerde,
-                                final Metrics metrics,
-                                final StreamsConfig config,
-                                final RecordCollector.Supplier collectorSupplier,
-                                final ThreadCache cache) {
+    private InternalMockProcessorContext(final File stateDir,
+                                         final Serde<?> keySerde,
+                                         final Serde<?> valSerde,
+                                         final Metrics metrics,
+                                         final StreamsConfig config,
+                                         final RecordCollector.Supplier collectorSupplier,
+                                         final ThreadCache cache) {
         super(new TaskId(0, 0),
               config,
               new MockStreamsMetrics(metrics),
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 3a9ed751e9c..39183d942ea 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -48,7 +48,7 @@
     private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
 
     private ProcessorTopology topology;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private ProcessorTopology globalTopology;
     private final LogContext logContext = new LogContext("testCache ");
 
@@ -85,7 +85,7 @@ public void setUp(final KStreamBuilder builder,
         topology = builder.build(null);
         globalTopology = builder.buildGlobalStateTopology();
         final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
-        context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
+        context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
         // init global topology first as it will add stores to the
         // store map that are required for joins etc.
@@ -126,7 +126,7 @@ public void setUp(final StreamsBuilder builder,
         globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
-        context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
+        context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
 
         // init global topology first as it will add stores to the
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
new file mode 100644
index 00000000000..03f871a7284
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnessess.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+@InterfaceStability.Evolving
+public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata ================================================
+    private String topic;
+    private Integer partition;
+    private Long offset;
+    private Long timestamp;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward> capturedForwards = new LinkedList<>();
+    private boolean committed = false;
+
+    /**
+     * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+     */
+    public static class CapturedPunctuator {
+        private final long intervalMs;
+        private final PunctuationType type;
+        private final Punctuator punctuator;
+        private boolean cancelled = false;
+
+        private CapturedPunctuator(final long intervalMs, final PunctuationType type, final Punctuator punctuator) {
+            this.intervalMs = intervalMs;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        public long getIntervalMs() {
+            return intervalMs;
+        }
+
+        public PunctuationType getType() {
+            return type;
+        }
+
+        public Punctuator getPunctuator() {
+            return punctuator;
+        }
+
+        public void cancel() {
+            this.cancelled = true;
+        }
+
+        public boolean cancelled() {
+            return cancelled;
+        }
+    }
+
+
+    public static class CapturedForward {
+        private final String childName;
+        private final long timestamp;
+        private final KeyValue keyValue;
+
+        private CapturedForward(final To to, final KeyValue keyValue) {
+            if (keyValue == null) throw new IllegalArgumentException();
+
+            this.childName = to.childName;
+            this.timestamp = to.timestamp;
+            this.keyValue = keyValue;
+        }
+
+        /**
+         * The child this data was forwarded to.
+         *
+         * @return The child name, or {@code null} if it was broadcasted.
+         */
+        public String childName() {
+            return childName;
+        }
+
+        /**
+         * The timestamp attached to the forwarded record.
+         *
+         * @return A timestamp, or {@code -1} if none was forwarded.
+         */
+        public long timestamp() {
+            return timestamp;
+        }
+
+        /**
+         * The data forwarded.
+         *
+         * @return A key/value pair. Not null.
+         */
+        public KeyValue keyValue() {
+            return keyValue;
+        }
+    }
+
+    // contructors ================================================
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     */
+    public MockProcessorContext() {
+        //noinspection DoubleBraceInitialization
+        this(
+            new Properties() {
+                {
+                    put(StreamsConfig.APPLICATION_ID_CONFIG, "");
+                    put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+                }
+            },
+            new TaskId(0, 0),
+            null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     *
+     * @param config a Properties object, used to configure the context and the processor.
+     */
+    public MockProcessorContext(final Properties config) {
+        this(config, new TaskId(0, 0), null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+     *
+     * @param config   a {@link Properties} object, used to configure the context and the processor.
+     * @param taskId   a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+     * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+     */
+    public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+        final StreamsConfig streamsConfig = new StreamsConfig(config);
+        this.taskId = taskId;
+        this.config = streamsConfig;
+        this.stateDir = stateDir;
+        this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context", new HashMap<String, String>());
+    }
+
+    @Override
+    public String applicationId() {
+        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        final Map<String, Object> combined = new HashMap<>();
+        combined.putAll(config.originals());
+        combined.putAll(config.values());
+        return combined;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return config.defaultKeySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return config.defaultValueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return stateDir;
+    }
+
+    @Override
+    public StreamsMetrics metrics() {
+        return metrics;
+    }
+
+    // settable record metadata ================================================
+
+    /**
+     * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set them directly.
+     *
+     * @param topic     A topic name
+     * @param partition A partition number
+     * @param offset    A record offset
+     * @param timestamp A record timestamp
+     */
+    public void setRecordMetadata(final String topic, final int partition, final long offset, final long timestamp) {
+        this.topic = topic;
+        this.partition = partition;
+        this.offset = offset;
+        this.timestamp = timestamp;
+    }
+
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param topic A topic name
+     */
+    public void setTopic(final String topic) {
+        this.topic = topic;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param partition A partition number
+     */
+    public void setPartition(final int partition) {
+        this.partition = partition;
+    }
+
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param offset A record offset
+     */
+    public void setOffset(final long offset) {
+        this.offset = offset;
+    }
+
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param timestamp A record timestamp
+     */
+    public void setTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public String topic() {
+        if (topic == null) {
+            throw new IllegalStateException("Topic must be set before use via setRecordMetadata() or setTopic().");
+        }
+        return topic;
+    }
+
+    @Override
+    public int partition() {
+        if (partition == null) {
+            throw new IllegalStateException("Partition must be set before use via setRecordMetadata() or setPartition().");
+        }
+        return partition;
+    }
+
+    @Override
+    public long offset() {
+        if (offset == null) {
+            throw new IllegalStateException("Offset must be set before use via setRecordMetadata() or setOffset().");
+        }
+        return offset;
+    }
+
+    @Override
+    public long timestamp() {
+        if (timestamp == null) {
+            throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setTimestamp().");
+        }
+        return timestamp;
+    }
+
+    // mocks ================================================
+
+
+    @Override
+    public void register(final StateStore store,
+                         final boolean loggingEnabledIsDeprecatedAndIgnored,
+                         final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) {
+        stateStores.put(store.name(), store);
+    }
+
+    @Override
+    public StateStore getStateStore(final String name) {
+        return stateStores.get(name);
+    }
+
+    @Override
+    public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
+        final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(intervalMs, type, callback);
+
+        punctuators.add(capturedPunctuator);
+
+        return new Cancellable() {
+            @Override
+            public void cancel() {
+                capturedPunctuator.cancel();
+            }
+        };
+    }
+
+    @Override
+    public void schedule(final long interval) {
+        throw new UnsupportedOperationException(
+            "schedule() is deprecated and not supported in Mock. " +
+                "Use schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) instead."
+        );
+    }
+
+    /**
+     * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
+     *
+     * @return A list of captured punctuators.
+     */
+    public List<CapturedPunctuator> scheduledPunctuators() {
+        final LinkedList<CapturedPunctuator> capturedPunctuators = new LinkedList<>();
+        capturedPunctuators.addAll(punctuators);
+        return capturedPunctuators;
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value) {
+        //noinspection unchecked
+        capturedForwards.add(new CapturedForward(To.all(), new KeyValue(key, value)));
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        //noinspection unchecked
+        capturedForwards.add(new CapturedForward(to, new KeyValue(key, value)));
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value, final int childIndex) {
+        throw new UnsupportedOperationException(
+            "Forwarding to a child by index is deprecated. " +
+                "Please transition processors to forward using a 'To' object instead."
+        );
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value, final String childName) {
+        throw new UnsupportedOperationException(
+            "Forwarding to a child by name is deprecated. " +
+                "Please transition processors to forward using 'To.child(childName)' instead."
+        );
+    }
+
+    /**
+     * Get all the forwarded data this context has observed. The returned list will not be
+     * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
+     * {@code forward(...)}.
+     *
+     * @return A list of key/value pairs that were previously passed to the context.
+     */
+    public List<CapturedForward> forwarded() {
+        final LinkedList<CapturedForward> result = new LinkedList<>();
+        result.addAll(capturedForwards);
+        return result;
+    }
+
+    /**
+     * Get all the forwarded data this context has observed for a specific child by name.
+     * The returned list will not be affected by subsequent interactions with the context.
+     * The data in the list is in the same order as the calls to {@code forward(...)}.
+     *
+     * @param childName The child name to retrieve forwards for
+     * @return A list of key/value pairs that were previously passed to the context.
+     */
+    public List<CapturedForward> forwarded(final String childName) {
+        final LinkedList<CapturedForward> result = new LinkedList<>();
+        for (final CapturedForward capture : capturedForwards) {
+            if (capture.childName() == null || capture.childName().equals(childName)) {
+                result.add(capture);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Clear the captured forwarded data.
+     */
+    public void resetForwards() {
+        capturedForwards.clear();
+    }
+
+    @Override
+    public void commit() {
+        committed = true;
+    }
+
+    /**
+     * Whether {@link ProcessorContext#commit()} has been called in this context.
+     *
+     * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
+     */
+    public boolean committed() {
+        return committed;
+    }
+
+    /**
+     * Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
+     */
+    public void resetCommit() {
+        committed = false;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        // This interface is assumed by state stores that add change-logging.
+        // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
+
+        throw new UnsupportedOperationException("MockProcessorContext does not provide record collection. " +
+            "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+            "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration.");
+    }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
new file mode 100644
index 00000000000..8c5ec4604fa
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MockProcessorContextTest {
+    @Test
+    public void shouldCaptureOutputRecords() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @Override
+            public void process(final String key, final Long value) {
+                context().forward(key + value, key.length() + value);
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+        assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+        assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+        assertFalse(forwarded.hasNext());
+
+        context.resetForwards();
+
+        assertEquals(0, context.forwarded().size());
+    }
+
+    @Test
+    public void shouldCaptureOutputRecordsUsingTo() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @Override
+            public void process(final String key, final Long value) {
+                context().forward(key + value, key.length() + value, To.all());
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+        assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+        assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+        assertFalse(forwarded.hasNext());
+
+        context.resetForwards();
+
+        assertEquals(0, context.forwarded().size());
+    }
+
+    @Test
+    public void shouldCaptureRecordsOutputToChildByName() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            private int count = 0;
+
+            @Override
+            public void process(final String key, final Long value) {
+                if (count == 0) {
+                    context().forward("start", -1L, To.all()); // broadcast
+                }
+                final To toChild = count % 2 == 0 ? To.child("george") : To.child("pete");
+                context().forward(key + value, key.length() + value, toChild);
+                count++;
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        {
+            final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+
+            final CapturedForward forward1 = forwarded.next();
+            assertEquals(new KeyValue<>("start", -1L), forward1.keyValue());
+            assertEquals(null, forward1.childName());
+
+            final CapturedForward forward2 = forwarded.next();
+            assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue());
+            assertEquals("george", forward2.childName());
+
+            final CapturedForward forward3 = forwarded.next();
+            assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue());
+            assertEquals("pete", forward3.childName());
+
+            assertFalse(forwarded.hasNext());
+        }
+
+        {
+            final Iterator<CapturedForward> forwarded = context.forwarded("george").iterator();
+            assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+            assertFalse(forwarded.hasNext());
+        }
+
+        {
+            final Iterator<CapturedForward> forwarded = context.forwarded("pete").iterator();
+            assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+            assertFalse(forwarded.hasNext());
+        }
+
+        {
+            final Iterator<CapturedForward> forwarded = context.forwarded("steve").iterator();
+            assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+            assertFalse(forwarded.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldThrowIfForwardedWithDeprecatedChildIndex() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @Override
+            public void process(final String key, final Long value) {
+                //noinspection deprecation
+                context().forward(key, value, 0);
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        try {
+            processor.process("foo", 5L);
+            fail("Should have thrown an UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) {
+            // expected
+        }
+    }
+
+    @Test
+    public void shouldThrowIfForwardedWithDeprecatedChildName() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @Override
+            public void process(final String key, final Long value) {
+                //noinspection deprecation
+                context().forward(key, value, "child1");
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        try {
+            processor.process("foo", 5L);
+            fail("Should have thrown an UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) {
+            // expected
+        }
+    }
+
+    @Test
+    public void shouldCaptureCommitsAndAllowReset() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            private int count = 0;
+
+            @Override
+            public void process(final String key, final Long value) {
+                if (++count > 2) context().commit();
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        assertFalse(context.committed());
+
+        processor.process("foobar", 500L);
+
+        assertTrue(context.committed());
+
+        context.resetCommit();
+
+        assertFalse(context.committed());
+    }
+
+    @Test
+    public void shouldStoreAndReturnStateStores() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @Override
+            public void process(final String key, final Long value) {
+                //noinspection unchecked
+                final KeyValueStore<String, Long> stateStore = (KeyValueStore<String, Long>) context().getStateStore("my-state");
+                stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
+                stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+        final KeyValueStore<String, Long> store = new InMemoryKeyValueStore<>("my-state", Serdes.String(), Serdes.Long());
+        context.register(store, false, null);
+
+        store.init(context, store);
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("bar", 50L);
+
+        assertEquals(5L, (long) store.get("foo"));
+        assertEquals(50L, (long) store.get("bar"));
+        assertEquals(55L, (long) store.get("all"));
+    }
+
+    @Test
+    public void shouldCaptureApplicationAndRecordMetadata() {
+        final Properties config = new Properties();
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata");
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+
+        final AbstractProcessor<String, Object> processor = new AbstractProcessor<String, Object>() {
+            @Override
+            public void process(final String key, final Object value) {
+                context().forward("appId", context().applicationId());
+                context().forward("taskId", context().taskId());
+
+                context().forward("topic", context().topic());
+                context().forward("partition", context().partition());
+                context().forward("offset", context().offset());
+                context().forward("timestamp", context().timestamp());
+
+                context().forward("key", key);
+                context().forward("value", value);
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext(config);
+        processor.init(context);
+
+        try {
+            processor.process("foo", 5L);
+            fail("Should have thrown an exception.");
+        } catch (final IllegalStateException expected) {
+            // expected, since the record metadata isn't initialized
+        }
+
+        context.resetForwards();
+        context.setRecordMetadata("t1", 0, 0L, 0L);
+
+        {
+            processor.process("foo", 5L);
+            final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+            assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("offset", 0L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("timestamp", 0L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("key", "foo"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("value", 5L), forwarded.next().keyValue());
+        }
+
+        context.resetForwards();
+
+        // record metadata should be "sticky"
+        context.setOffset(1L);
+        context.setTimestamp(10L);
+
+        {
+            processor.process("bar", 50L);
+            final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+            assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("key", "bar"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("value", 50L), forwarded.next().keyValue());
+        }
+
+        context.resetForwards();
+        // record metadata should be "sticky"
+        context.setTopic("t2");
+        context.setPartition(30);
+
+        {
+            processor.process("baz", 500L);
+            final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+            assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("topic", "t2"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("partition", 30), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("key", "baz"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("value", 500L), forwarded.next().keyValue());
+        }
+    }
+
+    @Test
+    public void shouldCapturePunctuator() {
+        final Processor<String, Long> processor = new Processor<String, Long>() {
+            @Override
+            public void init(final ProcessorContext context) {
+                context.schedule(
+                    1000L,
+                    PunctuationType.WALL_CLOCK_TIME,
+                    new Punctuator() {
+                        @Override
+                        public void punctuate(final long timestamp) {
+                            context.commit();
+                        }
+                    }
+                );
+            }
+
+            @Override
+            public void process(final String key, final Long value) {
+            }
+
+            @Override
+            public void punctuate(final long timestamp) {
+            }
+
+            @Override
+            public void close() {
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
+        assertEquals(1000L, capturedPunctuator.getIntervalMs());
+        assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType());
+        assertFalse(capturedPunctuator.cancelled());
+
+        final Punctuator punctuator = capturedPunctuator.getPunctuator();
+        assertFalse(context.committed());
+        punctuator.punctuate(1234L);
+        assertTrue(context.committed());
+    }
+
+    @Test
+    public void fullConstructorShouldSetAllExpectedAttributes() {
+        final Properties config = new Properties();
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+
+        final File dummyFile = new File("");
+        final MockProcessorContext context = new MockProcessorContext(config, new TaskId(1, 1), dummyFile);
+
+        assertEquals("testFullConstructor", context.applicationId());
+        assertEquals(new TaskId(1, 1), context.taskId());
+        assertEquals("testFullConstructor", context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG));
+        assertEquals("testFullConstructor", context.appConfigsWithPrefix("application.").get("id"));
+        assertEquals(Serdes.String().getClass(), context.keySerde().getClass());
+        assertEquals(Serdes.Long().getClass(), context.valueSerde().getClass());
+        assertEquals(dummyFile, context.stateDir());
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add MockProcessorContext to public test-utils
> ---------------------------------------------
>
>                 Key: KAFKA-6473
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6473
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Matthias J. Sax
>            Assignee: John Roesler
>            Priority: Major
>              Labels: needs-kip, user-experience
>
> With KIP-247, we added public test-utils artifact with a TopologyTestDriver class. Using the test driver for a single Processor/Transformer/ValueTransformer it's required to specify a whole topology with source and sink and plus the Processor/Transformer/ValueTransformer into it.
> For unit testing, it might be more convenient to have a MockProcessorContext, that can be used to test the Processor/Transformer/ValueTransformer in isolation. Ie, the test itself creates new Processor/Transformer/ValueTransformer object and calls init() manually passing in the MockProcessorContext.
> This is a public API change and requires a KIP: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message