phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject phoenix git commit: PHOENIX-2098 Pig Udf that given a count Reserve chunks of numbers for a sequence (Siddhi Mehta)
Date Fri, 10 Jul 2015 23:37:37 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 e461de558 -> bf6e12ba6


PHOENIX-2098 Pig Udf that given a count Reserve chunks of numbers for a sequence (Siddhi Mehta)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bf6e12ba
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bf6e12ba
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bf6e12ba

Branch: refs/heads/4.x-HBase-0.98
Commit: bf6e12ba6001e160d12699b262556f3322fbc217
Parents: e461de5
Author: James Taylor <jtaylor@salesforce.com>
Authored: Fri Jul 10 16:32:56 2015 -0700
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Fri Jul 10 16:36:03 2015 -0700

----------------------------------------------------------------------
 .../phoenix/pig/udf/ReserveNSequenceTestIT.java | 262 +++++++++++++++++++
 .../phoenix/pig/udf/ReserveNSequence.java       |  88 +++++++
 2 files changed, 350 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf6e12ba/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
new file mode 100644
index 0000000..2cbb6cc
--- /dev/null
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
@@ -0,0 +1,262 @@
+/**
+ * 
+ */
+package org.apache.phoenix.pig.udf;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.UDFContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test class to run all the Pig Sequence UDF integration tests against a virtual map reduce
cluster.
+ */
+public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT {
+
+    private static final String CREATE_SEQUENCE_SYNTAX = "CREATE SEQUENCE %s START WITH %s
INCREMENT BY %s MINVALUE %s MAXVALUE %s CACHE %s";
+    private static final String SEQUENCE_NAME = "my_schema.my_sequence";
+    private static final long MAX_VALUE = 10;
+
+    private static TupleFactory TF;
+    private static Connection conn;
+    private static String zkQuorum;
+    private static Configuration conf;
+    private static UDFContext udfContext;
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        conf = getTestClusterConfig();
+        zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + getZKClientPort(getTestClusterConfig());
+        conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
+        // Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl());
+        // Pig variables
+        TF = TupleFactory.getInstance();
+    }
+
+    @Before
+    public void setUp() throws SQLException {
+        createSequence();
+        createUdfContext();
+    }
+
+    @Test
+    public void testReserve() throws Exception {
+        doTest(new UDFTestProperties(1));
+    }
+
+    @Test
+    public void testReserveN() throws Exception {
+        doTest(new UDFTestProperties(5));
+    }
+
+    @Test
+    public void testReserveNwithPreviousAllocations() throws Exception {
+        UDFTestProperties props = new UDFTestProperties(5);
+        props.setCurrentValue(4);
+        doTest(props);
+    }
+
+    @Test
+    public void testReserveWithZero() throws Exception {
+        UDFTestProperties props = new UDFTestProperties(0);
+        props.setExceptionExpected(true);
+        props.setExceptionClass(IllegalArgumentException.class);
+        props.setErrorMessage(ReserveNSequence.INVALID_NUMBER_MESSAGE);
+        doTest(props);
+    }
+
+    @Test
+    public void testReserveWithNegativeNumber() throws Exception {
+        UDFTestProperties props = new UDFTestProperties(-1);
+        props.setExceptionExpected(true);
+        props.setExceptionClass(IllegalArgumentException.class);
+        props.setErrorMessage(ReserveNSequence.INVALID_NUMBER_MESSAGE);
+        doTest(props);
+    }
+
+    @Test
+    public void testReserveMaxLimit() throws Exception {
+        UDFTestProperties props = new UDFTestProperties(MAX_VALUE);
+        props.setExceptionExpected(true);
+        props.setExceptionClass(IOException.class);
+        props.setErrorMessage("Reached MAXVALUE of sequence");
+        doTest(props);
+    }
+
+    @Test
+    public void testNoSequenceName() throws Exception {
+        UDFTestProperties props = new UDFTestProperties(1);
+        props.setExceptionExpected(true);
+        props.setSequenceName(null);
+        props.setExceptionClass(NullPointerException.class);
+        props.setErrorMessage(ReserveNSequence.EMPTY_SEQUENCE_NAME_MESSAGE);
+        doTest(props);
+    }
+
+    @Test
+    public void testSequenceNotExisting() throws Exception {
+        UDFTestProperties props = new UDFTestProperties(1);
+        props.setExceptionExpected(true);
+        props.setSequenceName("foo.bar");
+        props.setExceptionClass(IOException.class);
+        props.setErrorMessage("Sequence undefined");
+        doTest(props);
+    }
+
+    private void doTest(UDFTestProperties props) throws Exception {
+        setCurrentValue(props.getCurrentValue());
+        Tuple tuple = TF.newTuple(3);
+        tuple.set(0, props.getNumToReserve());
+        tuple.set(1, props.getSequenceName());
+        tuple.set(2, zkQuorum);
+        Long result = null;
+        try {
+            ReserveNSequence udf = new ReserveNSequence();
+            result = udf.exec(tuple);
+            validateReservedSequence(props.getCurrentValue(), props.getNumToReserve(), result);
+        } catch (Exception e) {
+            if (props.isExceptionExpected()) {
+                assertEquals(props.getExceptionClass(), e.getClass());
+                e.getMessage().contains(props.getErrorMessage());
+            } else {
+                throw e;
+            }
+        }
+
+    }
+
+    private void createUdfContext() {
+        conf.set(ReserveNSequence.SEQUENCE_NAME_CONF_KEY, SEQUENCE_NAME);
+        udfContext = UDFContext.getUDFContext();
+        udfContext.addJobConf(conf);
+    }
+
+    private void validateReservedSequence(Long currentValue, long count, Long result) throws
SQLException {
+        Long startIndex = currentValue + 1;
+        assertEquals("Start index is incorrect", startIndex, result);
+        final long newNextSequenceValue = getNextSequenceValue();
+        assertEquals(startIndex + count, newNextSequenceValue);
+    }
+
+    private void createSequence() throws SQLException {
+        conn.createStatement().execute(String.format(CREATE_SEQUENCE_SYNTAX, SEQUENCE_NAME,
1, 1, 1, MAX_VALUE, 1));
+        conn.commit();
+    }
+
+    private void setCurrentValue(long currentValue) throws SQLException {
+        for (int i = 1; i <= currentValue; i++) {
+            getNextSequenceValue();
+        }
+    }
+
+    private long getNextSequenceValue() throws SQLException {
+        String ddl = new StringBuilder().append("SELECT NEXT VALUE FOR ").append(SEQUENCE_NAME).toString();
+        ResultSet rs = conn.createStatement().executeQuery(ddl);
+        assertTrue(rs.next());
+        conn.commit();
+        return rs.getLong(1);
+    }
+
+    private void dropSequence() throws Exception {
+        String ddl = new StringBuilder().append("DROP SEQUENCE ").append(SEQUENCE_NAME).toString();
+        conn.createStatement().execute(ddl);
+        conn.commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        udfContext.reset();
+        dropSequence();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        conn.close();
+    }
+
+    /**
+     * Static class to define properties for the test
+     */
+    private static class UDFTestProperties {
+        private final Long numToReserve;
+        private Long currentValue = 1L;
+        private String sequenceName = SEQUENCE_NAME;
+        private boolean exceptionExpected = false;
+        private Class exceptionClass = null;
+        private String errorMessage = null;
+
+        public UDFTestProperties(long numToReserve) {
+            this.numToReserve = numToReserve;
+        }
+
+        public Long getCurrentValue() {
+            return currentValue;
+        }
+
+        public void setCurrentValue(long currentValue) {
+            this.currentValue = currentValue;
+        }
+
+        public String getSequenceName() {
+            return sequenceName;
+        }
+
+        public void setSequenceName(String sequenceName) {
+            this.sequenceName = sequenceName;
+        }
+
+        public boolean isExceptionExpected() {
+            return exceptionExpected;
+        }
+
+        public void setExceptionExpected(boolean shouldThrowException) {
+            this.exceptionExpected = shouldThrowException;
+        }
+
+        public String getErrorMessage() {
+            return errorMessage;
+        }
+
+        public void setErrorMessage(String errorMessage) {
+            this.errorMessage = errorMessage;
+        }
+
+        public Long getNumToReserve() {
+            return numToReserve;
+        }
+
+        public Class getExceptionClass() {
+            return exceptionClass;
+        }
+
+        public void setExceptionClass(Class exceptionClass) {
+            this.exceptionClass = exceptionClass;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf6e12ba/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java
new file mode 100644
index 0000000..308f170
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java
@@ -0,0 +1,88 @@
+/**
+ * 
+ */
+package org.apache.phoenix.pig.udf;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed
on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
for the specific language
+ * governing permissions and limitations under the License.
+ */
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * UDF to Reserve a chunk of numbers for a given sequence
+ * 
+ * @note The way this UDF is invoked we open a new connection for every tuple row. The UDF
will not perform well on
+ *       large datasets as it involves creating a new connection for every tuple row
+ */
+public class ReserveNSequence extends EvalFunc<Long> {
+
+    public static final String INVALID_TUPLE_MESSAGE = "Tuple should have correct fields(NumtoReserve,SequenceName,zkquorum.";
+    public static final String EMPTY_SEQUENCE_NAME_MESSAGE = "Sequence name should be not
null";
+    public static final String EMPTY_ZK_MESSAGE = "ZKQuorum should be not null";
+    public static final String INVALID_NUMBER_MESSAGE = "Number of Sequences to Reserve should
be greater than 0";
+    public static final String SEQUENCE_NAME_CONF_KEY = "phoenix.sequence.name";
+
+    /**
+     * Reserve N next sequences for a sequence name. N is the first field in the tuple. Sequence
name is the second
+     * field in the tuple zkquorum is the third field in the tuple
+     */
+    @Override
+    public Long exec(Tuple input) throws IOException {
+        Preconditions.checkArgument(input != null && input.size() == 3, INVALID_TUPLE_MESSAGE);
+        Long numToReserve = (Long)(input.get(0));
+        Preconditions.checkArgument(numToReserve > 0, INVALID_NUMBER_MESSAGE);
+        String sequenceName = (String)input.get(1);
+        Preconditions.checkNotNull(sequenceName, EMPTY_SEQUENCE_NAME_MESSAGE);
+        String zkquorum = (String)input.get(2);
+        Preconditions.checkNotNull(zkquorum, EMPTY_ZK_MESSAGE);
+        UDFContext context = UDFContext.getUDFContext();
+        Configuration configuration = context.getJobConf();
+        configuration.set(HConstants.ZOOKEEPER_QUORUM, zkquorum);
+        Connection connection = null;
+        ResultSet rs = null;
+        try {
+            connection = ConnectionUtil.getOutputConnection(configuration);
+            String sql = getNextNSequenceSelectStatement(Long.valueOf(numToReserve), sequenceName);
+            rs = connection.createStatement().executeQuery(sql);
+            Preconditions.checkArgument(rs.next());
+            Long startIndex = rs.getLong(1);
+            rs.close();
+            connection.commit();
+            return startIndex;
+        } catch (SQLException e) {
+            throw new IOException("Caught exception while processing row." + e.getMessage(),
e);
+        } finally {
+            try {
+                if (connection != null) {
+                    connection.close();
+                }
+            } catch (SQLException e) {
+                throw new IOException("Caught exception while closing connection", e);
+            }
+        }
+    }
+
+    private String getNextNSequenceSelectStatement(Long numToReserve, String sequenceName)
{
+        return new StringBuilder().append("SELECT NEXT " + numToReserve + " VALUES" + " FOR
").append(sequenceName)
+                .toString();
+    }
+
+}


Mime
View raw message