flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-1371] [runtime] Fix KryoSerializer to not swallow EOFExceptions
Date Thu, 08 Jan 2015 11:49:58 GMT
Repository: flink
Updated Branches:
  refs/heads/master d9cb5b710 -> 19066b520


[FLINK-1371] [runtime] Fix KryoSerializer to not swallow EOFExceptions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19066b52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19066b52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19066b52

Branch: refs/heads/master
Commit: 19066b520435528e104a69ccf372f56811123ee3
Parents: d9cb5b7
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Jan 8 12:38:51 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jan 8 12:38:51 2015 +0100

----------------------------------------------------------------------
 .../java/typeutils/runtime/KryoSerializer.java  |  25 +-
 .../typeutils/runtime/WritableSerializer.java   |   1 +
 .../runtime/KryoGenericTypeSerializerTest.java  |  47 ++-
 .../runtime/KryoVersusAvroMinibenchmark.java    | 260 +---------------
 .../runtime/TestDataOutputSerializer.java       | 301 +++++++++++++++++++
 5 files changed, 373 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 9e302bf..b2c55fb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 
 public class KryoSerializer<T> extends TypeSerializer<T> {
@@ -114,8 +115,19 @@ public class KryoSerializer<T> extends TypeSerializer<T>
{
 			previousOut = target;
 		}
 		
-		kryo.writeClassAndObject(output, record);
-		output.flush();
+		try {
+			kryo.writeClassAndObject(output, record);
+			output.flush();
+		}
+		catch (KryoException ke) {
+			Throwable cause = ke.getCause();
+			if (cause instanceof EOFException) {
+				throw (EOFException) cause;
+			}
+			else {
+				throw ke;
+			}
+		}
 	}
 
 	@SuppressWarnings("unchecked")
@@ -173,4 +185,13 @@ public class KryoSerializer<T> extends TypeSerializer<T>
{
 			this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
 		}
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	// for testing
+	// --------------------------------------------------------------------------------------------
+	
+	Kryo getKryo() {
+		checkKryoInitialized();
+		return this.kryo;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index e838d27..c89733e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -43,6 +43,7 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T>
{
 		this.typeClass = typeClass;
 	}
 	
+	@SuppressWarnings("unchecked")
 	@Override
 	public T createInstance() {
 		if(typeClass == NullWritable.class) {

http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
index 3c22b15..d0fc6ed 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import static org.junit.Assert.*;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.junit.Test;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.Random;
 
+@SuppressWarnings("unchecked")
 public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
-
+	
 	@Test
 	public void testJavaList(){
 		Collection<Integer> a = new ArrayList<Integer>();
@@ -67,4 +70,44 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer
 	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
 		return new KryoSerializer<T>(type);
 	}
+	
+	/**
+	 * Make sure that the kryo serializer forwards EOF exceptions properly
+	 */
+	@Test
+	public void testForwardEOFException() {
+		try {
+			// construct a long string
+			String str;
+			{
+				char[] charData = new char[40000];
+				Random rnd = new Random();
+				
+				for (int i = 0; i < charData.length; i++) {
+					charData[i] = (char) rnd.nextInt(10000);
+				}
+				
+				str = new String(charData);
+			}
+			
+			// construct a memory target that is too small for the string
+			TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000);
+			KryoSerializer<String> serializer = new KryoSerializer<String>(String.class);
+			
+			try {
+				serializer.serialize(str, target);
+				fail("should throw a java.io.EOFException");
+			}
+			catch (java.io.EOFException e) {
+				// that is how we like it
+			}
+			catch (Exception e) {
+				fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
index 8111dc6..4c6b39f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
@@ -29,7 +29,6 @@ import java.util.Random;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemoryUtils;
 
 public class KryoVersusAvroMinibenchmark {
@@ -61,7 +60,7 @@ public class KryoVersusAvroMinibenchmark {
 			
 			System.out.println("Avro serializer");
 			{
-				final DataOutputSerializer outView = new DataOutputSerializer(100000000);
+				final TestDataOutputSerializer outView = new TestDataOutputSerializer(100000000);
 				final AvroSerializer<MyType> serializer = new AvroSerializer<MyType>(MyType.class);
 				
 				long start = System.nanoTime();
@@ -84,8 +83,9 @@ public class KryoVersusAvroMinibenchmark {
 			
 			System.out.println("Kryo serializer");
 			{
-				final DataOutputSerializer outView = new DataOutputSerializer(100000000);
+				final TestDataOutputSerializer outView = new TestDataOutputSerializer(100000000);
 				final KryoSerializer<MyType> serializer = new KryoSerializer<MyType>(MyType.class);
+				serializer.getKryo().register(Tuple2.class);
 				
 				long start = System.nanoTime();
 				
@@ -181,260 +181,6 @@ public class KryoVersusAvroMinibenchmark {
 	// ============================================================================================
 	// ============================================================================================
 	
-	public static final class DataOutputSerializer implements DataOutputView {
-		
-		private byte[] buffer;
-		
-		private int position;
-
-		private ByteBuffer wrapper;
-		
-		public DataOutputSerializer(int startSize) {
-			if (startSize < 1) {
-				throw new IllegalArgumentException();
-			}
-
-			this.buffer = new byte[startSize];
-			this.wrapper = ByteBuffer.wrap(buffer);
-		}
-		
-		public ByteBuffer wrapAsByteBuffer() {
-			this.wrapper.position(0);
-			this.wrapper.limit(this.position);
-			return this.wrapper;
-		}
-
-		public void clear() {
-			this.position = 0;
-		}
-
-		public int length() {
-			return this.position;
-		}
-
-		@Override
-		public String toString() {
-			return String.format("[pos=%d cap=%d]", this.position, this.buffer.length);
-		}
-
-		// ----------------------------------------------------------------------------------------
-		//                               Data Output
-		// ----------------------------------------------------------------------------------------
-		
-		@Override
-		public void write(int b) throws IOException {
-			if (this.position >= this.buffer.length) {
-				resize(1);
-			}
-			this.buffer[this.position++] = (byte) (b & 0xff);
-		}
-
-		@Override
-		public void write(byte[] b) throws IOException {
-			write(b, 0, b.length);
-		}
-
-		@Override
-		public void write(byte[] b, int off, int len) throws IOException {
-			if (len < 0 || off > b.length - len) {
-				throw new ArrayIndexOutOfBoundsException();
-			}
-			if (this.position > this.buffer.length - len) {
-				resize(len);
-			}
-			System.arraycopy(b, off, this.buffer, this.position, len);
-			this.position += len;
-		}
-
-		@Override
-		public void writeBoolean(boolean v) throws IOException {
-			write(v ? 1 : 0);
-		}
-
-		@Override
-		public void writeByte(int v) throws IOException {
-			write(v);
-		}
-
-		@Override
-		public void writeBytes(String s) throws IOException {
-			final int sLen = s.length();
-			if (this.position >= this.buffer.length - sLen) {
-				resize(sLen);
-			}
-			
-			for (int i = 0; i < sLen; i++) {
-				writeByte(s.charAt(i));
-			}
-			this.position += sLen;
-		}
-
-		@Override
-		public void writeChar(int v) throws IOException {
-			if (this.position >= this.buffer.length - 1) {
-				resize(2);
-			}
-			this.buffer[this.position++] = (byte) (v >> 8);
-			this.buffer[this.position++] = (byte) v;
-		}
-
-		@Override
-		public void writeChars(String s) throws IOException {
-			final int sLen = s.length();
-			if (this.position >= this.buffer.length - 2*sLen) {
-				resize(2*sLen);
-			} 
-			for (int i = 0; i < sLen; i++) {
-				writeChar(s.charAt(i));
-			}
-		}
-
-		@Override
-		public void writeDouble(double v) throws IOException {
-			writeLong(Double.doubleToLongBits(v));
-		}
-
-		@Override
-		public void writeFloat(float v) throws IOException {
-			writeInt(Float.floatToIntBits(v));
-		}
-
-		@SuppressWarnings("restriction")
-		@Override
-		public void writeInt(int v) throws IOException {
-			if (this.position >= this.buffer.length - 3) {
-				resize(4);
-			}
-			if (LITTLE_ENDIAN) {
-				v = Integer.reverseBytes(v);
-			}			
-			UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
-			this.position += 4;
-		}
-
-		@SuppressWarnings("restriction")
-		@Override
-		public void writeLong(long v) throws IOException {
-			if (this.position >= this.buffer.length - 7) {
-				resize(8);
-			}
-			if (LITTLE_ENDIAN) {
-				v = Long.reverseBytes(v);
-			}
-			UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
-			this.position += 8;
-		}
-
-		@Override
-		public void writeShort(int v) throws IOException {
-			if (this.position >= this.buffer.length - 1) {
-				resize(2);
-			}
-			this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
-			this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff);
-		}
-
-		@Override
-		public void writeUTF(String str) throws IOException {
-			int strlen = str.length();
-			int utflen = 0;
-			int c;
-
-			/* use charAt instead of copying String to char array */
-			for (int i = 0; i < strlen; i++) {
-				c = str.charAt(i);
-				if ((c >= 0x0001) && (c <= 0x007F)) {
-					utflen++;
-				} else if (c > 0x07FF) {
-					utflen += 3;
-				} else {
-					utflen += 2;
-				}
-			}
-
-			if (utflen > 65535) {
-				throw new UTFDataFormatException("Encoded string is too long: " + utflen);
-			}
-			else if (this.position > this.buffer.length - utflen - 2) {
-				resize(utflen + 2);
-			}
-			
-			byte[] bytearr = this.buffer;
-			int count = this.position;
-
-			bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-			bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
-
-			int i = 0;
-			for (i = 0; i < strlen; i++) {
-				c = str.charAt(i);
-				if (!((c >= 0x0001) && (c <= 0x007F))) {
-					break;
-				}
-				bytearr[count++] = (byte) c;
-			}
-
-			for (; i < strlen; i++) {
-				c = str.charAt(i);
-				if ((c >= 0x0001) && (c <= 0x007F)) {
-					bytearr[count++] = (byte) c;
-
-				} else if (c > 0x07FF) {
-					bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
-					bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
-					bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
-				} else {
-					bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
-					bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
-				}
-			}
-
-			this.position = count;
-		}
-		
-		
-		private void resize(int minCapacityAdd) throws IOException {
-			try {
-				final int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd);
-				final byte[] nb = new byte[newLen];
-				System.arraycopy(this.buffer, 0, nb, 0, this.position);
-				this.buffer = nb;
-				this.wrapper = ByteBuffer.wrap(this.buffer);
-			}
-			catch (NegativeArraySizeException nasex) {
-				throw new IOException("Serialization failed because the record length would exceed 2GB
(max addressable array size in Java).");
-			}
-		}
-		
-		@SuppressWarnings("restriction")
-		private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-		
-		@SuppressWarnings("restriction")
-		private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-		
-		private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
-
-		@Override
-		public void skipBytesToWrite(int numBytes) throws IOException {
-			if(buffer.length - this.position < numBytes){
-				throw new EOFException("Could not skip " + numBytes + " bytes.");
-			}
-
-			this.position += numBytes;
-		}
-
-		@Override
-		public void write(DataInputView source, int numBytes) throws IOException {
-			if(buffer.length - this.position < numBytes){
-				throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow.");
-			}
-
-			source.read(this.buffer, this.position, numBytes);
-			this.position += numBytes;
-		}
-		
-	}
-	
 	public static final class DataInputDeserializer implements DataInputView {
 		
 		private byte[] buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
new file mode 100644
index 0000000..58a2aeb
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemoryUtils;
+
+public final class TestDataOutputSerializer implements DataOutputView {
+	
+	private byte[] buffer;
+	
+	private int position;
+
+	private ByteBuffer wrapper;
+	
+	private final int maxSize;
+	
+
+	public TestDataOutputSerializer(int startSize) {
+		this(startSize, Integer.MAX_VALUE);
+	}
+	
+	public TestDataOutputSerializer(int startSize, int maxSize) {
+		if (startSize < 1 || startSize > maxSize) {
+			throw new IllegalArgumentException();
+		}
+
+		this.buffer = new byte[startSize];
+		this.wrapper = ByteBuffer.wrap(buffer);
+		this.maxSize = maxSize;
+	}
+	
+	public ByteBuffer wrapAsByteBuffer() {
+		this.wrapper.position(0);
+		this.wrapper.limit(this.position);
+		return this.wrapper;
+	}
+
+	public void clear() {
+		this.position = 0;
+	}
+
+	public int length() {
+		return this.position;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("[pos=%d cap=%d]", this.position, this.buffer.length);
+	}
+
+	// ----------------------------------------------------------------------------------------
+	//                               Data Output
+	// ----------------------------------------------------------------------------------------
+	
+	@Override
+	public void write(int b) throws IOException {
+		if (this.position >= this.buffer.length) {
+			resize(1);
+		}
+		this.buffer[this.position++] = (byte) (b & 0xff);
+	}
+
+	@Override
+	public void write(byte[] b) throws IOException {
+		write(b, 0, b.length);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		if (len < 0 || off > b.length - len) {
+			throw new ArrayIndexOutOfBoundsException();
+		}
+		if (this.position > this.buffer.length - len) {
+			resize(len);
+		}
+		System.arraycopy(b, off, this.buffer, this.position, len);
+		this.position += len;
+	}
+
+	@Override
+	public void writeBoolean(boolean v) throws IOException {
+		write(v ? 1 : 0);
+	}
+
+	@Override
+	public void writeByte(int v) throws IOException {
+		write(v);
+	}
+
+	@Override
+	public void writeBytes(String s) throws IOException {
+		final int sLen = s.length();
+		if (this.position >= this.buffer.length - sLen) {
+			resize(sLen);
+		}
+		
+		for (int i = 0; i < sLen; i++) {
+			writeByte(s.charAt(i));
+		}
+		this.position += sLen;
+	}
+
+	@Override
+	public void writeChar(int v) throws IOException {
+		if (this.position >= this.buffer.length - 1) {
+			resize(2);
+		}
+		this.buffer[this.position++] = (byte) (v >> 8);
+		this.buffer[this.position++] = (byte) v;
+	}
+
+	@Override
+	public void writeChars(String s) throws IOException {
+		final int sLen = s.length();
+		if (this.position >= this.buffer.length - 2*sLen) {
+			resize(2*sLen);
+		} 
+		for (int i = 0; i < sLen; i++) {
+			writeChar(s.charAt(i));
+		}
+	}
+
+	@Override
+	public void writeDouble(double v) throws IOException {
+		writeLong(Double.doubleToLongBits(v));
+	}
+
+	@Override
+	public void writeFloat(float v) throws IOException {
+		writeInt(Float.floatToIntBits(v));
+	}
+
+	@SuppressWarnings("restriction")
+	@Override
+	public void writeInt(int v) throws IOException {
+		if (this.position >= this.buffer.length - 3) {
+			resize(4);
+		}
+		if (LITTLE_ENDIAN) {
+			v = Integer.reverseBytes(v);
+		}			
+		UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
+		this.position += 4;
+	}
+
+	@SuppressWarnings("restriction")
+	@Override
+	public void writeLong(long v) throws IOException {
+		if (this.position >= this.buffer.length - 7) {
+			resize(8);
+		}
+		if (LITTLE_ENDIAN) {
+			v = Long.reverseBytes(v);
+		}
+		UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
+		this.position += 8;
+	}
+
+	@Override
+	public void writeShort(int v) throws IOException {
+		if (this.position >= this.buffer.length - 1) {
+			resize(2);
+		}
+		this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
+		this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff);
+	}
+
+	@Override
+	public void writeUTF(String str) throws IOException {
+		int strlen = str.length();
+		int utflen = 0;
+		int c;
+
+		/* use charAt instead of copying String to char array */
+		for (int i = 0; i < strlen; i++) {
+			c = str.charAt(i);
+			if ((c >= 0x0001) && (c <= 0x007F)) {
+				utflen++;
+			} else if (c > 0x07FF) {
+				utflen += 3;
+			} else {
+				utflen += 2;
+			}
+		}
+
+		if (utflen > 65535) {
+			throw new UTFDataFormatException("Encoded string is too long: " + utflen);
+		}
+		else if (this.position > this.buffer.length - utflen - 2) {
+			resize(utflen + 2);
+		}
+		
+		byte[] bytearr = this.buffer;
+		int count = this.position;
+
+		bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+		bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+
+		int i = 0;
+		for (i = 0; i < strlen; i++) {
+			c = str.charAt(i);
+			if (!((c >= 0x0001) && (c <= 0x007F))) {
+				break;
+			}
+			bytearr[count++] = (byte) c;
+		}
+
+		for (; i < strlen; i++) {
+			c = str.charAt(i);
+			if ((c >= 0x0001) && (c <= 0x007F)) {
+				bytearr[count++] = (byte) c;
+
+			} else if (c > 0x07FF) {
+				bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+				bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+				bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+			} else {
+				bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+				bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+			}
+		}
+
+		this.position = count;
+	}
+	
+	
+	private void resize(int minCapacityAdd) throws IOException {
+		try {
+			int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd);
+			
+			if (newLen > maxSize) {
+				
+				if (this.buffer.length + minCapacityAdd > maxSize) {
+					throw new EOFException("Exceeded maximum capacity");
+				}
+				
+				newLen = maxSize;
+			}
+			
+			final byte[] nb = new byte[newLen];
+			System.arraycopy(this.buffer, 0, nb, 0, this.position);
+			this.buffer = nb;
+			this.wrapper = ByteBuffer.wrap(this.buffer);
+		}
+		catch (NegativeArraySizeException nasex) {
+			throw new IOException("Serialization failed because the record length would exceed 2GB
(max addressable array size in Java).");
+		}
+	}
+	
+	@SuppressWarnings("restriction")
+	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+	
+	@SuppressWarnings("restriction")
+	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+	
+	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+
+	@Override
+	public void skipBytesToWrite(int numBytes) throws IOException {
+		if(buffer.length - this.position < numBytes){
+			throw new EOFException("Could not skip " + numBytes + " bytes.");
+		}
+
+		this.position += numBytes;
+	}
+
+	@Override
+	public void write(DataInputView source, int numBytes) throws IOException {
+		if(buffer.length - this.position < numBytes){
+			throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow.");
+		}
+
+		source.read(this.buffer, this.position, numBytes);
+		this.position += numBytes;
+	}
+	
+}
\ No newline at end of file


Mime
View raw message