Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 833DD200CE7 for ; Fri, 1 Sep 2017 17:07:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 81A1F16D384; Fri, 1 Sep 2017 15:07:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D781116D385 for ; Fri, 1 Sep 2017 17:07:27 +0200 (CEST) Received: (qmail 85516 invoked by uid 500); 1 Sep 2017 15:07:27 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 85429 invoked by uid 99); 1 Sep 2017 15:07:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Sep 2017 15:07:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 07A7DF563C; Fri, 1 Sep 2017 15:07:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bbende@apache.org To: commits@nifi.apache.org Date: Fri, 01 Sep 2017 15:07:27 -0000 Message-Id: <6ab4a3cb7dcd474bb23d387df08d171a@git.apache.org> In-Reply-To: <6dacb5f0601044bcbd03809db35a32d0@git.apache.org> References: <6dacb5f0601044bcbd03809db35a32d0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] nifi-registry git commit: NIFIREG-10 Implementing Registry service layer and connecting to REST end-points - Adding FlowSnapshotSerializer and JAXBFlowSnapshotSerializer - Adding bean validation 2.0.0 using Hibernate validator as the implementation archived-at: Fri, 01 Sep 2017 15:07:29 -0000 NIFIREG-10 Implementing Registry service layer and connecting to REST end-points - Adding FlowSnapshotSerializer and JAXBFlowSnapshotSerializer - Adding bean validation 2.0.0 using Hibernate validator as the implementation - Adding a test that can be run manual to exercise some of the REST API This closes #7. Project: http://git-wip-us.apache.org/repos/asf/nifi-registry/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-registry/commit/a1629c86 Tree: http://git-wip-us.apache.org/repos/asf/nifi-registry/tree/a1629c86 Diff: http://git-wip-us.apache.org/repos/asf/nifi-registry/diff/a1629c86 Branch: refs/heads/master Commit: a1629c86df62930f603f264495f63a2429af7f1a Parents: 88cae4f Author: Bryan Bende Authored: Tue Aug 22 16:15:39 2017 -0400 Committer: Bryan Bende Committed: Fri Sep 1 11:06:49 2017 -0400 ---------------------------------------------------------------------- build-and-run.sh | 28 + nifi-registry-data-model/pom.xml | 5 +- .../org/apache/nifi/registry/bucket/Bucket.java | 16 +- .../apache/nifi/registry/bucket/BucketItem.java | 27 +- .../nifi/registry/flow/VersionedComponent.java | 4 + .../nifi/registry/flow/VersionedFlow.java | 12 +- .../registry/flow/VersionedFlowSnapshot.java | 10 + .../flow/VersionedFlowSnapshotMetadata.java | 13 + nifi-registry-framework/pom.xml | 19 + .../exception/ResourceNotFoundException.java | 32 + .../flow/StandardFlowSnapshotContext.java | 15 + .../provider/StandardProviderFactory.java | 2 +- .../serialization/FlowSnapshotSerializer.java | 101 ++ .../serialization/SerializationException.java | 35 + .../nifi/registry/serialization/Serializer.java | 43 + .../jaxb/JAXBFlowSnapshotSerializer.java | 30 + .../serialization/jaxb/JAXBSerializer.java | 80 ++ .../nifi/registry/service/DataModelMapper.java | 125 +++ .../nifi/registry/service/RegistryService.java | 511 ++++++++++ .../registry/provider/MockMetadataProvider.java | 14 +- .../TestFlowSnapshotSerializer.java | 66 ++ .../jaxb/TestJAXBFlowSnapshotSerializer.java | 63 ++ .../registry/service/TestDataModelMapper.java | 174 ++++ .../registry/service/TestRegistryService.java | 936 +++++++++++++++++++ .../registry/metadata/MetadataProvider.java | 21 +- .../metadata/FileSystemMetadataProvider.java | 37 +- .../nifi/registry/metadata/MetadataHolder.java | 38 +- .../TestFileSystemMetadataProvider.java | 119 ++- nifi-registry-web-api/pom.xml | 10 +- .../web/NiFiRegistryResourceConfig.java | 33 + .../registry/web/api/BucketFlowResource.java | 14 +- .../nifi/registry/web/api/BucketResource.java | 58 +- .../nifi/registry/web/api/FlowResource.java | 121 ++- .../web/mapper/IllegalStateExceptionMapper.java | 44 + .../mapper/ResourceNotFoundExceptionMapper.java | 45 + .../mapper/SerializationExceptionMapper.java | 45 + .../apache/nifi/registry/web/TestRestAPI.java | 152 +++ pom.xml | 37 + 38 files changed, 3014 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/build-and-run.sh ---------------------------------------------------------------------- diff --git a/build-and-run.sh b/build-and-run.sh new file mode 100755 index 0000000..7c7d3d6 --- /dev/null +++ b/build-and-run.sh @@ -0,0 +1,28 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +REGISTRY_SCRIPT=`find nifi-registry-assembly/target/ -name nifi-registry.sh | head -1` +REGISTRY_BIN_DIR=$(dirname "${REGISTRY_SCRIPT}") +REGISTRY_DIR=$REGISTRY_BIN_DIR/.. + +./${REGISTRY_SCRIPT} stop + +mvn clean install -Pcontrib-check + +./${REGISTRY_SCRIPT} start + +tail -n 500 -f ${REGISTRY_DIR}/logs/nifi-registry-app.log http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-registry-data-model/pom.xml b/nifi-registry-data-model/pom.xml index dbdda37..438fe01 100644 --- a/nifi-registry-data-model/pom.xml +++ b/nifi-registry-data-model/pom.xml @@ -25,7 +25,10 @@ io.swagger swagger-annotations - 1.5.16 + + + javax.validation + validation-api http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java ---------------------------------------------------------------------- diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java index 2797213..9928cd8 100644 --- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java +++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java @@ -20,18 +20,32 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import org.apache.nifi.registry.flow.VersionedFlow; +import javax.validation.Valid; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotBlank; +import javax.xml.bind.annotation.XmlRootElement; import java.util.Objects; import java.util.Set; +@XmlRootElement @ApiModel(value = "bucket") public class Bucket { + @NotBlank private String identifier; + + @NotBlank private String name; + + @Min(1) private long createdTimestamp; + private String description; + + @Valid private Set versionedFlows; + @ApiModelProperty("The id of the bucket. This is set by the server at creation time.") public String getIdentifier() { return identifier; @@ -68,7 +82,7 @@ public class Bucket { this.description = description; } - @ApiModelProperty("The versioned flows in the bucket.") + @ApiModelProperty(value = "The versioned flows in the bucket.", readOnly = true) public Set getVersionedFlows() { return versionedFlows; } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java ---------------------------------------------------------------------- diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java index eec4fee..d57f07b 100644 --- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java +++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java @@ -19,17 +19,36 @@ package org.apache.nifi.registry.bucket; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; import java.util.Objects; @ApiModel("bucketItem") -public class BucketItem { +public abstract class BucketItem { + @NotBlank private String identifier; + + @NotBlank private String name; + + @NotBlank private String bucketIdentifier; + + @Min(1) private long createdTimestamp; + + @Min(1) private long modifiedTimestamp; - private BucketItemType type; + + @NotNull + private final BucketItemType type; + + public BucketItem(final BucketItemType type) { + this.type = type; + } + @ApiModelProperty("An ID to uniquely identify this object.") public String getIdentifier() { @@ -81,10 +100,6 @@ public class BucketItem { return type; } - public void setType(BucketItemType type) { - this.type = type; - } - @Override public int hashCode() { return Objects.hashCode(this.identifier); http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java ---------------------------------------------------------------------- diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java index bef9557..af9b3d7 100644 --- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java +++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java @@ -64,4 +64,8 @@ public abstract class VersionedComponent { } public abstract ComponentType getComponentType(); + + public void setComponentType(ComponentType type) { + // purposely do nothing here, this just to allow unmarshalling + } } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java ---------------------------------------------------------------------- diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java index d1b1108..8bbb040 100644 --- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java +++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java @@ -19,7 +19,10 @@ package org.apache.nifi.registry.flow; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import org.apache.nifi.registry.bucket.BucketItem; +import org.apache.nifi.registry.bucket.BucketItemType; +import javax.validation.Valid; +import javax.xml.bind.annotation.XmlRootElement; import java.util.SortedSet; /** @@ -31,12 +34,19 @@ import java.util.SortedSet; * * @see VersionedFlowSnapshot */ +@XmlRootElement @ApiModel(value = "versionedFlow") public class VersionedFlow extends BucketItem { private String description; + + @Valid private SortedSet snapshotMetadata; + public VersionedFlow() { + super(BucketItemType.FLOW); + } + @ApiModelProperty("A description of the flow.") public String getDescription() { return description; @@ -46,7 +56,7 @@ public class VersionedFlow extends BucketItem { this.description = description; } - @ApiModelProperty("The metadata for each snapshot of this flow.") + @ApiModelProperty(value = "The metadata for each snapshot of this flow.", readOnly = true) public SortedSet getSnapshotMetadata() { return snapshotMetadata; } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java ---------------------------------------------------------------------- diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java index c3b368c..9f5d973 100644 --- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java +++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java @@ -20,6 +20,9 @@ package org.apache.nifi.registry.flow; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import javax.xml.bind.annotation.XmlRootElement; import java.util.Objects; /** @@ -30,12 +33,19 @@ import java.util.Objects; * version of the flow, the timestamp when it was saved, the contents of the flow, etc. *

*/ +@XmlRootElement @ApiModel(value = "versionedFlowSnapshot") public class VersionedFlowSnapshot { + @Valid + @NotNull private VersionedFlowSnapshotMetadata snapshotMetadata; + + @Valid + @NotNull private VersionedProcessGroup flowContents; + @ApiModelProperty("The metadata for this snapshot") public VersionedFlowSnapshotMetadata getSnapshotMetadata() { return snapshotMetadata; http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java ---------------------------------------------------------------------- diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java index 4cd3de9..e60dcfd 100644 --- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java +++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java @@ -19,6 +19,8 @@ package org.apache.nifi.registry.flow; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotBlank; import java.util.Objects; /** @@ -28,13 +30,24 @@ import java.util.Objects; @ApiModel(value = "versionedFlowSnapshot") public class VersionedFlowSnapshotMetadata implements Comparable { + @NotBlank private String bucketIdentifier; + + @NotBlank private String flowIdentifier; + + @NotBlank private String flowName; + + @Min(1) private int version; + + @Min(1) private long timestamp; + private String comments; + @ApiModelProperty("The identifier of the bucket this snapshot belongs to.") public String getBucketIdentifier() { return bucketIdentifier; http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/pom.xml b/nifi-registry-framework/pom.xml index 4f36538..00bd2d6 100644 --- a/nifi-registry-framework/pom.xml +++ b/nifi-registry-framework/pom.xml @@ -62,6 +62,11 @@ org.apache.nifi.registry + nifi-registry-data-model + 0.0.1-SNAPSHOT + + + org.apache.nifi.registry nifi-registry-provider-api 0.0.1-SNAPSHOT @@ -71,10 +76,24 @@ 0.0.1-SNAPSHOT + org.hibernate + hibernate-validator + + + org.glassfish + javax.el + + + junit junit 4.12 test + + org.mockito + mockito-core + test + http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java new file mode 100644 index 0000000..a83e9e2 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.exception; + +/** + * An exception that is thrown when an entity is not found. + */ +public class ResourceNotFoundException extends RuntimeException { + + public ResourceNotFoundException(String message) { + super(message); + } + + public ResourceNotFoundException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java index 3b29e97..3527355 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.registry.flow; import org.apache.commons.lang3.Validate; +import org.apache.nifi.registry.bucket.Bucket; /** * Standard implementation of FlowSnapshotContext. @@ -96,6 +97,20 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext { private String comments; private long snapshotTimestamp; + public Builder() { + + } + + public Builder(final Bucket bucket, final VersionedFlowSnapshotMetadata snapshotMetadata) { + bucketId(bucket.getIdentifier()); + bucketName(bucket.getName()); + flowId(snapshotMetadata.getFlowIdentifier()); + flowName(snapshotMetadata.getFlowName()); + version(snapshotMetadata.getVersion()); + comments(snapshotMetadata.getComments()); + snapshotTimestamp(snapshotMetadata.getTimestamp()); + } + public Builder bucketId(final String bucketId) { this.bucketId = bucketId; return this; http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java index 1a83d68..03b5a86 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java @@ -58,7 +58,7 @@ public class StandardProviderFactory implements ProviderFactory { try { return JAXBContext.newInstance(JAXB_GENERATED_PATH, StandardProviderFactory.class.getClassLoader()); } catch (JAXBException e) { - throw new RuntimeException("Unable to create JAXBContext."); + throw new RuntimeException("Unable to create JAXBContext.", e); } } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java new file mode 100644 index 0000000..ae2482b --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.serialization; + +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.serialization.jaxb.JAXBFlowSnapshotSerializer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A serializer for VersionedFlowSnapshots that maps a "version" of the data model to a serializer. The version + * will be written to a header at the beginning of the OutputStream, and then the object and the OutputStream will + * be passed on to the real serializer for the given version. Similarly, when deserializing, the header will first be + * read from the InputStream to determine the version, and then the InputStream will be passed to the deserializer + * for the given version. + */ +public class FlowSnapshotSerializer implements Serializer { + + static final String MAGIC_HEADER = "Flows"; + static final byte[] MAGIC_HEADER_BYTES = MAGIC_HEADER.getBytes(StandardCharsets.UTF_8); + + static final Integer CURRENT_VERSION = 1; + + private final Map> serializersByVersion; + + public FlowSnapshotSerializer() { + final Map> tempSerializers = new HashMap<>(); + tempSerializers.put(CURRENT_VERSION, new JAXBFlowSnapshotSerializer()); + this.serializersByVersion = Collections.unmodifiableMap(tempSerializers); + } + + @Override + public void serialize(final VersionedFlowSnapshot versionedFlowSnapshot, final OutputStream out) throws SerializationException { + final ByteBuffer byteBuffer = ByteBuffer.allocate(9); + byteBuffer.put(MAGIC_HEADER_BYTES); + byteBuffer.putInt(CURRENT_VERSION); + + try { + out.write(byteBuffer.array()); + } catch (final IOException e) { + throw new SerializationException("Unable to write header while serializing snapshot", e); + } + + final Serializer serializer = serializersByVersion.get(CURRENT_VERSION); + if (serializer == null) { + throw new SerializationException("No flow snapshot serializer for version " + CURRENT_VERSION); + } + + serializer.serialize(versionedFlowSnapshot, out); + } + + @Override + public VersionedFlowSnapshot deserialize(final InputStream input) throws SerializationException { + final int headerLength = 9; + final byte[] buffer = new byte[headerLength]; + + int bytesRead = -1; + try { + bytesRead = input.read(buffer, 0, headerLength); + } catch (final IOException e) { + throw new SerializationException("Unable to read header while deserializing snapshot", e); + } + + if (bytesRead < headerLength) { + throw new SerializationException("Unable to read header while deserializing snapshot, expected" + + headerLength + " bytes, but found " + bytesRead); + } + + final ByteBuffer bb = ByteBuffer.wrap(buffer); + final int version = bb.getInt(MAGIC_HEADER_BYTES.length); + + final Serializer serializer = serializersByVersion.get(Integer.valueOf(version)); + if (serializer == null) { + throw new SerializationException("No flow snapshot serializer for version " + version); + } + + return serializer.deserialize(input); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java new file mode 100644 index 0000000..dd05e77 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.serialization; + +/** + * An error that can occur during serialization or deserialization. + */ +public class SerializationException extends RuntimeException { + + public SerializationException(String message) { + super(message); + } + + public SerializationException(String message, Throwable cause) { + super(message, cause); + } + + public SerializationException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java new file mode 100644 index 0000000..ca424ad --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.serialization; + +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Serializes and de-serializes objects. + */ +public interface Serializer { + + /** + * Serializes a snapshot to the given output stream. + * + * @param t the object to serialize + * @param out the output stream to serialize to + */ + void serialize(T t, OutputStream out) throws SerializationException; + + /** + * Deserializes the given InputStream back to an object of the given type. + * + * @param input the InputStream to deserialize + * @return the deserialized object + */ + T deserialize(InputStream input) throws SerializationException; + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java new file mode 100644 index 0000000..83984b1 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.serialization.jaxb; + +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; + +/** + * A JAXB serializer for VersionedFlowSnapshots. + */ +public class JAXBFlowSnapshotSerializer extends JAXBSerializer { + + public JAXBFlowSnapshotSerializer() { + super(VersionedFlowSnapshot.class); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java new file mode 100644 index 0000000..eff655d --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.serialization.jaxb; + +import org.apache.nifi.registry.serialization.SerializationException; +import org.apache.nifi.registry.serialization.Serializer; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * A Serializer that uses JAXB for serializing/deserializing. + */ +public class JAXBSerializer implements Serializer { + + private final JAXBContext jaxbContext; + + /** + * Load the JAXBContext. + */ + public JAXBSerializer(final Class clazz) { + try { + this.jaxbContext = JAXBContext.newInstance(clazz); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext.", e); + } + } + + @Override + public void serialize(final T t, final OutputStream out) throws SerializationException { + if (t == null) { + throw new IllegalArgumentException("The object to serialize cannot be null"); + } + + if (out == null) { + throw new IllegalArgumentException("OutputStream cannot be null"); + } + + try { + final Marshaller marshaller = jaxbContext.createMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); + marshaller.marshal(t, out); + } catch (JAXBException e) { + throw new SerializationException("Unable to serialize object", e); + } + } + + @Override + public T deserialize(final InputStream input) throws SerializationException { + if (input == null) { + throw new IllegalArgumentException("InputStream cannot be null"); + } + + try { + final Unmarshaller unmarshaller = jaxbContext.createUnmarshaller(); + return (T) unmarshaller.unmarshal(input); + } catch (JAXBException e) { + throw new SerializationException("Unable to deserialize object", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java new file mode 100644 index 0000000..c80dc21 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.service; + +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.metadata.BucketMetadata; +import org.apache.nifi.registry.metadata.FlowMetadata; +import org.apache.nifi.registry.metadata.FlowSnapshotMetadata; +import org.apache.nifi.registry.metadata.StandardBucketMetadata; +import org.apache.nifi.registry.metadata.StandardFlowMetadata; +import org.apache.nifi.registry.metadata.StandardFlowSnapshotMetadata; + +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Utility for mapping between Provider API and the registry data model. + */ +public class DataModelMapper { + + public static Bucket map(final BucketMetadata bucketMetadata) { + final Bucket bucket = new Bucket(); + bucket.setIdentifier(bucketMetadata.getIdentifier()); + bucket.setName(bucketMetadata.getName()); + bucket.setDescription(bucketMetadata.getDescription()); + bucket.setCreatedTimestamp(bucketMetadata.getCreatedTimestamp()); + + if (bucketMetadata.getFlowMetadata() != null) { + final Set flows = new LinkedHashSet<>(); + bucketMetadata.getFlowMetadata().stream().forEach(f -> flows.add(map(f))); + bucket.setVersionedFlows(flows); + } + + return bucket; + } + + public static BucketMetadata map(final Bucket bucket) { + final StandardBucketMetadata.Builder builder = new StandardBucketMetadata.Builder() + .identifier(bucket.getIdentifier()) + .name(bucket.getName()) + .description(bucket.getDescription()) + .created(bucket.getCreatedTimestamp()); + + if (bucket.getVersionedFlows() != null) { + bucket.getVersionedFlows().stream().forEach(f -> builder.addFlow(map(f))); + } + + return builder.build(); + } + + public static VersionedFlow map(final FlowMetadata flowMetadata) { + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setIdentifier(flowMetadata.getIdentifier()); + versionedFlow.setName(flowMetadata.getName()); + versionedFlow.setBucketIdentifier(flowMetadata.getBucketIdentifier()); + versionedFlow.setDescription(flowMetadata.getDescription()); + versionedFlow.setCreatedTimestamp(flowMetadata.getCreatedTimestamp()); + versionedFlow.setModifiedTimestamp(flowMetadata.getModifiedTimestamp()); + + if (flowMetadata.getSnapshotMetadata() != null) { + final SortedSet snapshots = new TreeSet<>(); + flowMetadata.getSnapshotMetadata().stream().forEach(s -> snapshots.add(map(s))); + versionedFlow.setSnapshotMetadata(snapshots); + } + + return versionedFlow; + } + + public static FlowMetadata map(final VersionedFlow versionedFlow) { + final StandardFlowMetadata.Builder builder = new StandardFlowMetadata.Builder() + .identifier(versionedFlow.getIdentifier()) + .name(versionedFlow.getName()) + .bucketIdentifier(versionedFlow.getBucketIdentifier()) + .description(versionedFlow.getDescription()) + .created(versionedFlow.getCreatedTimestamp()) + .modified(versionedFlow.getModifiedTimestamp()); + + if (versionedFlow.getSnapshotMetadata() != null) { + versionedFlow.getSnapshotMetadata().stream().forEach(s -> builder.addSnapshot(map(s))); + } + + return builder.build(); + } + + public static VersionedFlowSnapshotMetadata map(final FlowSnapshotMetadata flowSnapshotMetadata) { + final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); + metadata.setBucketIdentifier(flowSnapshotMetadata.getBucketIdentifier()); + metadata.setFlowIdentifier(flowSnapshotMetadata.getFlowIdentifier()); + metadata.setFlowName(flowSnapshotMetadata.getFlowName()); + metadata.setComments(flowSnapshotMetadata.getComments()); + metadata.setTimestamp(flowSnapshotMetadata.getCreatedTimestamp()); + metadata.setVersion(flowSnapshotMetadata.getVersion()); + return metadata; + } + + public static FlowSnapshotMetadata map(final VersionedFlowSnapshotMetadata metadata) { + return new StandardFlowSnapshotMetadata.Builder() + .bucketIdentifier(metadata.getBucketIdentifier()) + .flowIdentifier(metadata.getFlowIdentifier()) + .flowName(metadata.getFlowName()) + .comments(metadata.getComments()) + .created(metadata.getTimestamp()) + .version(metadata.getVersion()) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java new file mode 100644 index 0000000..6f53957 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.service; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.exception.ResourceNotFoundException; +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.flow.FlowSnapshotContext; +import org.apache.nifi.registry.flow.StandardFlowSnapshotContext; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.metadata.BucketMetadata; +import org.apache.nifi.registry.metadata.FlowMetadata; +import org.apache.nifi.registry.metadata.FlowSnapshotMetadata; +import org.apache.nifi.registry.metadata.MetadataProvider; +import org.apache.nifi.registry.metadata.StandardBucketMetadata; +import org.apache.nifi.registry.metadata.StandardFlowMetadata; +import org.apache.nifi.registry.serialization.Serializer; + +import javax.validation.ConstraintViolation; +import javax.validation.ConstraintViolationException; +import javax.validation.Validator; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +public class RegistryService { + + private final MetadataProvider metadataProvider; + private final FlowPersistenceProvider flowPersistenceProvider; + private final Serializer snapshotSerializer; + private final Validator validator; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Lock readLock = lock.readLock(); + private final Lock writeLock = lock.writeLock(); + + public RegistryService(final MetadataProvider metadataProvider, + final FlowPersistenceProvider flowPersistenceProvider, + final Serializer snapshotSerializer, + final Validator validator) { + this.metadataProvider = metadataProvider; + this.flowPersistenceProvider = flowPersistenceProvider; + this.snapshotSerializer = snapshotSerializer; + this.validator = validator; + } + + private void validate(T t, String invalidMessage) { + final Set> violations = validator.validate(t); + if (violations.size() > 0) { + throw new ConstraintViolationException(invalidMessage, violations); + } + } + + // ---------------------- Bucket methods --------------------------------------------- + + public Bucket createBucket(final Bucket bucket) { + if (bucket == null) { + throw new IllegalArgumentException("Bucket cannot be null"); + } + + // set an id, the created time, and clear out the flows since its read-only + bucket.setIdentifier(UUID.randomUUID().toString()); + bucket.setCreatedTimestamp(System.currentTimeMillis()); + bucket.setVersionedFlows(null); + + validate(bucket, "Bucket is not valid"); + + writeLock.lock(); + try { + final BucketMetadata existingBucketWithSameName = metadataProvider.getBucketByName(bucket.getName()); + if (existingBucketWithSameName != null) { + throw new IllegalStateException("A bucket with the same name already exists: " + existingBucketWithSameName.getIdentifier()); + } + + final BucketMetadata createdBucket = metadataProvider.createBucket(DataModelMapper.map(bucket)); + return DataModelMapper.map(createdBucket); + } finally { + writeLock.unlock(); + } + } + + public Bucket getBucket(final String bucketIdentifier) { + if (bucketIdentifier == null) { + throw new IllegalArgumentException("Bucket Identifier cannot be null"); + } + + readLock.lock(); + try { + final BucketMetadata bucket = metadataProvider.getBucketById(bucketIdentifier); + if (bucket == null) { + throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier); + } + + return DataModelMapper.map(bucket); + } finally { + readLock.unlock(); + } + } + + public Set getBuckets() { + readLock.lock(); + try { + final Set buckets = metadataProvider.getBuckets(); + return buckets.stream().map(b -> DataModelMapper.map(b)).collect(Collectors.toSet()); + } finally { + readLock.unlock(); + } + } + + public Bucket updateBucket(final Bucket bucket) { + if (bucket == null) { + throw new IllegalArgumentException("Bucket cannot be null"); + } + + if (bucket.getIdentifier() == null) { + throw new IllegalArgumentException("Bucket Identifier cannot be null"); + } + + writeLock.lock(); + try { + // ensure a bucket with the given id exists + final BucketMetadata existingBucketById = metadataProvider.getBucketById(bucket.getIdentifier()); + if (existingBucketById == null) { + throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucket.getIdentifier()); + } + + // ensure a different bucket with the same name does not exist + // since we're allowing partial updates here, only check this if a non-null name is provided + if (StringUtils.isNotBlank(bucket.getName())) { + final BucketMetadata existingBucketWithSameName = metadataProvider.getBucketByName(bucket.getName()); + if (existingBucketWithSameName != null && !existingBucketWithSameName.getIdentifier().equals(existingBucketById.getIdentifier())) { + throw new IllegalStateException("A bucket with the same name already exists: " + bucket.getName()); + } + } + + final StandardBucketMetadata.Builder builder = new StandardBucketMetadata.Builder(existingBucketById); + + // transfer over the new values to the existing bucket + if (StringUtils.isNotBlank(bucket.getName())) { + builder.name(bucket.getName()); + } + + if (bucket.getDescription() != null) { + builder.description(bucket.getDescription()); + } + + // perform the actual update + final BucketMetadata updatedBucket = metadataProvider.updateBucket(builder.build()); + return DataModelMapper.map(updatedBucket); + } finally { + writeLock.unlock(); + } + } + + public Bucket deleteBucket(final String bucketIdentifier) { + if (bucketIdentifier == null) { + throw new IllegalArgumentException("Bucket Identifier cannot be null"); + } + + writeLock.lock(); + try { + // ensure the bucket exists + final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketIdentifier); + if (existingBucket == null) { + throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier); + } + + // retrieve the versioned flows that are in this bucket + final Set bucketFlows = metadataProvider.getFlows(bucketIdentifier); + + // for each flow in the bucket, delete all snapshots from the flow persistence provider + for (final FlowMetadata bucketFlow : bucketFlows) { + flowPersistenceProvider.deleteSnapshots(bucketIdentifier, bucketFlow.getIdentifier()); + } + + // now delete the bucket from the metadata provider, which deletes all flows referencing it + metadataProvider.deleteBucket(bucketIdentifier); + + return DataModelMapper.map(existingBucket); + } finally { + writeLock.unlock(); + } + } + + // ---------------------- VersionedFlow methods --------------------------------------------- + + public VersionedFlow createFlow(final String bucketIdentifier, final VersionedFlow versionedFlow) { + if (StringUtils.isBlank(bucketIdentifier)) { + throw new IllegalArgumentException("Bucket Identifier cannot be null or blank"); + } + + if (versionedFlow == null) { + throw new IllegalArgumentException("VersionedFlow cannot be null"); + } + + if (versionedFlow.getBucketIdentifier() != null && !bucketIdentifier.equals(versionedFlow.getBucketIdentifier())) { + throw new IllegalArgumentException("Bucket identifiers must match"); + } + + if (versionedFlow.getBucketIdentifier() == null) { + versionedFlow.setBucketIdentifier(bucketIdentifier); + } + + versionedFlow.setIdentifier(UUID.randomUUID().toString()); + + final long timestamp = System.currentTimeMillis(); + versionedFlow.setCreatedTimestamp(timestamp); + versionedFlow.setModifiedTimestamp(timestamp); + + // clear out the snapshots since they are read-only + versionedFlow.setSnapshotMetadata(null); + + validate(versionedFlow, "VersionedFlow is not valid"); + + writeLock.lock(); + try { + // ensure the bucket exists + final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketIdentifier); + if (existingBucket == null) { + throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier); + } + + final FlowMetadata existingFlowWithSameName = metadataProvider.getFlowByName(versionedFlow.getName()); + if (existingFlowWithSameName != null) { + throw new IllegalStateException("A VersionedFlow with the same name already exists: " + existingFlowWithSameName.getIdentifier()); + } + + // create the flow + final FlowMetadata createdFlow = metadataProvider.createFlow(bucketIdentifier, DataModelMapper.map(versionedFlow)); + return DataModelMapper.map(createdFlow); + } finally { + writeLock.unlock(); + } + } + + public VersionedFlow getFlow(final String flowIdentifier) { + if (StringUtils.isBlank(flowIdentifier)) { + throw new IllegalArgumentException("Flow Identifier cannot be null or blank"); + } + + readLock.lock(); + try { + final FlowMetadata flowMetadata = metadataProvider.getFlowById(flowIdentifier); + if (flowMetadata == null) { + throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + flowIdentifier); + } + + return DataModelMapper.map(flowMetadata); + } finally { + readLock.unlock(); + } + } + + public Set getFlows() { + readLock.lock(); + try { + final Set flows = metadataProvider.getFlows(); + return flows.stream().map(f -> DataModelMapper.map(f)).collect(Collectors.toSet()); + } finally { + readLock.unlock(); + } + } + + public Set getFlows(String bucketId) { + if (StringUtils.isBlank(bucketId)) { + throw new IllegalArgumentException("Bucket Identifier cannot be null"); + } + + readLock.lock(); + try { + final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketId); + if (existingBucket == null) { + throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketId); + } + + final Set flows = metadataProvider.getFlows(bucketId); + return flows.stream().map(f -> DataModelMapper.map(f)).collect(Collectors.toSet()); + } finally { + readLock.unlock(); + } + } + + public VersionedFlow updateFlow(final VersionedFlow versionedFlow) { + if (versionedFlow == null) { + throw new IllegalArgumentException("VersionedFlow cannot be null"); + } + + if (StringUtils.isBlank(versionedFlow.getIdentifier())) { + throw new IllegalArgumentException("VersionedFlow identifier cannot be null or blank"); + } + + writeLock.lock(); + try { + // ensure a flow with the given id exists + final FlowMetadata existingFlow = metadataProvider.getFlowById(versionedFlow.getIdentifier()); + if (existingFlow == null) { + throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + versionedFlow.getIdentifier()); + } + + // ensure a different flow with the same name does not exist + // since we're allowing partial updates here, only check this if a non-null name is provided + if (StringUtils.isNotBlank(versionedFlow.getName())) { + final FlowMetadata existingFlowWithSameName = metadataProvider.getFlowByName(versionedFlow.getName()); + if (existingFlowWithSameName != null && !existingFlowWithSameName.getIdentifier().equals(existingFlow.getIdentifier())) { + throw new IllegalStateException("A VersionedFlow with the same name already exists: " + versionedFlow.getName()); + } + } + + final StandardFlowMetadata.Builder builder = new StandardFlowMetadata.Builder(existingFlow); + + // transfer over the new values to the existing flow + if (StringUtils.isNotBlank(versionedFlow.getName())) { + builder.name(versionedFlow.getName()); + } + + if (versionedFlow.getDescription() != null) { + builder.description(versionedFlow.getDescription()); + } + + builder.modified(System.currentTimeMillis()); + + // perform the actual update + final FlowMetadata updatedFlow = metadataProvider.updateFlow(builder.build()); + return DataModelMapper.map(updatedFlow); + } finally { + writeLock.unlock(); + } + } + + public VersionedFlow deleteFlow(final String flowIdentifier) { + if (StringUtils.isBlank(flowIdentifier)) { + throw new IllegalArgumentException("Flow Identifier cannot be null or blank"); + } + + writeLock.lock(); + try { + // ensure the flow exists + final FlowMetadata existingFlow = metadataProvider.getFlowById(flowIdentifier); + if (existingFlow == null) { + throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + flowIdentifier); + } + + // delete all snapshots from the flow persistence provider + flowPersistenceProvider.deleteSnapshots(existingFlow.getBucketIdentifier(), existingFlow.getIdentifier()); + + // now delete the flow from the metadata provider + metadataProvider.deleteFlow(flowIdentifier); + + return DataModelMapper.map(existingFlow); + } finally { + writeLock.unlock(); + } + } + + // ---------------------- VersionedFlowSnapshot methods --------------------------------------------- + + public VersionedFlowSnapshot createFlowSnapshot(final VersionedFlowSnapshot flowSnapshot) { + if (flowSnapshot == null) { + throw new IllegalArgumentException("VersionedFlowSnapshot cannot be null"); + } + + // validation will ensure that the metadata and contents are not null + if (flowSnapshot.getSnapshotMetadata() != null) { + flowSnapshot.getSnapshotMetadata().setTimestamp(System.currentTimeMillis()); + } + + validate(flowSnapshot, "VersionedFlowSnapshot is not valid"); + + writeLock.lock(); + try { + final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata(); + + // ensure the bucket exists + final BucketMetadata existingBucket = metadataProvider.getBucketById(snapshotMetadata.getBucketIdentifier()); + if (existingBucket == null) { + throw new ResourceNotFoundException("Bucket does not exist for identifier: " + snapshotMetadata.getBucketIdentifier()); + } + + // ensure the flow exists + final FlowMetadata existingFlowMetadata = metadataProvider.getFlowById(snapshotMetadata.getFlowIdentifier()); + if (existingFlowMetadata == null) { + throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + snapshotMetadata.getFlowIdentifier()); + } + + final VersionedFlow existingFlow = DataModelMapper.map(existingFlowMetadata); + + // if we already have snapshots we need to verify the new one has the correct version + if (existingFlow.getSnapshotMetadata() != null && existingFlow.getSnapshotMetadata().size() > 0) { + final VersionedFlowSnapshotMetadata lastSnapshot = existingFlow.getSnapshotMetadata().last(); + + if (snapshotMetadata.getVersion() <= lastSnapshot.getVersion()) { + throw new IllegalStateException("A VersionedFlowSnapshot with the same version already exists: " + snapshotMetadata.getVersion()); + } + + if (snapshotMetadata.getVersion() > (lastSnapshot.getVersion() + 1)) { + throw new IllegalStateException("Version must be a one-up number, last version was " + + lastSnapshot.getVersion() + " and version for this snapshot was " + + snapshotMetadata.getVersion()); + } + } else if (snapshotMetadata.getVersion() != 1) { + throw new IllegalStateException("Version of first snapshot must be 1"); + } + + // serialize the snapshot + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + snapshotSerializer.serialize(flowSnapshot, out); + + // save the serialized snapshot to the persistence provider + final Bucket bucket = DataModelMapper.map(existingBucket); + final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder(bucket, snapshotMetadata).build(); + flowPersistenceProvider.saveSnapshot(context, out.toByteArray()); + + // create snapshot in the metadata provider + metadataProvider.createFlowSnapshot(DataModelMapper.map(snapshotMetadata)); + return flowSnapshot; + } finally { + writeLock.unlock(); + } + } + + public VersionedFlowSnapshot getFlowSnapshot(final String flowIdentifier, final Integer version) { + if (StringUtils.isBlank(flowIdentifier)) { + throw new IllegalArgumentException("Flow Identifier cannot be null or blank"); + } + + if (version == null) { + throw new IllegalArgumentException("Version cannot be null or blank"); + } + + readLock.lock(); + try { + // ensure the snapshot exists + final FlowSnapshotMetadata snapshotMetadata = metadataProvider.getFlowSnapshot(flowIdentifier, version); + if (snapshotMetadata == null) { + throw new ResourceNotFoundException("VersionedFlowSnapshot does not exist for flow " + flowIdentifier + " and version " + version); + } + + // get the serialized bytes of the snapshot + final byte[] serializedSnapshot = flowPersistenceProvider.getSnapshot( + snapshotMetadata.getBucketIdentifier(), + snapshotMetadata.getFlowIdentifier(), + snapshotMetadata.getVersion() + ); + + if (serializedSnapshot == null || serializedSnapshot.length == 0) { + throw new IllegalStateException("No serialized content found for snapshot with flow identifier " + + flowIdentifier + " and version " + version); + } + + final InputStream input = new ByteArrayInputStream(serializedSnapshot); + return snapshotSerializer.deserialize(input); + } finally { + readLock.unlock(); + } + } + + public VersionedFlowSnapshotMetadata deleteFlowSnapshot(final String flowIdentifier, final Integer version) { + if (StringUtils.isBlank(flowIdentifier)) { + throw new IllegalArgumentException("Flow Identifier cannot be null or blank"); + } + + if (version == null) { + throw new IllegalArgumentException("Version cannot be null or blank"); + } + + writeLock.lock(); + try { + // ensure the snapshot exists + final FlowSnapshotMetadata snapshotMetadata = metadataProvider.getFlowSnapshot(flowIdentifier, version); + if (snapshotMetadata == null) { + throw new ResourceNotFoundException("VersionedFlowSnapshot does not exist for flow " + + flowIdentifier + " and version " + version); + } + + // delete the content of the snapshot + flowPersistenceProvider.deleteSnapshot( + snapshotMetadata.getBucketIdentifier(), + snapshotMetadata.getFlowIdentifier(), + snapshotMetadata.getVersion()); + + // delete the snapshot itself + metadataProvider.deleteFlowSnapshot(flowIdentifier, version); + return DataModelMapper.map(snapshotMetadata); + } finally { + writeLock.unlock(); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java index 781f348..176a9a5 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java @@ -43,7 +43,12 @@ public class MockMetadataProvider implements MetadataProvider { } @Override - public BucketMetadata getBucket(String bucketIdentifier) { + public BucketMetadata getBucketById(String bucketIdentifier) { + return null; + } + + @Override + public BucketMetadata getBucketByName(String name) { return null; } @@ -68,7 +73,12 @@ public class MockMetadataProvider implements MetadataProvider { } @Override - public FlowMetadata getFlow(String flowIdentifier) { + public FlowMetadata getFlowById(String flowIdentifier) { + return null; + } + + @Override + public FlowMetadata getFlowByName(String name) { return null; } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java new file mode 100644 index 0000000..14c78b2 --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.serialization; + +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +public class TestFlowSnapshotSerializer { + + @Test + public void testSerializeDeserializeFlowSnapshot() throws SerializationException { + final Serializer serializer = new FlowSnapshotSerializer(); + + final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); + snapshotMetadata.setFlowIdentifier("flow1"); + snapshotMetadata.setFlowName("First Flow"); + snapshotMetadata.setVersion(1); + snapshotMetadata.setComments("This is the first flow"); + snapshotMetadata.setTimestamp(System.currentTimeMillis()); + + final VersionedProcessGroup processGroup = new VersionedProcessGroup(); + processGroup.setIdentifier("pg1"); + processGroup.setName("My Process Group"); + + final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); + snapshot.setSnapshotMetadata(snapshotMetadata); + snapshot.setFlowContents(processGroup); + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + serializer.serialize(snapshot, out); + + final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + final VersionedFlowSnapshot deserializedSnapshot = serializer.deserialize(in); + final VersionedFlowSnapshotMetadata deserializedMetadata = deserializedSnapshot.getSnapshotMetadata(); + final VersionedProcessGroup deserializedProcessGroup = deserializedSnapshot.getFlowContents(); + + Assert.assertEquals(snapshotMetadata.getFlowIdentifier(), deserializedMetadata.getFlowIdentifier()); + Assert.assertEquals(snapshotMetadata.getFlowName(), deserializedMetadata.getFlowName()); + Assert.assertEquals(snapshotMetadata.getVersion(), deserializedMetadata.getVersion()); + Assert.assertEquals(snapshotMetadata.getComments(), deserializedMetadata.getComments()); + Assert.assertEquals(snapshotMetadata.getTimestamp(), deserializedMetadata.getTimestamp()); + + Assert.assertEquals(processGroup.getIdentifier(), deserializedProcessGroup.getIdentifier()); + Assert.assertEquals(processGroup.getName(), deserializedProcessGroup.getName()); + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java new file mode 100644 index 0000000..90accf6 --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.serialization.jaxb; + +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.serialization.SerializationException; +import org.apache.nifi.registry.serialization.Serializer; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; + +public class TestJAXBFlowSnapshotSerializer { + + @Test + public void testSerializeDeserializeFlowSnapshot() throws SerializationException { + final Serializer serializer = new JAXBFlowSnapshotSerializer(); + + final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); + snapshotMetadata.setFlowIdentifier("flow1"); + snapshotMetadata.setFlowName("First Flow"); + snapshotMetadata.setVersion(1); + snapshotMetadata.setComments("This is the first flow"); + snapshotMetadata.setTimestamp(System.currentTimeMillis()); + + final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); + snapshot.setSnapshotMetadata(snapshotMetadata); + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + serializer.serialize(snapshot, out); + + final String snapshotStr = new String(out.toByteArray(), StandardCharsets.UTF_8); + //System.out.println(snapshotStr); + + final ByteArrayInputStream in = new ByteArrayInputStream(snapshotStr.getBytes(StandardCharsets.UTF_8)); + final VersionedFlowSnapshot deserializedSnapshot = serializer.deserialize(in); + final VersionedFlowSnapshotMetadata deserializedMetadata = deserializedSnapshot.getSnapshotMetadata(); + + Assert.assertEquals(snapshotMetadata.getFlowIdentifier(), deserializedMetadata.getFlowIdentifier()); + Assert.assertEquals(snapshotMetadata.getFlowName(), deserializedMetadata.getFlowName()); + Assert.assertEquals(snapshotMetadata.getVersion(), deserializedMetadata.getVersion()); + Assert.assertEquals(snapshotMetadata.getComments(), deserializedMetadata.getComments()); + Assert.assertEquals(snapshotMetadata.getTimestamp(), deserializedMetadata.getTimestamp()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java new file mode 100644 index 0000000..1a6f3c9 --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.service; + +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.metadata.BucketMetadata; +import org.apache.nifi.registry.metadata.FlowMetadata; +import org.apache.nifi.registry.metadata.FlowSnapshotMetadata; +import org.apache.nifi.registry.metadata.StandardBucketMetadata; +import org.apache.nifi.registry.metadata.StandardFlowMetadata; +import org.apache.nifi.registry.metadata.StandardFlowSnapshotMetadata; +import org.junit.Test; + +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestDataModelMapper { + + @Test + public void testMapBucketToBucketMetadata() { + // create a bucket + final Bucket bucket = new Bucket(); + bucket.setIdentifier("bucket1"); + bucket.setName("Bucket 1"); + bucket.setDescription("This is bucket 1."); + bucket.setCreatedTimestamp(System.currentTimeMillis()); + + // create a flow + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setIdentifier("flow1"); + versionedFlow.setName("Flow 1"); + versionedFlow.setDescription("This is flow 1"); + versionedFlow.setBucketIdentifier(bucket.getIdentifier()); + versionedFlow.setCreatedTimestamp(System.currentTimeMillis()); + versionedFlow.setModifiedTimestamp(System.currentTimeMillis()); + + // create a snapshot for the flow + final VersionedFlowSnapshotMetadata versionedFlowSnapshotMetadata = new VersionedFlowSnapshotMetadata(); + versionedFlowSnapshotMetadata.setBucketIdentifier(bucket.getIdentifier()); + versionedFlowSnapshotMetadata.setFlowIdentifier(versionedFlow.getIdentifier()); + versionedFlowSnapshotMetadata.setFlowName(versionedFlow.getName()); + versionedFlowSnapshotMetadata.setVersion(1); + versionedFlowSnapshotMetadata.setTimestamp(System.currentTimeMillis()); + versionedFlowSnapshotMetadata.setComments("This is snapshot 1 of flow 1"); + + // add the snapshot to the flow + final SortedSet versionedFlowSnapshotMetadataSet = new TreeSet<>(); + versionedFlowSnapshotMetadataSet.add(versionedFlowSnapshotMetadata); + versionedFlow.setSnapshotMetadata(versionedFlowSnapshotMetadataSet); + + // add the flow to the bucket + final Set versionedFlows = new LinkedHashSet<>(); + versionedFlows.add(versionedFlow); + bucket.setVersionedFlows(versionedFlows); + + // test the mapping from bucket to bucket metadata + + final BucketMetadata bucketMetadata = DataModelMapper.map(bucket); + assertEquals(bucket.getIdentifier(), bucketMetadata.getIdentifier()); + assertEquals(bucket.getName(), bucketMetadata.getName()); + assertEquals(bucket.getDescription(), bucketMetadata.getDescription()); + assertEquals(bucket.getCreatedTimestamp(), bucketMetadata.getCreatedTimestamp()); + + assertNotNull(bucketMetadata.getFlowMetadata()); + assertEquals(1, bucketMetadata.getFlowMetadata().size()); + + final FlowMetadata flowMetadata = bucketMetadata.getFlowMetadata().iterator().next(); + assertNotNull(flowMetadata); + assertEquals(versionedFlow.getIdentifier(), flowMetadata.getIdentifier()); + assertEquals(versionedFlow.getName(), flowMetadata.getName()); + assertEquals(versionedFlow.getDescription(), flowMetadata.getDescription()); + assertEquals(versionedFlow.getBucketIdentifier(), flowMetadata.getBucketIdentifier()); + assertEquals(versionedFlow.getCreatedTimestamp(), flowMetadata.getCreatedTimestamp()); + assertEquals(versionedFlow.getModifiedTimestamp(), flowMetadata.getModifiedTimestamp()); + + assertNotNull(flowMetadata.getSnapshotMetadata()); + assertEquals(1, flowMetadata.getSnapshotMetadata().size()); + + final FlowSnapshotMetadata flowSnapshotMetadata = flowMetadata.getSnapshotMetadata().iterator().next(); + assertNotNull(flowSnapshotMetadata); + assertEquals(versionedFlowSnapshotMetadata.getFlowIdentifier(), flowSnapshotMetadata.getFlowIdentifier()); + assertEquals(versionedFlowSnapshotMetadata.getFlowName(), flowSnapshotMetadata.getFlowName()); + assertEquals(versionedFlowSnapshotMetadata.getBucketIdentifier(), flowSnapshotMetadata.getBucketIdentifier()); + assertEquals(versionedFlowSnapshotMetadata.getVersion(), flowSnapshotMetadata.getVersion()); + assertEquals(versionedFlowSnapshotMetadata.getComments(), flowSnapshotMetadata.getComments()); + assertEquals(versionedFlowSnapshotMetadata.getTimestamp(), flowSnapshotMetadata.getCreatedTimestamp()); + } + + @Test + public void testMapBucketMetadataToBucket() { + // create snapshot metadata + final FlowSnapshotMetadata snapshotMetadata = new StandardFlowSnapshotMetadata.Builder() + .flowIdentifier("flow1") + .flowName("Flow 1") + .bucketIdentifier("bucket1") + .version(1) + .comments("This is snapshot 1 of flow 1.") + .created(System.currentTimeMillis()) + .build(); + + // create flow metadata + final FlowMetadata flowMetadata = new StandardFlowMetadata.Builder() + .identifier("flow1") + .name("Flow 1") + .bucketIdentifier("bucket1") + .description("This flow 1.") + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .addSnapshot(snapshotMetadata) + .build(); + + // create bucket metadata + final BucketMetadata bucketMetadata = new StandardBucketMetadata.Builder() + .identifier("bucket1") + .name("Bucket 1") + .description("This is bucket 1.") + .created(System.currentTimeMillis()) + .addFlow(flowMetadata) + .build(); + + // test the mapping from bucket metadata to bucket + + final Bucket bucket = DataModelMapper.map(bucketMetadata); + assertEquals(bucketMetadata.getIdentifier(), bucket.getIdentifier()); + assertEquals(bucketMetadata.getName(), bucket.getName()); + assertEquals(bucketMetadata.getDescription(), bucket.getDescription()); + assertEquals(bucketMetadata.getCreatedTimestamp(), bucket.getCreatedTimestamp()); + + assertNotNull(bucket.getVersionedFlows()); + assertEquals(1, bucket.getVersionedFlows().size()); + + final VersionedFlow versionedFlow = bucket.getVersionedFlows().iterator().next(); + assertNotNull(versionedFlow); + assertEquals(flowMetadata.getIdentifier(), versionedFlow.getIdentifier()); + assertEquals(flowMetadata.getName(), versionedFlow.getName()); + assertEquals(flowMetadata.getBucketIdentifier(), versionedFlow.getBucketIdentifier()); + assertEquals(flowMetadata.getDescription(), versionedFlow.getDescription()); + assertEquals(flowMetadata.getCreatedTimestamp(), versionedFlow.getCreatedTimestamp()); + assertEquals(flowMetadata.getModifiedTimestamp(), versionedFlow.getModifiedTimestamp()); + + assertNotNull(versionedFlow.getSnapshotMetadata()); + assertEquals(1, versionedFlow.getSnapshotMetadata().size()); + + final VersionedFlowSnapshotMetadata versionedFlowSnapshotMetadata = versionedFlow.getSnapshotMetadata().first(); + assertEquals(snapshotMetadata.getFlowIdentifier(), versionedFlowSnapshotMetadata.getFlowIdentifier()); + assertEquals(snapshotMetadata.getFlowName(), versionedFlowSnapshotMetadata.getFlowName()); + assertEquals(snapshotMetadata.getBucketIdentifier(), versionedFlowSnapshotMetadata.getBucketIdentifier()); + assertEquals(snapshotMetadata.getVersion(), versionedFlowSnapshotMetadata.getVersion()); + assertEquals(snapshotMetadata.getComments(), versionedFlowSnapshotMetadata.getComments()); + assertEquals(snapshotMetadata.getCreatedTimestamp(), versionedFlowSnapshotMetadata.getTimestamp()); + } + +}