phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ndimi...@apache.org
Subject [1/3] phoenix git commit: PHOENIX-2481 JSON bulkload tool
Date Mon, 07 Dec 2015 23:02:05 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master 8ce3b580f -> 578979a14


http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java
new file mode 100644
index 0000000..3455616
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PIntegerArray;
+import org.apache.phoenix.schema.types.PUnsignedInt;
+import org.apache.phoenix.util.ColumnInfo;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import static org.junit.Assert.assertEquals;
+
+public class FormatToKeyValueMapperTest {
+
+    @Test
+    public void testBuildColumnInfoList() {
+        List<ColumnInfo> columnInfoList = ImmutableList.of(
+                new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
+                new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()),
+                new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType()));
+
+        Configuration conf = new Configuration();
+        FormatToKeyValueMapper.configureColumnInfoList(conf, columnInfoList);
+        List<ColumnInfo> fromConfig = FormatToKeyValueMapper.buildColumnInfoList(conf);
+
+        assertEquals(columnInfoList, fromConfig);
+    }
+
+    @Test
+    public void testBuildColumnInfoList_ContainingNulls() {
+        // A null value in the column info list means "skip that column in the input"
+        List<ColumnInfo> columnInfoListWithNull = Lists.newArrayList(
+                new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
+                null,
+                new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()),
+                new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType()));
+
+        Configuration conf = new Configuration();
+        FormatToKeyValueMapper.configureColumnInfoList(conf, columnInfoListWithNull);
+        List<ColumnInfo> fromConfig = FormatToKeyValueMapper.buildColumnInfoList(conf);
+
+        assertEquals(columnInfoListWithNull, fromConfig);
+    }
+
+    @Test
+    public void testLoadPreUpdateProcessor() {
+        Configuration conf = new Configuration();
+        conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class,
+                ImportPreUpsertKeyValueProcessor.class);
+
+        ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+        assertEquals(MockUpsertProcessor.class, processor.getClass());
+    }
+
+    @Test
+    public void testLoadPreUpdateProcessor_NotConfigured() {
+
+        Configuration conf = new Configuration();
+        ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+
+        assertEquals(FormatToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class,
+                processor.getClass());
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testLoadPreUpdateProcessor_ClassNotFound() {
+        Configuration conf = new Configuration();
+        conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass");
+
+        PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+    }
+
+    static class MockUpsertProcessor implements ImportPreUpsertKeyValueProcessor {
+        @Override
+        public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues)
{
+            throw new UnsupportedOperationException("Not yet implemented");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
new file mode 100644
index 0000000..b614312
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PIntegerArray;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionlessQueryTest
{
+
+    protected Connection conn;
+    protected List<ColumnInfo> columnInfoList;
+    protected PreparedStatement preparedStatement;
+    protected UpsertExecutor.UpsertListener<R> upsertListener;
+
+    protected abstract UpsertExecutor<R, F> getUpsertExecutor();
+    protected abstract R createRecord(Object... columnValues) throws IOException;
+
+    @Before
+    public void setUp() throws SQLException {
+        columnInfoList = ImmutableList.of(
+                new ColumnInfo("ID", Types.BIGINT),
+                new ColumnInfo("NAME", Types.VARCHAR),
+                new ColumnInfo("AGE", Types.INTEGER),
+                new ColumnInfo("VALUES", PIntegerArray.INSTANCE.getSqlType()));
+
+        preparedStatement = mock(PreparedStatement.class);
+        upsertListener = mock(UpsertExecutor.UpsertListener.class);
+        conn = DriverManager.getConnection(getUrl());
+    }
+
+    @After
+    public void tearDown() throws SQLException {
+        conn.close();
+    }
+
+    @Test
+    public void testExecute() throws Exception {
+        getUpsertExecutor().execute(createRecord(123L, "NameValue", 42, Arrays.asList(1,
2, 3)));
+
+        verify(upsertListener).upsertDone(1L);
+        verifyNoMoreInteractions(upsertListener);
+
+        verify(preparedStatement).setObject(1, Long.valueOf(123L));
+        verify(preparedStatement).setObject(2, "NameValue");
+        verify(preparedStatement).setObject(3, Integer.valueOf(42));
+        verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE,
new Object[]{1,2,3}));
+        verify(preparedStatement).execute();
+        verifyNoMoreInteractions(preparedStatement);
+    }
+
+    @Test
+    public void testExecute_TooFewFields() throws Exception {
+        R recordWithTooFewFields = createRecord(123L, "NameValue");
+        getUpsertExecutor().execute(recordWithTooFewFields);
+
+        verify(upsertListener).errorOnRecord(eq(recordWithTooFewFields), any(Throwable.class));
+        verifyNoMoreInteractions(upsertListener);
+    }
+
+    @Test
+    public void testExecute_TooManyFields() throws Exception {
+        R recordWithTooManyFields = createRecord(123L, "NameValue", 42, Arrays.asList(1,
2, 3), "Garbage");
+        getUpsertExecutor().execute(recordWithTooManyFields);
+
+        verify(upsertListener).upsertDone(1L);
+        verifyNoMoreInteractions(upsertListener);
+
+        verify(preparedStatement).setObject(1, Long.valueOf(123L));
+        verify(preparedStatement).setObject(2, "NameValue");
+        verify(preparedStatement).setObject(3, Integer.valueOf(42));
+        verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE,
new Object[]{1,2,3}));
+        verify(preparedStatement).execute();
+        verifyNoMoreInteractions(preparedStatement);
+    }
+
+    @Test
+    public void testExecute_NullField() throws Exception {
+        getUpsertExecutor().execute(createRecord(123L, "NameValue", null, Arrays.asList(1,
2, 3)));
+
+        verify(upsertListener).upsertDone(1L);
+        verifyNoMoreInteractions(upsertListener);
+
+        verify(preparedStatement).setObject(1, Long.valueOf(123L));
+        verify(preparedStatement).setObject(2, "NameValue");
+        verify(preparedStatement).setNull(3, columnInfoList.get(2).getSqlType());
+        verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE,
new Object[]{1,2,3}));
+        verify(preparedStatement).execute();
+        verifyNoMoreInteractions(preparedStatement);
+    }
+
+    @Test
+    public void testExecute_InvalidType() throws Exception {
+        R recordWithInvalidType = createRecord(123L, "NameValue", "ThisIsNotANumber", Arrays.asList(1,
2, 3));
+        getUpsertExecutor().execute(recordWithInvalidType);
+
+        verify(upsertListener).errorOnRecord(eq(recordWithInvalidType), any(Throwable.class));
+        verifyNoMoreInteractions(upsertListener);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
index 6efe246..7a09bee 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
@@ -17,130 +17,50 @@
  */
 package org.apache.phoenix.util.csv;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.phoenix.query.BaseConnectionlessQueryTest;
-import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.schema.types.PIntegerArray;
-import org.apache.phoenix.schema.types.PArrayDataType;
-import org.apache.phoenix.util.ColumnInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.sql.Types;
 import java.util.List;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-public class CsvUpsertExecutorTest extends BaseConnectionlessQueryTest {
-
-    private Connection conn;
-    private List<ColumnInfo> columnInfoList;
-    private PreparedStatement preparedStatement;
-    private CsvUpsertExecutor.UpsertListener upsertListener;
-
-    private CsvUpsertExecutor upsertExecutor;
-
-    @Before
-    public void setUp() throws SQLException {
-        columnInfoList = ImmutableList.of(
-                new ColumnInfo("ID", Types.BIGINT),
-                new ColumnInfo("NAME", Types.VARCHAR),
-                new ColumnInfo("AGE", Types.INTEGER),
-                new ColumnInfo("VALUES", PIntegerArray.INSTANCE.getSqlType()));
-
-        preparedStatement = mock(PreparedStatement.class);
-        upsertListener = mock(CsvUpsertExecutor.UpsertListener.class);
-        conn = DriverManager.getConnection(getUrl());
-        upsertExecutor = new CsvUpsertExecutor(conn, columnInfoList, preparedStatement, upsertListener,
":");
-    }
-
-    @After
-    public void tearDown() throws SQLException {
-        conn.close();
-    }
-
-    @Test
-    public void testExecute() throws Exception {
-        upsertExecutor.execute(createCsvRecord("123,NameValue,42,1:2:3"));
-
-        verify(upsertListener).upsertDone(1L);
-        verifyNoMoreInteractions(upsertListener);
-
-        verify(preparedStatement).setObject(1, Long.valueOf(123L));
-        verify(preparedStatement).setObject(2, "NameValue");
-        verify(preparedStatement).setObject(3, Integer.valueOf(42));
-        verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE,
new Object[]{1,2,3}));
-        verify(preparedStatement).execute();
-        verifyNoMoreInteractions(preparedStatement);
-    }
-
-    @Test
-    public void testExecute_TooFewFields() throws Exception {
-        CSVRecord csvRecordWithTooFewFields = createCsvRecord("123,NameValue");
-        upsertExecutor.execute(csvRecordWithTooFewFields);
-
-        verify(upsertListener).errorOnRecord(eq(csvRecordWithTooFewFields), any(Throwable.class));
-        verifyNoMoreInteractions(upsertListener);
-    }
-
-    @Test
-    public void testExecute_TooManyFields() throws Exception {
-        CSVRecord csvRecordWithTooManyFields = createCsvRecord("123,NameValue,42,1:2:3,Garbage");
-        upsertExecutor.execute(csvRecordWithTooManyFields);
-
-        verify(upsertListener).upsertDone(1L);
-        verifyNoMoreInteractions(upsertListener);
-
-        verify(preparedStatement).setObject(1, Long.valueOf(123L));
-        verify(preparedStatement).setObject(2, "NameValue");
-        verify(preparedStatement).setObject(3, Integer.valueOf(42));
-        verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE,
new Object[]{1,2,3}));
-        verify(preparedStatement).execute();
-        verifyNoMoreInteractions(preparedStatement);
-    }
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.phoenix.util.AbstractUpsertExecutorTest;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.junit.Before;
 
-    @Test
-    public void testExecute_NullField() throws Exception {
-        upsertExecutor.execute(createCsvRecord("123,NameValue,,1:2:3"));
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
 
-        verify(upsertListener).upsertDone(1L);
-        verifyNoMoreInteractions(upsertListener);
+public class CsvUpsertExecutorTest extends AbstractUpsertExecutorTest<CSVRecord, String>
{
 
-        verify(preparedStatement).setObject(1, Long.valueOf(123L));
-        verify(preparedStatement).setObject(2, "NameValue");
-        verify(preparedStatement).setNull(3, columnInfoList.get(2).getSqlType());
-        verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE,
new Object[]{1,2,3}));
-        verify(preparedStatement).execute();
-        verifyNoMoreInteractions(preparedStatement);
-    }
+    private static final String ARRAY_SEP = ":";
 
-    @Test
-    public void testExecute_InvalidType() throws Exception {
-        CSVRecord csvRecordWithInvalidType = createCsvRecord("123,NameValue,ThisIsNotANumber,1:2:3");
-        upsertExecutor.execute(csvRecordWithInvalidType);
+    private UpsertExecutor<CSVRecord, String> upsertExecutor;
 
-        verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class));
-        verifyNoMoreInteractions(upsertListener);
+    @Override
+    public UpsertExecutor<CSVRecord, String> getUpsertExecutor() {
+        return upsertExecutor;
     }
 
-    private CSVRecord createCsvRecord(String...columnValues) throws IOException {
+    @Override
+    public CSVRecord createRecord(Object... columnValues) throws IOException {
+        for (int i = 0; i < columnValues.length; i++) {
+            if (columnValues[i] == null) {
+                // Joiner.join throws on nulls, replace with empty string.
+                columnValues[i] = "";
+            }
+            if (columnValues[i] instanceof List) {
+                columnValues[i] = Joiner.on(ARRAY_SEP).join((List<?>) columnValues[i]);
+            }
+        }
         String inputRecord = Joiner.on(',').join(columnValues);
         return Iterables.getFirst(CSVParser.parse(inputRecord, CSVFormat.DEFAULT), null);
     }
+
+    @Before
+    public void setUp() throws SQLException {
+        super.setUp();
+        upsertExecutor = new CsvUpsertExecutor(conn, columnInfoList, preparedStatement,
+                upsertListener, ARRAY_SEP);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
new file mode 100644
index 0000000..c042dd4
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.json;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.phoenix.util.AbstractUpsertExecutorTest;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.junit.Before;
+
+public class JsonUpsertExecutorTest extends AbstractUpsertExecutorTest<Map<?, ?>,
Object> {
+
+    private UpsertExecutor<Map<?, ?>, Object> upsertExecutor;
+
+    @Override
+    protected UpsertExecutor<Map<?, ?>, Object> getUpsertExecutor() {
+        return upsertExecutor;
+    }
+
+    @Override
+    protected Map<?, ?> createRecord(Object... columnValues) throws IOException {
+        Map ret = new HashMap(columnValues.length);
+        int min = Math.min(columnInfoList.size(), columnValues.length);
+        for (int i = 0; i < min; i++) {
+            ret.put(columnInfoList.get(i).getColumnName().replace("\"", "").toLowerCase(),
columnValues[i]);
+        }
+        return ret;
+    }
+
+    @Before
+    public void setUp() throws SQLException {
+        super.setUp();
+        upsertExecutor = new JsonUpsertExecutor(conn, columnInfoList, preparedStatement,
upsertListener);
+    }
+}


Mime
View raw message