cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [4/5] cassandra git commit: Remove Pig support
Date Fri, 16 Oct 2015 12:28:28 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/56cfc6ea/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
deleted file mode 100644
index ba0a37d..0000000
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ /dev/null
@@ -1,1082 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.pig;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.datastax.driver.core.ColumnMetadata;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.TableMetadata;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
-
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.exceptions.AuthenticationException;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat;
-import org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter;
-import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
-import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
-import org.apache.cassandra.serializers.CollectionSerializer;
-import org.apache.cassandra.utils.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.*;
-import org.apache.pig.Expression.OpType;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.*;
-import org.apache.pig.impl.util.UDFContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.external.biz.base64Coder.Base64Coder;
-
-@Deprecated
-public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
-{
-    protected String DEFAULT_INPUT_FORMAT;
-    protected String DEFAULT_OUTPUT_FORMAT;
-
-    protected String username;
-    protected String password;
-    protected String keyspace;
-    protected String column_family;
-    protected String loadSignature;
-    protected String storeSignature;
-
-    protected Configuration conf;
-    protected String inputFormatClass;
-    protected String outputFormatClass;
-    protected int splitSize = 64 * 1024;
-    protected String partitionerClass;
-    protected boolean usePartitionFilter = false;
-    protected String initHostAddress;
-    protected String rpcPort;
-    protected int nativeProtocolVersion = 1;
-
-    private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
-    private static String BULK_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat";
-    private int pageSize = 1000;
-    private String columns;
-    private String outputQuery;
-    private String whereClause;
-
-    private RecordReader<Long, Row> reader;
-    private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
-    private String nativePort;
-    private String nativeCoreConnections;
-    private String nativeMaxConnections;
-    private String nativeMaxSimultReqs;
-    private String nativeConnectionTimeout;
-    private String nativeReadConnectionTimeout;
-    private String nativeReceiveBufferSize;
-    private String nativeSendBufferSize;
-    private String nativeSolinger;
-    private String nativeTcpNodelay;
-    private String nativeReuseAddress;
-    private String nativeKeepAlive;
-    private String nativeAuthProvider;
-    private String nativeSSLTruststorePath;
-    private String nativeSSLKeystorePath;
-    private String nativeSSLTruststorePassword;
-    private String nativeSSLKeystorePassword;
-    private String nativeSSLCipherSuites;
-    private String inputCql;
-
-    private boolean bulkOutputFormat = false;
-    private String bulkCfSchema;
-    private String bulkInsertStatement;
-    private String bulkOutputLocation;
-    private int bulkBuffSize = -1;
-    private int bulkStreamThrottle = -1;
-    private int bulkMaxFailedHosts = -1;
-    private boolean bulkDeleteSourceOnSuccess = true;
-    private String bulkTableAlias;
-
-    public CqlNativeStorage()
-    {
-        this(1000);
-    }
-
-    /** @param pageSize limit number of CQL rows to fetch in a thrift request */
-    public CqlNativeStorage(int pageSize)
-    {
-        super();
-        this.pageSize = pageSize;
-        DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlInputFormat";
-        DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlOutputFormat";
-    }
-
-    public void prepareToRead(RecordReader reader, PigSplit split)
-    {
-        this.reader = reader;
-        if (reader instanceof CqlRecordReader) {
-            nativeProtocolVersion = ((CqlRecordReader) reader).getNativeProtocolVersion();
-        }
-    }
-
-    public void prepareToWrite(RecordWriter writer)
-    {
-        this.writer = writer;
-    }
-
-    /** get next row */
-    public Tuple getNext() throws IOException
-    {
-        try
-        {
-            // load the next pair
-            if (!reader.nextKeyValue())
-                return null;
-
-            TableInfo tableMetadata = getCfInfo(loadSignature);
-            Row row = reader.getCurrentValue();
-            Tuple tuple = TupleFactory.getInstance().newTuple(tableMetadata.getColumns().size());
-            Iterator<ColumnInfo> itera = tableMetadata.getColumns().iterator();
-            int i = 0;
-            while (itera.hasNext())
-            {
-                ColumnInfo cdef = itera.next();
-                ByteBuffer columnValue = row.getBytesUnsafe(cdef.getName());
-                if (columnValue != null)
-                {
-                    AbstractType<?> validator = getValidatorMap(tableMetadata).get(ByteBufferUtil.bytes(cdef.getName()));
-                    setTupleValue(tuple, i, cqlColumnToObj(ByteBufferUtil.bytes(cdef.getName()), columnValue,
-                                                           tableMetadata), validator);
-                }
-                else
-                    tuple.set(i, null);
-                i++;
-            }
-            return tuple;
-        }
-        catch (InterruptedException e)
-        {
-            throw new IOException(e.getMessage());
-        }
-    }
-
-    /** convert a cql column to an object */
-    private Object cqlColumnToObj(ByteBuffer name, ByteBuffer columnValue, TableInfo cfDef) throws IOException
-    {
-        // standard
-        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-        return StorageHelper.cassandraToObj(validators.get(name), columnValue, nativeProtocolVersion);
-    }
-
-    /** set the value to the position of the tuple */
-    private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
-    {
-        if (validator instanceof CollectionType)
-            setCollectionTupleValues(tuple, position, value, validator);
-        else
-           StorageHelper.setTupleValue(tuple, position, value);
-    }
-
-    /** set the values of set/list at and after the position of the tuple */
-    private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
-    {
-        if (validator instanceof MapType)
-        {
-            setMapTupleValues(tuple, position, value, validator);
-            return;
-        }
-        AbstractType elementValidator;
-        if (validator instanceof SetType)
-            elementValidator = ((SetType<?>) validator).getElementsType();
-        else if (validator instanceof ListType)
-            elementValidator = ((ListType<?>) validator).getElementsType();
-        else
-            return;
-
-        int i = 0;
-        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
-        for (Object entry : (Collection<?>) value)
-        {
-            setTupleValue(innerTuple, i, cassandraToPigData(entry, elementValidator), elementValidator);
-            i++;
-        }
-        tuple.set(position, innerTuple);
-    }
-
-    /** set the values of set/list at and after the position of the tuple */
-    private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
-    {
-        AbstractType<?> keyValidator = ((MapType<?, ?>) validator).getKeysType();
-        AbstractType<?> valueValidator = ((MapType<?, ?>) validator).getValuesType();
-
-        int i = 0;
-        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
-        for(Map.Entry<?,?> entry :  ((Map<Object, Object>)value).entrySet())
-        {
-            Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
-            setTupleValue(mapEntryTuple, 0, cassandraToPigData(entry.getKey(), keyValidator), keyValidator);
-            setTupleValue(mapEntryTuple, 1, cassandraToPigData(entry.getValue(), valueValidator), valueValidator);
-            innerTuple.set(i, mapEntryTuple);
-            i++;
-        }
-        tuple.set(position, innerTuple);
-    }
-
-    private Object cassandraToPigData(Object obj, AbstractType validator)
-    {
-        if (validator instanceof DecimalType || validator instanceof InetAddressType)
-            return validator.getString(validator.decompose(obj));
-        return obj;
-    }
-
-    /** get the columnfamily definition for the signature */
-    protected TableInfo getCfInfo(String signature) throws IOException
-    {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CqlNativeStorage.class);
-        TableInfo cfInfo;
-        try
-        {
-            cfInfo = cfdefFromString(property.getProperty(signature));
-        }
-        catch (ClassNotFoundException e)
-        {
-            throw new IOException(e);
-        }
-        return cfInfo;
-    }
-
-    /** return the CfInfo for the column family */
-    protected TableMetadata getCfInfo(Session client)
-            throws NoHostAvailableException,
-            AuthenticationException,
-            IllegalStateException
-    {
-        // get CF meta data
-        return client.getCluster().getMetadata().getKeyspace(Metadata.quote(keyspace)).getTable(Metadata.quote(column_family));
-    }
-
-    /** convert key tuple to key map */
-    private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
-    {
-        Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
-        for (int i = 0; i < t.size(); i++)
-        {
-            if (t.getType(i) != DataType.TUPLE)
-                throw new IOException("keys was not a tuple");
-            Tuple inner = (Tuple) t.get(i);
-            if (inner.size() != 2)
-                throw new IOException("Keys were not in name and value pairs");
-            Object name = inner.get(0);
-            if (name == null)
-                throw new IOException("Key name was empty");
-            keys.put(name.toString(), objToBB(inner.get(1)));
-        }
-        return keys;
-    }
-
-    /** convert object to ByteBuffer */
-    protected ByteBuffer objToBB(Object o)
-    {
-        if (o == null)
-            return nullToBB();
-        if (o instanceof java.lang.String)
-            return ByteBuffer.wrap(new DataByteArray((String)o).get());
-        if (o instanceof Integer)
-            return Int32Type.instance.decompose((Integer)o);
-        if (o instanceof Long)
-            return LongType.instance.decompose((Long)o);
-        if (o instanceof Float)
-            return FloatType.instance.decompose((Float)o);
-        if (o instanceof Double)
-            return DoubleType.instance.decompose((Double)o);
-        if (o instanceof UUID)
-            return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
-        if(o instanceof Tuple) {
-            List<Object> objects = ((Tuple)o).getAll();
-            //collections
-            if (objects.size() > 0 && objects.get(0) instanceof String)
-            {
-                String collectionType = (String) objects.get(0);
-                if ("set".equalsIgnoreCase(collectionType) ||
-                        "list".equalsIgnoreCase(collectionType))
-                    return objToListOrSetBB(objects.subList(1, objects.size()));
-                else if ("map".equalsIgnoreCase(collectionType))
-                    return objToMapBB(objects.subList(1, objects.size()));
-
-            }
-            return objToCompositeBB(objects);
-        }
-
-        return ByteBuffer.wrap(((DataByteArray) o).get());
-    }
-
-    private ByteBuffer objToListOrSetBB(List<Object> objects)
-    {
-        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
-        for(Object sub : objects)
-        {
-            ByteBuffer buffer = objToBB(sub);
-            serialized.add(buffer);
-        }
-        return CollectionSerializer.pack(serialized, objects.size(), 3);
-    }
-
-    private ByteBuffer objToMapBB(List<Object> objects)
-    {
-        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
-        for(Object sub : objects)
-        {
-            List<Object> keyValue = ((Tuple)sub).getAll();
-            for (Object entry: keyValue)
-            {
-                ByteBuffer buffer = objToBB(entry);
-                serialized.add(buffer);
-            }
-        }
-        return CollectionSerializer.pack(serialized, objects.size(), 3);
-    }
-
-    private ByteBuffer objToCompositeBB(List<Object> objects)
-    {
-        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
-        int totalLength = 0;
-        for(Object sub : objects)
-        {
-            ByteBuffer buffer = objToBB(sub);
-            serialized.add(buffer);
-            totalLength += 2 + buffer.remaining() + 1;
-        }
-        ByteBuffer out = ByteBuffer.allocate(totalLength);
-        for (ByteBuffer bb : serialized)
-        {
-            int length = bb.remaining();
-            out.put((byte) ((length >> 8) & 0xFF));
-            out.put((byte) (length & 0xFF));
-            out.put(bb);
-            out.put((byte) 0);
-        }
-        out.flip();
-        return out;
-    }
-
-    /** send CQL query request using data from tuple */
-    private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int offset) throws IOException
-    {
-        for (int i = offset; i < t.size(); i++)
-        {
-            if (t.getType(i) != DataType.TUPLE)
-                throw new IOException("Output type was not a tuple");
-
-            Tuple inner = (Tuple) t.get(i);
-            if (inner.size() > 0)
-            {
-                List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner);
-                if (bindedVariables.size() <= 0)
-                    throw new IOException("Missing binded variables");
-                sendCqlQuery(key, bindedVariables);
-            }
-        }
-    }
-
-    /** compose a list of binded variables */
-    private List<ByteBuffer> bindedVariablesFromTuple(Tuple t) throws IOException
-    {
-        List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
-        for (int i = 0; i < t.size(); i++)
-            variables.add(objToBB(t.get(i)));
-        return variables;
-    }
-
-    /** writer write the data by executing CQL query */
-    private void sendCqlQuery(Map<String, ByteBuffer> key, List<ByteBuffer> bindedVariables) throws IOException
-    {
-        try
-        {
-            writer.write(key, bindedVariables);
-        }
-        catch (InterruptedException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    /** get the validators */
-    protected Map<ByteBuffer, AbstractType> getValidatorMap(TableInfo cfDef) throws IOException
-    {
-        Map<ByteBuffer, AbstractType> validators = new HashMap<>();
-        for (ColumnInfo cd : cfDef.getColumns())
-        {
-            if (cd.getTypeName() != null)
-            {
-                try
-                {
-                    AbstractType validator = TypeParser.parseCqlName(cd.getTypeName());
-                    if (validator instanceof CounterColumnType)
-                        validator = LongType.instance;
-                    validators.put(ByteBufferUtil.bytes(cd.getName()), validator);
-                }
-                catch (ConfigurationException | SyntaxException e)
-                {
-                    throw new IOException(e);
-                }
-            }
-        }
-        return validators;
-    }
-
-    /** schema: (value, value, value) where keys are in the front. */
-    public ResourceSchema getSchema(String location, Job job) throws IOException
-    {
-        setLocation(location, job);
-        TableInfo cfInfo = getCfInfo(loadSignature);
-        // top-level schema, no type
-        ResourceSchema schema = new ResourceSchema();
-
-        // get default validators
-        Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfInfo);
-
-        // will contain all fields for this schema
-        List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
-
-        for (ColumnInfo cdef : cfInfo.getColumns())
-        {
-            ResourceFieldSchema valSchema = new ResourceFieldSchema();
-            AbstractType<?> validator = validators.get(ByteBufferUtil.bytes(cdef.getName()));
-            valSchema.setName(cdef.getName());
-            valSchema.setType(StorageHelper.getPigType(validator));
-            allSchemaFields.add(valSchema);
-        }
-
-        // top level schema contains everything
-        schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
-        return schema;
-    }
-
-    public void setPartitionFilter(Expression partitionFilter) throws IOException
-    {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CqlNativeStorage.class);
-        property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
-    }
-
-    /**
-     * Return cql where clauses for the corresponding partition filter. Make sure the data format matches
-     * Only support the following Pig data types: int, long, float, double, boolean and chararray
-     * */
-    private String partitionFilterToWhereClauseString(Expression expression) throws IOException
-    {
-        Expression.BinaryExpression be = (Expression.BinaryExpression) expression;
-        OpType op = expression.getOpType();
-        String opString = op.toString();
-        switch (op)
-        {
-            case OP_EQ:
-                opString = " = ";
-            case OP_GE:
-            case OP_GT:
-            case OP_LE:
-            case OP_LT:
-                String name = be.getLhs().toString();
-                String value = be.getRhs().toString();
-                return String.format("%s %s %s", name, opString, value);
-            case OP_AND:
-                return String.format("%s AND %s", partitionFilterToWhereClauseString(be.getLhs()), partitionFilterToWhereClauseString(be.getRhs()));
-            default:
-                throw new IOException("Unsupported expression type: " + opString);
-        }
-    }
-
-    /** retrieve where clause for partition filter */
-    private String getWhereClauseForPartitionFilter()
-    {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CqlNativeStorage.class);
-        return property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE);
-    }
-
-    /**
-     *  output: (((name, value), (name, value)), (value ... value), (value...value))
-     *  bulk output: ((value ... value), (value...value))
-     *
-     * */
-    public void putNext(Tuple t) throws IOException
-    {
-        if (t.size() < 1)
-        {
-            // simply nothing here, we can't even delete without a key
-            logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
-            return;
-        }
-
-        if (t.getType(0) != DataType.TUPLE)
-            throw new IOException("First argument in output must be a tuple");
-
-        if (!bulkOutputFormat && t.getType(1) != DataType.TUPLE)
-            throw new IOException("Second argument in output must be a tuple");
-
-        if (bulkOutputFormat)
-        {
-            cqlQueryFromTuple(null, t, 0);
-        }
-        else
-        {
-            Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
-            cqlQueryFromTuple(key, t, 1);
-        }
-    }
-
-    /** set read configuration settings */
-    public void setLocation(String location, Job job) throws IOException
-    {
-        conf = job.getConfiguration();
-        setLocationFromUri(location);
-
-        if (username != null && password != null)
-        {
-            ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
-            CqlConfigHelper.setUserNameAndPassword(conf, username, password);
-        }
-        if (splitSize > 0)
-            ConfigHelper.setInputSplitSize(conf, splitSize);
-        if (partitionerClass!= null)
-            ConfigHelper.setInputPartitioner(conf, partitionerClass);
-        if (initHostAddress != null)
-            ConfigHelper.setInputInitialAddress(conf, initHostAddress);
-        if (rpcPort != null)
-            ConfigHelper.setInputRpcPort(conf, rpcPort);
-        if (nativePort != null)
-            CqlConfigHelper.setInputNativePort(conf, nativePort);
-        if (nativeCoreConnections != null)
-            CqlConfigHelper.setInputCoreConnections(conf, nativeCoreConnections);
-        if (nativeMaxConnections != null)
-            CqlConfigHelper.setInputMaxConnections(conf, nativeMaxConnections);
-        if (nativeMaxSimultReqs != null)
-            CqlConfigHelper.setInputMaxSimultReqPerConnections(conf, nativeMaxSimultReqs);
-        if (nativeConnectionTimeout != null)
-            CqlConfigHelper.setInputNativeConnectionTimeout(conf, nativeConnectionTimeout);
-        if (nativeReadConnectionTimeout != null)
-            CqlConfigHelper.setInputNativeReadConnectionTimeout(conf, nativeReadConnectionTimeout);
-        if (nativeReceiveBufferSize != null)
-            CqlConfigHelper.setInputNativeReceiveBufferSize(conf, nativeReceiveBufferSize);
-        if (nativeSendBufferSize != null)
-            CqlConfigHelper.setInputNativeSendBufferSize(conf, nativeSendBufferSize);
-        if (nativeSolinger != null)
-            CqlConfigHelper.setInputNativeSolinger(conf, nativeSolinger);
-        if (nativeTcpNodelay != null)
-            CqlConfigHelper.setInputNativeTcpNodelay(conf, nativeTcpNodelay);
-        if (nativeReuseAddress != null)
-            CqlConfigHelper.setInputNativeReuseAddress(conf, nativeReuseAddress);
-        if (nativeKeepAlive != null)
-            CqlConfigHelper.setInputNativeKeepAlive(conf, nativeKeepAlive);
-        if (nativeAuthProvider != null)
-            CqlConfigHelper.setInputNativeAuthProvider(conf, nativeAuthProvider);
-        if (nativeSSLTruststorePath != null)
-            CqlConfigHelper.setInputNativeSSLTruststorePath(conf, nativeSSLTruststorePath);
-        if (nativeSSLKeystorePath != null)
-            CqlConfigHelper.setInputNativeSSLKeystorePath(conf, nativeSSLKeystorePath);
-        if (nativeSSLTruststorePassword != null)
-            CqlConfigHelper.setInputNativeSSLTruststorePassword(conf, nativeSSLTruststorePassword);
-        if (nativeSSLKeystorePassword != null)
-            CqlConfigHelper.setInputNativeSSLKeystorePassword(conf, nativeSSLKeystorePassword);
-        if (nativeSSLCipherSuites != null)
-            CqlConfigHelper.setInputNativeSSLCipherSuites(conf, nativeSSLCipherSuites);
-
-        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
-        setConnectionInformation();
-
-        CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
-        if (inputCql != null)
-            CqlConfigHelper.setInputCql(conf, inputCql);
-        if (columns != null)
-            CqlConfigHelper.setInputColumns(conf, columns);
-        if (whereClause != null)
-            CqlConfigHelper.setInputWhereClauses(conf, whereClause);
-
-        String whereClauseForPartitionFilter = getWhereClauseForPartitionFilter();
-        String wc = whereClause != null && !whereClause.trim().isEmpty()
-                               ? whereClauseForPartitionFilter == null ? whereClause: String.format("%s AND %s", whereClause.trim(), whereClauseForPartitionFilter)
-                               : whereClauseForPartitionFilter;
-
-        if (wc != null)
-        {
-            logger.trace("where clause: {}", wc);
-            CqlConfigHelper.setInputWhereClauses(conf, wc);
-        }
-        if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null)
-        {
-            try
-            {
-                ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE)));
-            }
-            catch (NumberFormatException e)
-            {
-                throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e);
-            }
-        }
-
-        if (ConfigHelper.getInputInitialAddress(conf) == null)
-            throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
-        if (ConfigHelper.getInputPartitioner(conf) == null)
-            throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
-        if (loadSignature == null)
-            loadSignature = location;
-
-        initSchema(loadSignature);
-    }
-
-    /** set store configuration settings */
-    public void setStoreLocation(String location, Job job) throws IOException
-    {
-        conf = HadoopCompat.getConfiguration(job);
-        setLocationFromUri(location);
-
-        if (username != null && password != null)
-            ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
-        if (splitSize > 0)
-            ConfigHelper.setInputSplitSize(conf, splitSize);
-        if (partitionerClass!= null)
-            ConfigHelper.setOutputPartitioner(conf, partitionerClass);
-        if (rpcPort != null)
-        {
-            ConfigHelper.setOutputRpcPort(conf, rpcPort);
-            ConfigHelper.setInputRpcPort(conf, rpcPort);
-        }
-        if (initHostAddress != null)
-        {
-            ConfigHelper.setOutputInitialAddress(conf, initHostAddress);
-            ConfigHelper.setInputInitialAddress(conf, initHostAddress);
-        }
-
-        ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
-        CqlConfigHelper.setOutputCql(conf, outputQuery);
-
-        if (bulkOutputFormat)
-        {
-            DEFAULT_OUTPUT_FORMAT = BULK_OUTPUT_FORMAT;
-            if (bulkCfSchema != null)
-                CqlBulkOutputFormat.setTableSchema(conf, column_family, bulkCfSchema);
-            else
-                throw new IOException("bulk_cf_schema is missing in input url parameter");
-            if (bulkInsertStatement != null)
-                CqlBulkOutputFormat.setTableInsertStatement(conf, column_family, bulkInsertStatement);
-            else
-                throw new IOException("bulk_insert_statement is missing in input url parameter");
-            if (bulkTableAlias != null)
-                CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family); 
-            CqlBulkOutputFormat.setDeleteSourceOnSuccess(conf, bulkDeleteSourceOnSuccess);
-            if (bulkOutputLocation != null)
-                conf.set(CqlBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation);
-            if (bulkBuffSize > 0)
-                conf.set(CqlBulkRecordWriter.BUFFER_SIZE_IN_MB, String.valueOf(bulkBuffSize));
-            if (bulkStreamThrottle > 0)
-                conf.set(CqlBulkRecordWriter.STREAM_THROTTLE_MBITS, String.valueOf(bulkStreamThrottle));
-            if (bulkMaxFailedHosts > 0)
-                conf.set(CqlBulkRecordWriter.MAX_FAILED_HOSTS, String.valueOf(bulkMaxFailedHosts));
-            if (partitionerClass!= null)
-                ConfigHelper.setInputPartitioner(conf, partitionerClass);
-        }
-
-        setConnectionInformation();
-
-        if (ConfigHelper.getOutputRpcPort(conf) == 0)
-            throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
-        if (ConfigHelper.getOutputInitialAddress(conf) == null)
-            throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
-        if (ConfigHelper.getOutputPartitioner(conf) == null)
-            throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
-
-        initSchema(storeSignature);
-    }
-
-    /** Methods to get the column family schema from Cassandra */
-    protected void initSchema(String signature) throws IOException
-    {
-        Properties properties = UDFContext.getUDFContext().getUDFProperties(CqlNativeStorage.class);
-
-        // Only get the schema if we haven't already gotten it
-        if (!properties.containsKey(signature))
-        {
-            try (Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect())
-            {
-                client.execute("USE " + keyspace);
-
-                // compose the CfDef for the columfamily
-                TableMetadata cfInfo = getCfInfo(client);
-
-                if (cfInfo != null)
-                {
-                    properties.setProperty(signature, cfdefToString(cfInfo));
-                }
-                else
-                    throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
-                            column_family,
-                            keyspace));
-            }
-            catch (Exception e)
-            {
-                throw new IOException(e);
-            }
-        }
-    }
-
-
-    /** convert CfDef to string */
-    protected static String cfdefToString(TableMetadata cfDef) throws IOException
-    {
-        TableInfo tableInfo = new TableInfo(cfDef);
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (ObjectOutputStream oos = new ObjectOutputStream( baos ))
-        {
-            oos.writeObject(tableInfo);
-        }
-
-        return new String( Base64Coder.encode(baos.toByteArray()) );
-    }
-
-    /** convert string back to CfDef */
-    protected static TableInfo cfdefFromString(String st) throws IOException, ClassNotFoundException
-    {
-        byte [] data = Base64Coder.decode( st );
-        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data)))
-        {
-            Object o = ois.readObject();
-            return (TableInfo)o;
-        }
-    }
-
-    /** decompose the query to store the parameters in a map */
-    public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException
-    {
-        String[] params = query.split("&");
-        Map<String, String> map = new HashMap<String, String>(params.length);
-        for (String param : params)
-        {
-            String[] keyValue = param.split("=");
-            map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8"));
-        }
-        return map;
-    }
-
-    private void setLocationFromUri(String location) throws IOException
-    {
-        try
-        {
-            if (!location.startsWith("cql://"))
-                throw new Exception("Bad scheme: " + location);
-
-            String[] urlParts = location.split("\\?");
-            if (urlParts.length > 1)
-            {
-                Map<String, String> urlQuery = getQueryMap(urlParts[1]);
-
-                // each page row size
-                if (urlQuery.containsKey("page_size"))
-                    pageSize = Integer.parseInt(urlQuery.get("page_size"));
-
-                // output prepared statement
-                if (urlQuery.containsKey("output_query"))
-                    outputQuery = urlQuery.get("output_query");
-
-                if (urlQuery.containsKey("bulk_output_format"))
-                    bulkOutputFormat = Boolean.valueOf(urlQuery.get("bulk_output_format"));
-                if (urlQuery.containsKey("bulk_cf_schema"))
-                    bulkCfSchema = urlQuery.get("bulk_cf_schema");
-                if (urlQuery.containsKey("bulk_insert_statement"))
-                    bulkInsertStatement = urlQuery.get("bulk_insert_statement");
-                if (urlQuery.containsKey("bulk_output_location"))
-                    bulkOutputLocation = urlQuery.get("bulk_output_location");
-                if (urlQuery.containsKey("bulk_buff_size"))
-                    bulkBuffSize = Integer.valueOf(urlQuery.get("bulk_buff_size"));
-                if (urlQuery.containsKey("bulk_stream_throttle"))
-                    bulkStreamThrottle = Integer.valueOf(urlQuery.get("bulk_stream_throttle"));
-                if (urlQuery.containsKey("bulk_max_failed_hosts"))
-                    bulkMaxFailedHosts = Integer.valueOf(urlQuery.get("bulk_max_failed_hosts"));
-                if (urlQuery.containsKey("bulk_delete_source"))
-                    bulkDeleteSourceOnSuccess = Boolean.parseBoolean(urlQuery.get("bulk_delete_source"));
-                if (urlQuery.containsKey("bulk_table_alias"))
-                    bulkTableAlias = urlQuery.get("bulk_table_alias");
-
-                //split size
-                if (urlQuery.containsKey("split_size"))
-                    splitSize = Integer.parseInt(urlQuery.get("split_size"));
-                if (urlQuery.containsKey("partitioner"))
-                    partitionerClass = urlQuery.get("partitioner");
-                if (urlQuery.containsKey("use_secondary"))
-                    usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
-                if (urlQuery.containsKey("init_address"))
-                    initHostAddress = urlQuery.get("init_address");
-
-                if (urlQuery.containsKey("native_port"))
-                    nativePort = urlQuery.get("native_port");
-                if (urlQuery.containsKey("core_conns"))
-                    nativeCoreConnections = urlQuery.get("core_conns");
-                if (urlQuery.containsKey("max_conns"))
-                    nativeMaxConnections = urlQuery.get("max_conns");
-                if (urlQuery.containsKey("max_simult_reqs"))
-                    nativeMaxSimultReqs = urlQuery.get("max_simult_reqs");
-                if (urlQuery.containsKey("native_timeout"))
-                    nativeConnectionTimeout = urlQuery.get("native_timeout");
-                if (urlQuery.containsKey("native_read_timeout"))
-                    nativeReadConnectionTimeout = urlQuery.get("native_read_timeout");
-                if (urlQuery.containsKey("rec_buff_size"))
-                    nativeReceiveBufferSize = urlQuery.get("rec_buff_size");
-                if (urlQuery.containsKey("send_buff_size"))
-                    nativeSendBufferSize = urlQuery.get("send_buff_size");
-                if (urlQuery.containsKey("solinger"))
-                    nativeSolinger = urlQuery.get("solinger");
-                if (urlQuery.containsKey("tcp_nodelay"))
-                    nativeTcpNodelay = urlQuery.get("tcp_nodelay");
-                if (urlQuery.containsKey("reuse_address"))
-                    nativeReuseAddress = urlQuery.get("reuse_address");
-                if (urlQuery.containsKey("keep_alive"))
-                    nativeKeepAlive = urlQuery.get("keep_alive");
-                if (urlQuery.containsKey("auth_provider"))
-                    nativeAuthProvider = urlQuery.get("auth_provider");
-                if (urlQuery.containsKey("trust_store_path"))
-                    nativeSSLTruststorePath = urlQuery.get("trust_store_path");
-                if (urlQuery.containsKey("key_store_path"))
-                    nativeSSLKeystorePath = urlQuery.get("key_store_path");
-                if (urlQuery.containsKey("trust_store_password"))
-                    nativeSSLTruststorePassword = urlQuery.get("trust_store_password");
-                if (urlQuery.containsKey("key_store_password"))
-                    nativeSSLKeystorePassword = urlQuery.get("key_store_password");
-                if (urlQuery.containsKey("cipher_suites"))
-                    nativeSSLCipherSuites = urlQuery.get("cipher_suites");
-                if (urlQuery.containsKey("input_cql"))
-                    inputCql = urlQuery.get("input_cql");
-                if (urlQuery.containsKey("columns"))
-                    columns = urlQuery.get("columns");
-                if (urlQuery.containsKey("where_clause"))
-                    whereClause = urlQuery.get("where_clause");
-                if (urlQuery.containsKey("rpc_port"))
-                    rpcPort = urlQuery.get("rpc_port");
-            }
-            String[] parts = urlParts[0].split("/+");
-            String[] credentialsAndKeyspace = parts[1].split("@");
-            if (credentialsAndKeyspace.length > 1)
-            {
-                String[] credentials = credentialsAndKeyspace[0].split(":");
-                username = credentials[0];
-                password = credentials[1];
-                keyspace = credentialsAndKeyspace[1];
-            }
-            else
-            {
-                keyspace = parts[1];
-            }
-            column_family = parts[2];
-        }
-        catch (Exception e)
-        {
-            throw new IOException("Expected 'cql://[username:password@]<keyspace>/<columnfamily>" +
-                    "[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]" +
-                    "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]" +
-                    "[&init_address=<host>][&native_port=<native_port>][&core_conns=<core_conns>]" +
-                    "[&max_conns=<max_conns>][&min_simult_reqs=<min_simult_reqs>][&max_simult_reqs=<max_simult_reqs>]" +
-                    "[&native_timeout=<native_timeout>][&native_read_timeout=<native_read_timeout>][&rec_buff_size=<rec_buff_size>]" +
-                    "[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
-                    "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
-                    "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
-                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
-                    "[columns=<columns>][where_clause=<where_clause>]" +
-                    "[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement][&bulk_table_alias=<bulk_table_alias>]" +
-                    "[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>][&bulk_delete_source=<bulk_delete_source>]" +
-                    "[&bulk_stream_throttle=<bulk_stream_throttle>][&bulk_max_failed_hosts=<bulk_max_failed_hosts>]]': " +  e.getMessage());
-         }
-    }
-
-    public ByteBuffer nullToBB()
-    {
-        return ByteBuffer.wrap(new byte[0]);
-    }
-
-    /** output format */
-    public OutputFormat getOutputFormat() throws IOException
-    {
-        try
-        {
-            return FBUtilities.construct(outputFormatClass, "outputformat");
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    public void cleanupOnFailure(String failure, Job job)
-    {
-    }
-
-    public void cleanupOnSuccess(String location, Job job) throws IOException {
-    }
-
-    /** return partition keys */
-    public String[] getPartitionKeys(String location, Job job) throws IOException
-    {
-        if (!usePartitionFilter)
-            return null;
-        TableInfo tableMetadata = getCfInfo(loadSignature);
-        String[] partitionKeys = new String[tableMetadata.getPartitionKey().size()];
-        for (int i = 0; i < tableMetadata.getPartitionKey().size(); i++)
-        {
-            partitionKeys[i] = tableMetadata.getPartitionKey().get(i).getName();
-        }
-        return partitionKeys;
-    }
-
-    public void checkSchema(ResourceSchema schema) throws IOException
-    {
-        // we don't care about types, they all get casted to ByteBuffers
-    }
-
-    public ResourceStatistics getStatistics(String location, Job job)
-    {
-        return null;
-    }
-
-    @Override
-    public InputFormat getInputFormat() throws IOException
-    {
-        try
-        {
-            return FBUtilities.construct(inputFormatClass, "inputformat");
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
-    {
-        return relativeToAbsolutePath(location, curDir);
-    }
-
-    @Override
-    public String relativeToAbsolutePath(String location, Path curDir) throws IOException
-    {
-        return location;
-    }
-
-    @Override
-    public void setUDFContextSignature(String signature)
-    {
-        this.loadSignature = signature;
-    }
-
-    /** StoreFunc methods */
-    public void setStoreFuncUDFContextSignature(String signature)
-    {
-        this.storeSignature = signature;
-    }
-
-    /** set hadoop cassandra connection settings */
-    protected void setConnectionInformation() throws IOException
-    {
-        StorageHelper.setConnectionInformation(conf);
-        if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null)
-            inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT));
-        else
-            inputFormatClass = DEFAULT_INPUT_FORMAT;
-        if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null)
-            outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT));
-        else
-            outputFormatClass = DEFAULT_OUTPUT_FORMAT;
-    }
-
-    /** get the full class name */
-    protected String getFullyQualifiedClassName(String classname)
-    {
-        return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
-    }
-}
-
-class TableInfo implements Serializable
-{
-    private final List<ColumnInfo> columns;
-    private final List<ColumnInfo> partitionKey;
-    private final String name;
-
-    public TableInfo(TableMetadata tableMetadata)
-    {
-        List<ColumnMetadata> cmColumns = tableMetadata.getColumns();
-        columns = new ArrayList<>(cmColumns.size());
-        for (ColumnMetadata cm : cmColumns)
-        {
-            columns.add(new ColumnInfo(this, cm));
-        }
-        List<ColumnMetadata> cmPartitionKey = tableMetadata.getPartitionKey();
-        partitionKey = new ArrayList<>(cmPartitionKey.size());
-        for (ColumnMetadata cm : cmPartitionKey)
-        {
-            partitionKey.add(new ColumnInfo(this, cm));
-        }
-        name = tableMetadata.getName();
-    }
-
-    public List<ColumnInfo> getPartitionKey()
-    {
-        return partitionKey;
-    }
-
-    public List<ColumnInfo> getColumns()
-    {
-        return columns;
-    }
-
-    public String getName()
-    {
-        return name;
-    }
-}
-
-class ColumnInfo implements Serializable
-{
-    private final TableInfo table;
-    private final String name;
-    private final String typeName;
-
-    public ColumnInfo(TableInfo tableInfo, ColumnMetadata columnMetadata)
-    {
-        table = tableInfo;
-        name = columnMetadata.getName();
-        typeName = columnMetadata.getType().toString();
-    }
-
-    public String getName()
-    {
-        return name;
-    }
-
-    public String getTypeName()
-    {
-        return typeName;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56cfc6ea/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
deleted file mode 100644
index d700cb7..0000000
--- a/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package org.apache.cassandra.hadoop.pig;
-
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.Date;
-import java.util.UUID;
-
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.serializers.CollectionSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-
-@Deprecated
-public class StorageHelper
-{
-    // system environment variables that can be set to configure connection info:
-    // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
-    public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
-    public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
-    public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
-    public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
-    public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
-    public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
-    public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
-    public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
-    public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
-    public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
-    public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
-    public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
-
-
-    public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
-
-    protected static void setConnectionInformation(Configuration conf)
-    {
-        if (System.getenv(PIG_RPC_PORT) != null)
-        {
-            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
-            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
-        }
-
-        if (System.getenv(PIG_INPUT_RPC_PORT) != null)
-            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
-        if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
-            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
-
-        if (System.getenv(PIG_INITIAL_ADDRESS) != null)
-        {
-            ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
-            ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
-        }
-        if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
-            ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
-        if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
-            ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
-
-        if (System.getenv(PIG_PARTITIONER) != null)
-        {
-            ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
-            ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
-        }
-        if(System.getenv(PIG_INPUT_PARTITIONER) != null)
-            ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
-        if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
-            ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
-    }
-
-    protected static Object cassandraToObj(AbstractType validator, ByteBuffer value, int nativeProtocolVersion)
-    {
-        if (validator instanceof DecimalType || validator instanceof InetAddressType)
-            return validator.getString(value);
-
-        if (validator instanceof CollectionType)
-        {
-            // For CollectionType, the compose() method assumes the v3 protocol format of collection, which
-            // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format
-            return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion);
-        }
-
-        return validator.compose(value);
-    }
-
-    /** set the value to the position of the tuple */
-    protected static void setTupleValue(Tuple pair, int position, Object value) throws ExecException
-    {
-        if (value instanceof BigInteger)
-            pair.set(position, ((BigInteger) value).intValue());
-        else if (value instanceof ByteBuffer)
-            pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
-        else if (value instanceof UUID)
-            pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
-        else if (value instanceof Date)
-            pair.set(position, TimestampType.instance.decompose((Date) value).getLong());
-        else
-            pair.set(position, value);
-    }
-
-    /** get pig type for the cassandra data type*/
-    protected static byte getPigType(AbstractType type)
-    {
-        if (type instanceof LongType || type instanceof DateType || type instanceof TimestampType) // DateType is bad and it should feel bad
-            return DataType.LONG;
-        else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
-            return DataType.INTEGER;
-        else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof DecimalType || type instanceof InetAddressType)
-            return DataType.CHARARRAY;
-        else if (type instanceof FloatType)
-            return DataType.FLOAT;
-        else if (type instanceof DoubleType)
-            return DataType.DOUBLE;
-        else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
-            return DataType.TUPLE;
-
-        return DataType.BYTEARRAY;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56cfc6ea/test/conf/cassandra_pig.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra_pig.yaml b/test/conf/cassandra_pig.yaml
deleted file mode 100644
index ce71410..0000000
--- a/test/conf/cassandra_pig.yaml
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Warning!
-# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
-#
-cluster_name: Test Cluster
-memtable_allocation_type: heap_buffers
-commitlog_sync: batch
-commitlog_sync_batch_window_in_ms: 1.0
-commitlog_segment_size_in_mb: 5
-commitlog_directory: build/test/cassandra/commitlog
-hints_directory: build/test/cassandra/hints
-partitioner: org.apache.cassandra.dht.Murmur3Partitioner
-listen_address: 127.0.0.1
-storage_port: 7010
-rpc_port: 9170
-start_native_transport: true
-native_transport_port: 9042
-column_index_size_in_kb: 4
-saved_caches_directory: build/test/cassandra/saved_caches
-data_file_directories:
-    - build/test/cassandra/data
-disk_access_mode: mmap
-seed_provider:
-    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
-      parameters:
-          - seeds: "127.0.0.1"
-endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
-dynamic_snitch: true
-request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
-request_scheduler_id: keyspace
-server_encryption_options:
-    internode_encryption: none
-    keystore: conf/.keystore
-    keystore_password: cassandra
-    truststore: conf/.truststore
-    truststore_password: cassandra
-incremental_backups: true
-concurrent_compactors: 4
-compaction_throughput_mb_per_sec: 0
-row_cache_class_name: org.apache.cassandra.cache.OHCProvider
-row_cache_size_in_mb: 16
-enable_user_defined_functions: true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56cfc6ea/test/pig/org/apache/cassandra/pig/CqlRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlRecordReaderTest.java b/test/pig/org/apache/cassandra/pig/CqlRecordReaderTest.java
deleted file mode 100644
index b2a74b6..0000000
--- a/test/pig/org/apache/cassandra/pig/CqlRecordReaderTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.pig;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.pig.data.Tuple;
-import org.apache.thrift.TException;
-
-public class CqlRecordReaderTest extends PigTestBase
-{
-    private static String[] statements = {
-        "DROP KEYSPACE IF EXISTS cql3ks",
-        "CREATE KEYSPACE cql3ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};",
-        "USE cql3ks;",
-
-        "CREATE TABLE cqltable (" +
-        "pk1 int," +
-        "pk2 int," +
-        "pk3 int," +
-        "ck1 int," +
-        "ck2 int," +
-        "data text," +
-        "primary key((pk1,pk2,pk3),ck1,ck2));",
-        "INSERT INTO cqltable(pk1, pk2, pk3, ck1, ck2, data) VALUES (11, 12, 13, 14, 15, 'value1');",
-
-        "CREATE TABLE \"MixedCaseCqlTable\" (" +
-        "pk1 int," +
-        "\"PK2\" int," +
-        "pk3 int," +
-        "\"CK1\" int," +
-        "ck2 int," +
-        "data text," +
-        "primary key((pk1,\"PK2\",pk3),\"CK1\",ck2));",
-        "INSERT INTO \"MixedCaseCqlTable\"(pk1, \"PK2\", pk3, \"CK1\", ck2, data) VALUES (11, 12, 13, 14, 15, 'value1');",
-    };
-
-    @BeforeClass
-    public static void setup() throws IOException, ConfigurationException, TException
-    {
-        startCassandra();
-        executeCQLStatements(statements);
-        startHadoopCluster();
-    }
-
-    @Test
-    public void defaultCqlQueryTest() throws Exception
-    {
-        String initialQuery = "rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + nativeParameters + "' USING CqlNativeStorage();";
-        pig.registerQuery(initialQuery);
-        Iterator<Tuple> it = pig.openIterator("rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            Assert.assertEquals(t.get(0), 11);
-            Assert.assertEquals(t.get(1), 12);
-            Assert.assertEquals(t.get(2), 13);
-            Assert.assertEquals(t.get(3), 14);
-            Assert.assertEquals(t.get(4), 15);
-            Assert.assertEquals(t.get(5), "value1");
-        }
-        else
-        {
-            Assert.fail("Failed to get data for query " + initialQuery);
-        }
-    }
-
-    @Test
-    public void defaultMixedCaseCqlQueryTest() throws Exception
-    {
-        String initialQuery = "rows = LOAD 'cql://cql3ks/MixedCaseCqlTable?" + defaultParameters + nativeParameters + "' USING CqlNativeStorage();";
-        pig.registerQuery(initialQuery);
-        Iterator<Tuple> it = pig.openIterator("rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            Assert.assertEquals(t.get(0), 11);
-            Assert.assertEquals(t.get(1), 12);
-            Assert.assertEquals(t.get(2), 13);
-            Assert.assertEquals(t.get(3), 14);
-            Assert.assertEquals(t.get(4), 15);
-            Assert.assertEquals(t.get(5), "value1");
-        }
-        else
-        {
-            Assert.fail("Failed to get data for query " + initialQuery);
-        }
-    }
-
-    @Test
-    public void selectColumnsTest() throws Exception
-    {
-        String initialQuery = "rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + nativeParameters + "&columns=ck1%2Cck2%2Cdata' USING CqlNativeStorage();";
-        pig.registerQuery(initialQuery);
-        Iterator<Tuple> it = pig.openIterator("rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            Assert.assertEquals(t.get(0), 11);
-            Assert.assertEquals(t.get(1), 12);
-            Assert.assertEquals(t.get(2), 13);
-            Assert.assertEquals(t.get(3), 14);
-            Assert.assertEquals(t.get(4), 15);
-            Assert.assertEquals(t.get(5), "value1");
-        }
-        else
-        {
-            Assert.fail("Failed to get data for query " + initialQuery);
-        }
-    }
-
-    @Test
-    public void whereClauseTest() throws Exception
-    {
-        String initialQuery = "rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + nativeParameters + "&where_clause=ck1%3d14' USING CqlNativeStorage();";
-        pig.registerQuery(initialQuery);
-        Iterator<Tuple> it = pig.openIterator("rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            Assert.assertEquals(t.get(0), 11);
-            Assert.assertEquals(t.get(1), 12);
-            Assert.assertEquals(t.get(2), 13);
-            Assert.assertEquals(t.get(3), 14);
-            Assert.assertEquals(t.get(4), 15);
-            Assert.assertEquals(t.get(5), "value1");
-        }
-        else
-        {
-            Assert.fail("Failed to get data for query " + initialQuery);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56cfc6ea/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java b/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
deleted file mode 100644
index ca01901..0000000
--- a/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.pig;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.utils.Hex;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.thrift.TException;
-
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class CqlTableDataTypeTest extends PigTestBase
-{
-    //ASCII    (AsciiType.instance),
-    //BIGINT   (LongType.instance),
-    //BLOB     (BytesType.instance),
-    //BOOLEAN  (BooleanType.instance),
-    //COUNTER  (CounterColumnType.instance),
-    //DECIMAL  (DecimalType.instance),
-    //DOUBLE   (DoubleType.instance),
-    //FLOAT    (FloatType.instance),
-    //INET     (InetAddressType.instance),
-    //INT      (Int32Type.instance),
-    //TEXT     (UTF8Type.instance),
-    //TIMESTAMP(DateType.instance),
-    //UUID     (UUIDType.instance),
-    //VARCHAR  (UTF8Type.instance),
-    //VARINT   (IntegerType.instance),
-    //TIMEUUID (TimeUUIDType.instance);
-    //SET
-    //LIST
-    //MAP
-    //Create table to test the above data types
-    private static String[] statements = {
-            "DROP KEYSPACE IF EXISTS cql3ks",
-            "CREATE KEYSPACE cql3ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}",
-            "USE cql3ks;",
-
-            "CREATE TABLE cqltable (" +
-            "key int primary key," +
-            "col_ascii ascii," +
-            "col_bigint bigint," +
-            "col_blob blob," +
-            "col_boolean boolean," +
-            "col_decimal decimal," +
-            "col_double double," +
-            "col_float float," +
-            "col_inet inet," +
-            "col_int int," +
-            "col_text text," +
-            "col_timestamp timestamp," +
-            "col_uuid uuid," +
-            "col_varchar varchar," +
-            "col_varint varint," +
-            "col_timeuuid timeuuid);",
-
-            "CREATE TABLE settable (" +
-            "key int primary key," +
-            "col_set_ascii set<ascii>," +
-            "col_set_bigint set<bigint>," +
-            "col_set_blob set<blob>," +
-            "col_set_boolean set<boolean>," +
-            "col_set_decimal set<decimal>," +
-            "col_set_double set<double>," +
-            "col_set_float set<float>," +
-            "col_set_inet set<inet>," +
-            "col_set_int set<int>," +
-            "col_set_text set<text>," +
-            "col_set_timestamp set<timestamp>," +
-            "col_set_uuid set<uuid>," +
-            "col_set_varchar set<varchar>," +
-            "col_set_varint set<varint>," +
-            "col_set_timeuuid set<timeuuid>);",
-
-            "CREATE TABLE listtable (" +
-            "key int primary key," +
-            "col_list_ascii list<ascii>," +
-            "col_list_bigint list<bigint>," +
-            "col_list_blob list<blob>," +
-            "col_list_boolean list<boolean>," +
-            "col_list_decimal list<decimal>," +
-            "col_list_double list<double>," +
-            "col_list_float list<float>," +
-            "col_list_inet list<inet>," +
-            "col_list_int list<int>," +
-            "col_list_text list<text>," +
-            "col_list_timestamp list<timestamp>," +
-            "col_list_uuid list<uuid>," +
-            "col_list_varchar list<varchar>," +
-            "col_list_varint list<varint>," +
-            "col_list_timeuuid list<timeuuid>);",
-
-            "CREATE TABLE maptable (" +
-            "key int primary key," +
-            "col_map_ascii map<ascii, ascii>," +
-            "col_map_bigint map<bigint, bigint>," +
-            "col_map_blob map<blob, blob>," +
-            "col_map_boolean map<boolean, boolean>," +
-            "col_map_decimal map<decimal, decimal>," +
-            "col_map_double map<double, double>," +
-            "col_map_float map<float, float>," +
-            "col_map_inet map<inet, inet>," +
-            "col_map_int map<int, int>," +
-            "col_map_text map<text, text>," +
-            "col_map_timestamp map<timestamp, timestamp>," +
-            "col_map_uuid map<uuid, uuid>," +
-            "col_map_varchar map<varchar, varchar>," +
-            "col_map_varint map<varint, varint>," +
-            "col_map_timeuuid map<timeuuid, timeuuid>);",
-        
-            "INSERT INTO cqltable(key, col_ascii) VALUES (1, 'ascii');",
-            "INSERT INTO cqltable(key, col_bigint) VALUES (1, 12345678);",
-            "INSERT INTO cqltable(key, col_blob) VALUES (1, 0x23446c6c6f);",
-            "INSERT INTO cqltable(key, col_boolean) VALUES (1, false);",
-            "INSERT INTO cqltable(key, col_decimal) VALUES (1, 23.4567);",
-            "INSERT INTO cqltable(key, col_double) VALUES (1, 12345678.12345678);",
-            "INSERT INTO cqltable(key, col_float) VALUES (1, 123.12);",
-            "INSERT INTO cqltable(key, col_inet) VALUES (1, '127.0.0.1');",
-            "INSERT INTO cqltable(key, col_int) VALUES (1, 123);",
-            "INSERT INTO cqltable(key, col_text) VALUES (1, 'text');",
-            "INSERT INTO cqltable(key, col_timestamp) VALUES (1, '2011-02-03T04:05:00+0000');",
-            "INSERT INTO cqltable(key, col_timeuuid) VALUES (1, maxTimeuuid('2013-01-01 00:05+0000'));",
-            "INSERT INTO cqltable(key, col_uuid) VALUES (1, 550e8400-e29b-41d4-a716-446655440000);",
-            "INSERT INTO cqltable(key, col_varchar) VALUES (1, 'varchar');",
-            "INSERT INTO cqltable(key, col_varint) VALUES (1, 123);",
-
-            "INSERT INTO settable(key, col_set_ascii) VALUES (1, {'ascii1', 'ascii2'});",
-            "INSERT INTO settable(key, col_set_bigint) VALUES (1, {12345678, 12345679});",
-            "INSERT INTO settable(key, col_set_blob) VALUES (1, {0x68656c6c6f, 0x68656c6c6e});",
-            "INSERT INTO settable(key, col_set_boolean) VALUES (1, {false, true});",
-            "INSERT INTO settable(key, col_set_decimal) VALUES (1, {23.4567, 23.4568});",
-            "INSERT INTO settable(key, col_set_double) VALUES (1, {12345678.12345678, 12345678.12345679});",
-            "INSERT INTO settable(key, col_set_float) VALUES (1, {123.12, 123.13});",
-            "INSERT INTO settable(key, col_set_inet) VALUES (1, {'127.0.0.1', '127.0.0.2'});",
-            "INSERT INTO settable(key, col_set_int) VALUES (1, {123, 124});",
-            "INSERT INTO settable(key, col_set_text) VALUES (1, {'text1', 'text2'});",
-            "INSERT INTO settable(key, col_set_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000', '2011-02-04T04:05:00+0000'});",
-            "INSERT INTO settable(key, col_set_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});",      
-            "INSERT INTO settable(key, col_set_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000, 550e8400-e29b-41d4-a716-446655440001});",
-            "INSERT INTO settable(key, col_set_varchar) VALUES (1, {'varchar1', 'varchar2'});",
-            "INSERT INTO settable(key, col_set_varint) VALUES (1, {123, 124});",
-
-            "INSERT INTO listtable(key, col_list_ascii) VALUES (1, ['ascii2', 'ascii1']);",
-            "INSERT INTO listtable(key, col_list_bigint) VALUES (1, [12345679, 12345678]);",
-            "INSERT INTO listtable(key, col_list_blob) VALUES (1, [0x68656c6c6e, 0x68656c6c6f]);",
-            "INSERT INTO listtable(key, col_list_boolean) VALUES (1, [true, false]);",
-            "INSERT INTO listtable(key, col_list_decimal) VALUES (1, [23.4568, 23.4567]);",
-            "INSERT INTO listtable(key, col_list_double) VALUES (1, [12345678.12345679, 12345678.12345678]);",
-            "INSERT INTO listtable(key, col_list_float) VALUES (1, [123.13, 123.12]);",
-            "INSERT INTO listtable(key, col_list_inet) VALUES (1, ['127.0.0.2', '127.0.0.1']);",
-            "INSERT INTO listtable(key, col_list_int) VALUES (1, [124, 123]);",
-            "INSERT INTO listtable(key, col_list_text) VALUES (1, ['text2', 'text1']);",
-            "INSERT INTO listtable(key, col_list_timestamp) VALUES (1, ['2011-02-04T04:05:00+0000', '2011-02-03T04:05:00+0000']);",
-            "INSERT INTO listtable(key, col_list_timeuuid) VALUES (1, [e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f]);",
-            "INSERT INTO listtable(key, col_list_uuid) VALUES (1, [550e8400-e29b-41d4-a716-446655440001, 550e8400-e29b-41d4-a716-446655440000]);",
-            "INSERT INTO listtable(key, col_list_varchar) VALUES (1, ['varchar2', 'varchar1']);",
-            "INSERT INTO listtable(key, col_list_varint) VALUES (1, [124, 123]);",
-
-            "INSERT INTO maptable(key, col_map_ascii) VALUES (1, {'ascii1' : 'ascii2'});",
-            "INSERT INTO maptable(key, col_map_bigint) VALUES (1, {12345678 : 12345679});",
-            "INSERT INTO maptable(key, col_map_blob) VALUES (1, {0x68656c6c6f : 0x68656c6c6e});",
-            "INSERT INTO maptable(key, col_map_boolean) VALUES (1, {false : true});",
-            "INSERT INTO maptable(key, col_map_decimal) VALUES (1, {23.4567 : 23.4568});",
-            "INSERT INTO maptable(key, col_map_double) VALUES (1, {12345678.12345678 : 12345678.12345679});",
-            "INSERT INTO maptable(key, col_map_float) VALUES (1, {123.12 : 123.13});",
-            "INSERT INTO maptable(key, col_map_inet) VALUES (1, {'127.0.0.1' : '127.0.0.2'});",
-            "INSERT INTO maptable(key, col_map_int) VALUES (1, {123 : 124});",
-            "INSERT INTO maptable(key, col_map_text) VALUES (1, {'text1' : 'text2'});",
-            "INSERT INTO maptable(key, col_map_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000' : '2011-02-04T04:05:00+0000'});",
-            "INSERT INTO maptable(key, col_map_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f : e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});",      
-            "INSERT INTO maptable(key, col_map_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000 : 550e8400-e29b-41d4-a716-446655440001});",
-            "INSERT INTO maptable(key, col_map_varchar) VALUES (1, {'varchar1' : 'varchar2'});",
-            "INSERT INTO maptable(key, col_map_varint) VALUES (1, {123 : 124});",
-
-            "CREATE TABLE countertable (key int primary key, col_counter counter);",            
-            "UPDATE countertable SET col_counter = col_counter + 3 WHERE key = 1;",
-    };
-
-    @BeforeClass
-    public static void setup() throws IOException, ConfigurationException, TException
-    {
-        startCassandra();
-        executeCQLStatements(statements);
-        startHadoopCluster();
-    }
-
-    @Test
-    public void testCqlNativeStorageRegularType() throws IOException
-    {
-        //input_cql=select * from cqltable where token(key) > ? and token(key) <= ?
-        cqlTableTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20cqltable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlNativeStorage();");
-
-        //input_cql=select * from countertable where token(key) > ? and token(key) <= ?
-        counterTableTest("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20countertable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlNativeStorage();");
-    }
-
-    private void cqlTableTest(String initialQuery) throws IOException
-    {
-        pig.registerQuery(initialQuery);
-        Iterator<Tuple> it = pig.openIterator("rows");
-        //{key: int, 
-        //col_ascii: chararray, 
-        //col_bigint: long, 
-        //col_blob: bytearray, 
-        //col_boolean: bytearray,
-        //col_decimal: chararray, 
-        //col_double: double, 
-        //col_float: float, 
-        //col_inet: chararray, 
-        //col_int: int,
-        //col_text: chararray, 
-        //col_timestamp: long, 
-        //col_timeuuid: bytearray, 
-        //col_uuid: chararray,
-        //col_varchar: chararray, 
-        //col_varint: int}
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            Assert.assertEquals(t.get(0), 1);
-            Assert.assertEquals(t.get(1), "ascii");
-            Assert.assertEquals(t.get(2), 12345678L);
-            Assert.assertEquals(t.get(3), new DataByteArray(Hex.hexToBytes("23446c6c6f")));
-            Assert.assertEquals(t.get(4), false);
-            Assert.assertEquals(t.get(5), "23.4567");
-            Assert.assertEquals(t.get(6), 12345678.12345678d);
-            Assert.assertEquals(t.get(7), 123.12f);
-            Assert.assertEquals(t.get(8), "127.0.0.1");
-            Assert.assertEquals(t.get(9), 123);
-            Assert.assertEquals(t.get(10), "text");
-            Assert.assertEquals(t.get(11), 1296705900000L);
-            Assert.assertEquals(t.get(12), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
-            Assert.assertEquals(t.get(13), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
-            Assert.assertEquals(t.get(14), "varchar");
-            Assert.assertEquals(t.get(15), 123);
-        }
-        else
-        {
-            Assert.fail("Failed to get data for query " + initialQuery);
-        }
-    }
-
-    private void counterTableTest(String initialQuery) throws IOException
-    {
-        pig.registerQuery(initialQuery);
-        Iterator<Tuple>  it = pig.openIterator("cc_rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            Assert.assertEquals(t.get(0), 1);
-            Assert.assertEquals(t.get(1), 3L);
-        }
-        else
-        {
-            Assert.fail("Failed to get data for query " + initialQuery);
-        }
-    }
-
-    @Test
-    public void testCqlNativeStorageSetType() throws IOException
-    {
-        //input_cql=select * from settable where token(key) > ? and token(key) <= ?
-        settableTest("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20settable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlNativeStorage();");
-    }
-
-    private void settableTest(String initialQuery) throws IOException
-    {
-        pig.registerQuery(initialQuery);
-        Iterator<Tuple> it = pig.openIterator("set_rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            Assert.assertEquals(t.get(0), 1);
-            Tuple innerTuple = (Tuple) t.get(1);
-            Assert.assertEquals(innerTuple.get(0), "ascii1");
-            Assert.assertEquals(innerTuple.get(1), "ascii2");
-            innerTuple = (Tuple) t.get(2);
-            Assert.assertEquals(innerTuple.get(0), 12345678L);
-            Assert.assertEquals(innerTuple.get(1), 12345679L);
-            innerTuple = (Tuple) t.get(3);
-            Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
-            Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
-            innerTuple = (Tuple) t.get(4);
-            Assert.assertEquals(innerTuple.get(0), false);
-            Assert.assertEquals(innerTuple.get(1), true);
-            innerTuple = (Tuple) t.get(5);
-            Assert.assertEquals(innerTuple.get(0), "23.4567");
-            Assert.assertEquals(innerTuple.get(1), "23.4568");
-            innerTuple = (Tuple) t.get(6);
-            Assert.assertEquals(innerTuple.get(0), 12345678.12345678d);
-            Assert.assertEquals(innerTuple.get(1), 12345678.12345679d);
-            innerTuple = (Tuple) t.get(7);
-            Assert.assertEquals(innerTuple.get(0), 123.12f);
-            Assert.assertEquals(innerTuple.get(1), 123.13f);
-            innerTuple = (Tuple) t.get(8);
-            Assert.assertEquals(innerTuple.get(0), "127.0.0.1");
-            Assert.assertEquals(innerTuple.get(1), "127.0.0.2");
-            innerTuple = (Tuple) t.get(9);
-            Assert.assertEquals(innerTuple.get(0), 123);
-            Assert.assertEquals(innerTuple.get(1), 124);
-            innerTuple = (Tuple) t.get(10);
-            Assert.assertEquals(innerTuple.get(0), "text1");
-            Assert.assertEquals(innerTuple.get(1), "text2");
-            innerTuple = (Tuple) t.get(11);
-            Assert.assertEquals(innerTuple.get(0), 1296705900000L);
-            Assert.assertEquals(innerTuple.get(1), 1296792300000L);
-            innerTuple = (Tuple) t.get(12);
-            Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
-            Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
-            innerTuple = (Tuple) t.get(13);
-            Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
-            Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
-            innerTuple = (Tuple) t.get(14);
-            Assert.assertEquals(innerTuple.get(0), "varchar1");
-            Assert.assertEquals(innerTuple.get(1), "varchar2");  
-            innerTuple = (Tuple) t.get(15);
-            Assert.assertEquals(innerTuple.get(0), 123);
-            Assert.assertEquals(innerTuple.get(1), 124);
-        }
-        else
-        {
-            Assert.fail("Failed to get data for query " + initialQuery);
-        }
-    }
-
-    @Test
-    public void testCqlNativeStorageListType() throws IOException
-    {
-        //input_cql=select * from listtable where token(key) > ? and token(key) <= ?
-        listtableTest("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20listtable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlNativeStorage();");
-    }
-
-    private void listtableTest(String initialQuery) throws IOException
-    {
-        pig.registerQuery(initialQuery);
-        Iterator<Tuple> it = pig.openIterator("list_rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            Assert.assertEquals(t.get(0), 1);
-            Tuple innerTuple = (Tuple) t.get(1);
-            Assert.assertEquals(innerTuple.get(1), "ascii1");
-            Assert.assertEquals(innerTuple.get(0), "ascii2");
-            innerTuple = (Tuple) t.get(2);
-            Assert.assertEquals(innerTuple.get(1), 12345678L);
-            Assert.assertEquals(innerTuple.get(0), 12345679L);
-            innerTuple = (Tuple) t.get(3);
-            Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
-            Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
-            innerTuple = (Tuple) t.get(4);
-            Assert.assertEquals(innerTuple.get(1), false);
-            Assert.assertEquals(innerTuple.get(0), true);
-            innerTuple = (Tuple) t.get(5);
-            Assert.assertEquals(innerTuple.get(1), "23.4567");
-            Assert.assertEquals(innerTuple.get(0), "23.4568");
-            innerTuple = (Tuple) t.get(6);
-            Assert.assertEquals(innerTuple.get(1), 12345678.12345678d);
-            Assert.assertEquals(innerTuple.get(0), 12345678.12345679d);
-            innerTuple = (Tuple) t.get(7);
-            Assert.assertEquals(innerTuple.get(1), 123.12f);
-            Assert.assertEquals(innerTuple.get(0), 123.13f);
-            innerTuple = (Tuple) t.get(8);
-            Assert.assertEquals(innerTuple.get(1), "127.0.0.1");
-            Assert.assertEquals(innerTuple.get(0), "127.0.0.2");
-            innerTuple = (Tuple) t.get(9);
-            Assert.assertEquals(innerTuple.get(1), 123);
-            Assert.assertEquals(innerTuple.get(0), 124);
-            innerTuple = (Tuple) t.get(10);
-            Assert.assertEquals(innerTuple.get(1), "text1");
-            Assert.assertEquals(innerTuple.get(0), "text2");
-            innerTuple = (Tuple) t.get(11);
-            Assert.assertEquals(innerTuple.get(1), 1296705900000L);
-            Assert.assertEquals(innerTuple.get(0), 1296792300000L);
-            innerTuple = (Tuple) t.get(12);
-            Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
-            Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
-            innerTuple = (Tuple) t.get(13);
-            Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
-            Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
-            innerTuple = (Tuple) t.get(14);
-            Assert.assertEquals(innerTuple.get(1), "varchar1");
-            Assert.assertEquals(innerTuple.get(0), "varchar2");  
-            innerTuple = (Tuple) t.get(15);
-            Assert.assertEquals(innerTuple.get(1), 123);
-            Assert.assertEquals(innerTuple.get(0), 124);
-        }
-        else
-        {
-            Assert.fail("Failed to get data for query " + initialQuery);
-        }
-    }
-
-    @Test
-    public void testCqlNativeStorageMapType() throws IOException
-    {
-        //input_cql=select * from maptable where token(key) > ? and token(key) <= ?
-        maptableTest("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20maptable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlNativeStorage();");
-    }
-
-    private void maptableTest(String initialQuery) throws IOException
-    {
-        pig.registerQuery(initialQuery);
-        Iterator<Tuple> it = pig.openIterator("map_rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            Assert.assertEquals(t.get(0), 1);
-            Tuple innerTuple = (Tuple) ((Tuple) t.get(1)).get(0);
-            Assert.assertEquals(innerTuple.get(0), "ascii1");
-            Assert.assertEquals(innerTuple.get(1), "ascii2");
-            innerTuple = (Tuple) ((Tuple) t.get(2)).get(0);
-            Assert.assertEquals(innerTuple.get(0), 12345678L);
-            Assert.assertEquals(innerTuple.get(1), 12345679L);
-            innerTuple = (Tuple) ((Tuple) t.get(3)).get(0);
-            Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
-            Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
-            innerTuple = (Tuple) ((Tuple) t.get(4)).get(0);
-            Assert.assertEquals(innerTuple.get(0), false);
-            Assert.assertEquals(innerTuple.get(1), true);
-            innerTuple = (Tuple) ((Tuple) t.get(5)).get(0);
-            Assert.assertEquals(innerTuple.get(0), "23.4567");
-            Assert.assertEquals(innerTuple.get(1), "23.4568");
-            innerTuple = (Tuple) ((Tuple) t.get(6)).get(0);
-            Assert.assertEquals(innerTuple.get(0), 12345678.12345678d);
-            Assert.assertEquals(innerTuple.get(1), 12345678.12345679d);
-            innerTuple = (Tuple) ((Tuple) t.get(7)).get(0);
-            Assert.assertEquals(innerTuple.get(0), 123.12f);
-            Assert.assertEquals(innerTuple.get(1), 123.13f);
-            innerTuple = (Tuple) ((Tuple) t.get(8)).get(0);
-            Assert.assertEquals(innerTuple.get(0), "127.0.0.1");
-            Assert.assertEquals(innerTuple.get(1), "127.0.0.2");
-            innerTuple = (Tuple) ((Tuple) t.get(9)).get(0);
-            Assert.assertEquals(innerTuple.get(0), 123);
-            Assert.assertEquals(innerTuple.get(1), 124);
-            innerTuple = (Tuple) ((Tuple) t.get(10)).get(0);
-            Assert.assertEquals(innerTuple.get(0), "text1");
-            Assert.assertEquals(innerTuple.get(1), "text2");
-            innerTuple = (Tuple) ((Tuple) t.get(11)).get(0);
-            Assert.assertEquals(innerTuple.get(0), 1296705900000L);
-            Assert.assertEquals(innerTuple.get(1), 1296792300000L);
-            innerTuple = (Tuple) ((Tuple) t.get(12)).get(0);
-            Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
-            Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
-            innerTuple = (Tuple) ((Tuple) t.get(13)).get(0);
-            Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
-            Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
-            innerTuple = (Tuple) ((Tuple) t.get(14)).get(0);
-            Assert.assertEquals(innerTuple.get(0), "varchar1");
-            Assert.assertEquals(innerTuple.get(1), "varchar2");  
-            innerTuple = (Tuple) ((Tuple) t.get(15)).get(0);
-            Assert.assertEquals(innerTuple.get(0), 123);
-            Assert.assertEquals(innerTuple.get(1), 124);
-        }
-        else
-        {
-            Assert.fail("Failed to get data for query " + initialQuery);
-        }
-    }
-
-}


Mime
View raw message