gora-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (GORA-535) Add a data store for Apache Ignite
Date Sun, 15 Jul 2018 07:19:01 GMT

    [ https://issues.apache.org/jira/browse/GORA-535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544446#comment-16544446
] 

ASF GitHub Bot commented on GORA-535:
-------------------------------------

Github user nishadi commented on a diff in the pull request:

    https://github.com/apache/gora/pull/134#discussion_r202522015
  
    --- Diff: gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java ---
    @@ -0,0 +1,578 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.gora.ignite.store;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.ConcurrentHashMap;
    +import javax.sql.rowset.CachedRowSet;
    +import javax.sql.rowset.RowSetFactory;
    +import javax.sql.rowset.RowSetProvider;
    +import org.apache.avro.Schema;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificDatumWriter;
    +import org.apache.avro.util.Utf8;
    +import org.apache.gora.ignite.query.IgniteQuery;
    +import org.apache.gora.ignite.query.IgniteResult;
    +import org.apache.gora.ignite.utils.IgniteSQLBuilder;
    +import org.apache.gora.persistency.Persistent;
    +import org.apache.gora.persistency.impl.PersistentBase;
    +import org.apache.gora.query.PartitionQuery;
    +import org.apache.gora.query.Query;
    +import org.apache.gora.query.Result;
    +import org.apache.gora.query.impl.PartitionQueryImpl;
    +import org.apache.gora.store.impl.DataStoreBase;
    +import org.apache.gora.util.AvroUtils;
    +import org.apache.gora.util.GoraException;
    +import org.apache.gora.util.IOUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implementation of a Ignite data store to be used by gora.
    + *
    + * @param <K> class to be used for the key
    + * @param <T> class to be persisted within the store
    + */
    +public class IgniteStore<K, T extends PersistentBase> extends DataStoreBase<K,
T> {
    +
    +  public static final Logger LOG = LoggerFactory.getLogger(IgniteStore.class);
    +  private static final String PARSE_MAPPING_FILE_KEY = "gora.ignite.mapping.file";
    +  private static final String DEFAULT_MAPPING_FILE = "gora-ignite-mapping.xml";
    +  private IgniteParameters igniteParameters;
    +  private IgniteMapping igniteMapping;
    +  private Connection connection;
    +
    +  /*
    +   * Create a threadlocal map for the datum readers and writers, because they
    +   * are not thread safe, at least not before Avro 1.4.0 (See AVRO-650). When
    +   * they are thread safe, it is possible to maintain a single reader and writer
    +   * pair for every schema, instead of one for every thread.
    +   */
    +  public static final ConcurrentHashMap<Schema, SpecificDatumReader<?>> readerMap
= new ConcurrentHashMap<>();
    +
    +  public static final ConcurrentHashMap<Schema, SpecificDatumWriter<?>> writerMap
= new ConcurrentHashMap<>();
    +
    +  @Override
    +  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties
properties) throws GoraException {
    +
    +    try {
    +      super.initialize(keyClass, persistentClass, properties);
    +      IgniteMappingBuilder builder = new IgniteMappingBuilder(this);
    +      builder.readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
    +      igniteMapping = builder.getIgniteMapping();
    +      igniteParameters = IgniteParameters.load(properties, conf);
    +      connection = acquiereConnection();
    +      LOG.info("Ignite store was successfully initialized");
    +    } catch (ClassNotFoundException | SQLException ex) {
    +      LOG.error("Error while initializing Ignite store", ex);
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  private Connection acquiereConnection() throws ClassNotFoundException, SQLException
{
    +    Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
    +    StringBuilder urlBuilder = new StringBuilder();
    +    urlBuilder.append("jdbc:ignite:thin://");
    +    urlBuilder.append(igniteParameters.getHost());
    +    if (igniteParameters.getPort() != null) {
    +      urlBuilder.append(":" + igniteParameters.getPort());
    +    }
    +    if (igniteParameters.getSchema() != null) {
    +      urlBuilder.append("/" + igniteParameters.getSchema());
    +    }
    +    if (igniteParameters.getUser() != null) {
    +      urlBuilder.append(";" + igniteParameters.getUser());
    +    }
    +    if (igniteParameters.getPassword() != null) {
    +      urlBuilder.append(";" + igniteParameters.getPassword());
    +    }
    +    if (igniteParameters.getAdditionalConfigurations() != null) {
    +      urlBuilder.append(igniteParameters.getAdditionalConfigurations());
    +    }
    +    Connection conn = DriverManager.getConnection(urlBuilder.toString());
    +    return conn;
    +  }
    +
    +  @Override
    +  public String getSchemaName() {
    +    return igniteMapping.getTableName();
    +  }
    +
    +  @Override
    +  public String getSchemaName(final String mappingSchemaName,
    +      final Class<?> persistentClass) {
    +    return super.getSchemaName(mappingSchemaName, persistentClass);
    +  }
    +
    +  @Override
    +  public void createSchema() throws GoraException {
    +    if (connection == null) {
    +      throw new GoraException(
    +          "Impossible to create the schema as no connection has been initiated.");
    +    }
    +    if (schemaExists()) {
    +      return;
    +    }
    +    try (Statement stmt = connection.createStatement()) {
    +      String createTableSQL = IgniteSQLBuilder.createTable(igniteMapping);
    +      stmt.executeUpdate(createTableSQL);
    +      LOG.info("Table {} has been created for Ignite instance.",
    +          igniteMapping.getTableName());
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public void deleteSchema() throws GoraException {
    +    if (connection == null) {
    +      throw new GoraException(
    +          "Impossible to delete the schema as no connection has been initiated.");
    +    }
    +    try (Statement stmt = connection.createStatement()) {
    +      String dropTableSQL = IgniteSQLBuilder.dropTable(igniteMapping.getTableName());
    +      stmt.executeUpdate(dropTableSQL);
    +      LOG.info("Table {} has been dropped from Ignite instance.",
    +          igniteMapping.getTableName());
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public boolean schemaExists() throws GoraException {
    +    boolean exists = false;
    +    try (Statement stmt = connection.createStatement()) {
    +      String tableExistsSQL = IgniteSQLBuilder.tableExists(igniteMapping.getTableName());
    +      ResultSet executeQuery = stmt.executeQuery(tableExistsSQL);
    +      executeQuery.close();
    +      exists = true;
    +    } catch (SQLException ex) {
    +      /**
    +       * a 42000 error code is thrown by Ignite when a non-existent table
    +       * queried. More details:
    +       * https://apacheignite-sql.readme.io/docs/jdbc-error-codes
    +       */
    +      if (ex.getSQLState() != null && ex.getSQLState().equals("42000")) {
    --- End diff --
    
    Shall we move the constants to a separate constant file?


> Add a data store for Apache Ignite 
> -----------------------------------
>
>                 Key: GORA-535
>                 URL: https://issues.apache.org/jira/browse/GORA-535
>             Project: Apache Gora
>          Issue Type: New Feature
>            Reporter: Nishadi Kirielle
>            Priority: Major
>              Labels: gsoc2018
>
> Currently, Gora has support for persisting objects to various database models such as
Apache Hbase, Apache Cassandra and much more. [1] This project aims to extend its capability
to provide support for Apache Ignite database.
> Apache Ignite is a distributed database, caching and processing platform.[2] 
> [1]. [http://gora.apache.org/] 
> [2] . [https://ignite.apache.org/]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message