nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [11/11] nifi-minifi-cpp git commit: MINIFI-350 Added pytest-based system integration test framework and initial test cases
Date Tue, 12 Sep 2017 17:51:34 GMT
MINIFI-350 Added pytest-based system integration test framework and initial test cases

This closes #126.

Signed-off-by: Aldrin Piri <aldrin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/84f50b51
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/84f50b51
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/84f50b51

Branch: refs/heads/master
Commit: 84f50b5173b6d3cb88ddd8a8f366b80483081da2
Parents: 35a47c7
Author: Andrew I. Christianson <andy@andyic.org>
Authored: Thu Jul 13 10:42:35 2017 -0400
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Tue Sep 12 13:51:14 2017 -0400

----------------------------------------------------------------------
 .gitignore                                      |   4 +
 README.md                                       |  18 ++
 cmake/DockerConfig.cmake                        |   4 +-
 docker/DockerVerify.sh                          |  38 +++
 docker/test/integration/.gitignore              |   1 +
 docker/test/integration/README.md               | 184 ++++++++++++
 docker/test/integration/minifi/__init__.py      | 298 +++++++++++++++++++
 docker/test/integration/minifi/test/__init__.py | 191 ++++++++++++
 docker/test/integration/test_filesystem_ops.py  |  54 ++++
 docker/test/integration/test_http.py            |  36 +++
 10 files changed, 827 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 2ba3164..c28b765 100644
--- a/.gitignore
+++ b/.gitignore
@@ -54,3 +54,7 @@ thirdparty/apache-rat/apache-rat*
 # Ignore source files that have been placed in the docker directory during build
 docker/minificppsource
 *.swp
+.cache
+.cproject
+.settings
+*.pyc

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index f971683..8830aaa 100644
--- a/README.md
+++ b/README.md
@@ -99,6 +99,11 @@ $ yum install cmake \
   boost-devel \
   libssl-dev \
   doxygen
+$ # (Optional) for building docker image
+$ yum install docker
+$ # (Optional) for system integration tests
+$ yum install docker python-virtualenv
+
 ```
 
 Aptitude based Linux Distributions
@@ -111,6 +116,10 @@ $ apt-get install cmake \
   uuid-dev uuid \
   libboost-all-dev libssl-dev \
   doxygen
+$ # (Optional) for building docker image
+$ apt-get install docker.io
+$ # (Optional) for system integration tests
+$ apt-get install docker.io python-virtualenv
 ```
 
 OS X Using Homebrew (with XCode Command Line Tools installed)
@@ -124,6 +133,9 @@ $ brew install cmake \
   doxygen
 $ brew install curl
 $ brew link curl --force
+$ # (Optional) for building docker image/running system integration tests
+$ # Install docker using instructions at https://docs.docker.com/docker-for-mac/install/
+$ sudo pip install virtualenv
 ```
 
 
@@ -221,6 +233,12 @@ Successfully built c390063d9bd1
 Built target docker
 ```
 
+- (Optional) Execute system integration tests using the docker image built locally on a docker
daemon running locally.
+```
+~/Development/code/apache/nifi-minifi-cpp/build
+$ make docker-verify
+```
+
 ### Cleaning
 Remove the build directory created above.
 ```

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/cmake/DockerConfig.cmake
----------------------------------------------------------------------
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index 41ca7f7..57270e4 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -21,4 +21,6 @@ add_custom_target(
     COMMAND ${CMAKE_SOURCE_DIR}/docker/DockerBuild.sh 1000 1000 ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}
minificppsource ${CMAKE_SOURCE_DIR}
     WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/docker/)
 
-
+add_custom_target(
+    docker-verify
+    COMMAND ${CMAKE_SOURCE_DIR}/docker/DockerVerify.sh)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/DockerVerify.sh
----------------------------------------------------------------------
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
new file mode 100755
index 0000000..be62c1f
--- /dev/null
+++ b/docker/DockerVerify.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e
+
+docker_dir="$( cd ${0%/*} && pwd )"
+
+# Create virutal environment for testing
+if [[ ! -d ./test-env-py2 ]]; then
+  echo "Creating virtual environment in ./test-env-py2" 1>&2
+  virtualenv ./test-env-py2
+fi
+
+echo "Activating virtual environment..." 1>&2
+. ./test-env-py2/bin/activate
+pip install --upgrade pip setuptools
+
+# Install test dependencies
+echo "Installing test dependencies..." 1>&2
+pip install --upgrade pytest docker PyYAML watchdog
+
+export MINIFI_VERSION=0.3.0
+export PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration"
+pytest -s -v "${docker_dir}"/test/integration

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/.gitignore
----------------------------------------------------------------------
diff --git a/docker/test/integration/.gitignore b/docker/test/integration/.gitignore
new file mode 100644
index 0000000..bee8a64
--- /dev/null
+++ b/docker/test/integration/.gitignore
@@ -0,0 +1 @@
+__pycache__

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/README.md
----------------------------------------------------------------------
diff --git a/docker/test/integration/README.md b/docker/test/integration/README.md
new file mode 100644
index 0000000..6d8c066
--- /dev/null
+++ b/docker/test/integration/README.md
@@ -0,0 +1,184 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# Apache MiNiFi Docker System Integration Tests
+
+Apache MiNiFi includes a suite of docker-based system integration tests. These
+tests are designed to test the integration between distinct MiNiFi instances as
+well as other systems which are available in docker, such as Apache NiFi.
+
+## Test Execution Lifecycle
+
+Each test involves the following stages as part of its execution lifecycle:
+
+### Definition of flows/Flow DSL
+
+Flows are defined using a python-native domain specific language (DSL). The DSL
+supports the standard primitives which make up a NiFi/MiNiFi flow, such as
+processors, connections, and controller services. Several processors defined in
+the DSL have optional, named parameters enabling concise flow expression.
+
+By default, all relationships are set to auto-terminate. If a relationship is
+used, it is automatically taken out of the auto\_terminate list.
+
+**Example Trivial Flow:**
+
+```python
+flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
+```
+
+#### Supported Processors
+
+The following processors/parameters are supported:
+
+**GetFile**
+
+- input\_dir
+
+**PutFile**
+
+- output\_dir
+
+**LogAttribute**
+
+**ListenHTTP**
+
+- port
+- cert=None
+
+**InvokeHTTP**
+
+- url
+- method='GET'
+- ssl\_context\_service=None
+
+### Definition of an output validator
+
+The output validator is responsible for checking the state of a cluster for
+valid output conditions. Currently, the only supported output validator is the
+SingleFileOutputValidator, which looks for a single file to be written to
+/tmp/output by a flow having a given string as its contents.
+
+**Example SingleFileOutputValidator:**
+
+```python
+SingleFileOutputValidator('example output')
+```
+
+This example SingleFileOutputValidator would validate that a single file is
+written with the contents 'example output.'
+
+### Creation of a DockerTestCluster
+
+DockerTestCluster instances are used to deploy one or more flow to a simulated
+or actual multi-host docker cluster. This enables testing of interactions
+between multiple system components, such as MiNiFi flows. Before the test
+cluster is destroyed, an assertion may be performed on the results of the
+*check\_output()* method of the cluster. This invokes the validator supplied at
+construction against the output state of the system.
+
+Creation of a DockerTestCluster is simple:
+
+**Example DockerTestCluster Instantiation:**
+
+```python
+with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+  ...
+  # Perform test operations
+  ...
+  assert cluster.check_output()
+```
+
+Note that a docker cluster must be created inside of a *with* structure to
+ensure that all resources are ccreated and destroyed cleanly. 
+
+### Insertion of test input data
+
+Although arbitrary NiFi flows can ingest data from a multitude of sources, a
+MiNiFi system integration test is expected to receive input via deterministed,
+controlled channels. The primary supported method of providing input to a
+MiNiFi system integration test is to insert data into the filesystem at
+/tmp/input.
+
+To write a string to the contents of a file in /tmp/input, use the
+*put\_test\_data()* method.
+
+**Example put\_test\_data() Usage:**
+
+```python
+cluster.put_test_data('test')
+```
+
+This writes a file with a random name to /tmp/input, with the contents 'test.'
+
+To provide a resource to a container, such as a TLS certificate, use the
+*put\_test\_resource()* method to write a resource file to /tmp/resources.
+
+**Example put\_test\_resource() Usage:**
+
+```python
+cluster.put_test_resource('test-resource', 'resource contents')
+```
+
+This writes a file to /tmp/resources/test-resource with the contents 'resource
+contents.'
+
+### Deployment of one or more flows
+
+Deployment of flows to a test cluster is performed using the *deploy\_flow()*
+method of a cluster. Each flow is deployed as a separate docker service having
+its own DNS name. If a name is not provided upon deployment, a random name will
+be used.
+
+**Example deploy\_flow() Usage:**
+
+```python
+cluster.deploy_flow(flow, name='test-flow')
+```
+
+### Execution of one or more flows
+
+Flows are executed immediately upon deployment and according to schedule
+properties defined in the flow.yml. As such, to minimize test latency it is
+important to ensure that test inputs are added to the test cluster before flows
+are deployed. Filesystem events are monitored using event APIs, ensuring that
+flows are executed immediately upon input availability and output is validated
+immediately after it is written to disk.
+
+### Output validation
+
+As soon as data is written to /tmp/output, the OutputValidator (defined
+according to the documentation above) is executed on the output. The
+*check\_output()* cluster method waits for up to 5 seconds for valid output.
+
+### Cluster teardown/cleanup
+
+The deployment of a test cluster involves creating one or more docker
+containers and networks, as well as temporary files/directories on the host
+system. All resources are cleaned up automatically as long as clusters are
+created within a *with* block.
+
+```python
+
+# Using the with block ensures that all cluster resources are cleaned up after
+# the test cluster is no longer needed.
+
+with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+  ...
+  # Perform test operations
+  ...
+  assert cluster.check_output()
+```
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/minifi/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
new file mode 100644
index 0000000..557b9a8
--- /dev/null
+++ b/docker/test/integration/minifi/__init__.py
@@ -0,0 +1,298 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import uuid
+import logging
+
+import yaml
+import docker
+
+
+class Cluster(object):
+    """
+    Base Cluster class. This is intended to be a generic interface
+    to different types of clusters. Clusters could be Kubernetes clusters,
+    Docker swarms, or cloud compute/container services.
+    """
+
+    def deploy_flow(self, flow):
+        """
+        Deploys a flow to the cluster.
+        """
+
+    def __enter__(self):
+        """
+        Allocate ephemeral cluster resources.
+        """
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """
+        Clean up ephemeral cluster resources.
+        """
+
+class SingleNodeDockerCluster(Cluster):
+    """
+    A "cluster" which consists of a single docker node. Useful for
+    testing or use-cases which do not span multiple compute nodes.
+    """
+
+    def __init__(self):
+        self.network = None
+        self.containers = []
+        self.tmp_files = []
+
+        # Get docker client
+        self.client = docker.from_env()
+
+    def deploy_flow(self, flow, name=None, vols={}):
+        """
+        Compiles the flow to YAML and maps it into the container using
+        the docker volumes API.
+        """
+
+        logging.info('Deploying flow...')
+
+        if name is None:
+            name = 'minifi-' + str(uuid.uuid4())
+            logging.info('Flow name was not provided; using generated name \'%s\'', name)
+
+        minifi_version = os.environ['MINIFI_VERSION']
+        self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + minifi_version
+
+        # Write flow config
+        tmp_flow_file_name = '/tmp/.minifi-flow.' + str(uuid.uuid4())
+        self.tmp_files.append(tmp_flow_file_name)
+
+        yaml = flow_yaml(flow)
+
+        logging.info('Using generated flow config yml:\n%s', yaml)
+
+        with open(tmp_flow_file_name, 'w') as tmp_flow_file:
+            tmp_flow_file.write(yaml)
+
+        conf_file = tmp_flow_file_name
+
+        local_vols = {}
+        local_vols[conf_file] = {'bind': self.minifi_root + '/conf/config.yml', 'mode': 'ro'}
+        local_vols.update(vols)
+
+        logging.info('Creating and running docker container for flow...')
+
+        # Create network if necessary
+        if self.network is None:
+            net_name = 'minifi-' + str(uuid.uuid4())
+            logging.info('Creating network: %s', net_name)
+            self.network = self.client.networks.create(net_name)
+
+        container = self.client.containers.run(
+            'apacheminificpp:' + minifi_version,
+            detach=True,
+            name=name,
+            network=self.network.name,
+            volumes=local_vols)
+
+        logging.info('Started container \'%s\'', container.name)
+        self.containers.append(container)
+
+    def __enter__(self):
+        """
+        Allocate ephemeral cluster resources.
+        """
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """
+        Clean up ephemeral cluster resources
+        """
+
+        # Clean up containers
+        for container in self.containers:
+            logging.info('Cleaning up container: %s', container.name)
+            container.remove(v=True, force=True)
+
+        # Clean up network
+        if self.network is not None:
+            logging.info('Cleaning up network network: %s', self.network.name)
+            self.network.remove()
+
+        # Clean up tmp files
+        for tmp_file in self.tmp_files:
+            os.remove(tmp_file)
+
+
+class Processor(object):
+
+    def __init__(self,
+                 clazz,
+                 properties={},
+                 schedule={},
+                 name=None,
+                 auto_terminate=[]):
+        self.connections = {}
+        self.uuid = uuid.uuid4()
+
+        if name is None:
+            self.name = str(self.uuid)
+
+        self.clazz = clazz
+        self.properties = properties
+        self.auto_terminate = auto_terminate
+
+        self.out_proc = self
+
+        self.schedule = {
+            'scheduling strategy': 'EVENT_DRIVEN',
+            'scheduling period': '1 sec',
+            'penalization period': '30 sec',
+            'yield period': '1 sec',
+            'run duration nanos': 0
+        }
+        self.schedule.update(schedule)
+
+    def connect(self, connections):
+        for rel in connections:
+
+            # Ensure that rel is not auto-terminated
+            if rel in self.auto_terminate:
+                del self.auto_terminate[self.auto_terminate.index(rel)]
+
+            # Add to set of output connections for this rel
+            if not rel in self.connections:
+                self.connections[rel] = []
+            self.connections[rel].append(connections[rel])
+
+        return self
+
+    def __rshift__(self, other):
+        """
+        Right shift operator to support flow DSL, for example:
+
+            GetFile('/input') >> LogAttribute() >> PutFile('/output')
+
+        """
+
+        if (isinstance(other, tuple)):
+            if (isinstance(other[0], tuple)):
+                for rel_tuple in other:
+                    rel = {}
+                    rel[rel_tuple[0]] = rel_tuple[1]
+                    self.out_proc.connect(rel)
+            else:
+                rel = {}
+                rel[other[0]] = other[1]
+                self.out_proc.connect(rel)
+        else:
+            self.out_proc.connect({'success': other})
+            self.out_proc = other
+
+        return self
+
+
+def InvokeHTTP(url, method='GET'):
+    return Processor('InvokeHTTP',
+                     properties={'Remote URL': url,
+                                 'HTTP Method': method},
+                     auto_terminate=['success',
+                                     'response',
+                                     'retry',
+                                     'failure',
+                                     'no retry'])
+
+
+def ListenHTTP(port):
+    return Processor('ListenHTTP',
+                     properties={'Listening Port': port},
+                     auto_terminate=['success'])
+
+
+def LogAttribute():
+    return Processor('LogAttribute',
+                     auto_terminate=['success'])
+
+
+def GetFile(input_dir):
+    return Processor('GetFile',
+                     properties={'Input Directory': input_dir},
+                     schedule={'scheduling period': '0 sec'},
+                     auto_terminate=['success'])
+
+
+def PutFile(output_dir):
+    return Processor('PutFile',
+                     properties={'Output Directory': output_dir},
+                     auto_terminate=['success', 'failure'])
+
+
+def flow_yaml(processor, root=None, visited=[]):
+
+    if root is None:
+        res = {
+            'Flow Controller': {
+                'name': 'MiNiFi Flow'
+            },
+            'Processors': [],
+            'Connections': [],
+            'Remote Processing Groups': []
+        }
+    else:
+        res = root
+
+    visited.append(processor)
+
+    if hasattr(processor, 'name'):
+        proc_name = processor.name
+    else:
+        proc_name = str(processor.uuid)
+
+    res['Processors'].append({
+        'name': proc_name,
+        'id': str(processor.uuid),
+        'class': 'org.apache.nifi.processors.standard.' + processor.clazz,
+        'scheduling strategy': processor.schedule['scheduling strategy'],
+        'scheduling period': processor.schedule['scheduling period'],
+        'penalization period': processor.schedule['penalization period'],
+        'yield period': processor.schedule['yield period'],
+        'run duration nanos': processor.schedule['run duration nanos'],
+        'Properties': processor.properties,
+        'auto-terminated relationships list': processor.auto_terminate
+    })
+
+    for conn_name in processor.connections:
+        conn_procs = processor.connections[conn_name]
+
+        if isinstance(conn_procs, list):
+            for proc in conn_procs:
+                res['Connections'].append({
+                    'name': str(uuid.uuid4()),
+                    'source id': str(processor.uuid),
+                    'source relationship name': conn_name,
+                    'destination id': str(proc.uuid)
+                })
+                if proc not in visited:
+                    flow_yaml(proc, res, visited)
+        else:
+            res['Connections'].append({
+                'name': str(uuid.uuid4()),
+                'source id': str(processor.uuid),
+                'source relationship name': conn_name,
+                'destination id': str(conn_procs.uuid)
+            })
+            if conn_procs not in visited:
+                flow_yaml(conn_procs, res, visited)
+
+    if root is None:
+        return yaml.dump(res, default_flow_style=False)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/minifi/test/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
new file mode 100644
index 0000000..a7a3030
--- /dev/null
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import logging
+import shutil
+import uuid
+
+from threading import Event
+
+from os import listdir
+from os.path import isfile, join
+
+from watchdog.observers import Observer
+from watchdog.events import FileSystemEventHandler
+
+from minifi import SingleNodeDockerCluster
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+class DockerTestCluster(SingleNodeDockerCluster):
+
+    def __init__(self, output_validator):
+
+        # Create test input/output directories
+        test_cluster_id = str(uuid.uuid4())
+
+        self.tmp_test_output_dir = '/tmp/.minifi-test-output.' + test_cluster_id
+        self.tmp_test_input_dir = '/tmp/.minifi-test-input.' + test_cluster_id
+
+        logging.info('Creating tmp test input dir: %s', self.tmp_test_input_dir)
+        os.makedirs(self.tmp_test_input_dir, mode=0777)
+        logging.info('Creating tmp test output dir: %s', self.tmp_test_output_dir)
+        os.makedirs(self.tmp_test_output_dir, mode=0777)
+
+        # Point output validator to ephemeral output dir
+        self.output_validator = output_validator
+        output_validator.set_output_dir(self.tmp_test_output_dir)
+
+        # Start observing output dir
+        self.done_event = Event()
+        event_handler = OutputEventHandler(output_validator, self.done_event)
+        self.observer = Observer()
+        self.observer.schedule(event_handler, self.tmp_test_output_dir)
+        self.observer.start()
+
+        super(DockerTestCluster, self).__init__()
+
+    def deploy_flow(self, flow, name=None):
+        """
+        Performs a standard container flow deployment with the addition
+        of volumes supporting test input/output directories.
+        """
+
+        vols = {}
+        vols[self.tmp_test_input_dir] = {'bind': '/tmp/input', 'mode': 'rw'}
+        vols[self.tmp_test_output_dir] = {'bind': '/tmp/output', 'mode': 'rw'}
+
+        super(DockerTestCluster, self).deploy_flow(flow, vols=vols, name=name)
+
+    def put_test_data(self, contents):
+        """
+        Creates a randomly-named file in the test input dir and writes
+        the given content to it.
+        """
+
+        test_file_name = join(self.tmp_test_input_dir, str(uuid.uuid4()))
+        logging.info('Writing %d bytes of content to test file: %s', len(contents), test_file_name)
+
+        with open(test_file_name, 'w') as test_input_file:
+            test_input_file.write(contents)
+
+    def wait_for_output(self, timeout_seconds):
+        logging.info('Waiting up to %d seconds for test output...', timeout_seconds)
+        self.done_event.wait(timeout_seconds)
+        self.observer.stop()
+        self.observer.join()
+
+    def log_minifi_output(self):
+
+        for container in self.containers:
+            container = self.client.containers.get(container.id)
+            logging.info('Container logs for container \'%s\':\n%s', container.name, container.logs())
+            if container.status == 'running':
+                app_logs = container.exec_run('cat ' + self.minifi_root + '/minifi-app.log')
+                logging.info('MiNiFi app logs for container \'%s\':\n%s', container.name,
app_logs)
+            else:
+                logging.info(container.status)
+                logging.info('Could not cat app logs for container \'%s\' because it is not
running',
+                             container.name)
+            stats = container.stats(decode=True, stream=False)
+            logging.info('Container stats:\n%s', repr(stats))
+
+    def check_output(self):
+        """
+        Wait for flow output, validate it, and log minifi output.
+        """
+        self.wait_for_output(5)
+        self.log_minifi_output()
+
+        return self.output_validator.validate()
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """
+        Clean up ephemeral test resources.
+        """
+
+        logging.info('Removing tmp test input dir: %s', self.tmp_test_input_dir)
+        shutil.rmtree(self.tmp_test_input_dir)
+        logging.info('Removing tmp test output dir: %s', self.tmp_test_output_dir)
+        shutil.rmtree(self.tmp_test_output_dir)
+
+        super(DockerTestCluster, self).__exit__(exc_type, exc_val, exc_tb)
+
+
+class OutputEventHandler(FileSystemEventHandler):
+
+    def __init__(self, validator, done_event):
+        self.validator = validator
+        self.done_event = done_event
+
+    def on_created(self, event):
+        logging.info('Output file created: ' + event.src_path)
+        self.check(event)
+
+    def on_modified(self, event):
+        logging.info('Output file modified: ' + event.src_path)
+        self.check(event)
+
+    def check(self, event):
+        if self.validator.validate():
+            logging.info('Output file is valid')
+            self.done_event.set()
+        else:
+            logging.info('Output file is invalid')
+
+
+class OutputValidator(object):
+    """
+    Base output validator class. Validators must implement
+    method validate, which returns a boolean.
+    """
+
+    def validate(self):
+        """
+        Return True if output is valid; False otherwise.
+        """
+
+class SingleFileOutputValidator(OutputValidator):
+    """
+    Validates the content of a single file in the given directory.
+    """
+
+    def __init__(self, expected_content):
+        self.valid = False
+        self.expected_content = expected_content
+
+    def set_output_dir(self, output_dir):
+        self.output_dir = output_dir
+
+    def validate(self):
+
+        if self.valid:
+            return True
+
+        listing = listdir(self.output_dir)
+
+        if len(listing) > 0:
+            out_file_name = listing[0]
+
+            with open(join(self.output_dir, out_file_name), 'r') as out_file:
+                contents = out_file.read()
+
+                if contents == self.expected_content:
+                    self.valid = True
+                    return True
+
+        return False

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/test_filesystem_ops.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_filesystem_ops.py b/docker/test/integration/test_filesystem_ops.py
new file mode 100644
index 0000000..7a6f212
--- /dev/null
+++ b/docker/test/integration/test_filesystem_ops.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+
+def test_get_put():
+    """
+    Verify basic file get/put operations.
+    """
+
+    flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
+
+    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+
+        cluster.put_test_data('test')
+        cluster.deploy_flow(flow)
+
+        assert cluster.check_output()
+
+
+def test_file_exists_failure():
+    """
+    Verify that putting to a file that already exists fails.
+    """
+
+    flow = (GetFile('/tmp/input') >>
+
+            # First put should succeed
+            PutFile('/tmp') >>
+
+            # Second put should fail (file exists)
+            PutFile('/tmp') >> (('success', LogAttribute()),
+                                ('failure', LogAttribute() >> PutFile('/tmp/output'))))
+
+    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+
+        cluster.put_test_data('test')
+        cluster.deploy_flow(flow)
+
+        assert cluster.check_output()

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/test_http.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_http.py b/docker/test/integration/test_http.py
new file mode 100644
index 0000000..72c80bd
--- /dev/null
+++ b/docker/test/integration/test_http.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+
+def test_invoke_listen():
+    """
+    Verify sending using InvokeHTTP to a receiver using ListenHTTP.
+    """
+
+    invoke_flow = (GetFile('/tmp/input') >> LogAttribute() >>
+                   InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
+
+    listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
+
+    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+
+        cluster.put_test_data('test')
+        cluster.deploy_flow(listen_flow, name='minifi-listen')
+        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
+
+        assert cluster.check_output()


Mime
View raw message