Return-Path: X-Original-To: apmail-atlas-commits-archive@minotaur.apache.org Delivered-To: apmail-atlas-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A3B1218F93 for ; Tue, 16 Jun 2015 23:05:27 +0000 (UTC) Received: (qmail 32811 invoked by uid 500); 16 Jun 2015 23:05:27 -0000 Delivered-To: apmail-atlas-commits-archive@atlas.apache.org Received: (qmail 32784 invoked by uid 500); 16 Jun 2015 23:05:27 -0000 Mailing-List: contact commits-help@atlas.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.incubator.apache.org Delivered-To: mailing list commits@atlas.incubator.apache.org Received: (qmail 32775 invoked by uid 99); 16 Jun 2015 23:05:27 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jun 2015 23:05:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id EDD1ECE48E for ; Tue, 16 Jun 2015 23:05:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.153 X-Spam-Level: * X-Spam-Status: No, score=1.153 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.648, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id b9xWh_SR-3mB for ; Tue, 16 Jun 2015 23:05:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 852D04E5F7 for ; Tue, 16 Jun 2015 23:04:37 +0000 (UTC) Received: (qmail 30388 invoked by uid 99); 16 Jun 2015 23:04:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jun 2015 23:04:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B4F6EE3C48; Tue, 16 Jun 2015 23:04:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: venkatesh@apache.org To: commits@atlas.incubator.apache.org Date: Tue, 16 Jun 2015 23:05:10 -0000 Message-Id: In-Reply-To: <29f8990c10f64784b72a9cafbbc9676e@git.apache.org> References: <29f8990c10f64784b72a9cafbbc9676e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [36/58] [abbrv] incubator-atlas git commit: Refactor packages and scripts to Atlas (cherry picked from commit 414beba) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java new file mode 100644 index 0000000..3a1f323 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas; + +import com.google.inject.Inject; +import com.thinkaurelius.titan.core.TitanGraph; +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.apache.atlas.repository.graph.GraphProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GraphTransactionInterceptor implements MethodInterceptor { + private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class); + private TitanGraph titanGraph; + + @Inject + GraphProvider graphProvider; + + public Object invoke(MethodInvocation invocation) throws Throwable { + if (titanGraph == null) { + titanGraph = graphProvider.get(); + } + + try { + Object response = invocation.proceed(); + titanGraph.commit(); + LOG.debug("graph commit"); + return response; + } catch (Throwable t){ + titanGraph.rollback(); + LOG.debug("graph rollback"); + throw t; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java new file mode 100755 index 0000000..50de081 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas; + +import com.google.inject.matcher.Matchers; +import com.google.inject.throwingproviders.ThrowingProviderBinder; +import com.thinkaurelius.titan.core.TitanGraph; +import org.aopalliance.intercept.MethodInterceptor; +import org.apache.atlas.discovery.DiscoveryService; +import org.apache.atlas.discovery.HiveLineageService; +import org.apache.atlas.discovery.LineageService; +import org.apache.atlas.discovery.SearchIndexer; +import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; +import org.apache.atlas.repository.MetadataRepository; +import org.apache.atlas.repository.graph.GraphBackedMetadataRepository; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.repository.graph.GraphProvider; +import org.apache.atlas.repository.graph.TitanGraphProvider; +import org.apache.atlas.repository.typestore.GraphBackedTypeStore; +import org.apache.atlas.repository.typestore.ITypeStore; +import org.apache.atlas.services.DefaultMetadataService; +import org.apache.atlas.services.MetadataService; + +/** + * Guice module for Repository module. + */ +public class RepositoryMetadataModule extends com.google.inject.AbstractModule { + @Override + protected void configure() { + // special wiring for Titan Graph + ThrowingProviderBinder.create(binder()) + .bind(GraphProvider.class, TitanGraph.class) + .to(TitanGraphProvider.class) + .asEagerSingleton(); + + // allow for dynamic binding of the metadata repo & graph service + + // bind the MetadataRepositoryService interface to an implementation + bind(MetadataRepository.class).to(GraphBackedMetadataRepository.class).asEagerSingleton(); + + // bind the ITypeStore interface to an implementation + bind(ITypeStore.class).to(GraphBackedTypeStore.class).asEagerSingleton(); + + // bind the GraphService interface to an implementation + // bind(GraphService.class).to(graphServiceClass); + + // bind the MetadataService interface to an implementation + bind(MetadataService.class).to(DefaultMetadataService.class).asEagerSingleton(); + + // bind the DiscoveryService interface to an implementation + bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton(); + + bind(SearchIndexer.class).to(GraphBackedSearchIndexer.class).asEagerSingleton(); + + bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton(); + + MethodInterceptor interceptor = new GraphTransactionInterceptor(); + requestInjection(interceptor); + bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), interceptor); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/discovery/DiscoveryException.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/DiscoveryException.java b/repository/src/main/java/org/apache/atlas/discovery/DiscoveryException.java new file mode 100755 index 0000000..b5b4441 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/DiscoveryException.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery; + +import org.apache.atlas.MetadataException; + +import java.security.PrivilegedActionException; + +public class DiscoveryException extends MetadataException { + + /** + * Constructs a new exception with the specified detail message. The + * cause is not initialized, and may subsequently be initialized by + * a call to {@link #initCause}. + * + * @param message the detail message. The detail message is saved for + * later retrieval by the {@link #getMessage()} method. + */ + public DiscoveryException(String message) { + super(message); + } + + /** + * Constructs a new exception with the specified detail message and + * cause.

Note that the detail message associated with + * {@code cause} is not automatically incorporated in + * this exception's detail message. + * + * @param message the detail message (which is saved for later retrieval + * by the {@link #getMessage()} method). + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A null value is + * permitted, and indicates that the cause is nonexistent or + * unknown.) + * @since 1.4 + */ + public DiscoveryException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a new exception with the specified cause and a detail + * message of (cause==null ? null : cause.toString()) (which + * typically contains the class and detail message of cause). + * This constructor is useful for exceptions that are little more than + * wrappers for other throwables (for example, {@link + * PrivilegedActionException}). + * + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A null value is + * permitted, and indicates that the cause is nonexistent or + * unknown.) + * @since 1.4 + */ + public DiscoveryException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java new file mode 100755 index 0000000..e347c2c --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery; + +import java.util.List; +import java.util.Map; + +/** + * Metadata discovery service. + */ +public interface DiscoveryService { + + /** + * Full text search + */ + String searchByFullText(String query) throws DiscoveryException; + + /** + * Search using query DSL. + * + * @param dslQuery query in DSL format. + * @return JSON representing the type and results. + */ + String searchByDSL(String dslQuery) throws DiscoveryException; + + /** + * Assumes the User is familiar with the persistence structure of the Repository. + * The given query is run uninterpreted against the underlying Graph Store. + * The results are returned as a List of Rows. each row is a Map of Key,Value pairs. + * + * @param gremlinQuery query in gremlin dsl format + * @return List of Maps + * @throws org.apache.atlas.discovery.DiscoveryException + */ + List> searchByGremlin(String gremlinQuery) throws DiscoveryException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java new file mode 100644 index 0000000..e97fcb8 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery; + +import com.thinkaurelius.titan.core.TitanGraph; +import org.apache.atlas.GraphTransaction; +import org.apache.atlas.MetadataException; +import org.apache.atlas.ParamChecker; +import org.apache.atlas.PropertiesUtil; +import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy; +import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; +import org.apache.atlas.query.Expressions; +import org.apache.atlas.query.GremlinQueryResult; +import org.apache.atlas.query.HiveLineageQuery; +import org.apache.atlas.query.HiveWhereUsedQuery; +import org.apache.atlas.repository.EntityNotFoundException; +import org.apache.atlas.repository.MetadataRepository; +import org.apache.atlas.repository.graph.GraphProvider; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Some; +import scala.collection.immutable.List; + +import javax.inject.Inject; +import javax.inject.Singleton; + +/** + * Hive implementation of Lineage service interface. + */ +@Singleton +public class HiveLineageService implements LineageService { + + private static final Logger LOG = LoggerFactory.getLogger(HiveLineageService.class); + + private static final Option> SELECT_ATTRIBUTES = + Some.>apply(List.fromArray(new String[]{"name"})); + + private static final String HIVE_TABLE_TYPE_NAME; + private static final String HIVE_PROCESS_TYPE_NAME; + private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME; + private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME; + + private static final String HIVE_TABLE_SCHEMA_QUERY; + private static final String HIVE_TABLE_EXISTS_QUERY; + + static { + // todo - externalize this using type system - dog food + try { + PropertiesConfiguration conf = PropertiesUtil.getApplicationProperties(); + HIVE_TABLE_TYPE_NAME = + conf.getString("atlas.lineage.hive.table.type.name", "DataSet"); + HIVE_PROCESS_TYPE_NAME = + conf.getString("atlas.lineage.hive.process.type.name", "Process"); + HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = + conf.getString("atlas.lineage.hive.process.inputs.name", "inputs"); + HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = + conf.getString("atlas.lineage.hive.process.outputs.name", "outputs"); + + HIVE_TABLE_SCHEMA_QUERY = conf.getString( + "atlas.lineage.hive.table.schema.query", + "hive_table where name=\"%s\", columns"); + HIVE_TABLE_EXISTS_QUERY = conf.getString( + "atlas.lineage.hive.table.exists.query", + "from hive_table where name=\"%s\""); + } catch (MetadataException e) { + throw new RuntimeException(e); + } + } + + + private final TitanGraph titanGraph; + private final DefaultGraphPersistenceStrategy graphPersistenceStrategy; + private final GraphBackedDiscoveryService discoveryService; + + @Inject + HiveLineageService(GraphProvider graphProvider, + MetadataRepository metadataRepository, + GraphBackedDiscoveryService discoveryService) throws DiscoveryException { + this.titanGraph = graphProvider.get(); + this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository); + this.discoveryService = discoveryService; + } + + /** + * Return the lineage outputs for the given tableName. + * + * @param tableName tableName + * @return Lineage Outputs as JSON + */ + @Override + @GraphTransaction + public String getOutputs(String tableName) throws MetadataException { + LOG.info("Fetching lineage outputs for tableName={}", tableName); + ParamChecker.notEmpty(tableName, "table name cannot be null"); + validateTableExists(tableName); + + HiveWhereUsedQuery outputsQuery = new HiveWhereUsedQuery( + HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME, + HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, + Option.empty(), SELECT_ATTRIBUTES, true, + graphPersistenceStrategy, titanGraph); + + Expressions.Expression expression = outputsQuery.expr(); + LOG.debug("Expression is [" + expression.toString() +"]"); + try { + return discoveryService.evaluate(expression).toJson(); + } catch (Exception e) { // unable to catch ExpressionException + throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e); + } + } + + /** + * Return the lineage outputs graph for the given tableName. + * + * @param tableName tableName + * @return Outputs Graph as JSON + */ + @Override + @GraphTransaction + public String getOutputsGraph(String tableName) throws MetadataException { + LOG.info("Fetching lineage outputs graph for tableName={}", tableName); + ParamChecker.notEmpty(tableName, "table name cannot be null"); + validateTableExists(tableName); + + HiveWhereUsedQuery outputsQuery = new HiveWhereUsedQuery( + HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME, + HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, + Option.empty(), SELECT_ATTRIBUTES, true, + graphPersistenceStrategy, titanGraph); + return outputsQuery.graph().toInstanceJson(); + } + + /** + * Return the lineage inputs for the given tableName. + * + * @param tableName tableName + * @return Lineage Inputs as JSON + */ + @Override + @GraphTransaction + public String getInputs(String tableName) throws MetadataException { + LOG.info("Fetching lineage inputs for tableName={}", tableName); + ParamChecker.notEmpty(tableName, "table name cannot be null"); + validateTableExists(tableName); + + HiveLineageQuery inputsQuery = new HiveLineageQuery( + HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME, + HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, + Option.empty(), SELECT_ATTRIBUTES, true, + graphPersistenceStrategy, titanGraph); + + Expressions.Expression expression = inputsQuery.expr(); + LOG.debug("Expression is [" + expression.toString() +"]"); + try { + return discoveryService.evaluate(expression).toJson(); + } catch (Exception e) { // unable to catch ExpressionException + throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e); + } + } + + /** + * Return the lineage inputs graph for the given tableName. + * + * @param tableName tableName + * @return Inputs Graph as JSON + */ + @Override + @GraphTransaction + public String getInputsGraph(String tableName) throws MetadataException { + LOG.info("Fetching lineage inputs graph for tableName={}", tableName); + ParamChecker.notEmpty(tableName, "table name cannot be null"); + validateTableExists(tableName); + + HiveLineageQuery inputsQuery = new HiveLineageQuery( + HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME, + HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, + Option.empty(), SELECT_ATTRIBUTES, true, + graphPersistenceStrategy, titanGraph); + return inputsQuery.graph().toInstanceJson(); + } + + /** + * Return the schema for the given tableName. + * + * @param tableName tableName + * @return Schema as JSON + */ + @Override + @GraphTransaction + public String getSchema(String tableName) throws MetadataException { + LOG.info("Fetching schema for tableName={}", tableName); + ParamChecker.notEmpty(tableName, "table name cannot be null"); + validateTableExists(tableName); + + final String schemaQuery = String.format(HIVE_TABLE_SCHEMA_QUERY, tableName); + return discoveryService.searchByDSL(schemaQuery); + } + + /** + * Validate if indeed this is a table type and exists. + * + * @param tableName table name + */ + private void validateTableExists(String tableName) throws MetadataException { + final String tableExistsQuery = String.format(HIVE_TABLE_EXISTS_QUERY, tableName); + GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery); + if (!(queryResult.rows().length() > 0)) { + throw new EntityNotFoundException(tableName + " does not exist"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/discovery/LineageService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/LineageService.java b/repository/src/main/java/org/apache/atlas/discovery/LineageService.java new file mode 100644 index 0000000..12f1ca3 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/LineageService.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery; + +import org.apache.atlas.MetadataException; + +/** + * Lineage service interface. + */ +public interface LineageService { + + /** + * Return the lineage outputs for the given tableName. + * + * @param tableName tableName + * @return Outputs as JSON + */ + String getOutputs(String tableName) throws MetadataException; + + /** + * Return the lineage outputs graph for the given tableName. + * + * @param tableName tableName + * @return Outputs Graph as JSON + */ + String getOutputsGraph(String tableName) throws MetadataException; + + /** + * Return the lineage inputs for the given tableName. + * + * @param tableName tableName + * @return Inputs as JSON + */ + String getInputs(String tableName) throws MetadataException; + + /** + * Return the lineage inputs graph for the given tableName. + * + * @param tableName tableName + * @return Inputs Graph as JSON + */ + String getInputsGraph(String tableName) throws MetadataException; + + /** + * Return the schema for the given tableName. + * + * @param tableName tableName + * @return Schema as JSON + */ + String getSchema(String tableName) throws MetadataException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/discovery/SearchIndexer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchIndexer.java b/repository/src/main/java/org/apache/atlas/discovery/SearchIndexer.java new file mode 100755 index 0000000..9823fbd --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/SearchIndexer.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery; + +import org.apache.atlas.listener.TypesChangeListener; + + +/** + * Interface for indexing types. + */ +public interface SearchIndexer extends TypesChangeListener { + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java b/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java new file mode 100755 index 0000000..e9c12cc --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery.graph; + +import com.thinkaurelius.titan.core.TitanVertex; +import org.apache.atlas.MetadataException; +import org.apache.atlas.query.Expressions; +import org.apache.atlas.query.GraphPersistenceStrategies; +import org.apache.atlas.query.GraphPersistenceStrategies$class; +import org.apache.atlas.query.TypeUtils; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.MetadataRepository; +import org.apache.atlas.repository.graph.GraphBackedMetadataRepository; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.ITypedStruct; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.AttributeInfo; +import org.apache.atlas.typesystem.types.IDataType; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.StructType; +import org.apache.atlas.typesystem.types.TraitType; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Traversable; + +import java.util.List; + +/** + * Default implementation of GraphPersistenceStrategy. + */ +public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategies { + + private static final Logger LOG = LoggerFactory + .getLogger(DefaultGraphPersistenceStrategy.class); + + private final GraphBackedMetadataRepository metadataRepository; + + public DefaultGraphPersistenceStrategy(MetadataRepository metadataRepository) { + this.metadataRepository = (GraphBackedMetadataRepository) metadataRepository; + } + + @Override + public String typeAttributeName() { + return metadataRepository.getTypeAttributeName(); + } + + @Override + public String superTypeAttributeName() { + return metadataRepository.getSuperTypeAttributeName(); + } + + @Override + public String edgeLabel(IDataType dataType, AttributeInfo aInfo) { + return metadataRepository.getEdgeLabel(dataType, aInfo); + } + + @Override + public String traitLabel(IDataType dataType, String traitName) { + return metadataRepository.getTraitLabel(dataType, traitName); + } + + @Override + public String fieldNameInVertex(IDataType dataType, AttributeInfo aInfo) { + try { + return metadataRepository.getFieldNameInVertex(dataType, aInfo); + } catch (MetadataException e) { + throw new RuntimeException(e); + } + } + + @Override + public List traitNames(TitanVertex vertex) { + return metadataRepository.getTraitNames(vertex); + } + + @Override + public String fieldPrefixInSelect() { + return "it"; + } + + @Override + public Id getIdFromVertex(String dataTypeName, TitanVertex vertex) { + return metadataRepository.getIdFromVertex(dataTypeName, vertex); + } + + @Override + public U constructInstance(IDataType dataType, Object value) { + try { + switch (dataType.getTypeCategory()) { + case PRIMITIVE: + case ENUM: + return dataType.convert(value, Multiplicity.OPTIONAL); + + case ARRAY: + // todo + break; + + case MAP: + // todo + break; + + case STRUCT: + TitanVertex structVertex = (TitanVertex) value; + StructType structType = (StructType) dataType; + ITypedStruct structInstance = structType.createInstance(); + + TypeSystem.IdType idType = TypeSystem.getInstance().getIdType(); + + if (dataType.getName().equals(idType.getName())) { + structInstance.set(idType.typeNameAttrName(), + structVertex.getProperty(typeAttributeName())); + structInstance.set(idType.idAttrName(), + structVertex.getProperty(idAttributeName())); + + } else { + metadataRepository.getGraphToInstanceMapper().mapVertexToInstance( + structVertex, structInstance, structType.fieldMapping().fields); + } + return dataType.convert(structInstance, Multiplicity.OPTIONAL); + + case TRAIT: + TitanVertex traitVertex = (TitanVertex) value; + TraitType traitType = (TraitType) dataType; + ITypedStruct traitInstance = traitType.createInstance(); + // todo - this is not right, we should load the Instance associated with this + // trait. for now just loading the trait struct. + // metadataRepository.getGraphToInstanceMapper().mapVertexToTraitInstance( + // traitVertex, dataType.getName(), , traitType, traitInstance); + metadataRepository.getGraphToInstanceMapper().mapVertexToInstance( + traitVertex, traitInstance, traitType.fieldMapping().fields); + break; + + case CLASS: + TitanVertex classVertex = (TitanVertex) value; + ITypedReferenceableInstance classInstance = + metadataRepository.getGraphToInstanceMapper().mapGraphToTypedInstance( + classVertex.getProperty(Constants.GUID_PROPERTY_KEY), + classVertex); + return dataType.convert(classInstance, Multiplicity.OPTIONAL); + + default: + throw new UnsupportedOperationException( + "Load for type " + dataType + "is not supported"); + } + } catch (MetadataException e) { + LOG.error("error while constructing an instance", e); + } + + return null; + } + + @Override + public String edgeLabel(TypeUtils.FieldInfo fInfo) { + return fInfo.reverseDataType() == null + ? edgeLabel(fInfo.dataType(), fInfo.attrInfo()) + : edgeLabel(fInfo.reverseDataType(), fInfo.attrInfo()); + } + + @Override + public String gremlinCompOp(Expressions.ComparisonExpression op) { + return GraphPersistenceStrategies$class.gremlinCompOp(this, op); + } + + @Override + public String loopObjectExpression(IDataType dataType) { + return GraphPersistenceStrategies$class.loopObjectExpression(this, dataType); + } + + @Override + public String instanceToTraitEdgeDirection() { return "out"; } + + @Override + public String traitToInstanceEdgeDirection() { return "in"; } + + @Override + public String idAttributeName() { return metadataRepository.getIdAttributeName(); } + + @Override + public scala.collection.Seq typeTestExpression(String typeName, IntSequence intSeq) { + return GraphPersistenceStrategies$class.typeTestExpression(this, typeName, intSeq); + } + + @Override + public boolean collectTypeInstancesIntoVar() { + return GraphPersistenceStrategies$class.collectTypeInstancesIntoVar(this); + } + + @Override + public boolean addGraphVertexPrefix(scala.collection.Traversable preStatements) { + return GraphPersistenceStrategies$class.addGraphVertexPrefix(this, preStatements); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java new file mode 100755 index 0000000..1314402 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery.graph; + +import com.thinkaurelius.titan.core.TitanGraph; +import com.thinkaurelius.titan.core.TitanIndexQuery; +import com.thinkaurelius.titan.core.TitanProperty; +import com.thinkaurelius.titan.core.TitanVertex; +import com.tinkerpop.blueprints.Vertex; +import org.apache.atlas.GraphTransaction; +import org.apache.atlas.MetadataServiceClient; +import org.apache.atlas.discovery.DiscoveryException; +import org.apache.atlas.discovery.DiscoveryService; +import org.apache.atlas.query.Expressions; +import org.apache.atlas.query.GremlinEvaluator; +import org.apache.atlas.query.GremlinQuery; +import org.apache.atlas.query.GremlinQueryResult; +import org.apache.atlas.query.GremlinTranslator; +import org.apache.atlas.query.QueryParser; +import org.apache.atlas.query.QueryProcessor; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.MetadataRepository; +import org.apache.atlas.repository.graph.GraphProvider; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.util.Either; +import scala.util.parsing.combinator.Parsers; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.script.Bindings; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Graph backed implementation of Search. + */ +@Singleton +public class GraphBackedDiscoveryService implements DiscoveryService { + + private static final Logger LOG = LoggerFactory.getLogger(GraphBackedDiscoveryService.class); + + private final TitanGraph titanGraph; + private final DefaultGraphPersistenceStrategy graphPersistenceStrategy; + + public final static String SCORE = "score"; + + @Inject + GraphBackedDiscoveryService(GraphProvider graphProvider, + MetadataRepository metadataRepository) throws DiscoveryException { + this.titanGraph = graphProvider.get(); + this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository); + } + + //Refer http://s3.thinkaurelius.com/docs/titan/0.5.4/index-backends.html for indexed query + //http://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query + // .html#query-string-syntax for query syntax + @Override + @GraphTransaction + public String searchByFullText(String query) throws DiscoveryException { + String graphQuery = String.format("v.%s:(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, query); + LOG.debug("Full text query: {}", graphQuery); + Iterator> results = + titanGraph.indexQuery(Constants.FULLTEXT_INDEX, graphQuery).vertices().iterator(); + JSONArray response = new JSONArray(); + + while (results.hasNext()) { + TitanIndexQuery.Result result = results.next(); + Vertex vertex = result.getElement(); + + JSONObject row = new JSONObject(); + String guid = vertex.getProperty(Constants.GUID_PROPERTY_KEY); + if (guid != null) { //Filter non-class entities + try { + row.put("guid", guid); + row.put(MetadataServiceClient.TYPENAME, vertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY)); + row.put(SCORE, result.getScore()); + } catch (JSONException e) { + LOG.error("Unable to create response", e); + throw new DiscoveryException("Unable to create response"); + } + + response.put(row); + } + } + return response.toString(); + } + + /** + * Search using query DSL. + * + * @param dslQuery query in DSL format. + * @return JSON representing the type and results. + */ + @Override + @GraphTransaction + public String searchByDSL(String dslQuery) throws DiscoveryException { + LOG.info("Executing dsl query={}", dslQuery); + GremlinQueryResult queryResult = evaluate(dslQuery); + return queryResult.toJson(); + } + + public GremlinQueryResult evaluate(String dslQuery) throws DiscoveryException { + LOG.info("Executing dsl query={}", dslQuery); + try { + QueryParser queryParser = new QueryParser(); + Either either = queryParser.apply(dslQuery); + if (either.isRight()) { + Expressions.Expression expression = either.right().get(); + return evaluate(expression); + } else { + throw new DiscoveryException("Invalid expression : " + dslQuery + ". " + either.left()); + } + } catch (Exception e) { // unable to catch ExpressionException + throw new DiscoveryException("Invalid expression : " + dslQuery, e); + } + } + + public GremlinQueryResult evaluate(Expressions.Expression expression) { + Expressions.Expression validatedExpression = QueryProcessor.validate(expression); + GremlinQuery gremlinQuery = + new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate(); + LOG.debug("Query = {}", validatedExpression); + LOG.debug("Expression Tree = {}", validatedExpression.treeString()); + LOG.debug("Gremlin Query = {}", gremlinQuery.queryStr()); + return new GremlinEvaluator(gremlinQuery, graphPersistenceStrategy, titanGraph).evaluate(); + } + + /** + * Assumes the User is familiar with the persistence structure of the Repository. + * The given query is run uninterpreted against the underlying Graph Store. + * The results are returned as a List of Rows. each row is a Map of Key,Value pairs. + * + * @param gremlinQuery query in gremlin dsl format + * @return List of Maps + * @throws org.apache.atlas.discovery.DiscoveryException + */ + @Override + @GraphTransaction + public List> searchByGremlin(String gremlinQuery) + throws DiscoveryException { + LOG.info("Executing gremlin query={}", gremlinQuery); + ScriptEngineManager manager = new ScriptEngineManager(); + ScriptEngine engine = manager.getEngineByName("gremlin-groovy"); + Bindings bindings = engine.createBindings(); + bindings.put("g", titanGraph); + + try { + Object o = engine.eval(gremlinQuery, bindings); + return extractResult(o); + } catch (ScriptException se) { + throw new DiscoveryException(se); + } + } + + private List> extractResult(Object o) throws DiscoveryException { + if (!(o instanceof List)) { + throw new DiscoveryException(String.format("Cannot process result %s", o.toString())); + } + + List l = (List) o; + List> result = new ArrayList<>(); + for (Object r : l) { + + Map oRow = new HashMap<>(); + if (r instanceof Map) { + @SuppressWarnings("unchecked") + Map iRow = (Map) r; + for (Map.Entry e : iRow.entrySet()) { + Object k = e.getKey(); + Object v = e.getValue(); + oRow.put(k.toString(), v.toString()); + } + } else if (r instanceof TitanVertex) { + Iterable ps = ((TitanVertex) r).getProperties(); + for (TitanProperty tP : ps) { + String pName = tP.getPropertyKey().getName(); + Object pValue = ((TitanVertex) r).getProperty(pName); + if (pValue != null) { + oRow.put(pName, pValue.toString()); + } + } + + } else if (r instanceof String) { + oRow.put("", r.toString()); + } else { + throw new DiscoveryException(String.format("Cannot process result %s", o.toString())); + } + + result.add(oRow); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/listener/EntityChangeListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/listener/EntityChangeListener.java b/repository/src/main/java/org/apache/atlas/listener/EntityChangeListener.java new file mode 100755 index 0000000..1def3f8 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/listener/EntityChangeListener.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.listener; + +import org.apache.atlas.MetadataException; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; + +/** + * Entity (a Typed instance) change notification listener. + */ +public interface EntityChangeListener { + + /** + * This is upon adding a new typed instance to the repository. + * + * @param typedInstance a typed instance + * @throws org.apache.atlas.MetadataException + */ + void onEntityAdded(ITypedReferenceableInstance typedInstance) throws MetadataException; + + /** + * This is upon adding a new trait to a typed instance. + * + * @param guid globally unique identifier for the entity + * @param traitName trait name for the instance that needs to be added to entity + * @throws org.apache.atlas.MetadataException + */ + void onTraitAdded(String guid, String traitName) throws MetadataException; + + /** + * This is upon deleting a trait from a typed instance. + * + * @param guid globally unique identifier for the entity + * @param traitName trait name for the instance that needs to be deleted from entity + * @throws org.apache.atlas.MetadataException + */ + void onTraitDeleted(String guid, String traitName) throws MetadataException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/listener/TypesChangeListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/listener/TypesChangeListener.java b/repository/src/main/java/org/apache/atlas/listener/TypesChangeListener.java new file mode 100755 index 0000000..9cff874 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/listener/TypesChangeListener.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.listener; + +import org.apache.atlas.MetadataException; +import org.apache.atlas.typesystem.types.IDataType; + +/** + * Types change notification listener. + */ +public interface TypesChangeListener { + + /** + * This is upon adding a new type to Store. + * + * @param typeName type name + * @param dataType data type + * @throws MetadataException + */ + void onAdd(String typeName, IDataType dataType) throws MetadataException; + + /** + * This is upon removing an existing type from the Store. + * + * @param typeName type name + * @throws MetadataException + */ + // void onRemove(String typeName) throws MetadataException; + + // This is upon updating an existing type to the store + // void onChange() throws MetadataException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/Constants.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/Constants.java b/repository/src/main/java/org/apache/atlas/repository/Constants.java new file mode 100755 index 0000000..9e3ec2d --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/Constants.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository; + +public final class Constants { + + /** + * Globally Unique identifier property key. + */ + + public static final String INTERNAL_PROPERTY_KEY_PREFIX = "__"; + public static final String GUID_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "guid"; + public static final String GUID_INDEX = "guid_index"; + + /** + * Entity type name property key. + */ + public static final String ENTITY_TYPE_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "typeName"; + public static final String ENTITY_TYPE_INDEX = "type_index"; + + /** + * Entity type's super types property key. + */ + public static final String SUPER_TYPES_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "superTypeNames"; + public static final String SUPER_TYPES_INDEX = "super_types_index"; + + /** + * Full-text for the entity for enabling full-text search. + */ + //weird issue in TitanDB if __ added to this property key. Not adding it for now + public static final String ENTITY_TEXT_PROPERTY_KEY = "entityText"; + + /** + * Properties for type store graph + */ + public static final String TYPE_CATEGORY_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "type.category"; + public static final String VERTEX_TYPE_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "type"; + public static final String TYPENAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "type.name"; + + /** + * Trait names property key and index name. + */ + public static final String TRAIT_NAMES_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "traitNames"; + public static final String TRAIT_NAMES_INDEX = "trait_names_index"; + + public static final String VERSION_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "version"; + public static final String TIMESTAMP_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "timestamp"; + + /** + * search backing index name. + */ + public static final String BACKING_INDEX = "search"; + + /** + * search backing index name for vertex keys. + */ + public static final String VERTEX_INDEX = "vertex_index"; + + /** + * search backing index name for edge labels. + */ + public static final String EDGE_INDEX = "edge_index"; + + public static final String FULLTEXT_INDEX = "fulltext_index"; + + private Constants() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/DiscoverInstances.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/DiscoverInstances.java b/repository/src/main/java/org/apache/atlas/repository/DiscoverInstances.java new file mode 100755 index 0000000..f359e81 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/DiscoverInstances.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository; + +import org.apache.atlas.MetadataException; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.ObjectGraphWalker; + +import java.util.HashMap; +import java.util.Map; + +/** + * Graph walker implementation for discovering instances. + */ +public class DiscoverInstances implements ObjectGraphWalker.NodeProcessor { + + public final Map idToNewIdMap; + public final Map idToInstanceMap; + final IRepository repository; + + public DiscoverInstances(IRepository repository) { + this.repository = repository; + idToNewIdMap = new HashMap<>(); + idToInstanceMap = new HashMap<>(); + } + + @Override + public void processNode(ObjectGraphWalker.Node nd) throws MetadataException { + + IReferenceableInstance ref = null; + Id id = null; + + if (nd.attributeName == null) { + ref = (IReferenceableInstance) nd.instance; + id = ref.getId(); + } else if (nd.aInfo.dataType().getTypeCategory() == DataTypes.TypeCategory.CLASS) { + if (nd.value != null && (nd.value instanceof Id)) { + id = (Id) nd.value; + } + } + + if (id != null) { + if (id.isUnassigned()) { + if (!idToNewIdMap.containsKey(id)) { + idToNewIdMap.put(id, repository.newId(id.className)); + } + if (ref != null && idToInstanceMap.containsKey(ref)) { + // Oops + throw new RepositoryException( + String.format("Unexpected internal error: Id %s processed again", id)); + } + if (ref != null) { + idToInstanceMap.put(id, ref); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/EntityNotFoundException.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/EntityNotFoundException.java b/repository/src/main/java/org/apache/atlas/repository/EntityNotFoundException.java new file mode 100644 index 0000000..7c85d1e --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/EntityNotFoundException.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository; + +/** + * A simple wrapper for 404. + */ +public class EntityNotFoundException extends RepositoryException { + public EntityNotFoundException() { + } + + public EntityNotFoundException(String message) { + super(message); + } + + public EntityNotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public EntityNotFoundException(Throwable cause) { + super(cause); + } + + public EntityNotFoundException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/IRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/IRepository.java b/repository/src/main/java/org/apache/atlas/repository/IRepository.java new file mode 100755 index 0000000..20e37ee --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/IRepository.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository; + +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.HierarchicalType; +import org.apache.atlas.typesystem.types.TraitType; + +import java.util.List; + +/** + * Metadata Repository interface. + */ +public interface IRepository { + + ITypedReferenceableInstance create(IReferenceableInstance i) throws RepositoryException; + + ITypedReferenceableInstance update(ITypedReferenceableInstance i) throws RepositoryException; + + void delete(ITypedReferenceableInstance i) throws RepositoryException; + + Id newId(String typeName); + + ITypedReferenceableInstance get(Id id) throws RepositoryException; + + void defineClass(ClassType type) throws RepositoryException; + + void defineTrait(TraitType type) throws RepositoryException; + + void defineTypes(List types) throws RepositoryException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java new file mode 100755 index 0000000..0f2d1f7 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository; + +import org.apache.atlas.MetadataException; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.ITypedStruct; +import org.apache.atlas.typesystem.types.AttributeInfo; +import org.apache.atlas.typesystem.types.IDataType; + +import java.util.List; + +/** + * An interface for persisting metadata into a blueprints enabled graph db. + */ +public interface MetadataRepository { + + /** + * Returns the property key used to store entity type name. + * + * @return property key used to store entity type name. + */ + String getTypeAttributeName(); + + /** + * Returns the property key used to store super type names. + * + * @return property key used to store super type names. + */ + String getSuperTypeAttributeName(); + + /** + * Return the property key used to store a given traitName in the repository. + * + * @param dataType data type + * @param traitName trait name + * @return property key used to store a given traitName + */ + String getTraitLabel(IDataType dataType, String traitName); + + /** + * Return the property key used to store a given attribute in the repository. + * + * @param dataType data type + * @param aInfo attribute info + * @return property key used to store a given attribute + */ + String getFieldNameInVertex(IDataType dataType, AttributeInfo aInfo) throws MetadataException; + + /** + * Return the edge label for a given attribute in the repository. + * + * @param dataType data type + * @param aInfo attribute info + * @return edge label for a given attribute + */ + String getEdgeLabel(IDataType dataType, AttributeInfo aInfo); + + /** + * Creates an entity definition (instance) corresponding to a given type. + * + * @param entity entity (typed instance) + * @return a globally unique identifier + * @throws RepositoryException + */ + String createEntity(IReferenceableInstance entity) throws RepositoryException; + + /** + * Fetch the complete definition of an entity given its GUID. + * + * @param guid globally unique identifier for the entity + * @return entity (typed instance) definition + * @throws RepositoryException + */ + ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException; + + /** + * Gets the list of entities for a given entity type. + * + * @param entityType name of a type which is unique + * @return a list of entity names for the given type + * @throws RepositoryException + */ + List getEntityList(String entityType) throws RepositoryException; + + /** + * Deletes an entity definition (instance) corresponding to a given type. + * + * @param guid globally unique identifier for the entity + * @return true if deleted else false + * @throws RepositoryException + */ + // boolean deleteEntity(String guid) throws RepositoryException; + + /** + * Updates an entity given its GUID with the attribute name and value. + * + * @param guid globally unique identifier for the entity + * @param attributeName name of the attribute + * @param attributeValue value of the attribute + * @return an entity instance with updated state + * @throws RepositoryException + */ + //ITypedReferenceableInstance updateEntity(String guid, String attributeName, + // String attributeValue) throws RepositoryException; + + + // Trait management functions + /** + * Gets the list of trait names for a given entity represented by a guid. + * + * @param guid globally unique identifier for the entity + * @return a list of trait names for the given entity guid + * @throws RepositoryException + */ + List getTraitNames(String guid) throws MetadataException; + + /** + * Adds a new trait to an existing entity represented by a guid. + * + * @param guid globally unique identifier for the entity + * @param traitInstance trait instance that needs to be added to entity + * @throws RepositoryException + */ + void addTrait(String guid, + ITypedStruct traitInstance) throws RepositoryException; + + /** + * Deletes a given trait from an existing entity represented by a guid. + * + * @param guid globally unique identifier for the entity + * @param traitNameToBeDeleted name of the trait + * @throws RepositoryException + */ + void deleteTrait(String guid, + String traitNameToBeDeleted) throws RepositoryException; + + /** + * Adds the property to the entity that corresponds to the GUID + * @param guid entity id + * @param property property name + * @param value property value + */ + void updateEntity(String guid, String property, String value) throws RepositoryException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/RepositoryException.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/RepositoryException.java b/repository/src/main/java/org/apache/atlas/repository/RepositoryException.java new file mode 100755 index 0000000..8265a6f --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/RepositoryException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository; + +import org.apache.atlas.MetadataException; + +/** + * Base Exception class for Repository API. + */ +public class RepositoryException extends MetadataException { + + public RepositoryException() { + } + + public RepositoryException(String message) { + super(message); + } + + public RepositoryException(String message, Throwable cause) { + super(message, cause); + } + + public RepositoryException(Throwable cause) { + super(cause); + } + + public RepositoryException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +}