Return-Path: X-Original-To: apmail-hawq-commits-archive@minotaur.apache.org Delivered-To: apmail-hawq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2E46018ED0 for ; Wed, 28 Oct 2015 22:10:25 +0000 (UTC) Received: (qmail 41296 invoked by uid 500); 28 Oct 2015 22:10:09 -0000 Delivered-To: apmail-hawq-commits-archive@hawq.apache.org Received: (qmail 41257 invoked by uid 500); 28 Oct 2015 22:10:09 -0000 Mailing-List: contact commits-help@hawq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hawq.incubator.apache.org Delivered-To: mailing list commits@hawq.incubator.apache.org Received: (qmail 41248 invoked by uid 99); 28 Oct 2015 22:10:09 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Oct 2015 22:10:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id A7EF11809D4 for ; Wed, 28 Oct 2015 22:10:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.791 X-Spam-Level: X-Spam-Status: No, score=0.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id hO9T1HiolLYK for ; Wed, 28 Oct 2015 22:09:53 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id B3AD042B79 for ; Wed, 28 Oct 2015 22:09:52 +0000 (UTC) Received: (qmail 40360 invoked by uid 99); 28 Oct 2015 22:09:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Oct 2015 22:09:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 082C9DFFC2; Wed, 28 Oct 2015 22:09:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shivram@apache.org To: commits@hawq.incubator.apache.org Date: Wed, 28 Oct 2015 22:09:53 -0000 Message-Id: <74ef18761f724c06b60dd8995cab74bc@git.apache.org> In-Reply-To: <2a46fd82f3074b87af97f1eb7c663e5e@git.apache.org> References: <2a46fd82f3074b87af97f1eb7c663e5e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/15] incubator-hawq git commit: HAWQ-45. PXF Package Namespace change http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeInputBuilder.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeInputBuilder.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeInputBuilder.java new file mode 100644 index 0000000..126709f --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeInputBuilder.java @@ -0,0 +1,51 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OutputFormat; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.service.io.GPDBWritable; +import org.apache.hawq.pxf.service.io.Text; +import org.apache.hawq.pxf.service.utilities.ProtocolData; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.DataInput; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +public class BridgeInputBuilder { + private ProtocolData protocolData; + private static final Log LOG = LogFactory.getLog(BridgeInputBuilder.class); + + public BridgeInputBuilder(ProtocolData protocolData) throws Exception { + this.protocolData = protocolData; + } + + public List makeInput(DataInput inputStream) throws Exception { + if (protocolData.outputFormat() == OutputFormat.TEXT) { + Text txt = new Text(); + txt.readFields(inputStream); + return Collections.singletonList(new OneField(DataType.BYTEA.getOID(), txt.getBytes())); + } + + GPDBWritable gpdbWritable = new GPDBWritable(); + gpdbWritable.readFields(inputStream); + + if (gpdbWritable.isEmpty()) { + LOG.debug("Reached end of stream"); + return null; + } + + GPDBWritableMapper mapper = new GPDBWritableMapper(gpdbWritable); + int[] colTypes = gpdbWritable.getColType(); + List record = new LinkedList(); + for (int i = 0; i < colTypes.length; i++) { + mapper.setDataType(colTypes[i]); + record.add(new OneField(colTypes[i], mapper.getData(i))); + } + + return record; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java new file mode 100644 index 0000000..99255fa --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java @@ -0,0 +1,288 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hawq.pxf.api.BadRecordException; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OutputFormat; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.service.io.BufferWritable; +import org.apache.hawq.pxf.service.io.GPDBWritable; +import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException; +import org.apache.hawq.pxf.service.io.Text; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.commons.lang.ObjectUtils; + +import java.lang.reflect.Array; +import java.util.Arrays; +import java.util.List; + +import static org.apache.hawq.pxf.api.io.DataType.TEXT; + +/** + * Class creates the output record that is piped by the java process to the HAWQ + * backend. Actually, the output record is serialized and the obtained byte + * string is piped to the HAWQ segment. The output record will implement + * Writable, and the mission of BridgeOutputBuilder will be to translate a list + * of {@link OneField} objects (obtained from the Resolver) into an output + * record. + */ +public class BridgeOutputBuilder { + private ProtocolData inputData; + private Writable output = null; + private GPDBWritable errorRecord = null; + private int[] schema; + private String[] colNames; + + /** + * Constructs a BridgeOutputBuilder. + * + * @param input input data, like requested output format and schema + * information + */ + public BridgeOutputBuilder(ProtocolData input) { + inputData = input; + makeErrorRecord(); + } + + /** + * We need a separate GPDBWritable record to represent the error record. + * Just setting the errorFlag on the "output" GPDBWritable variable is not + * good enough, since the GPDBWritable is built only after the first record + * is read from the file. And if we encounter an error while fetching the + * first record from the file, then the output member will be null. The + * reason we cannot count on the schema to build the GPDBWritable output + * variable before reading the first record, is because the schema does not + * account for arrays - we cannot know from the schema the length of an + * array. We find out only after fetching the first record. + */ + void makeErrorRecord() { + int[] errSchema = { TEXT.getOID() }; + + if (inputData.outputFormat() != OutputFormat.BINARY) { + return; + } + + errorRecord = new GPDBWritable(errSchema); + errorRecord.setError(true); + } + + /** + * Returns the error record. If the output format is not binary, error + * records are not supported, and the given exception will be thrown + * + * @param ex exception to be stored in record + * @return error record + * @throws Exception if the output format is not binary + */ + public Writable getErrorOutput(Exception ex) throws Exception { + if (inputData.outputFormat() == OutputFormat.BINARY) { + errorRecord.setString(0, ex.getMessage()); + return errorRecord; + } else { + throw ex; + } + } + + /** + * Translates recFields (obtained from the Resolver) into an output record. + * + * @param recFields record fields to be serialized + * @return Writable object with serialized row + * @throws BadRecordException if building the output record failed + */ + public Writable makeOutput(List recFields) + throws BadRecordException { + if (output == null && inputData.outputFormat() == OutputFormat.BINARY) { + makeGPDBWritableOutput(); + } + + fillOutputRecord(recFields); + + return output; + } + + /** + * Creates the GPDBWritable object. The object is created one time and is + * refilled from recFields for each record sent + * + * @return empty GPDBWritable object with set columns + */ + GPDBWritable makeGPDBWritableOutput() { + int num_actual_fields = inputData.getColumns(); + schema = new int[num_actual_fields]; + colNames = new String[num_actual_fields]; + + for (int i = 0; i < num_actual_fields; i++) { + schema[i] = inputData.getColumn(i).columnTypeCode(); + colNames[i] = inputData.getColumn(i).columnName(); + } + + output = new GPDBWritable(schema); + + return (GPDBWritable) output; + } + + /** + * Fills the output record based on the fields in recFields. + * + * @param recFields record fields + * @throws BadRecordException if building the output record failed + */ + void fillOutputRecord(List recFields) throws BadRecordException { + if (inputData.outputFormat() == OutputFormat.BINARY) { + fillGPDBWritable(recFields); + } else { + fillText(recFields); + } + } + + /** + * Fills a GPDBWritable object based on recFields. The input record + * recFields must correspond to schema. If the record has more or less + * fields than the schema we throw an exception. We require that the type of + * field[i] in recFields corresponds to the type of field[i] in the schema. + * + * @param recFields record fields + * @throws BadRecordException if building the output record failed + */ + void fillGPDBWritable(List recFields) throws BadRecordException { + int size = recFields.size(); + if (size == 0) { // size 0 means the resolver couldn't deserialize any + // of the record fields + throw new BadRecordException("No fields in record"); + } else if (size != schema.length) { + throw new BadRecordException("Record has " + size + + " fields but the schema size is " + schema.length); + } + + for (int i = 0; i < size; i++) { + OneField current = recFields.get(i); + if (!isTypeInSchema(current.type, schema[i])) { + throw new BadRecordException("For field " + colNames[i] + + " schema requires type " + + DataType.get(schema[i]).toString() + + " but input record has type " + + DataType.get(current.type).toString()); + } + + fillOneGPDBWritableField(current, i); + } + } + + /** + * Tests if data type is a string type. String type is a type that can be + * serialized as string, such as varchar, bpchar, text, numeric, timestamp, + * date. + * + * @param type data type + * @return whether data type is string type + */ + boolean isStringType(DataType type) { + return Arrays.asList(DataType.VARCHAR, DataType.BPCHAR, DataType.TEXT, + DataType.NUMERIC, DataType.TIMESTAMP, DataType.DATE).contains( + type); + } + + /** + * Tests if record field type and schema type correspond. + * + * @param recType record type code + * @param schemaType schema type code + * @return whether record type and schema type match + */ + boolean isTypeInSchema(int recType, int schemaType) { + DataType dtRec = DataType.get(recType); + DataType dtSchema = DataType.get(schemaType); + + return (dtSchema == DataType.UNSUPPORTED_TYPE || dtRec == dtSchema || (isStringType(dtRec) && isStringType(dtSchema))); + } + + /** + * Fills a Text object based on recFields. + * + * @param recFields record fields + * @throws BadRecordException if text formatted record has more than one field + */ + void fillText(List recFields) throws BadRecordException { + /* + * For the TEXT case there must be only one record in the list + */ + if (recFields.size() != 1) { + throw new BadRecordException( + "BridgeOutputBuilder must receive one field when handling the TEXT format"); + } + + OneField fld = recFields.get(0); + int type = fld.type; + Object val = fld.val; + if (DataType.get(type) == DataType.BYTEA) {// from LineBreakAccessor + output = new BufferWritable((byte[]) val); + } else { // from QuotedLineBreakAccessor + String textRec = (String) val; + output = new Text(textRec + "\n"); + } + } + + /** + * Fills one GPDBWritable field. + * + * @param oneField field + * @param colIdx column index + * @throws BadRecordException if field type is not supported or doesn't match the schema + */ + void fillOneGPDBWritableField(OneField oneField, int colIdx) + throws BadRecordException { + int type = oneField.type; + Object val = oneField.val; + GPDBWritable GPDBoutput = (GPDBWritable) output; + try { + switch (DataType.get(type)) { + case INTEGER: + GPDBoutput.setInt(colIdx, (Integer) val); + break; + case FLOAT8: + GPDBoutput.setDouble(colIdx, (Double) val); + break; + case REAL: + GPDBoutput.setFloat(colIdx, (Float) val); + break; + case BIGINT: + GPDBoutput.setLong(colIdx, (Long) val); + break; + case SMALLINT: + GPDBoutput.setShort(colIdx, (Short) val); + break; + case BOOLEAN: + GPDBoutput.setBoolean(colIdx, (Boolean) val); + break; + case BYTEA: + byte[] bts = null; + if (val != null) { + int length = Array.getLength(val); + bts = new byte[length]; + for (int j = 0; j < length; j++) { + bts[j] = Array.getByte(val, j); + } + } + GPDBoutput.setBytes(colIdx, bts); + break; + case VARCHAR: + case BPCHAR: + case CHAR: + case TEXT: + case NUMERIC: + case TIMESTAMP: + case DATE: + GPDBoutput.setString(colIdx, ObjectUtils.toString(val, null)); + break; + default: + String valClassName = (val != null) ? val.getClass().getSimpleName() + : null; + throw new UnsupportedOperationException(valClassName + + " is not supported for HAWQ conversion"); + } + } catch (TypeMismatchException e) { + throw new BadRecordException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmenterFactory.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmenterFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmenterFactory.java new file mode 100644 index 0000000..1ea2f86 --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmenterFactory.java @@ -0,0 +1,17 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.service.utilities.Utilities; + +/** + * Factory class for creation of {@link Fragmenter} objects. The actual {@link Fragmenter} object is "hidden" behind + * an {@link Fragmenter} abstract class which is returned by the FragmenterFactory. + */ +public class FragmenterFactory { + static public Fragmenter create(InputData inputData) throws Exception { + String fragmenterName = inputData.getFragmenter(); + + return (Fragmenter) Utilities.createAnyInstance(InputData.class, fragmenterName, inputData); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java new file mode 100644 index 0000000..47f883c --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java @@ -0,0 +1,63 @@ +package org.apache.hawq.pxf.service; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.StreamingOutput; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.codehaus.jackson.map.ObjectMapper; + +import org.apache.hawq.pxf.api.Fragment; + +/** + * Class for serializing fragments metadata in JSON format. + * The class implements {@link StreamingOutput} so the serialization will be + * done in a stream and not in one bulk, this in order to avoid running + * out of memory when processing a lot of fragments. + */ +public class FragmentsResponse implements StreamingOutput { + + private static Log Log = LogFactory.getLog(FragmentsResponse.class); + + private List fragments; + + /** + * Constructs fragments response out of a list of fragments + * + * @param fragments fragment list + */ + public FragmentsResponse(List fragments) { + this.fragments = fragments; + } + + /** + * Serializes a fragments list in JSON, + * To be used as the result string for HAWQ. + * An example result is as follows: + * {@code {"PXFFragments":[{"replicas":["sdw1.corp.emc.com","sdw3.corp.emc.com","sdw8.corp.emc.com"],"sourceName":"text2.csv", "index":"0", "metadata":, "userData":""},{"replicas":["sdw2.corp.emc.com","sdw4.corp.emc.com","sdw5.corp.emc.com"],"sourceName":"text_data.csv","index":"0","metadata":,"userData":""}]}} + */ + @Override + public void write(OutputStream output) throws IOException, + WebApplicationException { + DataOutputStream dos = new DataOutputStream(output); + ObjectMapper mapper = new ObjectMapper(); + + dos.write("{\"PXFFragments\":[".getBytes()); + + String prefix = ""; + for (Fragment fragment : fragments) { + StringBuilder result = new StringBuilder(); + /* metaData and userData are automatically converted to Base64 */ + result.append(prefix).append(mapper.writeValueAsString(fragment)); + prefix = ","; + dos.write(result.toString().getBytes()); + } + + dos.write("]}".getBytes()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java new file mode 100644 index 0000000..0e9c47f --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java @@ -0,0 +1,135 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hawq.pxf.api.Fragment; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.List; + +/** + * Utility class for converting Fragments into a {@link FragmentsResponse} + * that will serialize them into JSON format. + */ +public class FragmentsResponseFormatter { + + private static Log LOG = LogFactory.getLog(FragmentsResponseFormatter.class); + + /** + * Converts Fragments list to FragmentsResponse + * after replacing host name by their respective IPs. + * + * @param fragments list of fragments + * @param data data (e.g. path) related to the fragments + * @return FragmentsResponse with given fragments + * @throws UnknownHostException if converting host names to IP fails + */ + public static FragmentsResponse formatResponse(List fragments, String data) throws UnknownHostException { + /* print the raw fragment list to log when in debug level */ + if (LOG.isDebugEnabled()) { + LOG.debug("Fragments before conversion to IP list:"); + FragmentsResponseFormatter.printList(fragments, data); + } + + /* HD-2550: convert host names to IPs */ + convertHostsToIPs(fragments); + + updateFragmentIndex(fragments); + + /* print the fragment list to log when in debug level */ + if (LOG.isDebugEnabled()) { + FragmentsResponseFormatter.printList(fragments, data); + } + + return new FragmentsResponse(fragments); + } + + /** + * Updates the fragments' indexes so that it is incremented by sourceName. + * (E.g.: {"a", 0}, {"a", 1}, {"b", 0} ... ) + * + * @param fragments fragments to be updated + */ + private static void updateFragmentIndex(List fragments) { + + String sourceName = null; + int index = 0; + for (Fragment fragment : fragments) { + + String currentSourceName = fragment.getSourceName(); + if (!currentSourceName.equals(sourceName)) { + index = 0; + sourceName = currentSourceName; + } + fragment.setIndex(index++); + } + } + + /** + * Converts hosts to their matching IP addresses. + * + * @throws UnknownHostException if converting host name to IP fails + */ + private static void convertHostsToIPs(List fragments) throws UnknownHostException { + /* host converted to IP map. Used to limit network calls. */ + HashMap hostToIpMap = new HashMap(); + + for (Fragment fragment : fragments) { + String[] hosts = fragment.getReplicas(); + if (hosts == null) { + continue; + } + String[] ips = new String[hosts.length]; + int index = 0; + + for (String host : hosts) { + String convertedIp = hostToIpMap.get(host); + if (convertedIp == null) { + /* find host's IP, and add to map */ + InetAddress addr = InetAddress.getByName(host); + convertedIp = addr.getHostAddress(); + hostToIpMap.put(host, convertedIp); + } + + /* update IPs array */ + ips[index] = convertedIp; + ++index; + } + fragment.setReplicas(ips); + } + } + + /* + * Converts a fragments list to a readable string and prints it to the log. + * Intended for debugging purposes only. + * 'datapath' is the data path part of the original URI (e.g., table name, *.csv, etc). + */ + private static void printList(List fragments, String datapath) { + LOG.debug("List of " + + (fragments.isEmpty() ? "no" : fragments.size()) + "fragments for \"" + + datapath + "\""); + + int i = 0; + for (Fragment fragment : fragments) { + StringBuilder result = new StringBuilder(); + result.append("Fragment #").append(++i).append(": [") + .append("Source: ").append(fragment.getSourceName()) + .append(", Index: ").append(fragment.getIndex()) + .append(", Replicas:"); + for (String host : fragment.getReplicas()) { + result.append(" ").append(host); + } + + result.append(", Metadata: ").append(new String(fragment.getMetadata())); + + if (fragment.getUserData() != null) { + result.append(", User Data: ").append(new String(fragment.getUserData())); + } + result.append("] "); + LOG.debug(result); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/GPDBWritableMapper.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/GPDBWritableMapper.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/GPDBWritableMapper.java new file mode 100644 index 0000000..7615e54 --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/GPDBWritableMapper.java @@ -0,0 +1,115 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hawq.pxf.api.UnsupportedTypeException; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.service.io.GPDBWritable; +import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException; + +/* + * Class for mapping GPDBWritable get functions to java types. + */ +public class GPDBWritableMapper { + + private GPDBWritable gpdbWritable; + private int type; + private DataGetter getter = null; + + public GPDBWritableMapper(GPDBWritable gpdbWritable) { + this.gpdbWritable = gpdbWritable; + } + + public void setDataType(int type) throws UnsupportedTypeException { + this.type = type; + + switch (DataType.get(type)) { + case BOOLEAN: + getter = new BooleanDataGetter(); + break; + case BYTEA: + getter = new BytesDataGetter(); + break; + case BIGINT: + getter = new LongDataGetter(); + break; + case SMALLINT: + getter = new ShortDataGetter(); + break; + case INTEGER: + getter = new IntDataGetter(); + break; + case TEXT: + getter = new StringDataGetter(); + break; + case REAL: + getter = new FloatDataGetter(); + break; + case FLOAT8: + getter = new DoubleDataGetter(); + break; + default: + throw new UnsupportedTypeException( + "Type " + GPDBWritable.getTypeName(type) + + " is not supported by GPDBWritable"); + } + } + + public Object getData(int colIdx) throws TypeMismatchException { + return getter.getData(colIdx); + } + + private interface DataGetter { + abstract Object getData(int colIdx) throws TypeMismatchException; + } + + private class BooleanDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getBoolean(colIdx); + } + } + + private class BytesDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getBytes(colIdx); + } + } + + private class DoubleDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getDouble(colIdx); + } + } + + private class FloatDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getFloat(colIdx); + } + } + + private class IntDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getInt(colIdx); + } + } + + private class LongDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getLong(colIdx); + } + } + + private class ShortDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getShort(colIdx); + } + } + + private class StringDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getString(colIdx); + } + } + + public String toString() { + return "getter type = " + GPDBWritable.getTypeName(type); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataFetcherFactory.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataFetcherFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataFetcherFactory.java new file mode 100644 index 0000000..cfd1105 --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataFetcherFactory.java @@ -0,0 +1,14 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hawq.pxf.api.MetadataFetcher; + +/** + * Factory class for creation of {@link MetadataFetcher} objects. + * The actual {@link MetadataFetcher} object is "hidden" behind an {@link MetadataFetcher} + * abstract class which is returned by the MetadataFetcherFactory. + */ +public class MetadataFetcherFactory { + static public MetadataFetcher create(String fetcherName) throws Exception { + return (MetadataFetcher) Class.forName(fetcherName).newInstance(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java new file mode 100644 index 0000000..6f33f2a --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java @@ -0,0 +1,86 @@ +package org.apache.hawq.pxf.service; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; + +import org.apache.hawq.pxf.api.Metadata; + +/** + * Utility class for converting {@link Metadata} into a JSON format. + */ +public class MetadataResponseFormatter { + + private static Log Log = LogFactory.getLog(MetadataResponseFormatter.class); + + /** + * Converts {@link Metadata} to JSON String format. + * + * @param metadata metadata to convert + * @return JSON formatted response + * @throws IOException if converting the data to JSON fails + */ + public static String formatResponseString(Metadata metadata) throws IOException { + /* print the metadata before serialization */ + Log.debug(MetadataResponseFormatter.metadataToString(metadata)); + + return MetadataResponseFormatter.metadataToJSON(metadata); + } + + /** + * Serializes a metadata in JSON, + * To be used as the result string for HAWQ. + * An example result is as follows: + * + * {"PXFMetadata":[{"table":{"dbName":"default","tableName":"t1"},"fields":[{"name":"a","type":"int"},{"name":"b","type":"float"}]}]} + */ + private static String metadataToJSON(Metadata metadata) throws IOException { + + if (metadata == null) { + throw new IllegalArgumentException("metadata object is null - cannot serialize"); + } + + if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) { + throw new IllegalArgumentException("metadata contains no fields - cannot serialize"); + } + + ObjectMapper mapper = new ObjectMapper(); + mapper.setSerializationInclusion(Inclusion.NON_EMPTY); // ignore empty fields + + StringBuilder result = new StringBuilder("{\"PXFMetadata\":"); + String prefix = "["; // preparation for supporting multiple tables + result.append(prefix).append(mapper.writeValueAsString(metadata)); + return result.append("]}").toString(); + } + + /** + * Converts metadata to a readable string. + * Intended for debugging purposes only. + */ + private static String metadataToString(Metadata metadata) { + StringBuilder result = new StringBuilder("Metadata for table \""); + + if (metadata == null) { + return "No metadata"; + } + + result.append(metadata.getTable()).append("\": "); + + if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) { + result.append("no fields in table"); + return result.toString(); + } + + int i = 0; + for (Metadata.Field field: metadata.getFields()) { + result.append("Field #").append(++i).append(": [") + .append("Name: ").append(field.getName()) + .append(", Type: ").append(field.getType()).append("] "); + } + + return result.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java new file mode 100644 index 0000000..9497c6c --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java @@ -0,0 +1,128 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hawq.pxf.api.BadRecordException; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.hawq.pxf.service.utilities.Utilities; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.*; +import java.nio.charset.CharacterCodingException; +import java.util.zip.ZipException; + +/* + * ReadBridge class creates appropriate accessor and resolver. + * It will then create the correct output conversion + * class (e.g. Text or GPDBWritable) and get records from accessor, + * let resolver deserialize them and reserialize them using the + * output conversion class. + * + * The class handles BadRecordException and other exception type + * and marks the record as invalid for GPDB. + */ +public class ReadBridge implements Bridge { + ReadAccessor fileAccessor = null; + ReadResolver fieldsResolver = null; + BridgeOutputBuilder outputBuilder = null; + + private Log Log; + + /* + * C'tor - set the implementation of the bridge + */ + public ReadBridge(ProtocolData protData) throws Exception { + outputBuilder = new BridgeOutputBuilder(protData); + Log = LogFactory.getLog(ReadBridge.class); + fileAccessor = getFileAccessor(protData); + fieldsResolver = getFieldsResolver(protData); + } + + /* + * Accesses the underlying HDFS file + */ + @Override + public boolean beginIteration() throws Exception { + return fileAccessor.openForRead(); + } + + /* + * Fetch next object from file and turn it into a record that the GPDB backend can process + */ + @Override + public Writable getNext() throws Exception { + Writable output; + OneRow onerow = null; + try { + onerow = fileAccessor.readNextObject(); + if (onerow == null) { + fileAccessor.closeForRead(); + return null; + } + + output = outputBuilder.makeOutput(fieldsResolver.getFields(onerow)); + } catch (IOException ex) { + if (!isDataException(ex)) { + fileAccessor.closeForRead(); + throw ex; + } + output = outputBuilder.getErrorOutput(ex); + } catch (BadRecordException ex) { + String row_info = "null"; + if (onerow != null) { + row_info = onerow.toString(); + } + if (ex.getCause() != null) { + Log.debug("BadRecordException " + ex.getCause().toString() + ": " + row_info); + } else { + Log.debug(ex.toString() + ": " + row_info); + } + output = outputBuilder.getErrorOutput(ex); + } catch (Exception ex) { + fileAccessor.closeForRead(); + throw ex; + } + + return output; + } + + public static ReadAccessor getFileAccessor(InputData inputData) throws Exception { + return (ReadAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData); + } + + public static ReadResolver getFieldsResolver(InputData inputData) throws Exception { + return (ReadResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData); + } + + /* + * There are many exceptions that inherit IOException. Some of them like EOFException are generated + * due to a data problem, and not because of an IO/connection problem as the father IOException + * might lead us to believe. For example, an EOFException will be thrown while fetching a record + * from a sequence file, if there is a formatting problem in the record. Fetching record from + * the sequence-file is the responsibility of the accessor so the exception will be thrown from the + * accessor. We identify this cases by analyzing the exception type, and when we discover that the + * actual problem was a data problem, we return the errorOutput GPDBWritable. + */ + private boolean isDataException(IOException ex) { + return (ex instanceof EOFException || ex instanceof CharacterCodingException || + ex instanceof CharConversionException || ex instanceof UTFDataFormatException || + ex instanceof ZipException); + } + + @Override + public boolean setNext(DataInputStream inputStream) { + throw new UnsupportedOperationException("setNext is not implemented"); + } + + @Override + public boolean isThreadSafe() { + boolean result = ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe(); + Log.debug("Bridge is " + (result ? "" : "not ") + "thread safe"); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java new file mode 100644 index 0000000..34ed316 --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java @@ -0,0 +1,96 @@ +package org.apache.hawq.pxf.service; + +import org.apache.hawq.pxf.api.*; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.hawq.pxf.service.utilities.Utilities; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.DataInputStream; +import java.util.List; + +/* + * WriteBridge class creates appropriate accessor and resolver. + * It reads data from inputStream by the resolver, + * and writes it to the Hadoop storage with the accessor. + */ +public class WriteBridge implements Bridge { + private static final Log LOG = LogFactory.getLog(WriteBridge.class); + WriteAccessor fileAccessor = null; + WriteResolver fieldsResolver = null; + BridgeInputBuilder inputBuilder; + + /* + * C'tor - set the implementation of the bridge + */ + public WriteBridge(ProtocolData protocolData) throws Exception { + + inputBuilder = new BridgeInputBuilder(protocolData); + /* plugins accept InputData paramaters */ + fileAccessor = getFileAccessor(protocolData); + fieldsResolver = getFieldsResolver(protocolData); + + } + + /* + * Accesses the underlying HDFS file + */ + public boolean beginIteration() throws Exception { + return fileAccessor.openForWrite(); + } + + /* + * Read data from stream, convert it using WriteResolver into OneRow object, and + * pass to WriteAccessor to write into file. + */ + @Override + public boolean setNext(DataInputStream inputStream) throws Exception { + + List record = inputBuilder.makeInput(inputStream); + if (record == null) { + close(); + return false; + } + + OneRow onerow = fieldsResolver.setFields(record); + if (onerow == null) { + close(); + return false; + } + if (!fileAccessor.writeNextObject(onerow)) { + close(); + throw new BadRecordException(); + } + return true; + } + + private void close() throws Exception { + try { + fileAccessor.closeForWrite(); + } catch (Exception e) { + LOG.error("Failed to close bridge resources: " + e.getMessage()); + throw e; + } + } + + private static WriteAccessor getFileAccessor(InputData inputData) throws Exception { + return (WriteAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData); + } + + private static WriteResolver getFieldsResolver(InputData inputData) throws Exception { + return (WriteResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData); + } + + @Override + public Writable getNext() { + throw new UnsupportedOperationException("getNext is not implemented"); + } + + @Override + public boolean isThreadSafe() { + return ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java new file mode 100644 index 0000000..e74c88b --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java @@ -0,0 +1,57 @@ +package org.apache.hawq.pxf.service.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.UnsupportedOperationException; + +/** + * A serializable object for transporting a byte array through the Bridge + * framework + */ +public class BufferWritable implements Writable { + + byte[] buf = null; + + /** + * Constructs a BufferWritable. Copies the buffer reference and not the + * actual bytes. This class is used when we intend to transport a buffer + * through the Bridge framework without copying the data each time the + * buffer is passed between the Bridge objects. + * + * @param inBuf buffer + */ + public BufferWritable(byte[] inBuf) { + buf = inBuf; + } + + /** + * Serializes the fields of this object to out. + * + * @param out DataOutput to serialize this object into. + * @throws IOException if the buffer was not set + */ + @Override + public void write(DataOutput out) throws IOException { + if (buf == null) + throw new IOException("BufferWritable was not set"); + out.write(buf); + } + + /** + * Deserializes the fields of this object from in. + *

+ * For efficiency, implementations should attempt to re-use storage in the + * existing object where possible. + *

+ * + * @param in DataInput to deserialize this object from + * @throws UnsupportedOperationException this function is not supported + */ + @Override + public void readFields(DataInput in) { + throw new UnsupportedOperationException( + "BufferWritable.readFields() is not implemented"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/GPDBWritable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/GPDBWritable.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/GPDBWritable.java new file mode 100644 index 0000000..1cce070 --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/GPDBWritable.java @@ -0,0 +1,873 @@ +package org.apache.hawq.pxf.service.io; + +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.*; +import java.util.Arrays; + +import static org.apache.hawq.pxf.api.io.DataType.*; + + +/** + * This class represents a GPDB record in the form of + * a Java object. + */ +public class GPDBWritable implements Writable { + /* + * GPDBWritable is using the following serialization form: + * Total Length | Version | Error Flag | # of columns | Col type |...| Col type | Null Bit array | Col val... + * 4 byte | 2 byte | 1 byte | 2 byte | 1 byte |...| 1 byte | ceil(# of columns/8) byte | Fixed or Var length + * + * For fixed length type, we know the length. + * In the col val, we align pad according to the alignment requirement of the type. + * For var length type, the alignment is always 4 byte. + * For var length type, col val is <4 byte length> + */ + + private static Log Log = LogFactory.getLog(GPDBWritable.class); + private static final int EOF = -1; + + /* + * Enum of the Database type + */ + private enum DBType { + BIGINT(8, 8), + BOOLEAN(1, 1), + FLOAT8(8, 8), + INTEGER(4, 4), + REAL(4, 4), + SMALLINT(2, 2), + BYTEA(4, -1), + TEXT(4, -1); + + private final int typelength; // -1 means var length + private final int alignment; + + DBType(int align, int len) { + this.typelength = len; + this.alignment = align; + } + + public int getTypeLength() { + return typelength; + } + + public boolean isVarLength() { + return typelength == -1; + } + + // return the alignment requirement of the type + public int getAlignment() { + return alignment; + } + } + + /* + * Constants + */ + private static final int PREV_VERSION = 1; + private static final int VERSION = 2; /* for backward compatibility */ + private static final String CHARSET = "UTF-8"; + + /* + * Local variables + */ + protected int[] colType; + protected Object[] colValue; + protected int alignmentOfEightBytes = 8; + protected byte errorFlag = 0; + protected int pktlen = EOF; + + public int[] getColType() { + return colType; + } + + /** + * An exception class for column type definition and + * set/get value mismatch. + */ + public class TypeMismatchException extends IOException { + public TypeMismatchException(String msg) { + super(msg); + } + } + + /** + * Empty Constructor + */ + public GPDBWritable() { + initializeEightByteAlignment(); + } + + /** + * Constructor to build a db record. colType defines the schema + * + * @param columnType the table column types + */ + public GPDBWritable(int[] columnType) { + initializeEightByteAlignment(); + colType = columnType; + colValue = new Object[columnType.length]; + } + + /** + * Constructor to build a db record from a serialized form. + * + * @param data a record in the serialized form + * @throws IOException if the data is malformatted. + */ + public GPDBWritable(byte[] data) throws IOException { + initializeEightByteAlignment(); + ByteArrayInputStream bis = new ByteArrayInputStream(data); + DataInputStream dis = new DataInputStream(bis); + + readFields(dis); + } + + /* + * Read first 4 bytes, and verify it's a valid packet length. + * Upon error returns EOF. + */ + private int readPktLen(DataInput in) throws IOException { + pktlen = EOF; + + try { + pktlen = in.readInt(); + } catch (EOFException e) { + Log.debug("Reached end of stream (EOFException)"); + return EOF; + } + if (pktlen == EOF) { + Log.debug("Reached end of stream (returned -1)"); + } + + return pktlen; + } + + @Override + public void readFields(DataInput in) throws IOException { + /* + * extract pkt len. + * + * GPSQL-1107: + * The DataInput might already be empty (EOF), but we can't check it beforehand. + * If that's the case, pktlen is updated to -1, to mark that the object is still empty. + * (can be checked with isEmpty()). + */ + pktlen = readPktLen(in); + if (isEmpty()) { + return; + } + + /* extract the version and col cnt */ + int version = in.readShort(); + int curOffset = 4 + 2; + int colCnt; + + /* !!! Check VERSION !!! */ + if (version != GPDBWritable.VERSION && version != GPDBWritable.PREV_VERSION) { + throw new IOException("Current GPDBWritable version(" + + GPDBWritable.VERSION + ") does not match input version(" + + version + ")"); + } + + if (version == GPDBWritable.VERSION) { + errorFlag = in.readByte(); + curOffset += 1; + } + + colCnt = in.readShort(); + curOffset += 2; + + /* Extract Column Type */ + colType = new int[colCnt]; + DBType[] coldbtype = new DBType[colCnt]; + for (int i = 0; i < colCnt; i++) { + int enumType = (in.readByte()); + curOffset += 1; + if (enumType == DBType.BIGINT.ordinal()) { + colType[i] = BIGINT.getOID(); + coldbtype[i] = DBType.BIGINT; + } else if (enumType == DBType.BOOLEAN.ordinal()) { + colType[i] = BOOLEAN.getOID(); + coldbtype[i] = DBType.BOOLEAN; + } else if (enumType == DBType.FLOAT8.ordinal()) { + colType[i] = FLOAT8.getOID(); + coldbtype[i] = DBType.FLOAT8; + } else if (enumType == DBType.INTEGER.ordinal()) { + colType[i] = INTEGER.getOID(); + coldbtype[i] = DBType.INTEGER; + } else if (enumType == DBType.REAL.ordinal()) { + colType[i] = REAL.getOID(); + coldbtype[i] = DBType.REAL; + } else if (enumType == DBType.SMALLINT.ordinal()) { + colType[i] = SMALLINT.getOID(); + coldbtype[i] = DBType.SMALLINT; + } else if (enumType == DBType.BYTEA.ordinal()) { + colType[i] = BYTEA.getOID(); + coldbtype[i] = DBType.BYTEA; + } else if (enumType == DBType.TEXT.ordinal()) { + colType[i] = TEXT.getOID(); + coldbtype[i] = DBType.TEXT; + } else { + throw new IOException("Unknown GPDBWritable.DBType ordinal value"); + } + } + + /* Extract null bit array */ + byte[] nullbytes = new byte[getNullByteArraySize(colCnt)]; + in.readFully(nullbytes); + curOffset += nullbytes.length; + boolean[] colIsNull = byteArrayToBooleanArray(nullbytes, colCnt); + + /* extract column value */ + colValue = new Object[colCnt]; + for (int i = 0; i < colCnt; i++) { + if (!colIsNull[i]) { + /* Skip the alignment padding */ + int skipbytes = roundUpAlignment(curOffset, coldbtype[i].getAlignment()) - curOffset; + for (int j = 0; j < skipbytes; j++) { + in.readByte(); + } + curOffset += skipbytes; + + /* For fixed length type, increment the offset according to type type length here. + * For var length type (BYTEA, TEXT), we'll read 4 byte length header and the + * actual payload. + */ + int varcollen = -1; + if (coldbtype[i].isVarLength()) { + varcollen = in.readInt(); + curOffset += 4 + varcollen; + } else { + curOffset += coldbtype[i].getTypeLength(); + } + + switch (DataType.get(colType[i])) { + case BIGINT: { + colValue[i] = in.readLong(); + break; + } + case BOOLEAN: { + colValue[i] = in.readBoolean(); + break; + } + case FLOAT8: { + colValue[i] = in.readDouble(); + break; + } + case INTEGER: { + colValue[i] = in.readInt(); + break; + } + case REAL: { + colValue[i] = in.readFloat(); + break; + } + case SMALLINT: { + colValue[i] = in.readShort(); + break; + } + + /* For BYTEA column, it has a 4 byte var length header. */ + case BYTEA: { + colValue[i] = new byte[varcollen]; + in.readFully((byte[]) colValue[i]); + break; + } + /* For text formatted column, it has a 4 byte var length header + * and it's always null terminated string. + * So, we can remove the last "\0" when constructing the string. + */ + case TEXT: { + byte[] data = new byte[varcollen]; + in.readFully(data, 0, varcollen); + colValue[i] = new String(data, 0, varcollen - 1, CHARSET); + break; + } + + default: + throw new IOException("Unknown GPDBWritable ColType"); + } + } + } + + /* Skip the ending alignment padding */ + int skipbytes = roundUpAlignment(curOffset, 8) - curOffset; + for (int j = 0; j < skipbytes; j++) { + in.readByte(); + } + curOffset += skipbytes; + + if (errorFlag != 0) { + throw new IOException("Received error value " + errorFlag + " from format"); + } + } + + @Override + public void write(DataOutput out) throws IOException { + int numCol = colType.length; + boolean[] nullBits = new boolean[numCol]; + int[] colLength = new int[numCol]; + byte[] enumType = new byte[numCol]; + int[] padLength = new int[numCol]; + byte[] padbytes = new byte[8]; + + /** + * Compute the total payload and header length + * header = total length (4 byte), Version (2 byte), Error (1 byte), #col (2 byte) + * col type array = #col * 1 byte + * null bit array = ceil(#col/8) + */ + int datlen = 4 + 2 + 1 + 2; + datlen += numCol; + datlen += getNullByteArraySize(numCol); + + for (int i = 0; i < numCol; i++) { + /* Get the enum type */ + DBType coldbtype; + switch (DataType.get(colType[i])) { + case BIGINT: + coldbtype = DBType.BIGINT; + break; + case BOOLEAN: + coldbtype = DBType.BOOLEAN; + break; + case FLOAT8: + coldbtype = DBType.FLOAT8; + break; + case INTEGER: + coldbtype = DBType.INTEGER; + break; + case REAL: + coldbtype = DBType.REAL; + break; + case SMALLINT: + coldbtype = DBType.SMALLINT; + break; + case BYTEA: + coldbtype = DBType.BYTEA; + break; + default: + coldbtype = DBType.TEXT; + } + enumType[i] = (byte) (coldbtype.ordinal()); + + /* Get the actual value, and set the null bit */ + if (colValue[i] == null) { + nullBits[i] = true; + colLength[i] = 0; + } else { + nullBits[i] = false; + + /* + * For fixed length type, we get the fixed length. + * For var len binary format, the length is in the col value. + * For text format, we must convert encoding first. + */ + if (!coldbtype.isVarLength()) { + colLength[i] = coldbtype.getTypeLength(); + } else if (!isTextForm(colType[i])) { + colLength[i] = ((byte[]) colValue[i]).length; + } else { + colLength[i] = ((String) colValue[i]).getBytes(CHARSET).length; + } + + /* calculate and add the type alignment padding */ + padLength[i] = roundUpAlignment(datlen, coldbtype.getAlignment()) - datlen; + datlen += padLength[i]; + + /* for variable length type, we add a 4 byte length header */ + if (coldbtype.isVarLength()) { + datlen += 4; + } + } + datlen += colLength[i]; + } + + /* + * Add the final alignment padding for the next record + */ + int endpadding = roundUpAlignment(datlen, 8) - datlen; + datlen += endpadding; + + /* Construct the packet header */ + out.writeInt(datlen); + out.writeShort(VERSION); + out.writeByte(errorFlag); + out.writeShort(numCol); + + /* Write col type */ + for (int i = 0; i < numCol; i++) { + out.writeByte(enumType[i]); + } + + /* Nullness */ + byte[] nullBytes = boolArrayToByteArray(nullBits); + out.write(nullBytes); + + /* Column Value */ + for (int i = 0; i < numCol; i++) { + if (!nullBits[i]) { + /* Pad the alignment byte first */ + if (padLength[i] > 0) { + out.write(padbytes, 0, padLength[i]); + } + + /* Now, write the actual column value */ + switch (DataType.get(colType[i])) { + case BIGINT: + out.writeLong(((Long) colValue[i])); + break; + case BOOLEAN: + out.writeBoolean(((Boolean) colValue[i])); + break; + case FLOAT8: + out.writeDouble(((Double) colValue[i])); + break; + case INTEGER: + out.writeInt(((Integer) colValue[i])); + break; + case REAL: + out.writeFloat(((Float) colValue[i])); + break; + case SMALLINT: + out.writeShort(((Short) colValue[i])); + break; + + /* For BYTEA format, add 4byte length header at the beginning */ + case BYTEA: + out.writeInt(colLength[i]); + out.write((byte[]) colValue[i]); + break; + + /* For text format, add 4byte length header. string is already '\0' terminated */ + default: { + out.writeInt(colLength[i]); + byte[] data = ((String) colValue[i]).getBytes(CHARSET); + out.write(data); + break; + } + } + } + } + + /* End padding */ + out.write(padbytes, 0, endpadding); + } + + /** + * Private helper to convert boolean array to byte array + */ + private static byte[] boolArrayToByteArray(boolean[] data) { + int len = data.length; + byte[] byts = new byte[getNullByteArraySize(len)]; + + for (int i = 0, j = 0, k = 7; i < data.length; i++) { + byts[j] |= (data[i] ? 1 : 0) << k--; + if (k < 0) { + j++; + k = 7; + } + } + return byts; + } + + /** + * Private helper to determine the size of the null byte array + */ + private static int getNullByteArraySize(int colCnt) { + return (colCnt / 8) + (colCnt % 8 != 0 ? 1 : 0); + } + + /** + * Private helper to convert byte array to boolean array + */ + private static boolean[] byteArrayToBooleanArray(byte[] data, int colCnt) { + boolean[] bools = new boolean[colCnt]; + for (int i = 0, j = 0, k = 7; i < bools.length; i++) { + bools[i] = ((data[j] >> k--) & 0x01) == 1; + if (k < 0) { + j++; + k = 7; + } + } + return bools; + } + + /** + * Private helper to round up alignment for the given length + */ + private int roundUpAlignment(int len, int align) { + int commonAlignment = align; + if (commonAlignment == 8) { + commonAlignment = alignmentOfEightBytes; + } + return (((len) + ((commonAlignment) - 1)) & ~((commonAlignment) - 1)); + } + + /** + * Getter/Setter methods to get/set the column value + */ + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setLong(int colIdx, Long val) + throws TypeMismatchException { + checkType(BIGINT, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setBoolean(int colIdx, Boolean val) + throws TypeMismatchException { + checkType(BOOLEAN, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setBytes(int colIdx, byte[] val) + throws TypeMismatchException { + checkType(BYTEA, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setString(int colIdx, String val) + throws TypeMismatchException { + checkType(TEXT, colIdx, true); + if (val != null) { + colValue[colIdx] = val + "\0"; + } else { + colValue[colIdx] = val; + } + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setFloat(int colIdx, Float val) + throws TypeMismatchException { + checkType(REAL, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setDouble(int colIdx, Double val) + throws TypeMismatchException { + checkType(FLOAT8, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setInt(int colIdx, Integer val) + throws TypeMismatchException { + checkType(INTEGER, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setShort(int colIdx, Short val) + throws TypeMismatchException { + checkType(SMALLINT, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Long getLong(int colIdx) + throws TypeMismatchException { + checkType(BIGINT, colIdx, false); + return (Long) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Boolean getBoolean(int colIdx) + throws TypeMismatchException { + checkType(BOOLEAN, colIdx, false); + return (Boolean) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public byte[] getBytes(int colIdx) + throws TypeMismatchException { + checkType(BYTEA, colIdx, false); + return (byte[]) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public String getString(int colIdx) + throws TypeMismatchException { + checkType(TEXT, colIdx, false); + return (String) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Float getFloat(int colIdx) + throws TypeMismatchException { + checkType(REAL, colIdx, false); + return (Float) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Double getDouble(int colIdx) + throws TypeMismatchException { + checkType(FLOAT8, colIdx, false); + return (Double) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Integer getInt(int colIdx) + throws TypeMismatchException { + checkType(INTEGER, colIdx, false); + return (Integer) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Short getShort(int colIdx) + throws TypeMismatchException { + checkType(SMALLINT, colIdx, false); + return (Short) colValue[colIdx]; + } + + /** + * Sets the error field. + * + * @param errorVal the error value + */ + public void setError(boolean errorVal) { + errorFlag = errorVal ? (byte) 1 : (byte) 0; + } + + /** + * Returns a string representation of the object. + */ + @Override + public String toString() { + if (colType == null) { + return null; + } + StringBuilder result = new StringBuilder(); + for (int i = 0; i < colType.length; i++) { + result.append("Column ").append(i).append(":"); + if (colValue[i] != null) { + result.append(colType[i] == BYTEA.getOID() + ? byteArrayInString((byte[]) colValue[i]) + : colValue[i]); + } + result.append("\n"); + } + return result.toString(); + } + + /** + * Helper printing function + */ + private static String byteArrayInString(byte[] data) { + StringBuilder result = new StringBuilder(); + for (Byte b : data) { + result.append(b.intValue()).append(" "); + } + return result.toString(); + } + + /** + * Private Helper to check the type mismatch + * If the expected type is stored as string, then it must be set + * via setString. + * Otherwise, the type must match. + */ + private void checkType(DataType inTyp, int idx, boolean isSet) + throws TypeMismatchException { + if (idx < 0 || idx >= colType.length) { + throw new TypeMismatchException("Column index is out of range"); + } + + int exTyp = colType[idx]; + + if (isTextForm(exTyp)) { + if (inTyp != TEXT) { + throw new TypeMismatchException(formErrorMsg(inTyp.getOID(), TEXT.getOID(), isSet)); + } + } else if (inTyp != DataType.get(exTyp)) { + throw new TypeMismatchException(formErrorMsg(inTyp.getOID(), exTyp, isSet)); + } + } + + private String formErrorMsg(int inTyp, int colTyp, boolean isSet) { + return isSet + ? "Cannot set " + getTypeName(inTyp) + " to a " + getTypeName(colTyp) + " column" + : "Cannot get " + getTypeName(inTyp) + " from a " + getTypeName(colTyp) + " column"; + } + + /** + * Private Helper routine to tell whether a type is Text form or not + * + * @param type the type OID that we want to check + */ + private boolean isTextForm(int type) { + return !Arrays.asList(BIGINT, BOOLEAN, BYTEA, FLOAT8, INTEGER, REAL, SMALLINT).contains(DataType.get(type)); + } + + /** + * Helper to get the type name. + * If a given oid is not in the commonly used list, we + * would expect a TEXT for it (for the error message). + * + * @param oid type OID + * @return type name + */ + public static String getTypeName(int oid) { + switch (DataType.get(oid)) { + case BOOLEAN: + return "BOOLEAN"; + case BYTEA: + return "BYTEA"; + case CHAR: + return "CHAR"; + case BIGINT: + return "BIGINT"; + case SMALLINT: + return "SMALLINT"; + case INTEGER: + return "INTEGER"; + case TEXT: + return "TEXT"; + case REAL: + return "REAL"; + case FLOAT8: + return "FLOAT8"; + case BPCHAR: + return "BPCHAR"; + case VARCHAR: + return "VARCHAR"; + case DATE: + return "DATE"; + case TIME: + return "TIME"; + case TIMESTAMP: + return "TIMESTAMP"; + case NUMERIC: + return "NUMERIC"; + default: + return "TEXT"; + } + } + + /* + * Get alignment from command line to match to the alignment + * the C code uses (see gphdfs/src/protocol_formatter/common.c). + */ + private void initializeEightByteAlignment() { + String alignment = System.getProperty("greenplum.alignment"); + if (alignment == null) { + return; + } + alignmentOfEightBytes = Integer.parseInt(alignment); + } + + /** + * Returns if the writable object is empty, + * based on the pkt len as read from stream. + * -1 means nothing was read (eof). + * + * @return whether the writable object is empty + */ + public boolean isEmpty() { + return pktlen == EOF; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Text.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Text.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Text.java new file mode 100644 index 0000000..cdea1de --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Text.java @@ -0,0 +1,379 @@ +package org.apache.hawq.pxf.service.io; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.*; +import java.util.Arrays; + +/** + * This class stores text using standard UTF8 encoding. It provides methods to + * serialize, deserialize. The type of length is integer and is serialized using + * zero-compressed format. + */ +public class Text implements Writable { + + // for write + private byte[] buf; + private static final Log LOG = LogFactory.getLog(Text.class); + int curLoc; + private static final char LINE_DELIMITER = '\n'; + private static final int BUF_SIZE = 1024; + private static final int EOF = -1; + + private static final byte[] EMPTY_BYTES = new byte[0]; + private static ThreadLocal ENCODER_FACTORY = new ThreadLocal() { + @Override + protected CharsetEncoder initialValue() { + return Charset.forName("UTF-8").newEncoder().onMalformedInput( + CodingErrorAction.REPORT).onUnmappableCharacter( + CodingErrorAction.REPORT); + } + }; + private static ThreadLocal DECODER_FACTORY = new ThreadLocal() { + @Override + protected CharsetDecoder initialValue() { + return Charset.forName("UTF-8").newDecoder().onMalformedInput( + CodingErrorAction.REPORT).onUnmappableCharacter( + CodingErrorAction.REPORT); + } + }; + private byte[] bytes; + private int length; + + public Text() { + bytes = EMPTY_BYTES; + buf = new byte[BUF_SIZE]; + } + + /** + * Construct from a string. + * + * @param string input string + */ + public Text(String string) { + set(string); + } + + /** + * Construct from another text. + * + * @param utf8 text to copy + */ + public Text(Text utf8) { + set(utf8); + } + + /** + * Construct from a byte array. + * + * @param utf8 input byte array + */ + public Text(byte[] utf8) { + set(utf8); + } + + public static boolean isNegativeVInt(byte value) { + return value < -120 || (value >= -112 && value < 0); + } + + public static long readVLong(DataInput stream) throws IOException { + byte firstByte = stream.readByte(); + int len = decodeVIntSize(firstByte); + if (len == 1) { + return firstByte; + } + long i = 0; + for (int idx = 0; idx < len - 1; idx++) { + byte b = stream.readByte(); + i = i << 8; + i = i | (b & 0xFF); + } + return (isNegativeVInt(firstByte) ? (i ^ -1L) : i); + } + + public static int decodeVIntSize(byte value) { + if (value >= -112) { + return 1; + } else if (value < -120) { + return -119 - value; + } + return -111 - value; + } + + public static String decode(byte[] utf8, int start, int length) + throws CharacterCodingException { + return decode(ByteBuffer.wrap(utf8, start, length), true); + } + + /** + * Converts the provided byte array to a String using the UTF-8 encoding. If + * replace is true, then malformed input is replaced with the + * substitution character, which is U+FFFD. Otherwise the method throws a + * MalformedInputException. + * + * @param utf8 UTF-8 encoded byte array + * @param start start point + * @param length length of array + * @param replace whether to replace malformed input with substitution + * character + * @return decoded string + * @throws MalformedInputException if a malformed input is used + * @throws CharacterCodingException if the conversion failed + */ + public static String decode(byte[] utf8, int start, int length, + boolean replace) + throws CharacterCodingException { + return decode(ByteBuffer.wrap(utf8, start, length), replace); + } + + private static String decode(ByteBuffer utf8, boolean replace) + throws CharacterCodingException { + CharsetDecoder decoder = DECODER_FACTORY.get(); + if (replace) { + decoder.onMalformedInput(java.nio.charset.CodingErrorAction.REPLACE); + decoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + } + String str = decoder.decode(utf8).toString(); + // set decoder back to its default value: REPORT + if (replace) { + decoder.onMalformedInput(CodingErrorAction.REPORT); + decoder.onUnmappableCharacter(CodingErrorAction.REPORT); + } + return str; + } + + /** + * Converts the provided String to bytes using the UTF-8 encoding. If the + * input is malformed, invalid chars are replaced by a default value. + * + * @param string string to encode + * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is + * ByteBuffer.limit() + * @throws CharacterCodingException if conversion failed + */ + public static ByteBuffer encode(String string) + throws CharacterCodingException { + return encode(string, true); + } + + /** + * Converts the provided String to bytes using the UTF-8 encoding. If + * replace is true, then malformed input is replaced with the + * substitution character, which is U+FFFD. Otherwise the method throws a + * MalformedInputException. + * + * @param string string to encode + * @param replace whether to replace malformed input with substitution character + * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is + * ByteBuffer.limit() + * @throws MalformedInputException if a malformed input is used + * @throws CharacterCodingException if the conversion failed + */ + public static ByteBuffer encode(String string, boolean replace) + throws CharacterCodingException { + CharsetEncoder encoder = ENCODER_FACTORY.get(); + if (replace) { + encoder.onMalformedInput(CodingErrorAction.REPLACE); + encoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + } + ByteBuffer bytes = encoder.encode(CharBuffer.wrap(string.toCharArray())); + if (replace) { + encoder.onMalformedInput(CodingErrorAction.REPORT); + encoder.onUnmappableCharacter(CodingErrorAction.REPORT); + } + return bytes; + } + + /** + * Returns the raw bytes; however, only data up to {@link #getLength()} is + * valid. + * + * @return raw bytes of byte array + */ + public byte[] getBytes() { + return bytes; + } + + /** + * Returns the number of bytes in the byte array + * + * @return number of bytes in byte array + */ + public int getLength() { + return length; + } + + /** + * Sets to contain the contents of a string. + * + * @param string input string + */ + public void set(String string) { + try { + ByteBuffer bb = encode(string, true); + bytes = bb.array(); + length = bb.limit(); + } catch (CharacterCodingException e) { + throw new RuntimeException("Should not have happened " + + e.toString()); + } + } + + /** + * Sets to a UTF-8 byte array. + * + * @param utf8 input UTF-8 byte array + */ + public void set(byte[] utf8) { + set(utf8, 0, utf8.length); + } + + /** + * Copies a text. + * + * @param other text object to copy. + */ + public void set(Text other) { + set(other.getBytes(), 0, other.getLength()); + } + + /** + * Sets the Text to range of bytes. + * + * @param utf8 the data to copy from + * @param start the first position of the new string + * @param len the number of bytes of the new string + */ + public void set(byte[] utf8, int start, int len) { + setCapacity(len, false); + System.arraycopy(utf8, start, bytes, 0, len); + this.length = len; + } + + /** + * Appends a range of bytes to the end of the given text. + * + * @param utf8 the data to copy from + * @param start the first position to append from utf8 + * @param len the number of bytes to append + */ + public void append(byte[] utf8, int start, int len) { + setCapacity(length + len, true); + System.arraycopy(utf8, start, bytes, length, len); + length += len; + } + + /** + * Clears the string to empty. + */ + public void clear() { + length = 0; + } + + /* + * Sets the capacity of this Text object to at least + * len bytes. If the current buffer is longer, then the + * capacity and existing content of the buffer are unchanged. If + * len is larger than the current capacity, the Text object's + * capacity is increased to match. + * + * @param len the number of bytes we need + * + * @param keepData should the old data be kept + */ + private void setCapacity(int len, boolean keepData) { + if (bytes == null || bytes.length < len) { + byte[] newBytes = new byte[len]; + if (bytes != null && keepData) { + System.arraycopy(bytes, 0, newBytes, 0, length); + } + bytes = newBytes; + } + } + + /** + * Convert text back to string + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + try { + return decode(bytes, 0, length); + } catch (CharacterCodingException e) { + throw new RuntimeException("Should not have happened " + + e.toString()); + } + } + + @Override + public void write(DataOutput out) throws IOException { + byte[] bytes = getBytes(); + out.write(bytes, 0, getLength()); + } + + /** + * deserialize + */ + @Override + public void readFields(DataInput inputStream) throws IOException { + + byte c; + curLoc = 0; + clear(); + while ((c = (byte) ((DataInputStream) inputStream).read()) != EOF) { + buf[curLoc] = c; + curLoc++; + + if (c == LINE_DELIMITER) { + LOG.trace("read one line, size " + curLoc); + break; + } + + if (isBufferFull()) { + flushBuffer(); + } + } + + if (!isBufferEmpty()) { + // the buffer doesn't end with a line break. + if (c == EOF) { + LOG.warn("Stream ended without line break"); + } + flushBuffer(); + } + } + + private boolean isBufferEmpty() { + return (curLoc == 0); + } + + private boolean isBufferFull() { + return (curLoc == BUF_SIZE); + } + + private void flushBuffer() { + append(buf, 0, curLoc); + curLoc = 0; + } + + /** + * Returns true iff o is a Text with the same contents. + */ + @Override + public boolean equals(Object o) { + return (o instanceof Text && Arrays.equals(bytes, ((Text) o).bytes)); + } + + @Override + public int hashCode() { + return Arrays.hashCode(bytes); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Writable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Writable.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Writable.java new file mode 100644 index 0000000..8741085 --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Writable.java @@ -0,0 +1,30 @@ +package org.apache.hawq.pxf.service.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * A serializable object which implements a simple, efficient, serialization + * protocol, based on {@link DataInput} and {@link DataOutput}. + */ +public interface Writable { + + /** + * Serialize the fields of this object to out. + * + * @param out DataOutput to serialize this object into. + * @throws IOException if I/O error occurs + */ + void write(DataOutput out) throws IOException; + + /** + * Deserialize the fields of this object from in. + *

For efficiency, implementations should attempt to re-use storage in the + * existing object where possible.

+ * + * @param in DataInput to deserialize this object from. + * @throws IOException if I/O error occurs + */ + void readFields(DataInput in) throws IOException; +}