Return-Path: Delivered-To: apmail-avro-commits-archive@www.apache.org Received: (qmail 42085 invoked from network); 30 Aug 2010 16:52:28 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 30 Aug 2010 16:52:28 -0000 Received: (qmail 55748 invoked by uid 500); 30 Aug 2010 16:52:28 -0000 Delivered-To: apmail-avro-commits-archive@avro.apache.org Received: (qmail 55706 invoked by uid 500); 30 Aug 2010 16:52:28 -0000 Mailing-List: contact commits-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@avro.apache.org Delivered-To: mailing list commits@avro.apache.org Received: (qmail 55698 invoked by uid 99); 30 Aug 2010 16:52:28 -0000 Received: from Unknown (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Aug 2010 16:52:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Aug 2010 16:52:02 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 88F3A23889EB; Mon, 30 Aug 2010 16:50:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r990860 [1/3] - in /avro/trunk: ./ lang/php/ lang/php/examples/ lang/php/lib/ lang/php/lib/avro/ lang/php/test/ Date: Mon, 30 Aug 2010 16:50:41 -0000 To: commits@avro.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100830165041.88F3A23889EB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Mon Aug 30 16:50:40 2010 New Revision: 990860 URL: http://svn.apache.org/viewvc?rev=990860&view=rev Log: AVRO-627. Add PHP implementation. Contributed by Michael Glaesemann. Added: avro/trunk/lang/php/ (with props) avro/trunk/lang/php/README.txt avro/trunk/lang/php/build.sh (with props) avro/trunk/lang/php/examples/ avro/trunk/lang/php/examples/write_read.php avro/trunk/lang/php/lib/ avro/trunk/lang/php/lib/avro/ avro/trunk/lang/php/lib/avro.php avro/trunk/lang/php/lib/avro/data_file.php avro/trunk/lang/php/lib/avro/datum.php avro/trunk/lang/php/lib/avro/debug.php avro/trunk/lang/php/lib/avro/gmp.php avro/trunk/lang/php/lib/avro/io.php avro/trunk/lang/php/lib/avro/schema.php avro/trunk/lang/php/lib/avro/util.php avro/trunk/lang/php/test/ (with props) avro/trunk/lang/php/test/AllTests.php avro/trunk/lang/php/test/DataFileTest.php avro/trunk/lang/php/test/DatumIOTest.php avro/trunk/lang/php/test/FloatIntEncodingTest.php avro/trunk/lang/php/test/IODatumReaderTest.php avro/trunk/lang/php/test/InterOpTest.php avro/trunk/lang/php/test/LongEncodingTest.php avro/trunk/lang/php/test/NameTest.php avro/trunk/lang/php/test/SchemaTest.php avro/trunk/lang/php/test/StringIOTest.php avro/trunk/lang/php/test/generate_interop_data.php avro/trunk/lang/php/test/test_helper.php Modified: avro/trunk/BUILD.txt avro/trunk/CHANGES.txt avro/trunk/build.sh Modified: avro/trunk/BUILD.txt URL: http://svn.apache.org/viewvc/avro/trunk/BUILD.txt?rev=990860&r1=990859&r2=990860&view=diff ============================================================================== --- avro/trunk/BUILD.txt (original) +++ avro/trunk/BUILD.txt Mon Aug 30 16:50:40 2010 @@ -5,6 +5,7 @@ REQUIREMENTS The following packages must be installed before Avro can be built: - Java: JDK 1.6 and 1.5 (for Forrest docs) + - PHP: php5, phpunit, php5-gmp - Python: 2.5 or greater, python-setuptools for dist target - C: gcc, autoconf, automake, libtool, asciidoc, source-highlight - C++: g++, flex, bison, libboost-dev Modified: avro/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=990860&r1=990859&r2=990860&view=diff ============================================================================== --- avro/trunk/CHANGES.txt (original) +++ avro/trunk/CHANGES.txt Mon Aug 30 16:50:40 2010 @@ -36,6 +36,8 @@ Avro 1.4.0 (31 August 2010) NEW FEATURES + AVRO-627. Add PHP implementation. (Michael Glaesemann) + AVRO-613. Create basic frontend to view trace results. (Patrick Wendell via philz) Modified: avro/trunk/build.sh URL: http://svn.apache.org/viewvc/avro/trunk/build.sh?rev=990860&r1=990859&r2=990860&view=diff ============================================================================== --- avro/trunk/build.sh (original) +++ avro/trunk/build.sh Mon Aug 30 16:50:40 2010 @@ -45,6 +45,7 @@ case "$target" in (cd lang/c; ./build.sh test) (cd lang/c++; ./build.sh test) (cd lang/ruby; rake test) + (cd lang/php; ./build.sh test) # create interop test data (cd lang/java; ant interop-data-generate) @@ -52,6 +53,7 @@ case "$target" in (cd lang/c; ./build.sh interop-data-generate) #(cd lang/c++; make interop-data-generate) (cd lang/ruby; rake generate_interop) + (cd lang/php; ./build.sh interop-data-generate) # run interop data tests (cd lang/java; ant interop-data-test) @@ -59,6 +61,7 @@ case "$target" in (cd lang/c; ./build.sh interop-data-test) #(cd lang/c++; make interop-data-test) (cd lang/ruby; rake interop) + (cd lang/php; ./build.sh test-interop) # run interop rpc tests /bin/bash share/test/interop/bin/test_rpc_interop.sh @@ -86,6 +89,8 @@ case "$target" in (cd lang/ruby; rake dist) + (cd lang/php; ./build.sh dist) + # build docs (cd doc; ant) (cd build; tar czf ../dist/avro-doc-$VERSION.tar.gz avro-doc-$VERSION) @@ -127,6 +132,7 @@ case "$target" in (cd lang/ruby; rake clean) + (cd lang/php; ./build.sh clean) ;; *) Propchange: avro/trunk/lang/php/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Mon Aug 30 16:50:40 2010 @@ -0,0 +1 @@ +pkg Added: avro/trunk/lang/php/README.txt URL: http://svn.apache.org/viewvc/avro/trunk/lang/php/README.txt?rev=990860&view=auto ============================================================================== --- avro/trunk/lang/php/README.txt (added) +++ avro/trunk/lang/php/README.txt Mon Aug 30 16:50:40 2010 @@ -0,0 +1,24 @@ +What the Avro PHP library is +============================ + +A library for using [Avro](http://avro.apache.org/) with PHP. + +Getting started +=============== + +Untar the avro-php distribution, untar it, and put it in your include path: + + tar xjf avro-php.tar.bz2 # avro-php.tar.bz2 is likely avro-php-1.4.0.tar.bz2 + cp avro-php /path/to/where/you/want/it + +Require the avro.php file in your source, and you should be good to go: + + 1392, 'member_name' => 'Jose'); +$maria = array('member_id' => 1642, 'member_name' => 'Maria'); +$data = array($jose, $maria); + +$file_name = 'data.avr'; +// Open $file_name for writing, using the given writer's schema +$data_writer = AvroDataIO::open_file($file_name, 'w', $writers_schema_json); + +// Write each datum to the file +foreach ($data as $datum) + $data_writer->append($datum); +// Tidy up +$data_writer->close(); + +// Open $file_name (by default for reading) using the writer's schema +// included in the file +$data_reader = AvroDataIO::open_file($file_name); +echo "from file:\n"; +// Read each datum +foreach ($data_reader->data() as $datum) + echo var_export($datum, true) . "\n"; +$data_reader->close(); + +// Create a data string +// Create a string io object. +$io = new AvroStringIO(); +// Create a datum writer object +$writers_schema = AvroSchema::parse($writers_schema_json); +$writer = new AvroIODatumWriter($writers_schema); +$data_writer = new AvroDataIOWriter($io, $writer, $writers_schema); +foreach ($data as $datum) + $data_writer->append($datum); +$data_writer->close(); + +$binary_string = $io->string(); + +// Load the string data string +$read_io = new AvroStringIO($binary_string); +$data_reader = new AvroDataIOReader($read_io, new AvroIODatumReader()); +echo "from binary string:\n"; +foreach ($data_reader->data() as $datum) + echo var_export($datum, true) . "\n"; + +/** Output +from file: +array ( + 'member_id' => 1392, + 'member_name' => 'Jose', +) +array ( + 'member_id' => 1642, + 'member_name' => 'Maria', +) +from binary string: +array ( + 'member_id' => 1392, + 'member_name' => 'Jose', +) +array ( + 'member_id' => 1642, + 'member_name' => 'Maria', +) +*/ Added: avro/trunk/lang/php/lib/avro.php URL: http://svn.apache.org/viewvc/avro/trunk/lang/php/lib/avro.php?rev=990860&view=auto ============================================================================== --- avro/trunk/lang/php/lib/avro.php (added) +++ avro/trunk/lang/php/lib/avro.php Mon Aug 30 16:50:40 2010 @@ -0,0 +1,194 @@ +io = $io; + $this->decoder = new AvroIOBinaryDecoder($this->io); + $this->datum_reader = $datum_reader; + $this->read_header(); + + $codec = AvroUtil::array_value($this->metadata, + AvroDataIO::METADATA_CODEC_ATTR); + if ($codec && !AvroDataIO::is_valid_codec($codec)) + throw new AvroDataIOException(sprintf('Uknown codec: %s', $codec)); + + $this->block_count = 0; + // FIXME: Seems unsanitary to set writers_schema here. + // Can't constructor take it as an argument? + $this->datum_reader->set_writers_schema( + AvroSchema::parse($this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR])); + } + + /** + * Reads header of object container + * @throws AvroDataIOException if the file is not an Avro data file. + */ + private function read_header() + { + $this->seek(0, AvroIO::SEEK_SET); + + $magic = $this->read(AvroDataIO::magic_size()); + + if (strlen($magic) < AvroDataIO::magic_size()) + throw new AvroDataIOException( + 'Not an Avro data file: shorter than the Avro magic block'); + + if (AvroDataIO::magic() != $magic) + throw new AvroDataIOException( + sprintf('Not an Avro data file: %s does not match %s', + $magic, AvroDataIO::magic())); + + $this->metadata = $this->datum_reader->read_data(AvroDataIO::metadata_schema(), + AvroDataIO::metadata_schema(), + $this->decoder); + $this->sync_marker = $this->read(AvroDataIO::SYNC_SIZE); + } + + /** + * @internal Would be nice to implement data() as an iterator, I think + * @returns array of data from object container. + */ + public function data() + { + $data = array(); + while (true) + { + if (0 == $this->block_count) + { + if ($this->is_eof()) + break; + + if ($this->skip_sync()) + if ($this->is_eof()) + break; + + $this->read_block_header(); + } + $data []= $this->datum_reader->read($this->decoder); + $this->block_count -= 1; + } + return $data; + } + + /** + * Closes this writer (and its AvroIO object.) + * @uses AvroIO::close() + */ + public function close() { return $this->io->close(); } + + /** + * @uses AvroIO::seek() + */ + private function seek($offset, $whence) + { + return $this->io->seek($offset, $whence); + } + + /** + * @uses AvroIO::read() + */ + private function read($len) { return $this->io->read($len); } + + /** + * @uses AvroIO::is_eof() + */ + private function is_eof() { return $this->io->is_eof(); } + + private function skip_sync() + { + $proposed_sync_marker = $this->read(AvroDataIO::SYNC_SIZE); + if ($proposed_sync_marker != $this->sync_marker) + { + $this->seek(-AvroDataIO::SYNC_SIZE, AvroIO::SEEK_CUR); + return false; + } + return true; + } + + /** + * Reads the block header (which includes the count of items in the block + * and the length in bytes of the block) + * @returns int length in bytes of the block. + */ + private function read_block_header() + { + $this->block_count = $this->decoder->read_long(); + return $this->decoder->read_long(); + } + +} + +/** + * Writes Avro data to an AvroIO source using an AvroSchema + * @package Avro + */ +class AvroDataIOWriter +{ + /** + * @returns string a new, unique sync marker. + */ + private static function generate_sync_marker() + { + // From http://php.net/manual/en/function.mt-rand.php comments + return pack('S8', + mt_rand(0, 0xffff), mt_rand(0, 0xffff), + mt_rand(0, 0xffff), + mt_rand(0, 0xffff) | 0x4000, + mt_rand(0, 0xffff) | 0x8000, + mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff)); + } + + /** + * @var AvroIO object container where data is written + */ + private $io; + + /** + * @var AvroIOBinaryEncoder encoder for object container + */ + private $encoder; + + /** + * @var AvroDatumWriter + */ + private $datum_writer; + + /** + * @var AvroStringIO buffer for writing + */ + private $buffer; + + /** + * @var AvroIOBinaryEncoder encoder for buffer + */ + private $buffer_encoder; // AvroIOBinaryEncoder + + /** + * @var int count of items written to block + */ + private $block_count; + + /** + * @var array map of object container metadata + */ + private $metadata; + + /** + * @param AvroIO $io + * @param AvroIODatumWriter $datum_writer + * @param AvroSchema $writers_schema + */ + public function __construct($io, $datum_writer, $writers_schema=null) + { + if (!($io instanceof AvroIO)) + throw new AvroDataIOException('io must be instance of AvroIO'); + + $this->io = $io; + $this->encoder = new AvroIOBinaryEncoder($this->io); + $this->datum_writer = $datum_writer; + $this->buffer = new AvroStringIO(); + $this->buffer_encoder = new AvroIOBinaryEncoder($this->buffer); + $this->block_count = 0; + $this->metadata = array(); + + if ($writers_schema) + { + $this->sync_marker = self::generate_sync_marker(); + $this->metadata[AvroDataIO::METADATA_CODEC_ATTR] = AvroDataIO::NULL_CODEC; + $this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR] = strval($writers_schema); + $this->write_header(); + } + else + { + $dfr = new AvroDataIOReader($this->io, new AvroIODatumReader()); + $this->sync_marker = $dfr->sync_marker; + $this->metadata[AvroDataIO::METADATA_CODEC_ATTR] = $dfr->metadata[AvroDataIO::METADATA_CODEC_ATTR]; + + $schema_from_file = $dfr->metadata[AvroDataIO::METADATA_SCHEMA_ATTR]; + $this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR] = $schema_from_file; + $this->datum_writer->writers_schema = AvroSchema::parse($schema_from_file); + $this->seek(0, SEEK_END); + } + } + + /** + * @param mixed $datum + */ + public function append($datum) + { + $this->datum_writer->write($datum, $this->buffer_encoder); + $this->block_count++; + + if ($this->buffer->length() >= AvroDataIO::SYNC_INTERVAL) + $this->write_block(); + } + + /** + * Flushes buffer to AvroIO object container and closes it. + * @return mixed value of $io->close() + * @see AvroIO::close() + */ + public function close() + { + $this->flush(); + return $this->io->close(); + } + + /** + * Flushes biffer to AvroIO object container. + * @returns mixed value of $io->flush() + * @see AvroIO::flush() + */ + private function flush() + { + $this->write_block(); + return $this->io->flush(); + } + + /** + * Writes a block of data to the AvroIO object container. + * @throws AvroDataIOException if the codec provided by the encoder + * is not supported + * @internal Should the codec check happen in the constructor? + * Why wait until we're writing data? + */ + private function write_block() + { + if ($this->block_count > 0) + { + $this->encoder->write_long($this->block_count); + $to_write = strval($this->buffer); + $this->encoder->write_long(strlen($to_write)); + + if (AvroDataIO::is_valid_codec( + $this->metadata[AvroDataIO::METADATA_CODEC_ATTR])) + $this->write($to_write); + else + throw new AvroDataIOException( + sprintf('codec %s is not supported', + $this->metadata[AvroDataIO::METADATA_CODEC_ATTR])); + + $this->write($this->sync_marker); + $this->buffer->truncate(); + $this->block_count = 0; + } + } + + /** + * Writes the header of the AvroIO object container + */ + private function write_header() + { + $this->write(AvroDataIO::magic()); + $this->datum_writer->write_data(AvroDataIO::metadata_schema(), + $this->metadata, $this->encoder); + $this->write($this->sync_marker); + } + + /** + * @param string $bytes + * @uses AvroIO::write() + */ + private function write($bytes) { return $this->io->write($bytes); } + + /** + * @param int $offset + * @param int $whence + * @uses AvroIO::seek() + */ + private function seek($offset, $whence) + { + return $this->io->seek($offset, $whence); + } +} Added: avro/trunk/lang/php/lib/avro/datum.php URL: http://svn.apache.org/viewvc/avro/trunk/lang/php/lib/avro/datum.php?rev=990860&view=auto ============================================================================== --- avro/trunk/lang/php/lib/avro/datum.php (added) +++ avro/trunk/lang/php/lib/avro/datum.php Mon Aug 30 16:50:40 2010 @@ -0,0 +1,984 @@ +writers_schema = $writers_schema; + } + + /** + * @param AvroSchema $writers_schema + * @param $datum + * @param AvroIOBinaryEncoder $encoder + * @returns mixed + * + * @throws AvrioIOTypeException if $datum is invalid for $writers_schema + */ + function write_data($writers_schema, $datum, $encoder) + { + if (!AvroSchema::is_valid_datum($writers_schema, $datum)) + throw new AvroIOTypeException($writers_schema, $datum); + + switch ($writers_schema->type()) + { + case AvroSchema::NULL_TYPE: + return $encoder->write_null($datum); + case AvroSchema::BOOLEAN_TYPE: + return $encoder->write_boolean($datum); + case AvroSchema::INT_TYPE: + return $encoder->write_int($datum); + case AvroSchema::LONG_TYPE: + return $encoder->write_long($datum); + case AvroSchema::FLOAT_TYPE: + return $encoder->write_float($datum); + case AvroSchema::DOUBLE_TYPE: + return $encoder->write_double($datum); + case AvroSchema::STRING_TYPE: + return $encoder->write_string($datum); + case AvroSchema::BYTES_TYPE: + return $encoder->write_bytes($datum); + case AvroSchema::ARRAY_SCHEMA: + return $this->write_array($writers_schema, $datum, $encoder); + case AvroSchema::MAP_SCHEMA: + return $this->write_map($writers_schema, $datum, $encoder); + case AvroSchema::FIXED_SCHEMA: + return $this->write_fixed($writers_schema, $datum, $encoder); + case AvroSchema::ENUM_SCHEMA: + return $this->write_enum($writers_schema, $datum, $encoder); + case AvroSchema::RECORD_SCHEMA: + case AvroSchema::ERROR_SCHEMA: + case AvroSchema::REQUEST_SCHEMA: + return $this->write_record($writers_schema, $datum, $encoder); + case AvroSchema::UNION_SCHEMA: + return $this->write_union($writers_schema, $datum, $encoder); + default: + throw new AvroException(sprintf('Uknown type: %s', + $writers_schema->type)); + } + } + + /** + * @param $datum + * @param AvroIOBinaryEncoder $encoder + */ + function write($datum, $encoder) + { + $this->write_data($this->writers_schema, $datum, $encoder); + } + + /**#@+ + * @param AvroSchema $writers_schema + * @param null|boolean|int|float|string|array $datum item to be written + * @param AvroIOBinaryEncoder $encoder + */ + private function write_array($writers_schema, $datum, $encoder) + { + $datum_count = count($datum); + if (0 < $datum_count) + { + $encoder->write_long($datum_count); + $items = $writers_schema->items(); + foreach ($datum as $item) + $this->write_data($items, $item, $encoder); + } + return $encoder->write_long(0); + } + + private function write_map($writers_schema, $datum, $encoder) + { + $datum_count = count($datum); + if ($datum_count > 0) + { + $encoder->write_long($datum_count); + foreach ($datum as $k => $v) + { + $encoder->write_string($k); + $this->write_data($writers_schema->values(), $v, $encoder); + } + } + $encoder->write_long(0); + } + + private function write_union($writers_schema, $datum, $encoder) + { + $datum_schema_index = -1; + $datum_schema = null; + foreach ($writers_schema->schemas() as $index => $schema) + if (AvroSchema::is_valid_datum($schema, $datum)) + { + $datum_schema_index = $index; + $datum_schema = $schema; + break; + } + + if (is_null($datum_schema)) + throw new AvroIOTypeException($writers_schema, $datum); + + $encoder->write_long($datum_schema_index); + $this->write_data($datum_schema, $datum, $encoder); + } + + private function write_enum($writers_schema, $datum, $encoder) + { + $datum_index = $writers_schema->symbol_index($datum); + return $encoder->write_int($datum_index); + } + + private function write_fixed($writers_schema, $datum, $encoder) + { + /** + * NOTE Unused $writers_schema parameter included for consistency + * with other write_* methods. + */ + return $encoder->write($datum); + } + + private function write_record($writers_schema, $datum, $encoder) + { + foreach ($writers_schema->fields() as $field) + $this->write_data($field->type(), $datum[$field->name()], $encoder); + } + + /**#@-*/ +} + +/** + * Encodes and writes Avro data to an AvroIO object using + * Avro binary encoding. + * + * @package Avro + */ +class AvroIOBinaryEncoder +{ + /** + * Performs encoding of the given float value to a binary string + * + * XXX: This is not endian-aware! The {@link Avro::check_platform()} + * called in {@link AvroIOBinaryEncoder::__construct()} should ensure the + * library is only used on little-endian platforms, which ensure the little-endian + * encoding required by the Avro spec. + * + * @param float $float + * @returns string bytes + * @see Avro::check_platform() + */ + static function float_to_int_bits($float) + { + return pack('f', (float) $float); + } + + /** + * Performs encoding of the given double value to a binary string + * + * XXX: This is not endian-aware! See comments in + * {@link AvroIOBinaryEncoder::float_to_int_bits()} for details. + * + * @param double $double + * @returns string bytes + */ + static function double_to_long_bits($double) + { + return pack('d', (double) $double); + } + + /** + * @param int|string $n + * @returns string long $n encoded as bytes + * @internal This relies on 64-bit PHP. + */ + static public function encode_long($n) + { + $n = (int) $n; + $n = ($n << 1) ^ ($n >> 63); + $str = ''; + while (0 != ($n & ~0x7F)) + { + $str .= chr(($n & 0x7F) | 0x80); + $n >>= 7; + } + $str .= chr($n); + return $str; + } + + /** + * @var AvroIO + */ + private $io; + + /** + * @param AvroIO $io object to which data is to be written. + * + */ + function __construct($io) + { + Avro::check_platform(); + $this->io = $io; + } + + /** + * @param null $datum actual value is ignored + */ + function write_null($datum) { return null; } + + /** + * @param boolean $datum + */ + function write_boolean($datum) + { + $byte = $datum ? chr(1) : chr(0); + $this->write($byte); + } + + /** + * @param int $datum + */ + function write_int($datum) { $this->write_long($datum); } + + /** + * @param int $n + */ + function write_long($n) + { + if (Avro::uses_gmp()) + $this->write(AvroGMP::encode_long($n)); + else + $this->write(self::encode_long($n)); + } + + /** + * @param float $datum + * @uses self::float_to_int_bits() + */ + public function write_float($datum) + { + $this->write(self::float_to_int_bits($datum)); + } + + /** + * @param float $datum + * @uses self::double_to_long_bits() + */ + public function write_double($datum) + { + $this->write(self::double_to_long_bits($datum)); + } + + /** + * @param string $str + * @uses self::write_bytes() + */ + function write_string($str) { $this->write_bytes($str); } + + /** + * @param string $bytes + */ + function write_bytes($bytes) + { + $this->write_long(strlen($bytes)); + $this->write($bytes); + } + + /** + * @param string $datum + */ + function write($datum) { $this->io->write($datum); } +} + +/** + * Handles schema-specifc reading of data from the decoder. + * + * Also handles schema resolution between the reader and writer + * schemas (if a writer's schema is provided). + * + * @package Avro + */ +class AvroIODatumReader +{ + /** + * + * @param AvroSchema $writers_schema + * @param AvroSchema $readers_schema + * @returns boolean true if the schemas are consistent with + * each other and false otherwise. + */ + static function schemas_match($writers_schema, $readers_schema) + { + $writers_schema_type = $writers_schema->type; + $readers_schema_type = $readers_schema->type; + + if (AvroSchema::UNION_SCHEMA == $writers_schema_type + || AvroSchema::UNION_SCHEMA == $readers_schema_type) + return true; + + if ($writers_schema_type == $readers_schema_type) + { + if (AvroSchema::is_primitive_type($writers_schema_type)) + return true; + + switch ($readers_schema_type) + { + case AvroSchema::MAP_SCHEMA: + return self::attributes_match($writers_schema->values(), + $readers_schema->values(), + array(AvroSchema::TYPE_ATTR)); + case AvroSchema::ARRAY_SCHEMA: + return self::attributes_match($writers_schema->items(), + $readers_schema->items(), + array(AvroSchema::TYPE_ATTR)); + case AvroSchema::ENUM_SCHEMA: + return self::attributes_match($writers_schema, $readers_schema, + array(AvroSchema::FULLNAME_ATTR)); + case AvroSchema::FIXED_SCHEMA: + return self::attributes_match($writers_schema, $readers_schema, + array(AvroSchema::FULLNAME_ATTR, + AvroSchema::SIZE_ATTR)); + case AvroSchema::RECORD_SCHEMA: + case AvroSchema::ERROR_SCHEMA: + return self::attributes_match($writers_schema, $readers_schema, + array(AvroSchema::FULLNAME_ATTR)); + case AvroSchema::REQUEST_SCHEMA: + // XXX: This seems wrong + return true; + // XXX: no default + } + + if (AvroSchema::INT_TYPE == $writers_schema_type + && in_array($readers_schema_type, array(AvroSchema::LONG_TYPE, + AvroSchema::FLOAT_TYPE, + AvroSchema::DOUBLE_TYPE))) + return true; + + if (AvroSchema::LONG_TYPE == $writers_schema_type + && in_array($readers_schema_type, array(AvroSchema::FLOAT_TYPE, + AvroSchema::DOUBLE_TYPE))) + return true; + + if (AvroSchema::FLOAT_TYPE == $writers_schema_type + && AvroSchema::DOUBLE_TYPE == $readers_schema_type) + return true; + + return false; + } + + } + + /** + * Checks equivalence of the given attributes of the two given schemas. + * + * @param AvroSchema $schema_one + * @param AvroSchema $schema_two + * @param string[] $attribute_names array of string attribute names to compare + * + * @returns boolean true if the attributes match and false otherwise. + */ + static function attributes_match($schema_one, $schema_two, $attribute_names) + { + foreach ($attribute_names as $attribute_name) + if ($schema_one->attribute($attribute_name) + != $schema_two->attribute($attribute_name)) + return false; + return true; + } + + /** + * @var AvroSchema + */ + private $writers_schema; + + /** + * @var AvroSchema + */ + private $readers_schema; + + /** + * @param AvroSchema $writers_schema + * @param AvroSchema $readers_schema + */ + function __construct($writers_schema=null, $readers_schema=null) + { + $this->writers_schema = $writers_schema; + $this->readers_schema = $readers_schema; + } + + /** + * @param AvroSchema $readers_schema + */ + public function set_writers_schema($readers_schema) + { + $this->writers_schema = $readers_schema; + } + + /** + * @param AvroIOBinaryDecoder $decoder + * @returns string + */ + public function read($decoder) + { + if (is_null($this->readers_schema)) + $this->readers_schema = $this->writers_schema; + return $this->read_data($this->writers_schema, $this->readers_schema, + $decoder); + } + + /**#@+ + * @param AvroSchema $writers_schema + * @param AvroSchema $readers_schema + * @param AvroIOBinaryDecoder $decoder + */ + /** + * @returns mixed + */ + public function read_data($writers_schema, $readers_schema, $decoder) + { + if (!self::schemas_match($writers_schema, $readers_schema)) + throw new AvroIOSchemaMatchException($writers_schema, $readers_schema); + + // Schema resolution: reader's schema is a union, writer's schema is not + if (AvroSchema::UNION_SCHEMA == $readers_schema->type() + && AvroSchema::UNION_SCHEMA != $writers_schema->type()) + { + foreach ($readers_schema->schemas() as $schema) + if (self::schemas_match($writers_schema, $schema)) + return $this->read_data($writers_schema, $schema, $decoder); + throw new AvroIOSchemaMatchException($writers_schema, $readers_schema); + } + + switch ($writers_schema->type()) + { + case AvroSchema::NULL_TYPE: + return $decoder->read_null(); + case AvroSchema::BOOLEAN_TYPE: + return $decoder->read_boolean(); + case AvroSchema::INT_TYPE: + return $decoder->read_int(); + case AvroSchema::LONG_TYPE: + return $decoder->read_long(); + case AvroSchema::FLOAT_TYPE: + return $decoder->read_float(); + case AvroSchema::DOUBLE_TYPE: + return $decoder->read_double(); + case AvroSchema::STRING_TYPE: + return $decoder->read_string(); + case AvroSchema::BYTES_TYPE: + return $decoder->read_bytes(); + case AvroSchema::ARRAY_SCHEMA: + return $this->read_array($writers_schema, $readers_schema, $decoder); + case AvroSchema::MAP_SCHEMA: + return $this->read_map($writers_schema, $readers_schema, $decoder); + case AvroSchema::UNION_SCHEMA: + return $this->read_union($writers_schema, $readers_schema, $decoder); + case AvroSchema::ENUM_SCHEMA: + return $this->read_enum($writers_schema, $readers_schema, $decoder); + case AvroSchema::FIXED_SCHEMA: + return $this->read_fixed($writers_schema, $readers_schema, $decoder); + case AvroSchema::RECORD_SCHEMA: + case AvroSchema::ERROR_SCHEMA: + case AvroSchema::REQUEST_SCHEMA: + return $this->read_record($writers_schema, $readers_schema, $decoder); + default: + throw new AvroException(sprintf("Cannot read unknown schema type: %s", + $writers_schema->type())); + } + } + + /** + * @returns array + */ + public function read_array($writers_schema, $readers_schema, $decoder) + { + $items = array(); + $block_count = $decoder->read_long(); + while (0 != $block_count) + { + if ($block_count < 0) + { + $block_count = -$block_count; + $block_size = $decoder->read_long(); // Read (and ignore) block size + } + for ($i = 0; $i < $block_count; $i++) + $items []= $this->read_data($writers_schema->items(), + $readers_schema->items(), + $decoder); + $block_count = $decoder->read_long(); + } + return $items; + } + + /** + * @returns array + */ + public function read_map($writers_schema, $readers_schema, $decoder) + { + $items = array(); + $pair_count = $decoder->read_long(); + while (0 != $pair_count) + { + if ($pair_count < 0) + { + $pair_count = -$pair_count; + // Note: we're not doing anything with block_size other than skipping it + $block_size = $decoder->read_long(); + } + + for ($i = 0; $i < $pair_count; $i++) + { + $key = $decoder->read_string(); + $items[$key] = $this->read_data($writers_schema->values(), + $readers_schema->values(), + $decoder); + } + $pair_count = $decoder->read_long(); + } + return $items; + } + + /** + * @returns mixed + */ + public function read_union($writers_schema, $readers_schema, $decoder) + { + $schema_index = $decoder->read_long(); + $selected_writers_schema = $writers_schema->schema_by_index($schema_index); + return $this->read_data($selected_writers_schema, $readers_schema, $decoder); + } + + /** + * @returns string + */ + public function read_enum($writers_schema, $readers_schema, $decoder) + { + $symbol_index = $decoder->read_int(); + $symbol = $writers_schema->symbol_by_index($symbol_index); + if (!$readers_schema->has_symbol($symbol)) + null; // FIXME: unset wrt schema resolution + return $symbol; + } + + /** + * @returns string + */ + public function read_fixed($writers_schema, $readers_schema, $decoder) + { + return $decoder->read($writers_schema->size()); + } + + /** + * @returns array + */ + public function read_record($writers_schema, $readers_schema, $decoder) + { + $readers_fields = $readers_schema->fields_hash(); + $record = array(); + foreach ($writers_schema->fields() as $writers_field) + { + $type = $writers_field->type(); + if (isset($readers_fields[$writers_field->name()])) + $record[$writers_field->name()] + = $this->read_data($type, + $readers_fields[$writers_field->name()]->type(), + $decoder); + else + $this->skip_data($type, $decoder); + } + // Fill in default values + if (count($readers_fields) > count($record)) + { + $writers_fields = $writers_schema->fields_hash(); + foreach ($readers_fields as $field_name => $field) + { + if (!isset($writers_fields[$field_name])) + { + if ($field->has_default_value()) + $record[$field->name()] + = $this->read_default_value($field->type(), + $field->default_value()); + else + null; // FIXME: unset + } + } + } + + return $record; + } + /**#@-*/ + + /** + * @param AvroSchema $field_schema + * @param null|boolean|int|float|string|array $default_value + * @returns null|boolean|int|float|string|array + * + * @throws AvroException if $field_schema type is unknown. + */ + public function read_default_value($field_schema, $default_value) + { + switch($field_schema->type()) + { + case AvroSchema::NULL_TYPE: + return null; + case AvroSchema::BOOLEAN_TYPE: + return $default_value; + case AvroSchema::INT_TYPE: + case AvroSchema::LONG_TYPE: + return (int) $default_value; + case AvroSchema::FLOAT_TYPE: + case AvroSchema::DOUBLE_TYPE: + return (float) $default_value; + case AvroSchema::STRING_TYPE: + case AvroSchema::BYTES_TYPE: + return $default_value; + case AvroSchema::ARRAY_SCHEMA: + $array = array(); + foreach ($default_value as $json_val) + { + $val = $this->read_default_value($field_schema->items(), $json_val); + $array []= $val; + } + return $array; + case AvroSchema::MAP_SCHEMA: + $map = array(); + foreach ($default_value as $key => $json_val) + $map[$key] = $this->read_default_value($field_schema->values(), + $json_val); + return $map; + case AvroSchema::UNION_SCHEMA: + return $this->read_default_value($field_schema->schema_by_index(0), + $default_value); + case AvroSchema::ENUM_SCHEMA: + case AvroSchema::FIXED_SCHEMA: + return $default_value; + case AvroSchema::RECORD_SCHEMA: + $record = array(); + foreach ($field_schema->fields() as $field) + { + $field_name = $field->name(); + if (!$json_val = $default_value[$field_name]) + $json_val = $field->default_value(); + + $record[$field_name] = $this->read_default_value($field->type(), + $json_val); + } + return $record; + default: + throw new AvroException(sprintf('Unknown type: %s', $field_schema->type())); + } + } + + /** + * @param AvroSchema $writers_schema + * @param AvroIOBinaryDecoder $decoder + */ + private function skip_data($writers_schema, $decoder) + { + switch ($writers_schema->type()) + { + case AvroSchema::NULL_TYPE: + return $decoder->skip_null(); + case AvroSchema::BOOLEAN_TYPE: + return $decoder->skip_boolean(); + case AvroSchema::INT_TYPE: + return $decoder->skip_int(); + case AvroSchema::LONG_TYPE: + return $decoder->skip_long(); + case AvroSchema::FLOAT_TYPE: + return $decoder->skip_float(); + case AvroSchema::DOUBLE_TYPE: + return $decoder->skip_double(); + case AvroSchema::STRING_TYPE: + return $decoder->skip_string(); + case AvroSchema::BYTES_TYPE: + return $decoder->skip_bytes(); + case AvroSchema::ARRAY_SCHEMA: + return $decoder->skip_array($writers_schema, $decoder); + case AvroSchema::MAP_SCHEMA: + return $decoder->skip_map($writers_schema, $decoder); + case AvroSchema::UNION_SCHEMA: + return $decoder->skip_union($writers_schema, $decoder); + case AvroSchema::ENUM_SCHEMA: + return $decoder->skip_enum($writers_schema, $decoder); + case AvroSchema::FIXED_SCHEMA: + return $decoder->skip_fixed($writers_schema, $decoder); + case AvroSchema::RECORD_SCHEMA: + case AvroSchema::ERROR_SCHEMA: + case AvroSchema::REQUEST_SCHEMA: + return $decoder->skip_record($writers_schema, $decoder); + default: + throw new AvroException(sprintf('Uknown schema type: %s', + $writers_schema->type())); + } + } +} + +/** + * Decodes and reads Avro data from an AvroIO object encoded using + * Avro binary encoding. + * + * @package Avro + */ +class AvroIOBinaryDecoder +{ + + /** + * @param int[] array of byte ascii values + * @returns long decoded value + * @internal Requires 64-bit platform + */ + public static function decode_long_from_array($bytes) + { + $b = array_shift($bytes); + $n = $b & 0x7f; + $shift = 7; + while (0 != ($b & 0x80)) + { + $b = array_shift($bytes); + $n |= (($b & 0x7f) << $shift); + $shift += 7; + } + return (($n >> 1) ^ -($n & 1)); + } + + /** + * Performs decoding of the binary string to a float value. + * + * XXX: This is not endian-aware! See comments in + * {@link AvroIOBinaryEncoder::float_to_int_bits()} for details. + * + * @param string $bits + * @returns float + */ + static public function int_bits_to_float($bits) + { + $float = unpack('f', $bits); + return (float) $float[1]; + } + + /** + * Performs decoding of the binary string to a double value. + * + * XXX: This is not endian-aware! See comments in + * {@link AvroIOBinaryEncoder::float_to_int_bits()} for details. + * + * @param string $bits + * @returns float + */ + static public function long_bits_to_double($bits) + { + $double = unpack('d', $bits); + return (double) $double[1]; + } + + /** + * @var AvroIO + */ + private $io; + + /** + * @param AvroIO $io object from which to read. + */ + public function __construct($io) + { + Avro::check_platform(); + $this->io = $io; + } + + /** + * @returns string the next byte from $this->io. + * @throws AvroException if the next byte cannot be read. + */ + private function next_byte() { return $this->read(1); } + + /** + * @returns null + */ + public function read_null() { return null; } + + /** + * @returns boolean + */ + public function read_boolean() + { + return (boolean) (1 == ord($this->next_byte())); + } + + /** + * @returns int + */ + public function read_int() { return (int) $this->read_long(); } + + /** + * @returns long + */ + public function read_long() + { + $byte = ord($this->next_byte()); + $bytes = array($byte); + while (0 != ($byte & 0x80)) + { + $byte = ord($this->next_byte()); + $bytes []= $byte; + } + + if (Avro::uses_gmp()) + return AvroGMP::decode_long_from_array($bytes); + + return self::decode_long_from_array($bytes); + } + + /** + * @returns float + */ + public function read_float() + { + return self::int_bits_to_float($this->read(4)); + } + + /** + * @returns double + */ + public function read_double() + { + return self::long_bits_to_double($this->read(8)); + } + + /** + * A string is encoded as a long followed by that many bytes + * of UTF-8 encoded character data. + * @returns string + */ + public function read_string() { return $this->read_bytes(); } + + /** + * @returns string + */ + public function read_bytes() { return $this->read($this->read_long()); } + + /** + * @param int $len count of bytes to read + * @returns string + */ + public function read($len) { return $this->io->read($len); } + + public function skip_null() { return null; } + + public function skip_boolean() { return $this->skip(1); } + + public function skip_int() { return $this->skip_long(); } + + protected function skip_long() + { + $b = $this->next_byte(); + while (0 != ($b & 0x80)) + $b = $this->next_byte(); + } + + public function skip_float() { return $this->skip(4); } + + public function skip_double() { return $this->skip(8); } + + public function skip_bytes() { return $this->skip($this->read_long()); } + + public function skip_string() { return $this->skip_bytes(); } + + /** + * @param int $len count of bytes to skip + * @uses AvroIO::seek() + */ + public function skip($len) { $this->seek($len, AvroIO::SEEK_CUR); } + + /** + * @returns int position of pointer in AvroIO instance + * @uses AvroIO::tell() + */ + private function tell() { return $this->io->tell(); } + + /** + * @param int $offset + * @param int $whence + * @returns boolean true upon success + * @uses AvroIO::seek() + */ + private function seek($offset, $whence) + { + return $this->io->seek($offset, $whence); + } +} + Added: avro/trunk/lang/php/lib/avro/debug.php URL: http://svn.apache.org/viewvc/avro/trunk/lang/php/lib/avro/debug.php?rev=990860&view=auto ============================================================================== --- avro/trunk/lang/php/lib/avro/debug.php (added) +++ avro/trunk/lang/php/lib/avro/debug.php Mon Aug 30 16:50:40 2010 @@ -0,0 +1,194 @@ += $debug_level); + } + + /** + * @param string $format format string for the given arguments. Passed as is + * to vprintf. + * @param array $args array of arguments to pass to vsprinf. + * @param int $debug_level debug level at which to print this statement + * @returns boolean true + */ + static function debug($format, $args, $debug_level=self::DEBUG1) + { + if (self::is_debug($debug_level)) + vprintf($format . "\n", $args); + return true; + } + + /** + * @param string $str + * @returns string[] array of hex representation of each byte of $str + */ + static function hex_array($str) { return self::bytes_array($str); } + + /** + * @param string $str + * @param string $joiner string used to join + * @returns string hex-represented bytes of each byte of $str + joined by $joiner + */ + static function hex_string($str, $joiner=' ') + { + return join($joiner, self::hex_array($str)); + } + + /** + * @param string $str + * @param string $format format to represent bytes + * @returns string[] array of each byte of $str formatted using $format + */ + static function bytes_array($str, $format='x%02x') + { + $x = array(); + foreach (str_split($str) as $b) + $x []= sprintf($format, ord($b)); + return $x; + } + + /** + * @param string $str + * @returns string[] array of bytes of $str represented in decimal format ('%3d') + */ + static function dec_array($str) { return self::bytes_array($str, '%3d'); } + + /** + * @param string $str + * @param string $joiner string to join bytes of $str + * @returns string of bytes of $str represented in decimal format + * @uses dec_array() + */ + static function dec_string($str, $joiner = ' ') + { + return join($joiner, self::dec_array($str)); + } + + /** + * @param string $str + * @param string $format one of 'ctrl', 'hex', or 'dec' for control, + hexadecimal, or decimal format for bytes. + - ctrl: ASCII control characters represented as text. + For example, the null byte is represented as 'NUL'. + Visible ASCII characters represent themselves, and + others are represented as a decimal ('%03d') + - hex: bytes represented in hexadecimal ('%02X') + - dec: bytes represented in decimal ('%03d') + * @returns string[] array of bytes represented in the given format. + */ + static function ascii_array($str, $format='ctrl') + { + if (!in_array($format, array('ctrl', 'hex', 'dec'))) + throw new AvroException('Unrecognized format specifier'); + + $ctrl_chars = array('NUL', 'SOH', 'STX', 'ETX', 'EOT', 'ENQ', 'ACK', 'BEL', + 'BS', 'HT', 'LF', 'VT', 'FF', 'CR', 'SO', 'SI', + 'DLE', 'DC1', 'DC2', 'DC3', 'DC4', 'NAK', 'SYN', 'ETB', + 'CAN', 'EM', 'SUB', 'ESC', 'FS', 'GS', 'RS', 'US'); + $x = array(); + foreach (str_split($str) as $b) + { + $db = ord($b); + if ($db < 32) + { + switch ($format) + { + case 'ctrl': + $x []= str_pad($ctrl_chars[$db], 3, ' ', STR_PAD_LEFT); + break; + case 'hex': + $x []= sprintf("x%02X", $db); + break; + case 'dec': + $x []= str_pad($db, 3, '0', STR_PAD_LEFT); + break; + } + } + else if ($db < 127) + $x []= " $b"; + else if ($db == 127) + { + switch ($format) + { + case 'ctrl': + $x []= 'DEL'; + break; + case 'hex': + $x []= sprintf("x%02X", $db); + break; + case 'dec': + $x []= str_pad($db, 3, '0', STR_PAD_LEFT); + break; + } + } + else + if ('hex' == $format) + $x []= sprintf("x%02X", $db); + else + $x []= str_pad($db, 3, '0', STR_PAD_LEFT); + } + return $x; + } + + /** + * @param string $str + * @param string $format one of 'ctrl', 'hex', or 'dec'. + * See {@link self::ascii_array()} for more description + * @param string $joiner + * @returns string of bytes joined by $joiner + * @uses ascii_array() + */ + static function ascii_string($str, $format='ctrl', $joiner = ' ') + { + return join($joiner, self::ascii_array($str, $format)); + } +} Added: avro/trunk/lang/php/lib/avro/gmp.php URL: http://svn.apache.org/viewvc/avro/trunk/lang/php/lib/avro/gmp.php?rev=990860&view=auto ============================================================================== --- avro/trunk/lang/php/lib/avro/gmp.php (added) +++ avro/trunk/lang/php/lib/avro/gmp.php Mon Aug 30 16:50:40 2010 @@ -0,0 +1,222 @@ + gmp_sign($g)) + $g = self::gmp_twos_complement($g); + + $m = gmp_mul($g, gmp_pow(self::gmp_2(), $shift)); + $m = gmp_and($m, self::gmp_0xfs()); + if (gmp_testbit($m, 63)) + $m = gmp_neg(gmp_add(gmp_and(gmp_com($m), self::gmp_0xfs()), + self::gmp_1())); + return $m; + } + + /** + * Arithmetic right shift + * @param resource|int|string $g + * @param int $shift number of bits to shift right + * @returns resource $g shifted right $shift bits + */ + static function shift_right($g, $shift) + { + if (0 == $shift) + return $g; + + if (0 <= gmp_sign($g)) + $m = gmp_div($g, gmp_pow(self::gmp_2(), $shift)); + else // negative + { + $g = gmp_and($g, self::gmp_0xfs()); + $m = gmp_div($g, gmp_pow(self::gmp_2(), $shift)); + $m = gmp_and($m, self::gmp_0xfs()); + for ($i = 63; $i >= (63 - $shift); $i--) + gmp_setbit($m, $i); + + $m = gmp_neg(gmp_add(gmp_and(gmp_com($m), self::gmp_0xfs()), + self::gmp_1())); + } + + return $m; + } + + /** + * @param int|str $n integer (or string representation of integer) to encode + * @return string $bytes of the long $n encoded per the Avro spec + */ + static function encode_long($n) + { + $g = gmp_init($n); + $g = gmp_xor(self::shift_left($g, 1), + self::shift_right($g, 63)); + $bytes = ''; + while (0 != gmp_cmp(self::gmp_0(), gmp_and($g, self::gmp_n0x7f()))) + { + $bytes .= chr(gmp_intval(gmp_and($g, self::gmp_0x7f())) | 0x80); + $g = self::shift_right($g, 7); + } + $bytes .= chr(gmp_intval($g)); + return $bytes; + } + + /** + * @param int[] $bytes array of ascii codes of bytes to decode + * @return string represenation of decoded long. + */ + static function decode_long_from_array($bytes) + { + $b = array_shift($bytes); + $g = gmp_init($b & 0x7f); + $shift = 7; + while (0 != ($b & 0x80)) + { + $b = array_shift($bytes); + $g = gmp_or($g, self::shift_left(($b & 0x7f), $shift)); + $shift += 7; + } + $val = gmp_xor(self::shift_right($g, 1), gmp_neg(gmp_and($g, 1))); + return gmp_strval($val); + } + +} Added: avro/trunk/lang/php/lib/avro/io.php URL: http://svn.apache.org/viewvc/avro/trunk/lang/php/lib/avro/io.php?rev=990860&view=auto ============================================================================== --- avro/trunk/lang/php/lib/avro/io.php (added) +++ avro/trunk/lang/php/lib/avro/io.php Mon Aug 30 16:50:40 2010 @@ -0,0 +1,493 @@ +not like eof in C or feof in PHP: + * it returns TRUE if the *next* read would be end of file, + * rather than if the *most recent* read read end of file. + * @returns boolean true if at the end of file, and false otherwise + */ + public function is_eof() + { + throw new AvroNotImplementedException('Not implemented'); + } + + /** + * Closes this AvroIO instance. + */ + public function close() + { + throw new AvroNotImplementedException('Not implemented'); + } + +} + +/** + * AvroIO wrapper for string access + * @package Avro + */ +class AvroStringIO extends AvroIO +{ + /** + * @var array array of individual bytes + */ + private $buffer; + /** + * @var int current position in string + */ + private $current_index; + /** + * @var boolean whether or not the string is closed. + */ + private $is_closed; + + /** + * @param string $str initial value of AvroStringIO buffer. Regardless + * of the initial value, the pointer is set to the + * beginning of the buffer. + * @throws AvroIOException if a non-string value is passed as $str + */ + public function __construct($str = '') + { + $this->is_closed = false; + $this->buffer = array(); + $this->current_index = 0; + + if (is_string($str)) + $this->buffer = str_split($str); + else + throw new AvroIOException( + sprintf('constructor argument must be a string: %s', gettype($str))); + } + + /** + * Append bytes to this buffer. + * (Nothing more is needed to support Avro.) + * @param str $arg bytes to write + * @returns int count of bytes written. + * @throws AvroIOException if $args is not a string value. + */ + public function write($arg) + { + $this->check_closed(); + if (is_string($arg)) + return $this->append_str($arg); + throw new AvroIOException( + sprintf('write argument must be a string: (%s) %s', + gettype($arg), var_export($arg, true))); + } + + /** + * @returns string bytes read from buffer + * @todo test for fencepost errors wrt updating current_index + */ + public function read($len) + { + $this->check_closed(); + $read = array_slice($this->buffer, $this->current_index, $len); + if (count($read) < $len) + $this->current_index = $this->length(); + else + $this->current_index += $len; + return join($read); + } + + /** + * @returns boolean true if successful + * @throws AvroIOException if the seek failed. + */ + public function seek($offset, $whence=self::SEEK_SET) + { + if (!is_int($offset)) + throw new AvroIOException('Seek offset must be an integer.'); + // Prevent seeking before BOF + switch ($whence) + { + case self::SEEK_SET: + if (0 > $offset) + throw new AvroIOException('Cannot seek before beginning of file.'); + $this->current_index = $offset; + break; + case self::SEEK_CUR: + if (0 > $this->current_index + $whence) + throw new AvroIOException('Cannot seek before beginning of file.'); + $this->current_index += $offset; + break; + case self::SEEK_END: + if (0 > $this->length() + $offset) + throw new AvroIOException('Cannot seek before beginning of file.'); + $this->current_index = $this->length() + $offset; + break; + default: + throw new AvroIOException(sprintf('Invalid seek whence %d', $whence)); + } + + return true; + } + + /** + * @returns int + * @see AvroIO::tell() + */ + public function tell() { return $this->current_index; } + + /** + * @returns boolean + * @see AvroIO::is_eof() + */ + public function is_eof() + { + return ($this->current_index >= $this->length()); + } + + /** + * No-op provided for compatibility with AvroIO interface. + * @returns boolean true + */ + public function flush() { return true; } + + /** + * Marks this buffer as closed. + * @returns boolean true + */ + public function close() + { + $this->check_closed(); + $this->is_closed = true; + return true; + } + + /** + * @throws AvroIOException if the buffer is closed. + */ + private function check_closed() + { + if ($this->is_closed()) + throw new AvroIOException('Buffer is closed'); + } + + /** + * Appends bytes to this buffer. + * @param string $str + * @returns integer count of bytes written. + */ + private function append_str($str) + { + $this->check_closed(); + $ary = str_split($str); + $len = count($ary); + $this->buffer = array_merge($this->buffer, $ary); + $this->current_index += $len; + return $len; + } + + /** + * Truncates the truncate buffer to 0 bytes and returns the pointer + * to the beginning of the buffer. + * @returns boolean true + */ + public function truncate() + { + $this->check_closed(); + $this->buffer = array(); + $this->current_index = 0; + return true; + } + + /** + * @returns int count of bytes in the buffer + * @internal Could probably memoize length for performance, but + * no need do this yet. + */ + public function length() { return count($this->buffer); } + + /** + * @returns string + */ + public function __toString() { return join($this->buffer); } + + + /** + * @returns string + * @uses self::__toString() + */ + public function string() { return $this->__toString(); } + + /** + * @returns boolean true if this buffer is closed and false + * otherwise. + */ + public function is_closed() { return $this->is_closed; } +} + +/** + * AvroIO wrapper for PHP file access functions + * @package Avro + */ +class AvroFile extends AvroIO +{ + /** + * @var string fopen read mode value. Used internally. + */ + const FOPEN_READ_MODE = 'rb'; + + /** + * @var string fopen write mode value. Used internally. + */ + const FOPEN_WRITE_MODE = 'wb'; + + /** + * @var string + */ + private $file_path; + + /** + * @var resource file handle for AvroFile instance + */ + private $file_handle; + + public function __construct($file_path, $mode = self::READ_MODE) + { + /** + * XXX: should we check for file existence (in case of reading) + * or anything else about the provided file_path argument? + */ + $this->file_path = $file_path; + switch ($mode) + { + case self::WRITE_MODE: + $this->file_handle = fopen($this->file_path, self::FOPEN_WRITE_MODE); + if (false == $this->file_handle) + throw new AvroIOException('Could not open file for writing'); + break; + case self::READ_MODE: + $this->file_handle = fopen($this->file_path, self::FOPEN_READ_MODE); + if (false == $this->file_handle) + throw new AvroIOException('Could not open file for reading'); + break; + default: + throw new AvroIOException( + sprintf("Only modes '%s' and '%s' allowed. You provided '%s'.", + self::READ_MODE, self::WRITE_MODE, $mode)); + } + } + + /** + * @returns int count of bytes written + * @throws AvroIOException if write failed. + */ + public function write($str) + { + $len = fwrite($this->file_handle, $str); + if (false === $len) + throw new AvroIOException(sprintf('Could not write to file')); + return $len; + } + + /** + * @param int $len count of bytes to read. + * @returns string bytes read + * @throws AvroIOException if length value is negative or if the read failed + */ + public function read($len) + { + if (0 > $len) + throw new AvroIOException( + sprintf("Invalid length value passed to read: %d", $len)); + + if (0 == $len) + return ''; + + $bytes = fread($this->file_handle, $len); + if (false === $bytes) + throw new AvroIOException('Could not read from file'); + return $bytes; + } + + /** + * @returns int current position within the file + * @throws AvroFileExcpetion if tell failed. + */ + public function tell() + { + $position = ftell($this->file_handle); + if (false === $position) + throw new AvroIOException('Could not execute tell on reader'); + return $position; + } + + /** + * @param int $offset + * @param int $whence + * @returns boolean true upon success + * @throws AvroIOException if seek failed. + * @see AvroIO::seek() + */ + public function seek($offset, $whence = SEEK_SET) + { + $res = fseek($this->file_handle, $offset, $whence); + // Note: does not catch seeking beyond end of file + if (-1 === $res) + throw new AvroIOException( + sprintf("Could not execute seek (offset = %d, whence = %d)", + $offset, $whence)); + return true; + } + + /** + * Closes the file. + * @returns boolean true if successful. + * @throws AvroIOException if there was an error closing the file. + */ + public function close() + { + $res = fclose($this->file_handle); + if (false === $res) + throw new AvroIOException('Error closing file.'); + return $res; + } + + /** + * @returns boolean true if the pointer is at the end of the file, + * and false otherwise. + * @see AvroIO::is_eof() as behavior differs from feof() + */ + public function is_eof() + { + $this->read(1); + if (feof($this->file_handle)) + return true; + $this->seek(-1, self::SEEK_CUR); + return false; + } + + /** + * @returns boolean true if the flush was successful. + * @throws AvroIOException if there was an error flushing the file. + */ + public function flush() + { + $res = fflush($this->file_handle); + if (false === $res) + throw new AvroIOException('Could not flush file.'); + return true; + } + +}