atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suma...@apache.org
Subject [17/26] incubator-atlas git commit: ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhmenath via sumasai)
Date Fri, 18 Dec 2015 10:44:17 GMT
ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown
(yhmenath via sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/afb9e618
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/afb9e618
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/afb9e618

Branch: refs/heads/branch-0.6-incubating
Commit: afb9e618a290fcc79f59600418cbf68f1356fd30
Parents: 6dfbda1
Author: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Authored: Mon Dec 14 16:29:47 2015 +0530
Committer: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Committed: Mon Dec 14 16:34:26 2015 +0530

----------------------------------------------------------------------
 client/pom.xml                                  |  6 ++
 .../main/java/org/apache/atlas/AtlasClient.java | 22 +++++-
 .../java/org/apache/atlas/AtlasClientTest.java  | 67 ++++++++++++++++
 .../notification/NotificationHookConsumer.java  | 42 ++++++++++
 .../NotificationHookConsumerTest.java           | 82 ++++++++++++++++++++
 pom.xml                                         |  2 +-
 release-log.txt                                 |  1 +
 .../atlas/web/listeners/GuiceServletConfig.java |  2 +-
 8 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 279d894..d41b5bf 100755
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -67,5 +67,11 @@
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index becc4db..b108b25 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -19,6 +19,7 @@
 package org.apache.atlas;
 
 import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
@@ -67,6 +68,7 @@ public class AtlasClient {
     public static final String DATATYPE = "dataType";
 
     public static final String BASE_URI = "api/atlas/";
+    public static final String ADMIN_VERSION = "admin/version";
     public static final String TYPES = "types";
     public static final String URI_ENTITY = "entities";
     public static final String URI_SEARCH = "discovery/search";
@@ -126,11 +128,29 @@ public class AtlasClient {
         service = client.resource(UriBuilder.fromUri(baseUrl).build());
     }
 
+    // for testing
+    AtlasClient(WebResource service) {
+        this.service = service;
+    }
+
     protected Configuration getClientProperties() throws AtlasException {
         return ApplicationProperties.get();
     }
 
-    enum API {
+    public boolean isServerReady() throws AtlasServiceException {
+        WebResource resource = getResource(API.VERSION);
+        try {
+            callAPIWithResource(API.VERSION, resource);
+            return true;
+        } catch (ClientHandlerException che) {
+            return false;
+        }
+    }
+
+    public enum API {
+
+        //Admin operations
+        VERSION(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK),
 
         //Type operations
         CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST, Response.Status.CREATED),

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/client/src/test/java/org/apache/atlas/AtlasClientTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
new file mode 100644
index 0000000..1e7eed1
--- /dev/null
+++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.Response;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AtlasClientTest {
+
+    @Test
+    public void shouldVerifyServerIsReady() throws AtlasServiceException {
+        WebResource webResource = mock(WebResource.class);
+        AtlasClient atlasClient = new AtlasClient(webResource);
+
+        WebResource.Builder builder = setupBuilder(webResource);
+        ClientResponse response = mock(ClientResponse.class);
+        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+        when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\","
+
+                "\"Description\":\"Metadata Management and Data Governance Platform over
Hadoop\"}");
+        when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);
+
+        assertTrue(atlasClient.isServerReady());
+    }
+
+    private WebResource.Builder setupBuilder(WebResource webResource) {
+        WebResource adminVersionResource = mock(WebResource.class);
+        when(webResource.path(AtlasClient.API.VERSION.getPath())).thenReturn(adminVersionResource);
+        WebResource.Builder builder = mock(WebResource.Builder.class);
+        when(adminVersionResource.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
+        when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
+        return builder;
+    }
+
+    @Test
+    public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException {
+        WebResource webResource = mock(WebResource.class);
+        AtlasClient atlasClient = new AtlasClient(webResource);
+        WebResource.Builder builder = setupBuilder(webResource);
+        when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow(
+                new ClientHandlerException());
+        assertFalse(atlasClient.isServerReady());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index ffeb406..1bee26f 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -22,6 +22,7 @@ import com.google.inject.Singleton;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
 import org.codehaus.jettison.json.JSONArray;
@@ -42,6 +43,7 @@ public class NotificationHookConsumer implements Service {
 
     public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
     public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
+    public static final int SERVER_READY_WAIT_TIME_MS = 1000;
 
     @Inject
     private NotificationInterface notificationInterface;
@@ -77,15 +79,32 @@ public class NotificationHookConsumer implements Service {
         }
     }
 
+    static class Timer {
+        public void sleep(int interval) throws InterruptedException {
+            Thread.sleep(interval);
+        }
+    }
+
     class HookConsumer implements Runnable {
         private final NotificationConsumer<JSONArray> consumer;
+        private final AtlasClient client;
 
         public HookConsumer(NotificationConsumer<JSONArray> consumer) {
+            this(atlasClient, consumer);
+        }
+
+        public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer)
{
+            this.client = client;
             this.consumer = consumer;
         }
 
         @Override
         public void run() {
+
+            if (!serverAvailable(new NotificationHookConsumer.Timer())) {
+                return;
+            }
+
             while(consumer.hasNext()) {
                 JSONArray entityJson = consumer.next();
                 LOG.info("Processing message {}", entityJson);
@@ -98,5 +117,28 @@ public class NotificationHookConsumer implements Service {
                 }
             }
         }
+
+        boolean serverAvailable(Timer timer) {
+            try {
+                while (!client.isServerReady()) {
+                    try {
+                        LOG.info("Atlas Server is not ready. Waiting for {} milliseconds
to retry...",
+                                SERVER_READY_WAIT_TIME_MS);
+                        timer.sleep(SERVER_READY_WAIT_TIME_MS);
+                    } catch (InterruptedException e) {
+                        LOG.info("Interrupted while waiting for Atlas Server to become ready,
" +
+                                "exiting consumer thread.", e);
+                        return false;
+                    }
+                }
+            } catch (AtlasServiceException e) {
+                LOG.info(
+                        "Handled AtlasServiceException while waiting for Atlas Server to
become ready, " +
+                                "exiting consumer thread.", e);
+                return false;
+            }
+            LOG.info("Atlas Server is ready, can start reading Kafka events.");
+            return true;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
new file mode 100644
index 0000000..e4d7f8c
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class NotificationHookConsumerTest {
+
+    @Test
+    public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException
{
+        AtlasClient atlasClient = mock(AtlasClient.class);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        when(atlasClient.isServerReady()).thenReturn(true);
+
+        assertTrue(hookConsumer.serverAvailable(timer));
+
+        verifyZeroInteractions(timer);
+    }
+
+    @Test
+    public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException,
InterruptedException {
+        AtlasClient atlasClient = mock(AtlasClient.class);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
+
+        assertTrue(hookConsumer.serverAvailable(timer));
+
+        verify(timer, times(3)).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
+    }
+
+    @Test
+    public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException,
InterruptedException {
+        AtlasClient atlasClient = mock(AtlasClient.class);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
+        when(atlasClient.isServerReady()).thenReturn(false);
+
+        assertFalse(hookConsumer.serverAvailable(timer));
+    }
+
+    @Test
+    public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException
{
+        AtlasClient atlasClient = mock(AtlasClient.class);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
new Exception()));
+
+        assertFalse(hookConsumer.serverAvailable(timer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 929d255..d9c3df1 100755
--- a/pom.xml
+++ b/pom.xml
@@ -323,7 +323,7 @@
         <node.version>v0.10.30</node.version>
         <slf4j.version>1.7.7</slf4j.version>
         <jetty.version>9.2.12.v20150709</jetty.version>
-        <jersey.version>1.10</jersey.version>
+        <jersey.version>1.19</jersey.version>
         <jackson.version>1.8.3</jackson.version>
         <tinkerpop.version>2.6.0</tinkerpop.version>
         <titan.version>0.5.4</titan.version>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 07a3e9b..ffd69ea 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
 ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
 
 ALL CHANGES:
+ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown
(yhemanth via sumasai)
 ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai)
 ATLAS-244 UI: Add Tag Tab (darshankumar89 via sumasai)
 ATLAS-376 UI: Use the Schema API of the backend to populate details for Schema tab (darshankumar89
via sumasai)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index f0d80cb..c1f6a9b 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -124,7 +124,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
     }
 
     protected void startServices() {
-        LOG.debug("Starting services");
+        LOG.info("Starting services");
         Services services = injector.getInstance(Services.class);
         services.start();
     }


Mime
View raw message