bahir-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetzger <...@git.apache.org>
Subject [GitHub] bahir-flink pull request #17: [BAHIR-99] Kudu connector to read/write from/t...
Date Sun, 23 Jul 2017 15:10:23 GMT
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/17#discussion_r128921944
  
    --- Diff: flink-connector-kudu/src/main/java/es/accenture/flink/Utils/Utils.java ---
    @@ -0,0 +1,447 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package es.accenture.flink.Utils;
    +
    +import es.accenture.flink.Sink.KuduOutputFormat;
    +import es.accenture.flink.Utils.Exceptions.KuduClientException;
    +import es.accenture.flink.Utils.Exceptions.KuduTableException;
    +import org.apache.kudu.ColumnSchema;
    +import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
    +import org.apache.kudu.Schema;
    +import org.apache.kudu.Type;
    +import org.apache.kudu.client.*;
    +import org.apache.log4j.Logger;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public class Utils {
    +
    +    //Kudu variables
    +    private KuduClient client;
    +    private KuduSession session;
    +
    +    // LOG4J
    +    private final static Logger logger = Logger.getLogger(Utils.class);
    +
    +    /**
    +     * Builder Util Class which creates a Kudu client and log in to be able to perform
operations later
    +     * @param host Kudu's host
    +     * @throws KuduClientException In case of exception caused by Kudu Client
    +     */
    +    public Utils(String host) throws KuduClientException {
    +        this.client = new KuduClient.KuduClientBuilder(host).build();
    +        if (client == null){
    +            throw new KuduClientException("ERROR: param \"host\" not valid, can't establish
connection");
    +        }
    +        this.session = this.client.newSession();
    +    }
    +
    +    /**
    +     * Return an instance of the table indicated in the settings
    +     *
    +     * In case that the table exists, return an instance of the table
    +     * In case that the table doesn't exist, create a new table with the data provided
and return an instance
    +     * In both cases,takes into account the way of the table to perfom some operations
or others
    +     *
    +     *     If the mode is CREATE:
    +     *
    +     *         If the table exists: return error (Can not create table that already exists)
    +     *         If the table doesn't exist and  the list of column names has not been
provided: return error
    +     *         If the table doesn't exist and  the list of column names has been provided:
create a new table with data provided and return an instance
    +     *
    +     *    If the mode is APPEND:
    +     *
    +     *        If the table exists: return the instance in the table
    +     *        If the table doesn't exist: return error
    +     *
    +     *    If the mode is OVERRIDE:
    +     *
    +     *        If the table exist: delete all rows of this table and return an instance
of it
    +     *        If the table doesn't exist: return error
    +     *
    +     *
    +     * @param tableName             Table name to use
    +     * @param tableMode             Operations mode for operate with the table (CREATE,
APPEND, OVERRIDE)
    +     * @return                      Instance of the table indicated
    +     * @throws KuduTableException   In case of can't access to a table o can't create
it (wrong params or not existing table)
    +     * @throws KuduException        In case of error of Kudu
    +     */
    +    public KuduTable useTable(String tableName, Integer tableMode) throws KuduTableException,
KuduException {
    +        KuduTable table;
    +
    +        if (tableMode == KuduOutputFormat.CREATE) {
    +            logger.error("Bad call method, use useTable(String tableName, String [] fieldsNames,
RowSerializable row) instead");
    +            table = null;
    +        }else if (tableMode == KuduOutputFormat.APPEND) {
    +            logger.info("Modo APPEND");
    +            try {
    +                if (client.tableExists(tableName)) {
    +                    //logger.info("SUCCESS: There is the table with the name \"" + tableName
+ "\"");
    +                    table = client.openTable(tableName);
    +                } else {
    +                    logger.error("ERROR: The table doesn't exist");
    +                    throw new KuduTableException("ERROR: The table doesn't exist, so
can't do APPEND operation");
    +                }
    +            } catch (Exception e) {
    +                throw new KuduTableException("ERROR: param \"host\" not valid, can't
establish connection");
    +            }
    +        }else if (tableMode == KuduOutputFormat.OVERRIDE) {
    +            logger.info("Modo OVERRIDE");
    +            try {
    +                if (client.tableExists(tableName)) {
    +                    logger.info("SUCCESS: There is the table with the name \"" + tableName
+ "\". Emptying the table");
    +                    clearTable(tableName);
    +                    table = client.openTable(tableName);
    +                } else {
    +                    logger.error("ERROR: The table doesn't exist");
    +                    throw new KuduTableException("ERROR: The table doesn't exist, so
can't do OVERRIDE operation");
    +                }
    +            } catch (Exception e) {
    +                throw new KuduTableException("ERROR: param \"host\" not valid, can't
establish connection");
    +            }
    +        }else {
    +            throw new KuduTableException("ERROR: Incorrect parameters, please check the
constructor method. Incorrect \"tableMode\" parameter.");
    +        }
    +        return table;
    +    }
    +
    +    /**
    +     * Returns an instance of the table requested in parameters
    +     * If the table exists, returns an instance of the table
    +     * If the table doesn't exist, creates a new table with the data provided and returns
an instance
    +     *
    +     * @param tableName     Table name to use
    +     * @param fieldsNames   List of names of columns of the table (to create table)
    +     * @param row           List of values to insert a row in the table (to know the
types of columns)
    +     * @return              Instance of the table indicated
    +     * @throws IllegalArgumentException In case of wrong parameters
    +     * @throws KuduException    In case of exception caused by Kudu
    +     */
    +    public KuduTable useTable(String tableName, String [] fieldsNames, RowSerializable
row) throws IllegalArgumentException, KuduException {
    +        KuduTable table;
    +
    +        if (client.tableExists(tableName)){
    +            logger.info("The table exists");
    +            table = client.openTable(tableName);
    +        } else {
    +            if (tableName == null || tableName.equals("")) {
    +                throw new IllegalArgumentException("ERROR: Incorrect parameters, please
check the constructor method. Incorrect \"tableName\" parameter.");
    +
    +            } else if (fieldsNames == null || fieldsNames[0].isEmpty()) {
    +                throw new IllegalArgumentException("ERROR: Incorrect parameters, please
check the constructor method. Missing \"fields\" parameter.");
    +
    +            } else if (row == null){
    +                throw new IllegalArgumentException("ERROR: Incorrect parameters, please
check the constructor method. Incorrect \"row\" parameter.");
    +
    +            } else {
    +                logger.info("The table doesn't exist");
    +                table = createTable(tableName, fieldsNames, row);
    +            }
    +        }
    +        return table;
    +    }
    +    /**
    +     * Create a new Kudu table and return the instance of this table
    +     *
    +     * @param tableName     name of the table to create
    +     * @param fieldsNames   list name columns of the table
    +     * @param row           list of values to insert a row in the table( to know the
types of columns)
    +     * @return              instance of the table indicated
    +     * @throws KuduException In case of exception caused by Kudu
    +     */
    +    public KuduTable createTable (String tableName, String [] fieldsNames, RowSerializable
row) throws KuduException {
    +
    +        if(client.tableExists(tableName))
    +            return client.openTable(tableName);
    +
    +
    +        List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
    +        List<String> rangeKeys = new ArrayList<String>(); // Primary key
    +        rangeKeys.add(fieldsNames[0]);
    +
    +        logger.info("Creating the table \"" + tableName + "\"...");
    +        for (int i = 0; i < fieldsNames.length; i++){
    +            ColumnSchema col;
    +            String colName = fieldsNames[i];
    +            Type colType = getRowsPositionType(i, row);
    +
    +            if (colName.equals(fieldsNames[0])) {
    +                col = new ColumnSchemaBuilder(colName, colType).key(true).build();
    +                columns.add(0, col);//To create the table, the key must be the first
in the column list otherwise it will give a failure
    +            } else {
    +                col = new ColumnSchemaBuilder(colName, colType).build();
    +                columns.add(col);
    +            }
    +        }
    +        Schema schema = new Schema(columns);
    +
    +        if(!client.tableExists(tableName))
    +            client.createTable(tableName, schema, new CreateTableOptions().setRangePartitionColumns(rangeKeys).addHashPartitions(rangeKeys,
4));
    +        //logger.info("SUCCESS: The table has been created successfully");
    +
    +
    +        return client.openTable(tableName);
    +    }
    +    /**
    +     * Delete the indicated table
    +     *
    +     * @param tableName name table to delete
    +     */
    +    public void deleteTable (String tableName){
    +
    +        logger.info("Deleting the table \"" + tableName + "\"...");
    +        try {
    +            if(client.tableExists(tableName)) {
    +                client.deleteTable(tableName);
    +                logger.info("SUCCESS: Table deleted successfully");
    +            }
    +        } catch (KuduException e) {
    +            logger.error("The table \"" + tableName  +"\" doesn't exist, so can't be
deleted.", e);
    +        }
    +    }
    +
    +    /**
    +     * Return the type of the value of the position "pos", like the class object "Type"
    +     *
    +     * @param pos   Row position
    +     * @param row   list of values to insert a row in the table
    +     * @return      element type "pos"-esimo of "row"
    +     */
    +    public Type getRowsPositionType (int pos, RowSerializable row){
    +        Type colType = null;
    +        switch(row.productElement(pos).getClass().getName()){
    +            case "java.lang.String":
    +                colType = Type.STRING;
    +                break;
    +            case "java.lang.Integer":
    +                colType = Type.INT32;
    +                break;
    +            case "java.lang.Boolean":
    +                colType = Type.BOOL;
    +                break;
    +            default:
    +                break;
    +        }
    +        return colType;
    +    }
    +
    +    /**
    +     * Return a list with all rows of the indicated table
    +     *
    +     * @param tableName Table name to read
    +     * @return          List of rows in the table(object Row)
    +     * @throws KuduException In case of exception caused by Kudu
    +     */
    +    public List<RowSerializable> readTable (String tableName) throws KuduException
{
    +
    +        KuduTable table = client.openTable(tableName);
    +        KuduScanner scanner = client.newScannerBuilder(table).build();
    +        //Obtain the column name list
    +        String[] columnsNames = getNamesOfColumns(table);
    +        //The list return all rows
    +        List<RowSerializable> rowsList = new ArrayList<>();
    +
    +        int posRow = 0;
    +        while (scanner.hasMoreRows()) {
    +            for (RowResult row : scanner.nextRows()) { //Get the rows
    +                RowSerializable rowToInsert = new RowSerializable(columnsNames.length);
    +                for (String col : columnsNames) { //For each column, it's type determined
and this is how to read it
    +
    +                    String colType = row.getColumnType(col).getName();
    +                    switch (colType) {
    +                        case "string":
    +                            rowToInsert.setField(posRow, row.getString(col));
    +                            posRow++;
    +                            break;
    +                        case "int32":
    +                            rowToInsert.setField(posRow, row.getInt(col));
    +                            posRow++;
    +                            break;
    +                        case "bool":
    +                            rowToInsert.setField(posRow, row.getBoolean(col));
    +                            posRow++;
    +                            break;
    +                        default:
    +                            break;
    +                    }
    +                }
    +                rowsList.add(rowToInsert);
    +                posRow = 0;
    +            }
    +        }
    +        return rowsList;
    +    }
    +
    +
    +
    +    /**
    +     * Return a list with all rows of the indicated table
    +     *
    +     * @param tableName Table name to read
    +     * @throws KuduException In case of exception caused by Kudu
    +     */
    +    public void readTablePrint (String tableName) throws KuduException {
    +        KuduTable table = client.openTable(tableName);
    +        KuduScanner scanner = client.newScannerBuilder(table).build();
    +        int cont = 0;
    +        try {
    +            while (scanner.hasMoreRows()) {
    +                RowResultIterator results = scanner.nextRows();
    +                while (results.hasNext()) {
    +                    RowResult result = results.next();
    +                    System.out.println(result.rowToString());
    +                    cont++;
    +                }
    +            }
    +            System.out.println("Number of rows: " + cont);
    +        } catch (Exception e) {
    --- End diff --
    
    The exception handling needs improvement


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message