atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [1/2] incubator-atlas git commit: ATLAS-158 Provide Atlas Entity Change Notification (tbeerbower via shwethags)
Date Tue, 10 Nov 2015 15:52:43 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master c93e0972a -> 6f421e997


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
index 213e46c..6102427 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
@@ -20,9 +20,12 @@ package org.apache.atlas.typesystem;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.classification.InterfaceAudience;
 import org.apache.atlas.typesystem.persistence.Id;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -75,6 +78,27 @@ public class Referenceable extends Struct implements IReferenceableInstance
{
         traits = ImmutableMap.copyOf(_traits);
     }
 
+    /**
+     * Construct a Referenceable from the given ITypedReferenceableInstance.
+     *
+     * @param instance  the typed referenceable instance to copy
+     *
+     * @throws AtlasException if the referenceable can not be created
+     */
+    public Referenceable(ITypedReferenceableInstance instance) throws AtlasException {
+        this(instance.getId()._getId(), instance.getTypeName(), instance.getValuesMap(),
instance.getTraits(),
+            getTraits(instance));
+    }
+
+    /**
+     * No-arg constructor for serialization.
+     */
+    @SuppressWarnings("unused")
+    private Referenceable() {
+        this("", "", Collections.<String, Object>emptyMap(), Collections.<String>emptyList(),
+            Collections.<String, IStruct>emptyMap());
+    }
+
     @Override
     public ImmutableList<String> getTraits() {
         return traitNames;
@@ -89,4 +113,13 @@ public class Referenceable extends Struct implements IReferenceableInstance
{
     public IStruct getTrait(String typeName) {
         return traits.get(typeName);
     }
+
+    private static Map<String, IStruct> getTraits(ITypedReferenceableInstance instance)
{
+        Map<String, IStruct> traits = new HashMap<>();
+
+        for (String traitName : instance.getTraits() ) {
+            traits.put(traitName, instance.getTrait(traitName));
+        }
+        return traits;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java b/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java
index d57fbe0..d03e2c2 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java
@@ -20,6 +20,7 @@ package org.apache.atlas.typesystem;
 
 import org.apache.atlas.classification.InterfaceAudience;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -41,6 +42,15 @@ public class Struct implements IStruct {
         }
     }
 
+    /**
+     * No-arg constructor for serialization.
+     */
+    @SuppressWarnings("unused")
+    private Struct() {
+        this("", Collections.<String, Object>emptyMap());
+    }
+
+
     @Override
     public String getTypeName() {
         return typeName;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/typesystem/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties
index 341acec..8d66dc3 100644
--- a/typesystem/src/main/resources/application.properties
+++ b/typesystem/src/main/resources/application.properties
@@ -61,6 +61,8 @@ atlas.kafka.data=target/data/kafka
 atlas.kafka.zookeeper.session.timeout.ms=400
 atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=100
+atlas.kafka.hook.group.id=atlas
+atlas.kafka.entities.group.id=atlas_entities
 
 #########  Security Properties  #########
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index fd05d28..f9c0cbb 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -33,9 +33,13 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RepositoryMetadataModule;
+import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
 import org.apache.atlas.repository.graph.GraphProvider;
 import org.apache.atlas.service.Services;
+import org.apache.atlas.services.MetadataService;
+import org.apache.atlas.typesystem.types.TypeSystem;
 import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
 import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.commons.configuration.Configuration;
@@ -114,6 +118,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
         LoginProcessor loginProcessor = new LoginProcessor();
         loginProcessor.login();
 
+        initMetadataService();
         startServices();
     }
 
@@ -154,4 +159,17 @@ public class GuiceServletConfig extends GuiceServletContextListener {
         Services services = injector.getInstance(Services.class);
         services.stop();
     }
+
+    // initialize the metadata service
+    private void initMetadataService() {
+        MetadataService metadataService = injector.getInstance(MetadataService.class);
+
+        // add a listener for entity changes
+        NotificationInterface notificationInterface = injector.getInstance(NotificationInterface.class);
+
+        NotificationEntityChangeListener listener =
+            new NotificationEntityChangeListener(notificationInterface, TypeSystem.getInstance());
+
+        metadataService.registerListener(listener);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
new file mode 100644
index 0000000..204b95a
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.inject.Inject;
+import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.web.resources.BaseResourceIT;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+/**
+ * Entity Notification Integration Tests.
+ */
+@Guice(modules = NotificationModule.class)
+public class EntityNotificationIT extends BaseResourceIT {
+
+  @Inject
+  private NotificationInterface notificationInterface;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    super.setUp();
+    createTypeDefinitions();
+  }
+
+  @Test
+  public void testEntityNotification() throws Exception {
+
+    List<NotificationConsumer<EntityNotification>> consumers =
+        notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES,
1);
+
+    NotificationConsumer<EntityNotification> consumer =  consumers.iterator().next();
+    final EntityNotificationConsumer notificationConsumer = new EntityNotificationConsumer(consumer);
+    Thread thread = new Thread(notificationConsumer);
+    thread.start();
+
+    createEntity("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
+
+    waitFor(10000, new Predicate() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return notificationConsumer.entityNotification != null;
+      }
+    });
+
+    Assert.assertNotNull(notificationConsumer.entityNotification);
+    Assert.assertEquals(EntityNotification.OperationType.ENTITY_CREATE, notificationConsumer.entityNotification.getOperationType());
+    Assert.assertEquals(DATABASE_TYPE, notificationConsumer.entityNotification.getEntity().getTypeName());
+    Assert.assertEquals("Sales", notificationConsumer.entityNotification.getEntity().get("name"));
+  }
+
+  private void createEntity(String name, String description, String owner, String locationUri,
String... traitNames)
+      throws Exception {
+
+    Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
+    referenceable.set("name", name);
+    referenceable.set("description", description);
+    referenceable.set("owner", owner);
+    referenceable.set("locationUri", locationUri);
+    referenceable.set("createTime", System.currentTimeMillis());
+
+    createInstance(referenceable);
+  }
+
+  private static class EntityNotificationConsumer implements Runnable {
+    private final NotificationConsumer<EntityNotification> consumerIterator;
+    private EntityNotification entityNotification = null;
+
+    public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator)
{
+      this.consumerIterator = consumerIterator;
+    }
+
+    @Override
+    public void run() {
+      while(consumerIterator.hasNext()) {
+        entityNotification = consumerIterator.next();
+      }
+    }
+  }
+}


Mime
View raw message