atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [11/12] incubator-atlas git commit: ATLAS-948 import-hive should allow an option to continue after failure (sumasai)
Date Wed, 29 Jun 2016 08:37:57 GMT
ATLAS-948 import-hive should allow an option to continue after failure (sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/4165e2f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/4165e2f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/4165e2f9

Branch: refs/heads/0.7-incubating
Commit: 4165e2f9cb87b36c1076e84002ddb42ec8c4cb7f
Parents: 2ba1000
Author: Shwetha GS <sshivalingamurthy@hortonworks.com>
Authored: Wed Jun 29 12:16:39 2016 +0530
Committer: Shwetha GS <sshivalingamurthy@hortonworks.com>
Committed: Wed Jun 29 12:16:39 2016 +0530

----------------------------------------------------------------------
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  | 114 ++++++++++++-------
 .../hive/bridge/HiveMetaStoreBridgeTest.java    |  82 +++++++++++--
 release-log.txt                                 |   1 +
 3 files changed, 143 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4165e2f9/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 4d009e8..0045780 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -33,6 +33,10 @@ import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.atlas.typesystem.json.TypesSerialization;
 import org.apache.atlas.typesystem.persistence.Id;
 import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.fs.Path;
@@ -111,17 +115,16 @@ public class HiveMetaStoreBridge {
         return atlasClient;
     }
 
-    void importHiveMetadata() throws Exception {
+    void importHiveMetadata(boolean failOnError) throws Exception {
         LOG.info("Importing hive metadata");
-        importDatabases();
+        importDatabases(failOnError);
     }
 
-    private void importDatabases() throws Exception {
+    private void importDatabases(boolean failOnError) throws Exception {
         List<String> databases = hiveClient.getAllDatabases();
         for (String databaseName : databases) {
             Referenceable dbReference = registerDatabase(databaseName);
-
-            importTables(dbReference, databaseName);
+            importTables(dbReference, databaseName, failOnError);
         }
     }
 
@@ -254,52 +257,68 @@ public class HiveMetaStoreBridge {
 
     /**
      * Imports all tables for the given db
-     * @param databaseName
      * @param databaseReferenceable
+     * @param databaseName
+     * @param failOnError
      * @throws Exception
      */
-    private void importTables(Referenceable databaseReferenceable, String databaseName) throws
Exception {
+    private int importTables(Referenceable databaseReferenceable, String databaseName, final
boolean failOnError) throws Exception {
+        int tablesImported = 0;
         List<String> hiveTables = hiveClient.getAllTables(databaseName);
         LOG.info("Importing tables {} for db {}", hiveTables.toString(), databaseName);
         for (String tableName : hiveTables) {
-            Table table = hiveClient.getTable(databaseName, tableName);
-            Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
-            if (table.getTableType() == TableType.EXTERNAL_TABLE){
-                String tableQualifiedName = getTableQualifiedName(clusterName, table);
-                Referenceable process = getProcessReference(tableQualifiedName);
-                if (process == null){
-                    LOG.info("Attempting to register create table process for {}", tableQualifiedName);
-                    Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
-                    ArrayList<Referenceable> sourceList = new ArrayList<>();
-                    ArrayList<Referenceable> targetList = new ArrayList<>();
-                    String tableLocation = table.getDataLocation().toString();
-                    Referenceable path = fillHDFSDataSet(tableLocation);
-                    String query = getCreateTableString(table, tableLocation);
-                    sourceList.add(path);
-                    targetList.add(tableReferenceable);
-                    lineageProcess.set("inputs", sourceList);
-                    lineageProcess.set("outputs", targetList);
-                    lineageProcess.set("userName", table.getOwner());
-                    lineageProcess.set("startTime", new Date(System.currentTimeMillis()));
-                    lineageProcess.set("endTime", new Date(System.currentTimeMillis()));
-                    lineageProcess.set("operationType", "CREATETABLE");
-                    lineageProcess.set("queryText", query);
-                    lineageProcess.set("queryId", query);
-                    lineageProcess.set("queryPlan", "{}");
-                    lineageProcess.set("clusterName", clusterName);
-                    List<String> recentQueries = new ArrayList<>(1);
-                    recentQueries.add(query);
-                    lineageProcess.set("recentQueries", recentQueries);
-                    lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
-                    lineageProcess.set(AtlasClient.NAME, query);
-                    registerInstance(lineageProcess);
-
+            try {
+                Table table = hiveClient.getTable(databaseName, tableName);
+                Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
+                tablesImported++;
+                if (table.getTableType() == TableType.EXTERNAL_TABLE) {
+                    String tableQualifiedName = getTableQualifiedName(clusterName, table);
+                    Referenceable process = getProcessReference(tableQualifiedName);
+                    if (process == null) {
+                        LOG.info("Attempting to register create table process for {}", tableQualifiedName);
+                        Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
+                        ArrayList<Referenceable> sourceList = new ArrayList<>();
+                        ArrayList<Referenceable> targetList = new ArrayList<>();
+                        String tableLocation = table.getDataLocation().toString();
+                        Referenceable path = fillHDFSDataSet(tableLocation);
+                        String query = getCreateTableString(table, tableLocation);
+                        sourceList.add(path);
+                        targetList.add(tableReferenceable);
+                        lineageProcess.set("inputs", sourceList);
+                        lineageProcess.set("outputs", targetList);
+                        lineageProcess.set("userName", table.getOwner());
+                        lineageProcess.set("startTime", new Date(System.currentTimeMillis()));
+                        lineageProcess.set("endTime", new Date(System.currentTimeMillis()));
+                        lineageProcess.set("operationType", "CREATETABLE");
+                        lineageProcess.set("queryText", query);
+                        lineageProcess.set("queryId", query);
+                        lineageProcess.set("queryPlan", "{}");
+                        lineageProcess.set("clusterName", clusterName);
+                        List<String> recentQueries = new ArrayList<>(1);
+                        recentQueries.add(query);
+                        lineageProcess.set("recentQueries", recentQueries);
+                        lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
+                        lineageProcess.set(AtlasClient.NAME, query);
+                        registerInstance(lineageProcess);
+                    } else {
+                        LOG.info("Process {} is already registered", process.toString());
+                    }
                 }
-                else {
-                    LOG.info("Process {} is already registered", process.toString());
+            } catch (Exception e) {
+                LOG.error("Import failed for hive_table {} ", tableName, e);
+                if (failOnError) {
+                    throw e;
                 }
             }
         }
+
+        if ( tablesImported == hiveTables.size()) {
+            LOG.info("Successfully imported all {} tables from {} ", tablesImported, databaseName);
+        } else {
+            LOG.error("Unable to import {} tables out of {} tables from {}", tablesImported,
hiveTables.size(), databaseName);
+        }
+
+        return tablesImported;
     }
 
     /**
@@ -618,7 +637,7 @@ public class HiveMetaStoreBridge {
         }
     }
 
-    public static void main(String[] argv) throws Exception {
+    public static void main(String[] args) throws Exception {
 
         Configuration atlasConf = ApplicationProperties.get();
         String atlasEndpoint = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL);
@@ -632,8 +651,17 @@ public class HiveMetaStoreBridge {
             atlasClient = new AtlasClient(ugi, ugi.getShortUserName(), atlasEndpoint);
         }
 
+        Options options = new Options();
+        CommandLineParser parser = new BasicParser();
+        CommandLine cmd = parser.parse( options, args);
+
+        boolean failOnError = false;
+        if (cmd.hasOption("failOnError")) {
+            failOnError = true;
+        }
+
         HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(),
atlasClient);
         hiveMetaStoreBridge.registerHiveDataModel();
-        hiveMetaStoreBridge.importHiveMetadata();
+        hiveMetaStoreBridge.importHiveMetadata(failOnError);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4165e2f9/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
index 856e5b1..9f7f6b0 100644
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -41,6 +41,8 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import scala.actors.threadpool.Arrays;
 
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.mockito.Mockito.argThat;
@@ -78,7 +80,7 @@ public class HiveMetaStoreBridgeTest {
         returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
-        bridge.importHiveMetadata();
+        bridge.importHiveMetadata(true);
 
         // verify update is called
         verify(atlasClient).updateEntity(eq("72e06b34-9151-4023-aa9d-b82103a50e76"),
@@ -90,7 +92,7 @@ public class HiveMetaStoreBridgeTest {
     public void testImportThatUpdatesRegisteredTable() throws Exception {
         setupDB(hiveClient, TEST_DB_NAME);
 
-        Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
+        List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
 
         returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
 
@@ -99,12 +101,12 @@ public class HiveMetaStoreBridgeTest {
                 HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
                 getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
         when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
-        String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
hiveTable);
+        String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
hiveTables.get(0));
         when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
                 processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
-        bridge.importHiveMetadata();
+        bridge.importHiveMetadata(true);
 
         // verify update is called on table
         verify(atlasClient).updateEntity(eq("82e06b34-9151-4023-aa9d-b82103a50e77"),
@@ -119,11 +121,15 @@ public class HiveMetaStoreBridgeTest {
                 getEntityReference("72e06b34-9151-4023-aa9d-b82103a50e76"));
     }
 
-    private Table setupTable(Hive hiveClient, String databaseName, String tableName) throws
HiveException {
-        when(hiveClient.getAllTables(databaseName)).thenReturn(Arrays.asList(new String[]{tableName}));
-        Table testTable = createTestTable(databaseName, tableName);
-        when(hiveClient.getTable(databaseName, tableName)).thenReturn(testTable);
-        return testTable;
+    private List<Table> setupTables(Hive hiveClient, String databaseName, String...
tableNames) throws HiveException {
+        List<Table> tables = new ArrayList<>();
+        when(hiveClient.getAllTables(databaseName)).thenReturn(Arrays.asList(tableNames));
+        for(String tableName : tableNames) {
+            Table testTable = createTestTable(databaseName, tableName);
+            when(hiveClient.getTable(databaseName, tableName)).thenReturn(testTable);
+            tables.add(testTable);
+        }
+        return tables;
     }
 
     private void setupDB(Hive hiveClient, String databaseName) throws HiveException {
@@ -135,7 +141,8 @@ public class HiveMetaStoreBridgeTest {
     @Test
     public void testImportWhenPartitionKeysAreNull() throws Exception {
         setupDB(hiveClient, TEST_DB_NAME);
-        Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
+        List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
+        Table hiveTable = hiveTables.get(0);
 
         returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
 
@@ -157,12 +164,65 @@ public class HiveMetaStoreBridgeTest {
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
         try {
-            bridge.importHiveMetadata();
+            bridge.importHiveMetadata(true);
         } catch (Exception e) {
             Assert.fail("Partition with null key caused import to fail with exception ",
e);
         }
     }
 
+    @Test
+    public void testImportContinuesWhenTableRegistrationFails() throws Exception {
+        setupDB(hiveClient, TEST_DB_NAME);
+        final String table2Name = TEST_TABLE_NAME + "_1";
+        List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME,
table2Name);
+
+        returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
+        when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout
while reading data from hive metastore"));
+
+        when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
+            table2Name,
+            HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
+            getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
+        when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
+        String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
hiveTables.get(1));
+        when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
+            processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
+
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
+        try {
+            bridge.importHiveMetadata(false);
+        } catch (Exception e) {
+            Assert.fail("Table registration failed with exception", e);
+        }
+    }
+
+    @Test
+    public void testImportFailsWhenTableRegistrationFails() throws Exception {
+        setupDB(hiveClient, TEST_DB_NAME);
+        final String table2Name = TEST_TABLE_NAME + "_1";
+        List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME,
table2Name);
+
+        returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
+        when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout
while reading data from hive metastore"));
+
+        when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
+            table2Name,
+            HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
+            getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
+        when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
+        String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
hiveTables.get(1));
+        when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
+            processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
+
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
+        try {
+            bridge.importHiveMetadata(true);
+            Assert.fail("Table registration is supposed to fail");
+        } catch (Exception e) {
+            //Expected
+        }
+    }
+
     private JSONArray getEntityReference(String id) throws JSONException {
         return new JSONArray(String.format("[{\"$id$\":{\"id\":\"%s\"}}]", id));
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4165e2f9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 2b1da19..b983594 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -31,6 +31,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file
(dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via
shwethags)
 
 ALL CHANGES:
+ATLAS-948 import-hive should allow an option to continue after failure (sumasai)
 ATLAS-954 Get hadoop classpath if command hadoop is in PATH (svimal2106 via sumasai)
 ATLAS-919 UI : Deleted references should be shown in red or filtered out (kevalbhatt18 via
sumasai)
 ATLAS-927 aboutAtlas_tmpl.html has hard-coded project version (Kalyanikashikar via yhemanth)


Mime
View raw message