hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lil...@apache.org
Subject incubator-hawq git commit: HAWQ-774. Add snappy compression support to row oriented storage
Date Tue, 07 Jun 2016 03:00:08 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master c589334d6 -> 355c43704


HAWQ-774. Add snappy compression support to row oriented storage


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/355c4370
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/355c4370
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/355c4370

Branch: refs/heads/master
Commit: 355c43704cf8fd74ed46da86f7e45ac9a0d235c0
Parents: c589334
Author: Paul Guo <paulguo@gmail.com>
Authored: Mon Jun 6 10:16:15 2016 +0800
Committer: Paul Guo <paulguo@gmail.com>
Committed: Mon Jun 6 18:18:06 2016 +0800

----------------------------------------------------------------------
 src/backend/access/common/reloptions.c |   3 +-
 src/backend/catalog/pg_compression.c   | 124 +++++++++++++++++++++++++++-
 src/include/catalog/pg_appendonly.h    |   2 +-
 src/include/catalog/pg_compression.h   |   2 +
 src/include/catalog/pg_proc.h          |  20 +++++
 src/include/catalog/pg_proc.sql        |  10 +++
 src/include/utils/builtins.h           |   6 ++
 7 files changed, 161 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/backend/access/common/reloptions.c
----------------------------------------------------------------------
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 38b404a..8822d01 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -543,8 +543,7 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
 						 errOmitLocation(true)));
 		}
 
-		if (!(columnstore == RELSTORAGE_PARQUET) && ((strcmp(compresstype, "snappy") ==
0)
-				|| (strcmp(compresstype, "gzip") == 0)))
+		if (!(columnstore == RELSTORAGE_PARQUET) && (strcmp(compresstype, "gzip") == 0))
 		{
 			ereport(ERROR,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/backend/catalog/pg_compression.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/pg_compression.c b/src/backend/catalog/pg_compression.c
index 9d0ff4a..8eb51d5 100644
--- a/src/backend/catalog/pg_compression.c
+++ b/src/backend/catalog/pg_compression.c
@@ -46,6 +46,8 @@
 #include "utils/relcache.h"
 #include "utils/syscache.h"
 
+#include "snappy-c.h"
+
 /* names we expect to see in ENCODING clauses */
 char *storage_directive_names[] = {"compresstype", "compresslevel",
 								   "blocksize", NULL};
@@ -124,7 +126,7 @@ GetCompressionImplementation(char *comptype)
 						comptype)));
 
 	funcs = palloc0(sizeof(PGFunction) * NUM_COMPRESS_FUNCS);
-	
+
 	ctup = (Form_pg_compression)GETSTRUCT(tuple);
 
 	Insist(OidIsValid(ctup->compconstructor));
@@ -350,6 +352,121 @@ zlib_validator(PG_FUNCTION_ARGS)
 }
 
 Datum
+snappy_constructor(PG_FUNCTION_ARGS)
+{
+	TupleDesc			td = PG_GETARG_POINTER(0);
+	StorageAttributes	*sa = PG_GETARG_POINTER(1);
+	CompressionState	*cs	= palloc0(sizeof(CompressionState));
+
+	cs->opaque = NULL;
+	cs->desired_sz = snappy_max_compressed_length;
+
+	Insist(PointerIsValid(td));
+	Insist(PointerIsValid(sa->comptype));
+
+	PG_RETURN_POINTER(cs);
+}
+
+Datum
+snappy_destructor(PG_FUNCTION_ARGS)
+{
+	CompressionState	*cs = PG_GETARG_POINTER(0);
+
+	if (cs->opaque)
+	{
+		Insist(PointerIsValid(cs->opaque));
+		pfree(cs->opaque);
+	}
+
+	PG_RETURN_VOID();
+}
+
+static void
+elog_snappy_error(snappy_status retval, char *func_name,
+				  int src_sz, int dst_sz, int dst_used)
+{
+	switch (retval)
+	{
+		case SNAPPY_INVALID_INPUT:
+			elog(ERROR, "invalid input for %s(): "
+				 "src_sz=%d dst_sz=%d dst_used=%d",
+				 func_name, src_sz, dst_sz, dst_used);
+			break;
+		case SNAPPY_BUFFER_TOO_SMALL:
+			elog(ERROR, "buffer is too small in %s(): "
+				 "src_sz=%d dst_sz=%d dst_used=%d",
+				 func_name, src_sz, dst_sz, dst_used);
+			break;
+		default:
+			elog(ERROR, "unknown failure (return value %d) for %s(): "
+				 "src_sz=%d dst_sz=%d dst_used=%d", retval, func_name,
+				 src_sz, dst_sz, dst_used);
+			break;
+	}
+}
+
+Datum
+snappy_compress_internal(PG_FUNCTION_ARGS)
+{
+	const char		*src = PG_GETARG_POINTER(0);
+	size_t			src_sz = PG_GETARG_INT32(1);
+	char			*dst = PG_GETARG_POINTER(2);
+	size_t			dst_sz = PG_GETARG_INT32(3);
+	size_t			*dst_used = PG_GETARG_POINTER(4);
+	size_t			compressed_length;
+	snappy_status	retval;
+
+	compressed_length = snappy_max_compressed_length(src_sz);
+	Insist(dst_sz >= compressed_length);
+
+	retval = snappy_compress(src, src_sz, dst, &compressed_length);
+	*dst_used = compressed_length;
+
+	if (retval != SNAPPY_OK)
+		elog_snappy_error(retval, "snappy_compress", src_sz, dst_sz, *dst_used);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+snappy_decompress_internal(PG_FUNCTION_ARGS)
+{
+	const char		*src	= PG_GETARG_POINTER(0);
+	size_t			src_sz = PG_GETARG_INT32(1);
+	char			*dst	= PG_GETARG_POINTER(2);
+	int32			dst_sz = PG_GETARG_INT32(3);
+	int32			*dst_used = PG_GETARG_POINTER(4);
+	size_t			uncompressed_length;
+	snappy_status	retval;
+
+	Insist(src_sz > 0 && dst_sz > 0);
+
+	retval = snappy_uncompressed_length((char *) src, (size_t) src_sz,
+										&uncompressed_length);
+	if (retval != SNAPPY_OK)
+		elog_snappy_error(retval, "snappy_uncompressed_length",
+						  src_sz, dst_sz, *dst_used);
+
+	Insist(dst_sz >= uncompressed_length);
+
+	retval = snappy_uncompress((char *) src, src_sz, (char *) dst,
+							   &uncompressed_length);
+	*dst_used = uncompressed_length;
+
+	if (retval != SNAPPY_OK)
+		elog_snappy_error(retval, "snappy_uncompressed",
+						  src_sz, dst_sz, *dst_used);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+snappy_validator(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_VOID();
+}
+
+Datum
 rle_type_constructor(PG_FUNCTION_ARGS)
 {
 	elog(ERROR, "rle_type block compression not supported");
@@ -437,10 +554,11 @@ compresstype_is_valid(char *comptype)
 					 cql("SELECT COUNT(*) FROM pg_compression "
 						 " WHERE compname = :1 ",
 						 NameGetDatum(&compname))));
-		
+
+	/* FIXME: This is a hack. Should register gzip handlers into pg_compression table. */
 	if(!found)
 	{
-		if((strcmp(comptype, "snappy") == 0) || strcmp(comptype, "gzip") == 0)
+		if(strcmp(comptype, "gzip") == 0)
 			found = true;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_appendonly.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_appendonly.h b/src/include/catalog/pg_appendonly.h
index c78789b..5f0ef04 100644
--- a/src/include/catalog/pg_appendonly.h
+++ b/src/include/catalog/pg_appendonly.h
@@ -75,7 +75,7 @@ CATALOG(pg_appendonly,6105) BKI_WITHOUT_OIDS
 	int2			majorversion;		/* major version indicating what's stored in this table  */
 	int2			minorversion;		/* minor version indicating what's stored in this table  */
 	bool			checksum;			/* true if checksum is stored with data and checked */
-	text			compresstype;		/* the compressor used (zlib, or quicklz) */
+	text			compresstype;		/* the compressor used (zlib, or or snappy, quicklz) */
     bool            columnstore;        /* true if co or parquet table, false if ao table*/
     Oid             segrelid;           /* OID of aoseg table; 0 if none */
     Oid             segidxid;           /* if aoseg table, OID of segno index */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_compression.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_compression.h b/src/include/catalog/pg_compression.h
index 80ef711..8746db9 100644
--- a/src/include/catalog/pg_compression.h
+++ b/src/include/catalog/pg_compression.h
@@ -117,6 +117,8 @@ typedef FormData_pg_compression *Form_pg_compression;
 /* TIDYCAT_END_CODEGEN */
 
 /* Initial contents */
+DATA(insert OID = 3069 ( snappy gp_snappy_constructor gp_snappy_destructor gp_snappy_compress
gp_snappy_decompress gp_snappy_validator PGUID ));
+
 DATA(insert OID = 3060 ( zlib gp_zlib_constructor gp_zlib_destructor gp_zlib_compress gp_zlib_decompress
gp_zlib_validator PGUID ));
 
 DATA(insert OID = 3061 ( quicklz gp_quicklz_constructor gp_quicklz_destructor gp_quicklz_compress
gp_quicklz_decompress gp_quicklz_validator PGUID ));

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_proc.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index e818909..4441126 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -10312,6 +10312,26 @@ DESCR("anytable type serialization input function");
 DATA(insert OID = 3055 ( anytable_out  PGNSP PGUID 12 f f t f i 1 2275 f "3053" _null_ _null_
_null_ anytable_out - _null_ n ));
 DESCR("anytable type serialization output function");
 
+/* gp_snappy_constructor(internal, internal, bool) => internal */
+DATA(insert OID = 5080 ( gp_snappy_constructor  PGNSP PGUID 12 f f f f v 3 2281 f "2281 2281
16" _null_ _null_ _null_ snappy_constructor - _null_ n ));
+DESCR("snappy constructor");
+
+/* gp_snappy_destructor(internal) => void */
+DATA(insert OID = 5081 ( gp_snappy_destructor  PGNSP PGUID 12 f f f f v 1 2278 f "2281" _null_
_null_ _null_ snappy_destructor - _null_ n ));
+DESCR("snappy destructor");
+
+/* gp_snappy_compress(internal, int4, internal, int4, internal, internal) => void */
+DATA(insert OID = 5082 ( gp_snappy_compress  PGNSP PGUID 12 f f f f i 6 2278 f "2281 23 2281
23 2281 2281" _null_ _null_ _null_ snappy_compress_internal - _null_ n ));
+DESCR("snappy compressor");
+
+/* gp_snappy_decompress(internal, int4, internal, int4, internal, internal) => void */
+DATA(insert OID = 5083 ( gp_snappy_decompress  PGNSP PGUID 12 f f f f i 6 2278 f "2281 23
2281 23 2281 2281" _null_ _null_ _null_ snappy_decompress_internal - _null_ n ));
+DESCR("snappy decompressor");
+
+/* gp_snappy_validator(internal) => void */
+DATA(insert OID = 9926 ( gp_snappy_validator  PGNSP PGUID 12 f f f f i 1 2278 f "2281" _null_
_null_ _null_ snappy_validator - _null_ n ));
+DESCR("snappy compression validator");
+
 /* gp_quicklz_constructor(internal, internal, bool) => internal */
 DATA(insert OID = 5076 ( gp_quicklz_constructor  PGNSP PGUID 12 f f f f v 3 2281 f "2281
2281 16" _null_ _null_ _null_ quicklz_constructor - _null_ n ));
 DESCR("quicklz constructor");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_proc.sql
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_proc.sql b/src/include/catalog/pg_proc.sql
index 987b802..50985d4 100644
--- a/src/include/catalog/pg_proc.sql
+++ b/src/include/catalog/pg_proc.sql
@@ -5450,6 +5450,16 @@
 
  CREATE FUNCTION anytable_out(anytable) RETURNS cstring LANGUAGE internal IMMUTABLE STRICT
AS 'anytable_out' WITH (OID=3055, DESCRIPTION="anytable type serialization output function");
 
+ CREATE FUNCTION gp_snappy_constructor(internal, internal, bool) RETURNS internal LANGUAGE
internal VOLATILE AS 'snappy_constructor' WITH (OID=5080, DESCRIPTION="snappy constructor");
+
+ CREATE FUNCTION gp_snappy_destructor(internal) RETURNS void LANGUAGE internal VOLATILE AS
'snappy_destructor' WITH(OID=5081, DESCRIPTION="snappy destructor");
+
+ CREATE FUNCTION gp_snappy_compress(internal, int4, internal, int4, internal, internal) RETURNS
void LANGUAGE internal IMMUTABLE AS 'snappy_compress_internal' WITH(OID=5082, DESCRIPTION="snappy
compressor");
+
+ CREATE FUNCTION gp_snappy_decompress(internal, int4, internal, int4, internal, internal)
RETURNS void LANGUAGE internal IMMUTABLE AS 'snappy_decompress_internal' WITH(OID=5083, DESCRIPTION="snappy
decompressor");
+
+ CREATE FUNCTION gp_snappy_validator(internal) RETURNS void LANGUAGE internal IMMUTABLE AS
'snappy_validator' WITH(OID=9926, DESCRIPTION="snappy compression validator");
+
  CREATE FUNCTION gp_quicklz_constructor(internal, internal, bool) RETURNS internal LANGUAGE
internal VOLATILE AS 'quicklz_constructor' WITH (OID=5076, DESCRIPTION="quicklz constructor");
 
  CREATE FUNCTION gp_quicklz_destructor(internal) RETURNS void LANGUAGE internal VOLATILE
AS 'quicklz_destructor' WITH(OID=5077, DESCRIPTION="quicklz destructor");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/utils/builtins.h
----------------------------------------------------------------------
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index af381c4..4dca6de 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1164,6 +1164,12 @@ extern Datum quicklz_compress(PG_FUNCTION_ARGS);
 extern Datum quicklz_decompress(PG_FUNCTION_ARGS);
 extern Datum quicklz_validator(PG_FUNCTION_ARGS);
 
+extern Datum snappy_constructor(PG_FUNCTION_ARGS);
+extern Datum snappy_destructor(PG_FUNCTION_ARGS);
+extern Datum snappy_compress_internal(PG_FUNCTION_ARGS);
+extern Datum snappy_decompress_internal(PG_FUNCTION_ARGS);
+extern Datum snappy_validator(PG_FUNCTION_ARGS);
+
 extern Datum zlib_constructor(PG_FUNCTION_ARGS);
 extern Datum zlib_destructor(PG_FUNCTION_ARGS);
 extern Datum zlib_compress(PG_FUNCTION_ARGS);


Mime
View raw message