Return-Path:
X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io
Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io
Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183])
by cust-asf2.ponee.io (Postfix) with ESMTP id 833DD200CE7
for ; Fri, 1 Sep 2017 17:07:29 +0200 (CEST)
Received: by cust-asf.ponee.io (Postfix)
id 81A1F16D384; Fri, 1 Sep 2017 15:07:29 +0000 (UTC)
Delivered-To: archive-asf-public@cust-asf.ponee.io
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by cust-asf.ponee.io (Postfix) with SMTP id D781116D385
for ; Fri, 1 Sep 2017 17:07:27 +0200 (CEST)
Received: (qmail 85516 invoked by uid 500); 1 Sep 2017 15:07:27 -0000
Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@nifi.apache.org
Delivered-To: mailing list commits@nifi.apache.org
Received: (qmail 85429 invoked by uid 99); 1 Sep 2017 15:07:26 -0000
Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Sep 2017 15:07:26 +0000
Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33)
id 07A7DF563C; Fri, 1 Sep 2017 15:07:26 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: bbende@apache.org
To: commits@nifi.apache.org
Date: Fri, 01 Sep 2017 15:07:27 -0000
Message-Id: <6ab4a3cb7dcd474bb23d387df08d171a@git.apache.org>
In-Reply-To: <6dacb5f0601044bcbd03809db35a32d0@git.apache.org>
References: <6dacb5f0601044bcbd03809db35a32d0@git.apache.org>
X-Mailer: ASF-Git Admin Mailer
Subject: [3/3] nifi-registry git commit: NIFIREG-10 Implementing Registry
service layer and connecting to REST end-points - Adding
FlowSnapshotSerializer and JAXBFlowSnapshotSerializer - Adding bean
validation 2.0.0 using Hibernate validator as the implementation
archived-at: Fri, 01 Sep 2017 15:07:29 -0000
NIFIREG-10 Implementing Registry service layer and connecting to REST end-points
- Adding FlowSnapshotSerializer and JAXBFlowSnapshotSerializer
- Adding bean validation 2.0.0 using Hibernate validator as the implementation
- Adding a test that can be run manual to exercise some of the REST API
This closes #7.
Project: http://git-wip-us.apache.org/repos/asf/nifi-registry/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-registry/commit/a1629c86
Tree: http://git-wip-us.apache.org/repos/asf/nifi-registry/tree/a1629c86
Diff: http://git-wip-us.apache.org/repos/asf/nifi-registry/diff/a1629c86
Branch: refs/heads/master
Commit: a1629c86df62930f603f264495f63a2429af7f1a
Parents: 88cae4f
Author: Bryan Bende
Authored: Tue Aug 22 16:15:39 2017 -0400
Committer: Bryan Bende
Committed: Fri Sep 1 11:06:49 2017 -0400
----------------------------------------------------------------------
build-and-run.sh | 28 +
nifi-registry-data-model/pom.xml | 5 +-
.../org/apache/nifi/registry/bucket/Bucket.java | 16 +-
.../apache/nifi/registry/bucket/BucketItem.java | 27 +-
.../nifi/registry/flow/VersionedComponent.java | 4 +
.../nifi/registry/flow/VersionedFlow.java | 12 +-
.../registry/flow/VersionedFlowSnapshot.java | 10 +
.../flow/VersionedFlowSnapshotMetadata.java | 13 +
nifi-registry-framework/pom.xml | 19 +
.../exception/ResourceNotFoundException.java | 32 +
.../flow/StandardFlowSnapshotContext.java | 15 +
.../provider/StandardProviderFactory.java | 2 +-
.../serialization/FlowSnapshotSerializer.java | 101 ++
.../serialization/SerializationException.java | 35 +
.../nifi/registry/serialization/Serializer.java | 43 +
.../jaxb/JAXBFlowSnapshotSerializer.java | 30 +
.../serialization/jaxb/JAXBSerializer.java | 80 ++
.../nifi/registry/service/DataModelMapper.java | 125 +++
.../nifi/registry/service/RegistryService.java | 511 ++++++++++
.../registry/provider/MockMetadataProvider.java | 14 +-
.../TestFlowSnapshotSerializer.java | 66 ++
.../jaxb/TestJAXBFlowSnapshotSerializer.java | 63 ++
.../registry/service/TestDataModelMapper.java | 174 ++++
.../registry/service/TestRegistryService.java | 936 +++++++++++++++++++
.../registry/metadata/MetadataProvider.java | 21 +-
.../metadata/FileSystemMetadataProvider.java | 37 +-
.../nifi/registry/metadata/MetadataHolder.java | 38 +-
.../TestFileSystemMetadataProvider.java | 119 ++-
nifi-registry-web-api/pom.xml | 10 +-
.../web/NiFiRegistryResourceConfig.java | 33 +
.../registry/web/api/BucketFlowResource.java | 14 +-
.../nifi/registry/web/api/BucketResource.java | 58 +-
.../nifi/registry/web/api/FlowResource.java | 121 ++-
.../web/mapper/IllegalStateExceptionMapper.java | 44 +
.../mapper/ResourceNotFoundExceptionMapper.java | 45 +
.../mapper/SerializationExceptionMapper.java | 45 +
.../apache/nifi/registry/web/TestRestAPI.java | 152 +++
pom.xml | 37 +
38 files changed, 3014 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/build-and-run.sh
----------------------------------------------------------------------
diff --git a/build-and-run.sh b/build-and-run.sh
new file mode 100755
index 0000000..7c7d3d6
--- /dev/null
+++ b/build-and-run.sh
@@ -0,0 +1,28 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+REGISTRY_SCRIPT=`find nifi-registry-assembly/target/ -name nifi-registry.sh | head -1`
+REGISTRY_BIN_DIR=$(dirname "${REGISTRY_SCRIPT}")
+REGISTRY_DIR=$REGISTRY_BIN_DIR/..
+
+./${REGISTRY_SCRIPT} stop
+
+mvn clean install -Pcontrib-check
+
+./${REGISTRY_SCRIPT} start
+
+tail -n 500 -f ${REGISTRY_DIR}/logs/nifi-registry-app.log
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/pom.xml b/nifi-registry-data-model/pom.xml
index dbdda37..438fe01 100644
--- a/nifi-registry-data-model/pom.xml
+++ b/nifi-registry-data-model/pom.xml
@@ -25,7 +25,10 @@
io.swagger
swagger-annotations
- 1.5.16
+
+
+ javax.validation
+ validation-api
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java
index 2797213..9928cd8 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java
@@ -20,18 +20,32 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.registry.flow.VersionedFlow;
+import javax.validation.Valid;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotBlank;
+import javax.xml.bind.annotation.XmlRootElement;
import java.util.Objects;
import java.util.Set;
+@XmlRootElement
@ApiModel(value = "bucket")
public class Bucket {
+ @NotBlank
private String identifier;
+
+ @NotBlank
private String name;
+
+ @Min(1)
private long createdTimestamp;
+
private String description;
+
+ @Valid
private Set versionedFlows;
+
@ApiModelProperty("The id of the bucket. This is set by the server at creation time.")
public String getIdentifier() {
return identifier;
@@ -68,7 +82,7 @@ public class Bucket {
this.description = description;
}
- @ApiModelProperty("The versioned flows in the bucket.")
+ @ApiModelProperty(value = "The versioned flows in the bucket.", readOnly = true)
public Set getVersionedFlows() {
return versionedFlows;
}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java
index eec4fee..d57f07b 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java
@@ -19,17 +19,36 @@ package org.apache.nifi.registry.bucket;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
import java.util.Objects;
@ApiModel("bucketItem")
-public class BucketItem {
+public abstract class BucketItem {
+ @NotBlank
private String identifier;
+
+ @NotBlank
private String name;
+
+ @NotBlank
private String bucketIdentifier;
+
+ @Min(1)
private long createdTimestamp;
+
+ @Min(1)
private long modifiedTimestamp;
- private BucketItemType type;
+
+ @NotNull
+ private final BucketItemType type;
+
+ public BucketItem(final BucketItemType type) {
+ this.type = type;
+ }
+
@ApiModelProperty("An ID to uniquely identify this object.")
public String getIdentifier() {
@@ -81,10 +100,6 @@ public class BucketItem {
return type;
}
- public void setType(BucketItemType type) {
- this.type = type;
- }
-
@Override
public int hashCode() {
return Objects.hashCode(this.identifier);
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java
index bef9557..af9b3d7 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java
@@ -64,4 +64,8 @@ public abstract class VersionedComponent {
}
public abstract ComponentType getComponentType();
+
+ public void setComponentType(ComponentType type) {
+ // purposely do nothing here, this just to allow unmarshalling
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java
index d1b1108..8bbb040 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java
@@ -19,7 +19,10 @@ package org.apache.nifi.registry.flow;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.registry.bucket.BucketItem;
+import org.apache.nifi.registry.bucket.BucketItemType;
+import javax.validation.Valid;
+import javax.xml.bind.annotation.XmlRootElement;
import java.util.SortedSet;
/**
@@ -31,12 +34,19 @@ import java.util.SortedSet;
*
* @see VersionedFlowSnapshot
*/
+@XmlRootElement
@ApiModel(value = "versionedFlow")
public class VersionedFlow extends BucketItem {
private String description;
+
+ @Valid
private SortedSet snapshotMetadata;
+ public VersionedFlow() {
+ super(BucketItemType.FLOW);
+ }
+
@ApiModelProperty("A description of the flow.")
public String getDescription() {
return description;
@@ -46,7 +56,7 @@ public class VersionedFlow extends BucketItem {
this.description = description;
}
- @ApiModelProperty("The metadata for each snapshot of this flow.")
+ @ApiModelProperty(value = "The metadata for each snapshot of this flow.", readOnly = true)
public SortedSet getSnapshotMetadata() {
return snapshotMetadata;
}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
index c3b368c..9f5d973 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
@@ -20,6 +20,9 @@ package org.apache.nifi.registry.flow;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import javax.validation.Valid;
+import javax.validation.constraints.NotNull;
+import javax.xml.bind.annotation.XmlRootElement;
import java.util.Objects;
/**
@@ -30,12 +33,19 @@ import java.util.Objects;
* version of the flow, the timestamp when it was saved, the contents of the flow, etc.
*
*/
+@XmlRootElement
@ApiModel(value = "versionedFlowSnapshot")
public class VersionedFlowSnapshot {
+ @Valid
+ @NotNull
private VersionedFlowSnapshotMetadata snapshotMetadata;
+
+ @Valid
+ @NotNull
private VersionedProcessGroup flowContents;
+
@ApiModelProperty("The metadata for this snapshot")
public VersionedFlowSnapshotMetadata getSnapshotMetadata() {
return snapshotMetadata;
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java
index 4cd3de9..e60dcfd 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java
@@ -19,6 +19,8 @@ package org.apache.nifi.registry.flow;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotBlank;
import java.util.Objects;
/**
@@ -28,13 +30,24 @@ import java.util.Objects;
@ApiModel(value = "versionedFlowSnapshot")
public class VersionedFlowSnapshotMetadata implements Comparable {
+ @NotBlank
private String bucketIdentifier;
+
+ @NotBlank
private String flowIdentifier;
+
+ @NotBlank
private String flowName;
+
+ @Min(1)
private int version;
+
+ @Min(1)
private long timestamp;
+
private String comments;
+
@ApiModelProperty("The identifier of the bucket this snapshot belongs to.")
public String getBucketIdentifier() {
return bucketIdentifier;
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/pom.xml b/nifi-registry-framework/pom.xml
index 4f36538..00bd2d6 100644
--- a/nifi-registry-framework/pom.xml
+++ b/nifi-registry-framework/pom.xml
@@ -62,6 +62,11 @@
org.apache.nifi.registry
+ nifi-registry-data-model
+ 0.0.1-SNAPSHOT
+
+
+ org.apache.nifi.registry
nifi-registry-provider-api
0.0.1-SNAPSHOT
@@ -71,10 +76,24 @@
0.0.1-SNAPSHOT
+ org.hibernate
+ hibernate-validator
+
+
+ org.glassfish
+ javax.el
+
+
+
junit
junit
4.12
test
+
+ org.mockito
+ mockito-core
+ test
+
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
new file mode 100644
index 0000000..a83e9e2
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.exception;
+
+/**
+ * An exception that is thrown when an entity is not found.
+ */
+public class ResourceNotFoundException extends RuntimeException {
+
+ public ResourceNotFoundException(String message) {
+ super(message);
+ }
+
+ public ResourceNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java
index 3b29e97..3527355 100644
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java
@@ -17,6 +17,7 @@
package org.apache.nifi.registry.flow;
import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.bucket.Bucket;
/**
* Standard implementation of FlowSnapshotContext.
@@ -96,6 +97,20 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext {
private String comments;
private long snapshotTimestamp;
+ public Builder() {
+
+ }
+
+ public Builder(final Bucket bucket, final VersionedFlowSnapshotMetadata snapshotMetadata) {
+ bucketId(bucket.getIdentifier());
+ bucketName(bucket.getName());
+ flowId(snapshotMetadata.getFlowIdentifier());
+ flowName(snapshotMetadata.getFlowName());
+ version(snapshotMetadata.getVersion());
+ comments(snapshotMetadata.getComments());
+ snapshotTimestamp(snapshotMetadata.getTimestamp());
+ }
+
public Builder bucketId(final String bucketId) {
this.bucketId = bucketId;
return this;
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
index 1a83d68..03b5a86 100644
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
@@ -58,7 +58,7 @@ public class StandardProviderFactory implements ProviderFactory {
try {
return JAXBContext.newInstance(JAXB_GENERATED_PATH, StandardProviderFactory.class.getClassLoader());
} catch (JAXBException e) {
- throw new RuntimeException("Unable to create JAXBContext.");
+ throw new RuntimeException("Unable to create JAXBContext.", e);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java
new file mode 100644
index 0000000..ae2482b
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.serialization.jaxb.JAXBFlowSnapshotSerializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A serializer for VersionedFlowSnapshots that maps a "version" of the data model to a serializer. The version
+ * will be written to a header at the beginning of the OutputStream, and then the object and the OutputStream will
+ * be passed on to the real serializer for the given version. Similarly, when deserializing, the header will first be
+ * read from the InputStream to determine the version, and then the InputStream will be passed to the deserializer
+ * for the given version.
+ */
+public class FlowSnapshotSerializer implements Serializer {
+
+ static final String MAGIC_HEADER = "Flows";
+ static final byte[] MAGIC_HEADER_BYTES = MAGIC_HEADER.getBytes(StandardCharsets.UTF_8);
+
+ static final Integer CURRENT_VERSION = 1;
+
+ private final Map> serializersByVersion;
+
+ public FlowSnapshotSerializer() {
+ final Map> tempSerializers = new HashMap<>();
+ tempSerializers.put(CURRENT_VERSION, new JAXBFlowSnapshotSerializer());
+ this.serializersByVersion = Collections.unmodifiableMap(tempSerializers);
+ }
+
+ @Override
+ public void serialize(final VersionedFlowSnapshot versionedFlowSnapshot, final OutputStream out) throws SerializationException {
+ final ByteBuffer byteBuffer = ByteBuffer.allocate(9);
+ byteBuffer.put(MAGIC_HEADER_BYTES);
+ byteBuffer.putInt(CURRENT_VERSION);
+
+ try {
+ out.write(byteBuffer.array());
+ } catch (final IOException e) {
+ throw new SerializationException("Unable to write header while serializing snapshot", e);
+ }
+
+ final Serializer serializer = serializersByVersion.get(CURRENT_VERSION);
+ if (serializer == null) {
+ throw new SerializationException("No flow snapshot serializer for version " + CURRENT_VERSION);
+ }
+
+ serializer.serialize(versionedFlowSnapshot, out);
+ }
+
+ @Override
+ public VersionedFlowSnapshot deserialize(final InputStream input) throws SerializationException {
+ final int headerLength = 9;
+ final byte[] buffer = new byte[headerLength];
+
+ int bytesRead = -1;
+ try {
+ bytesRead = input.read(buffer, 0, headerLength);
+ } catch (final IOException e) {
+ throw new SerializationException("Unable to read header while deserializing snapshot", e);
+ }
+
+ if (bytesRead < headerLength) {
+ throw new SerializationException("Unable to read header while deserializing snapshot, expected"
+ + headerLength + " bytes, but found " + bytesRead);
+ }
+
+ final ByteBuffer bb = ByteBuffer.wrap(buffer);
+ final int version = bb.getInt(MAGIC_HEADER_BYTES.length);
+
+ final Serializer serializer = serializersByVersion.get(Integer.valueOf(version));
+ if (serializer == null) {
+ throw new SerializationException("No flow snapshot serializer for version " + version);
+ }
+
+ return serializer.deserialize(input);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java
new file mode 100644
index 0000000..dd05e77
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+/**
+ * An error that can occur during serialization or deserialization.
+ */
+public class SerializationException extends RuntimeException {
+
+ public SerializationException(String message) {
+ super(message);
+ }
+
+ public SerializationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SerializationException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java
new file mode 100644
index 0000000..ca424ad
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Serializes and de-serializes objects.
+ */
+public interface Serializer {
+
+ /**
+ * Serializes a snapshot to the given output stream.
+ *
+ * @param t the object to serialize
+ * @param out the output stream to serialize to
+ */
+ void serialize(T t, OutputStream out) throws SerializationException;
+
+ /**
+ * Deserializes the given InputStream back to an object of the given type.
+ *
+ * @param input the InputStream to deserialize
+ * @return the deserialized object
+ */
+ T deserialize(InputStream input) throws SerializationException;
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java
new file mode 100644
index 0000000..83984b1
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization.jaxb;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+
+/**
+ * A JAXB serializer for VersionedFlowSnapshots.
+ */
+public class JAXBFlowSnapshotSerializer extends JAXBSerializer {
+
+ public JAXBFlowSnapshotSerializer() {
+ super(VersionedFlowSnapshot.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
new file mode 100644
index 0000000..eff655d
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization.jaxb;
+
+import org.apache.nifi.registry.serialization.SerializationException;
+import org.apache.nifi.registry.serialization.Serializer;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * A Serializer that uses JAXB for serializing/deserializing.
+ */
+public class JAXBSerializer implements Serializer {
+
+ private final JAXBContext jaxbContext;
+
+ /**
+ * Load the JAXBContext.
+ */
+ public JAXBSerializer(final Class clazz) {
+ try {
+ this.jaxbContext = JAXBContext.newInstance(clazz);
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to create JAXBContext.", e);
+ }
+ }
+
+ @Override
+ public void serialize(final T t, final OutputStream out) throws SerializationException {
+ if (t == null) {
+ throw new IllegalArgumentException("The object to serialize cannot be null");
+ }
+
+ if (out == null) {
+ throw new IllegalArgumentException("OutputStream cannot be null");
+ }
+
+ try {
+ final Marshaller marshaller = jaxbContext.createMarshaller();
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+ marshaller.marshal(t, out);
+ } catch (JAXBException e) {
+ throw new SerializationException("Unable to serialize object", e);
+ }
+ }
+
+ @Override
+ public T deserialize(final InputStream input) throws SerializationException {
+ if (input == null) {
+ throw new IllegalArgumentException("InputStream cannot be null");
+ }
+
+ try {
+ final Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
+ return (T) unmarshaller.unmarshal(input);
+ } catch (JAXBException e) {
+ throw new SerializationException("Unable to deserialize object", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
new file mode 100644
index 0000000..c80dc21
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.BucketMetadata;
+import org.apache.nifi.registry.metadata.FlowMetadata;
+import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.StandardBucketMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowSnapshotMetadata;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Utility for mapping between Provider API and the registry data model.
+ */
+public class DataModelMapper {
+
+ public static Bucket map(final BucketMetadata bucketMetadata) {
+ final Bucket bucket = new Bucket();
+ bucket.setIdentifier(bucketMetadata.getIdentifier());
+ bucket.setName(bucketMetadata.getName());
+ bucket.setDescription(bucketMetadata.getDescription());
+ bucket.setCreatedTimestamp(bucketMetadata.getCreatedTimestamp());
+
+ if (bucketMetadata.getFlowMetadata() != null) {
+ final Set flows = new LinkedHashSet<>();
+ bucketMetadata.getFlowMetadata().stream().forEach(f -> flows.add(map(f)));
+ bucket.setVersionedFlows(flows);
+ }
+
+ return bucket;
+ }
+
+ public static BucketMetadata map(final Bucket bucket) {
+ final StandardBucketMetadata.Builder builder = new StandardBucketMetadata.Builder()
+ .identifier(bucket.getIdentifier())
+ .name(bucket.getName())
+ .description(bucket.getDescription())
+ .created(bucket.getCreatedTimestamp());
+
+ if (bucket.getVersionedFlows() != null) {
+ bucket.getVersionedFlows().stream().forEach(f -> builder.addFlow(map(f)));
+ }
+
+ return builder.build();
+ }
+
+ public static VersionedFlow map(final FlowMetadata flowMetadata) {
+ final VersionedFlow versionedFlow = new VersionedFlow();
+ versionedFlow.setIdentifier(flowMetadata.getIdentifier());
+ versionedFlow.setName(flowMetadata.getName());
+ versionedFlow.setBucketIdentifier(flowMetadata.getBucketIdentifier());
+ versionedFlow.setDescription(flowMetadata.getDescription());
+ versionedFlow.setCreatedTimestamp(flowMetadata.getCreatedTimestamp());
+ versionedFlow.setModifiedTimestamp(flowMetadata.getModifiedTimestamp());
+
+ if (flowMetadata.getSnapshotMetadata() != null) {
+ final SortedSet snapshots = new TreeSet<>();
+ flowMetadata.getSnapshotMetadata().stream().forEach(s -> snapshots.add(map(s)));
+ versionedFlow.setSnapshotMetadata(snapshots);
+ }
+
+ return versionedFlow;
+ }
+
+ public static FlowMetadata map(final VersionedFlow versionedFlow) {
+ final StandardFlowMetadata.Builder builder = new StandardFlowMetadata.Builder()
+ .identifier(versionedFlow.getIdentifier())
+ .name(versionedFlow.getName())
+ .bucketIdentifier(versionedFlow.getBucketIdentifier())
+ .description(versionedFlow.getDescription())
+ .created(versionedFlow.getCreatedTimestamp())
+ .modified(versionedFlow.getModifiedTimestamp());
+
+ if (versionedFlow.getSnapshotMetadata() != null) {
+ versionedFlow.getSnapshotMetadata().stream().forEach(s -> builder.addSnapshot(map(s)));
+ }
+
+ return builder.build();
+ }
+
+ public static VersionedFlowSnapshotMetadata map(final FlowSnapshotMetadata flowSnapshotMetadata) {
+ final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
+ metadata.setBucketIdentifier(flowSnapshotMetadata.getBucketIdentifier());
+ metadata.setFlowIdentifier(flowSnapshotMetadata.getFlowIdentifier());
+ metadata.setFlowName(flowSnapshotMetadata.getFlowName());
+ metadata.setComments(flowSnapshotMetadata.getComments());
+ metadata.setTimestamp(flowSnapshotMetadata.getCreatedTimestamp());
+ metadata.setVersion(flowSnapshotMetadata.getVersion());
+ return metadata;
+ }
+
+ public static FlowSnapshotMetadata map(final VersionedFlowSnapshotMetadata metadata) {
+ return new StandardFlowSnapshotMetadata.Builder()
+ .bucketIdentifier(metadata.getBucketIdentifier())
+ .flowIdentifier(metadata.getFlowIdentifier())
+ .flowName(metadata.getFlowName())
+ .comments(metadata.getComments())
+ .created(metadata.getTimestamp())
+ .version(metadata.getVersion())
+ .build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
new file mode 100644
index 0000000..6f53957
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.exception.ResourceNotFoundException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.flow.StandardFlowSnapshotContext;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.BucketMetadata;
+import org.apache.nifi.registry.metadata.FlowMetadata;
+import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.MetadataProvider;
+import org.apache.nifi.registry.metadata.StandardBucketMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowMetadata;
+import org.apache.nifi.registry.serialization.Serializer;
+
+import javax.validation.ConstraintViolation;
+import javax.validation.ConstraintViolationException;
+import javax.validation.Validator;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class RegistryService {
+
+ private final MetadataProvider metadataProvider;
+ private final FlowPersistenceProvider flowPersistenceProvider;
+ private final Serializer snapshotSerializer;
+ private final Validator validator;
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Lock readLock = lock.readLock();
+ private final Lock writeLock = lock.writeLock();
+
+ public RegistryService(final MetadataProvider metadataProvider,
+ final FlowPersistenceProvider flowPersistenceProvider,
+ final Serializer snapshotSerializer,
+ final Validator validator) {
+ this.metadataProvider = metadataProvider;
+ this.flowPersistenceProvider = flowPersistenceProvider;
+ this.snapshotSerializer = snapshotSerializer;
+ this.validator = validator;
+ }
+
+ private void validate(T t, String invalidMessage) {
+ final Set> violations = validator.validate(t);
+ if (violations.size() > 0) {
+ throw new ConstraintViolationException(invalidMessage, violations);
+ }
+ }
+
+ // ---------------------- Bucket methods ---------------------------------------------
+
+ public Bucket createBucket(final Bucket bucket) {
+ if (bucket == null) {
+ throw new IllegalArgumentException("Bucket cannot be null");
+ }
+
+ // set an id, the created time, and clear out the flows since its read-only
+ bucket.setIdentifier(UUID.randomUUID().toString());
+ bucket.setCreatedTimestamp(System.currentTimeMillis());
+ bucket.setVersionedFlows(null);
+
+ validate(bucket, "Bucket is not valid");
+
+ writeLock.lock();
+ try {
+ final BucketMetadata existingBucketWithSameName = metadataProvider.getBucketByName(bucket.getName());
+ if (existingBucketWithSameName != null) {
+ throw new IllegalStateException("A bucket with the same name already exists: " + existingBucketWithSameName.getIdentifier());
+ }
+
+ final BucketMetadata createdBucket = metadataProvider.createBucket(DataModelMapper.map(bucket));
+ return DataModelMapper.map(createdBucket);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public Bucket getBucket(final String bucketIdentifier) {
+ if (bucketIdentifier == null) {
+ throw new IllegalArgumentException("Bucket Identifier cannot be null");
+ }
+
+ readLock.lock();
+ try {
+ final BucketMetadata bucket = metadataProvider.getBucketById(bucketIdentifier);
+ if (bucket == null) {
+ throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
+ }
+
+ return DataModelMapper.map(bucket);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public Set getBuckets() {
+ readLock.lock();
+ try {
+ final Set buckets = metadataProvider.getBuckets();
+ return buckets.stream().map(b -> DataModelMapper.map(b)).collect(Collectors.toSet());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public Bucket updateBucket(final Bucket bucket) {
+ if (bucket == null) {
+ throw new IllegalArgumentException("Bucket cannot be null");
+ }
+
+ if (bucket.getIdentifier() == null) {
+ throw new IllegalArgumentException("Bucket Identifier cannot be null");
+ }
+
+ writeLock.lock();
+ try {
+ // ensure a bucket with the given id exists
+ final BucketMetadata existingBucketById = metadataProvider.getBucketById(bucket.getIdentifier());
+ if (existingBucketById == null) {
+ throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucket.getIdentifier());
+ }
+
+ // ensure a different bucket with the same name does not exist
+ // since we're allowing partial updates here, only check this if a non-null name is provided
+ if (StringUtils.isNotBlank(bucket.getName())) {
+ final BucketMetadata existingBucketWithSameName = metadataProvider.getBucketByName(bucket.getName());
+ if (existingBucketWithSameName != null && !existingBucketWithSameName.getIdentifier().equals(existingBucketById.getIdentifier())) {
+ throw new IllegalStateException("A bucket with the same name already exists: " + bucket.getName());
+ }
+ }
+
+ final StandardBucketMetadata.Builder builder = new StandardBucketMetadata.Builder(existingBucketById);
+
+ // transfer over the new values to the existing bucket
+ if (StringUtils.isNotBlank(bucket.getName())) {
+ builder.name(bucket.getName());
+ }
+
+ if (bucket.getDescription() != null) {
+ builder.description(bucket.getDescription());
+ }
+
+ // perform the actual update
+ final BucketMetadata updatedBucket = metadataProvider.updateBucket(builder.build());
+ return DataModelMapper.map(updatedBucket);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public Bucket deleteBucket(final String bucketIdentifier) {
+ if (bucketIdentifier == null) {
+ throw new IllegalArgumentException("Bucket Identifier cannot be null");
+ }
+
+ writeLock.lock();
+ try {
+ // ensure the bucket exists
+ final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketIdentifier);
+ if (existingBucket == null) {
+ throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
+ }
+
+ // retrieve the versioned flows that are in this bucket
+ final Set bucketFlows = metadataProvider.getFlows(bucketIdentifier);
+
+ // for each flow in the bucket, delete all snapshots from the flow persistence provider
+ for (final FlowMetadata bucketFlow : bucketFlows) {
+ flowPersistenceProvider.deleteSnapshots(bucketIdentifier, bucketFlow.getIdentifier());
+ }
+
+ // now delete the bucket from the metadata provider, which deletes all flows referencing it
+ metadataProvider.deleteBucket(bucketIdentifier);
+
+ return DataModelMapper.map(existingBucket);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ // ---------------------- VersionedFlow methods ---------------------------------------------
+
+ public VersionedFlow createFlow(final String bucketIdentifier, final VersionedFlow versionedFlow) {
+ if (StringUtils.isBlank(bucketIdentifier)) {
+ throw new IllegalArgumentException("Bucket Identifier cannot be null or blank");
+ }
+
+ if (versionedFlow == null) {
+ throw new IllegalArgumentException("VersionedFlow cannot be null");
+ }
+
+ if (versionedFlow.getBucketIdentifier() != null && !bucketIdentifier.equals(versionedFlow.getBucketIdentifier())) {
+ throw new IllegalArgumentException("Bucket identifiers must match");
+ }
+
+ if (versionedFlow.getBucketIdentifier() == null) {
+ versionedFlow.setBucketIdentifier(bucketIdentifier);
+ }
+
+ versionedFlow.setIdentifier(UUID.randomUUID().toString());
+
+ final long timestamp = System.currentTimeMillis();
+ versionedFlow.setCreatedTimestamp(timestamp);
+ versionedFlow.setModifiedTimestamp(timestamp);
+
+ // clear out the snapshots since they are read-only
+ versionedFlow.setSnapshotMetadata(null);
+
+ validate(versionedFlow, "VersionedFlow is not valid");
+
+ writeLock.lock();
+ try {
+ // ensure the bucket exists
+ final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketIdentifier);
+ if (existingBucket == null) {
+ throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
+ }
+
+ final FlowMetadata existingFlowWithSameName = metadataProvider.getFlowByName(versionedFlow.getName());
+ if (existingFlowWithSameName != null) {
+ throw new IllegalStateException("A VersionedFlow with the same name already exists: " + existingFlowWithSameName.getIdentifier());
+ }
+
+ // create the flow
+ final FlowMetadata createdFlow = metadataProvider.createFlow(bucketIdentifier, DataModelMapper.map(versionedFlow));
+ return DataModelMapper.map(createdFlow);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public VersionedFlow getFlow(final String flowIdentifier) {
+ if (StringUtils.isBlank(flowIdentifier)) {
+ throw new IllegalArgumentException("Flow Identifier cannot be null or blank");
+ }
+
+ readLock.lock();
+ try {
+ final FlowMetadata flowMetadata = metadataProvider.getFlowById(flowIdentifier);
+ if (flowMetadata == null) {
+ throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + flowIdentifier);
+ }
+
+ return DataModelMapper.map(flowMetadata);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public Set getFlows() {
+ readLock.lock();
+ try {
+ final Set flows = metadataProvider.getFlows();
+ return flows.stream().map(f -> DataModelMapper.map(f)).collect(Collectors.toSet());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public Set getFlows(String bucketId) {
+ if (StringUtils.isBlank(bucketId)) {
+ throw new IllegalArgumentException("Bucket Identifier cannot be null");
+ }
+
+ readLock.lock();
+ try {
+ final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketId);
+ if (existingBucket == null) {
+ throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketId);
+ }
+
+ final Set flows = metadataProvider.getFlows(bucketId);
+ return flows.stream().map(f -> DataModelMapper.map(f)).collect(Collectors.toSet());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public VersionedFlow updateFlow(final VersionedFlow versionedFlow) {
+ if (versionedFlow == null) {
+ throw new IllegalArgumentException("VersionedFlow cannot be null");
+ }
+
+ if (StringUtils.isBlank(versionedFlow.getIdentifier())) {
+ throw new IllegalArgumentException("VersionedFlow identifier cannot be null or blank");
+ }
+
+ writeLock.lock();
+ try {
+ // ensure a flow with the given id exists
+ final FlowMetadata existingFlow = metadataProvider.getFlowById(versionedFlow.getIdentifier());
+ if (existingFlow == null) {
+ throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + versionedFlow.getIdentifier());
+ }
+
+ // ensure a different flow with the same name does not exist
+ // since we're allowing partial updates here, only check this if a non-null name is provided
+ if (StringUtils.isNotBlank(versionedFlow.getName())) {
+ final FlowMetadata existingFlowWithSameName = metadataProvider.getFlowByName(versionedFlow.getName());
+ if (existingFlowWithSameName != null && !existingFlowWithSameName.getIdentifier().equals(existingFlow.getIdentifier())) {
+ throw new IllegalStateException("A VersionedFlow with the same name already exists: " + versionedFlow.getName());
+ }
+ }
+
+ final StandardFlowMetadata.Builder builder = new StandardFlowMetadata.Builder(existingFlow);
+
+ // transfer over the new values to the existing flow
+ if (StringUtils.isNotBlank(versionedFlow.getName())) {
+ builder.name(versionedFlow.getName());
+ }
+
+ if (versionedFlow.getDescription() != null) {
+ builder.description(versionedFlow.getDescription());
+ }
+
+ builder.modified(System.currentTimeMillis());
+
+ // perform the actual update
+ final FlowMetadata updatedFlow = metadataProvider.updateFlow(builder.build());
+ return DataModelMapper.map(updatedFlow);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public VersionedFlow deleteFlow(final String flowIdentifier) {
+ if (StringUtils.isBlank(flowIdentifier)) {
+ throw new IllegalArgumentException("Flow Identifier cannot be null or blank");
+ }
+
+ writeLock.lock();
+ try {
+ // ensure the flow exists
+ final FlowMetadata existingFlow = metadataProvider.getFlowById(flowIdentifier);
+ if (existingFlow == null) {
+ throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + flowIdentifier);
+ }
+
+ // delete all snapshots from the flow persistence provider
+ flowPersistenceProvider.deleteSnapshots(existingFlow.getBucketIdentifier(), existingFlow.getIdentifier());
+
+ // now delete the flow from the metadata provider
+ metadataProvider.deleteFlow(flowIdentifier);
+
+ return DataModelMapper.map(existingFlow);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ // ---------------------- VersionedFlowSnapshot methods ---------------------------------------------
+
+ public VersionedFlowSnapshot createFlowSnapshot(final VersionedFlowSnapshot flowSnapshot) {
+ if (flowSnapshot == null) {
+ throw new IllegalArgumentException("VersionedFlowSnapshot cannot be null");
+ }
+
+ // validation will ensure that the metadata and contents are not null
+ if (flowSnapshot.getSnapshotMetadata() != null) {
+ flowSnapshot.getSnapshotMetadata().setTimestamp(System.currentTimeMillis());
+ }
+
+ validate(flowSnapshot, "VersionedFlowSnapshot is not valid");
+
+ writeLock.lock();
+ try {
+ final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
+
+ // ensure the bucket exists
+ final BucketMetadata existingBucket = metadataProvider.getBucketById(snapshotMetadata.getBucketIdentifier());
+ if (existingBucket == null) {
+ throw new ResourceNotFoundException("Bucket does not exist for identifier: " + snapshotMetadata.getBucketIdentifier());
+ }
+
+ // ensure the flow exists
+ final FlowMetadata existingFlowMetadata = metadataProvider.getFlowById(snapshotMetadata.getFlowIdentifier());
+ if (existingFlowMetadata == null) {
+ throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + snapshotMetadata.getFlowIdentifier());
+ }
+
+ final VersionedFlow existingFlow = DataModelMapper.map(existingFlowMetadata);
+
+ // if we already have snapshots we need to verify the new one has the correct version
+ if (existingFlow.getSnapshotMetadata() != null && existingFlow.getSnapshotMetadata().size() > 0) {
+ final VersionedFlowSnapshotMetadata lastSnapshot = existingFlow.getSnapshotMetadata().last();
+
+ if (snapshotMetadata.getVersion() <= lastSnapshot.getVersion()) {
+ throw new IllegalStateException("A VersionedFlowSnapshot with the same version already exists: " + snapshotMetadata.getVersion());
+ }
+
+ if (snapshotMetadata.getVersion() > (lastSnapshot.getVersion() + 1)) {
+ throw new IllegalStateException("Version must be a one-up number, last version was "
+ + lastSnapshot.getVersion() + " and version for this snapshot was "
+ + snapshotMetadata.getVersion());
+ }
+ } else if (snapshotMetadata.getVersion() != 1) {
+ throw new IllegalStateException("Version of first snapshot must be 1");
+ }
+
+ // serialize the snapshot
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ snapshotSerializer.serialize(flowSnapshot, out);
+
+ // save the serialized snapshot to the persistence provider
+ final Bucket bucket = DataModelMapper.map(existingBucket);
+ final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder(bucket, snapshotMetadata).build();
+ flowPersistenceProvider.saveSnapshot(context, out.toByteArray());
+
+ // create snapshot in the metadata provider
+ metadataProvider.createFlowSnapshot(DataModelMapper.map(snapshotMetadata));
+ return flowSnapshot;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public VersionedFlowSnapshot getFlowSnapshot(final String flowIdentifier, final Integer version) {
+ if (StringUtils.isBlank(flowIdentifier)) {
+ throw new IllegalArgumentException("Flow Identifier cannot be null or blank");
+ }
+
+ if (version == null) {
+ throw new IllegalArgumentException("Version cannot be null or blank");
+ }
+
+ readLock.lock();
+ try {
+ // ensure the snapshot exists
+ final FlowSnapshotMetadata snapshotMetadata = metadataProvider.getFlowSnapshot(flowIdentifier, version);
+ if (snapshotMetadata == null) {
+ throw new ResourceNotFoundException("VersionedFlowSnapshot does not exist for flow " + flowIdentifier + " and version " + version);
+ }
+
+ // get the serialized bytes of the snapshot
+ final byte[] serializedSnapshot = flowPersistenceProvider.getSnapshot(
+ snapshotMetadata.getBucketIdentifier(),
+ snapshotMetadata.getFlowIdentifier(),
+ snapshotMetadata.getVersion()
+ );
+
+ if (serializedSnapshot == null || serializedSnapshot.length == 0) {
+ throw new IllegalStateException("No serialized content found for snapshot with flow identifier "
+ + flowIdentifier + " and version " + version);
+ }
+
+ final InputStream input = new ByteArrayInputStream(serializedSnapshot);
+ return snapshotSerializer.deserialize(input);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public VersionedFlowSnapshotMetadata deleteFlowSnapshot(final String flowIdentifier, final Integer version) {
+ if (StringUtils.isBlank(flowIdentifier)) {
+ throw new IllegalArgumentException("Flow Identifier cannot be null or blank");
+ }
+
+ if (version == null) {
+ throw new IllegalArgumentException("Version cannot be null or blank");
+ }
+
+ writeLock.lock();
+ try {
+ // ensure the snapshot exists
+ final FlowSnapshotMetadata snapshotMetadata = metadataProvider.getFlowSnapshot(flowIdentifier, version);
+ if (snapshotMetadata == null) {
+ throw new ResourceNotFoundException("VersionedFlowSnapshot does not exist for flow "
+ + flowIdentifier + " and version " + version);
+ }
+
+ // delete the content of the snapshot
+ flowPersistenceProvider.deleteSnapshot(
+ snapshotMetadata.getBucketIdentifier(),
+ snapshotMetadata.getFlowIdentifier(),
+ snapshotMetadata.getVersion());
+
+ // delete the snapshot itself
+ metadataProvider.deleteFlowSnapshot(flowIdentifier, version);
+ return DataModelMapper.map(snapshotMetadata);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
index 781f348..176a9a5 100644
--- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
@@ -43,7 +43,12 @@ public class MockMetadataProvider implements MetadataProvider {
}
@Override
- public BucketMetadata getBucket(String bucketIdentifier) {
+ public BucketMetadata getBucketById(String bucketIdentifier) {
+ return null;
+ }
+
+ @Override
+ public BucketMetadata getBucketByName(String name) {
return null;
}
@@ -68,7 +73,12 @@ public class MockMetadataProvider implements MetadataProvider {
}
@Override
- public FlowMetadata getFlow(String flowIdentifier) {
+ public FlowMetadata getFlowById(String flowIdentifier) {
+ return null;
+ }
+
+ @Override
+ public FlowMetadata getFlowByName(String name) {
return null;
}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java
new file mode 100644
index 0000000..14c78b2
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+public class TestFlowSnapshotSerializer {
+
+ @Test
+ public void testSerializeDeserializeFlowSnapshot() throws SerializationException {
+ final Serializer serializer = new FlowSnapshotSerializer();
+
+ final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+ snapshotMetadata.setFlowIdentifier("flow1");
+ snapshotMetadata.setFlowName("First Flow");
+ snapshotMetadata.setVersion(1);
+ snapshotMetadata.setComments("This is the first flow");
+ snapshotMetadata.setTimestamp(System.currentTimeMillis());
+
+ final VersionedProcessGroup processGroup = new VersionedProcessGroup();
+ processGroup.setIdentifier("pg1");
+ processGroup.setName("My Process Group");
+
+ final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+ snapshot.setSnapshotMetadata(snapshotMetadata);
+ snapshot.setFlowContents(processGroup);
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ serializer.serialize(snapshot, out);
+
+ final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ final VersionedFlowSnapshot deserializedSnapshot = serializer.deserialize(in);
+ final VersionedFlowSnapshotMetadata deserializedMetadata = deserializedSnapshot.getSnapshotMetadata();
+ final VersionedProcessGroup deserializedProcessGroup = deserializedSnapshot.getFlowContents();
+
+ Assert.assertEquals(snapshotMetadata.getFlowIdentifier(), deserializedMetadata.getFlowIdentifier());
+ Assert.assertEquals(snapshotMetadata.getFlowName(), deserializedMetadata.getFlowName());
+ Assert.assertEquals(snapshotMetadata.getVersion(), deserializedMetadata.getVersion());
+ Assert.assertEquals(snapshotMetadata.getComments(), deserializedMetadata.getComments());
+ Assert.assertEquals(snapshotMetadata.getTimestamp(), deserializedMetadata.getTimestamp());
+
+ Assert.assertEquals(processGroup.getIdentifier(), deserializedProcessGroup.getIdentifier());
+ Assert.assertEquals(processGroup.getName(), deserializedProcessGroup.getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java
new file mode 100644
index 0000000..90accf6
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization.jaxb;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.serialization.SerializationException;
+import org.apache.nifi.registry.serialization.Serializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+
+public class TestJAXBFlowSnapshotSerializer {
+
+ @Test
+ public void testSerializeDeserializeFlowSnapshot() throws SerializationException {
+ final Serializer serializer = new JAXBFlowSnapshotSerializer();
+
+ final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+ snapshotMetadata.setFlowIdentifier("flow1");
+ snapshotMetadata.setFlowName("First Flow");
+ snapshotMetadata.setVersion(1);
+ snapshotMetadata.setComments("This is the first flow");
+ snapshotMetadata.setTimestamp(System.currentTimeMillis());
+
+ final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+ snapshot.setSnapshotMetadata(snapshotMetadata);
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ serializer.serialize(snapshot, out);
+
+ final String snapshotStr = new String(out.toByteArray(), StandardCharsets.UTF_8);
+ //System.out.println(snapshotStr);
+
+ final ByteArrayInputStream in = new ByteArrayInputStream(snapshotStr.getBytes(StandardCharsets.UTF_8));
+ final VersionedFlowSnapshot deserializedSnapshot = serializer.deserialize(in);
+ final VersionedFlowSnapshotMetadata deserializedMetadata = deserializedSnapshot.getSnapshotMetadata();
+
+ Assert.assertEquals(snapshotMetadata.getFlowIdentifier(), deserializedMetadata.getFlowIdentifier());
+ Assert.assertEquals(snapshotMetadata.getFlowName(), deserializedMetadata.getFlowName());
+ Assert.assertEquals(snapshotMetadata.getVersion(), deserializedMetadata.getVersion());
+ Assert.assertEquals(snapshotMetadata.getComments(), deserializedMetadata.getComments());
+ Assert.assertEquals(snapshotMetadata.getTimestamp(), deserializedMetadata.getTimestamp());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java
new file mode 100644
index 0000000..1a6f3c9
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.BucketMetadata;
+import org.apache.nifi.registry.metadata.FlowMetadata;
+import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.StandardBucketMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowSnapshotMetadata;
+import org.junit.Test;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestDataModelMapper {
+
+ @Test
+ public void testMapBucketToBucketMetadata() {
+ // create a bucket
+ final Bucket bucket = new Bucket();
+ bucket.setIdentifier("bucket1");
+ bucket.setName("Bucket 1");
+ bucket.setDescription("This is bucket 1.");
+ bucket.setCreatedTimestamp(System.currentTimeMillis());
+
+ // create a flow
+ final VersionedFlow versionedFlow = new VersionedFlow();
+ versionedFlow.setIdentifier("flow1");
+ versionedFlow.setName("Flow 1");
+ versionedFlow.setDescription("This is flow 1");
+ versionedFlow.setBucketIdentifier(bucket.getIdentifier());
+ versionedFlow.setCreatedTimestamp(System.currentTimeMillis());
+ versionedFlow.setModifiedTimestamp(System.currentTimeMillis());
+
+ // create a snapshot for the flow
+ final VersionedFlowSnapshotMetadata versionedFlowSnapshotMetadata = new VersionedFlowSnapshotMetadata();
+ versionedFlowSnapshotMetadata.setBucketIdentifier(bucket.getIdentifier());
+ versionedFlowSnapshotMetadata.setFlowIdentifier(versionedFlow.getIdentifier());
+ versionedFlowSnapshotMetadata.setFlowName(versionedFlow.getName());
+ versionedFlowSnapshotMetadata.setVersion(1);
+ versionedFlowSnapshotMetadata.setTimestamp(System.currentTimeMillis());
+ versionedFlowSnapshotMetadata.setComments("This is snapshot 1 of flow 1");
+
+ // add the snapshot to the flow
+ final SortedSet versionedFlowSnapshotMetadataSet = new TreeSet<>();
+ versionedFlowSnapshotMetadataSet.add(versionedFlowSnapshotMetadata);
+ versionedFlow.setSnapshotMetadata(versionedFlowSnapshotMetadataSet);
+
+ // add the flow to the bucket
+ final Set versionedFlows = new LinkedHashSet<>();
+ versionedFlows.add(versionedFlow);
+ bucket.setVersionedFlows(versionedFlows);
+
+ // test the mapping from bucket to bucket metadata
+
+ final BucketMetadata bucketMetadata = DataModelMapper.map(bucket);
+ assertEquals(bucket.getIdentifier(), bucketMetadata.getIdentifier());
+ assertEquals(bucket.getName(), bucketMetadata.getName());
+ assertEquals(bucket.getDescription(), bucketMetadata.getDescription());
+ assertEquals(bucket.getCreatedTimestamp(), bucketMetadata.getCreatedTimestamp());
+
+ assertNotNull(bucketMetadata.getFlowMetadata());
+ assertEquals(1, bucketMetadata.getFlowMetadata().size());
+
+ final FlowMetadata flowMetadata = bucketMetadata.getFlowMetadata().iterator().next();
+ assertNotNull(flowMetadata);
+ assertEquals(versionedFlow.getIdentifier(), flowMetadata.getIdentifier());
+ assertEquals(versionedFlow.getName(), flowMetadata.getName());
+ assertEquals(versionedFlow.getDescription(), flowMetadata.getDescription());
+ assertEquals(versionedFlow.getBucketIdentifier(), flowMetadata.getBucketIdentifier());
+ assertEquals(versionedFlow.getCreatedTimestamp(), flowMetadata.getCreatedTimestamp());
+ assertEquals(versionedFlow.getModifiedTimestamp(), flowMetadata.getModifiedTimestamp());
+
+ assertNotNull(flowMetadata.getSnapshotMetadata());
+ assertEquals(1, flowMetadata.getSnapshotMetadata().size());
+
+ final FlowSnapshotMetadata flowSnapshotMetadata = flowMetadata.getSnapshotMetadata().iterator().next();
+ assertNotNull(flowSnapshotMetadata);
+ assertEquals(versionedFlowSnapshotMetadata.getFlowIdentifier(), flowSnapshotMetadata.getFlowIdentifier());
+ assertEquals(versionedFlowSnapshotMetadata.getFlowName(), flowSnapshotMetadata.getFlowName());
+ assertEquals(versionedFlowSnapshotMetadata.getBucketIdentifier(), flowSnapshotMetadata.getBucketIdentifier());
+ assertEquals(versionedFlowSnapshotMetadata.getVersion(), flowSnapshotMetadata.getVersion());
+ assertEquals(versionedFlowSnapshotMetadata.getComments(), flowSnapshotMetadata.getComments());
+ assertEquals(versionedFlowSnapshotMetadata.getTimestamp(), flowSnapshotMetadata.getCreatedTimestamp());
+ }
+
+ @Test
+ public void testMapBucketMetadataToBucket() {
+ // create snapshot metadata
+ final FlowSnapshotMetadata snapshotMetadata = new StandardFlowSnapshotMetadata.Builder()
+ .flowIdentifier("flow1")
+ .flowName("Flow 1")
+ .bucketIdentifier("bucket1")
+ .version(1)
+ .comments("This is snapshot 1 of flow 1.")
+ .created(System.currentTimeMillis())
+ .build();
+
+ // create flow metadata
+ final FlowMetadata flowMetadata = new StandardFlowMetadata.Builder()
+ .identifier("flow1")
+ .name("Flow 1")
+ .bucketIdentifier("bucket1")
+ .description("This flow 1.")
+ .created(System.currentTimeMillis())
+ .modified(System.currentTimeMillis())
+ .addSnapshot(snapshotMetadata)
+ .build();
+
+ // create bucket metadata
+ final BucketMetadata bucketMetadata = new StandardBucketMetadata.Builder()
+ .identifier("bucket1")
+ .name("Bucket 1")
+ .description("This is bucket 1.")
+ .created(System.currentTimeMillis())
+ .addFlow(flowMetadata)
+ .build();
+
+ // test the mapping from bucket metadata to bucket
+
+ final Bucket bucket = DataModelMapper.map(bucketMetadata);
+ assertEquals(bucketMetadata.getIdentifier(), bucket.getIdentifier());
+ assertEquals(bucketMetadata.getName(), bucket.getName());
+ assertEquals(bucketMetadata.getDescription(), bucket.getDescription());
+ assertEquals(bucketMetadata.getCreatedTimestamp(), bucket.getCreatedTimestamp());
+
+ assertNotNull(bucket.getVersionedFlows());
+ assertEquals(1, bucket.getVersionedFlows().size());
+
+ final VersionedFlow versionedFlow = bucket.getVersionedFlows().iterator().next();
+ assertNotNull(versionedFlow);
+ assertEquals(flowMetadata.getIdentifier(), versionedFlow.getIdentifier());
+ assertEquals(flowMetadata.getName(), versionedFlow.getName());
+ assertEquals(flowMetadata.getBucketIdentifier(), versionedFlow.getBucketIdentifier());
+ assertEquals(flowMetadata.getDescription(), versionedFlow.getDescription());
+ assertEquals(flowMetadata.getCreatedTimestamp(), versionedFlow.getCreatedTimestamp());
+ assertEquals(flowMetadata.getModifiedTimestamp(), versionedFlow.getModifiedTimestamp());
+
+ assertNotNull(versionedFlow.getSnapshotMetadata());
+ assertEquals(1, versionedFlow.getSnapshotMetadata().size());
+
+ final VersionedFlowSnapshotMetadata versionedFlowSnapshotMetadata = versionedFlow.getSnapshotMetadata().first();
+ assertEquals(snapshotMetadata.getFlowIdentifier(), versionedFlowSnapshotMetadata.getFlowIdentifier());
+ assertEquals(snapshotMetadata.getFlowName(), versionedFlowSnapshotMetadata.getFlowName());
+ assertEquals(snapshotMetadata.getBucketIdentifier(), versionedFlowSnapshotMetadata.getBucketIdentifier());
+ assertEquals(snapshotMetadata.getVersion(), versionedFlowSnapshotMetadata.getVersion());
+ assertEquals(snapshotMetadata.getComments(), versionedFlowSnapshotMetadata.getComments());
+ assertEquals(snapshotMetadata.getCreatedTimestamp(), versionedFlowSnapshotMetadata.getTimestamp());
+ }
+
+}