Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8404A11AD6 for ; Tue, 8 Jul 2014 12:18:49 +0000 (UTC) Received: (qmail 66769 invoked by uid 500); 8 Jul 2014 12:18:49 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 66747 invoked by uid 500); 8 Jul 2014 12:18:49 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 66734 invoked by uid 99); 8 Jul 2014 12:18:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jul 2014 12:18:49 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 08 Jul 2014 12:18:44 +0000 Received: (qmail 66470 invoked by uid 99); 8 Jul 2014 12:18:23 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jul 2014 12:18:23 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 927C694632C; Tue, 8 Jul 2014 12:18:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Tue, 08 Jul 2014 12:18:25 -0000 Message-Id: <1cb3aa1fcfdb423fb1d366506b4de55e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] [FLINK-994] Replaced DataInput and DataOutput with DataInputView and DataOutputView X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java b/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java index 73671fa..78e90f0 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java @@ -1070,7 +1070,7 @@ public final class Record implements Value, CopyableValue { // -------------------------------------------------------------------------------------------- @Override - public void write(DataOutput out) throws IOException { + public void write(DataOutputView out) throws IOException { // make sure everything is in a valid binary representation updateBinaryRepresenation(); @@ -1080,7 +1080,7 @@ public final class Record implements Value, CopyableValue { } @Override - public void read(DataInput in) throws IOException { + public void read(DataInputView in) throws IOException { final int len = readVarLengthInt(in); this.binaryLen = len; @@ -1187,7 +1187,7 @@ public final class Record implements Value, CopyableValue { } /** - * Writes this record to the given output view. This method is similar to {@link #write(DataOutput)}, but + * Writes this record to the given output view. This method is similar to {@link eu.stratosphere.core.io.IOReadableWritable#write(eu.stratosphere.core.memory.DataOutputView)}, but * it returns the number of bytes written. * * @param target The view to write the record to. @@ -1262,12 +1262,12 @@ public final class Record implements Value, CopyableValue { private static final long serialVersionUID = 1L; @Override - public void write(DataOutput out) throws IOException { + public void write(DataOutputView out) throws IOException { throw new UnsupportedOperationException(); } @Override - public void read(DataInput in) throws IOException { + public void read(DataInputView in) throws IOException { throw new UnsupportedOperationException(); } }; @@ -1275,7 +1275,7 @@ public final class Record implements Value, CopyableValue { /** * Internal interface class to provide serialization for the data types. */ - private static final class InternalDeSerializer implements DataInput, DataOutput, Serializable { + private static final class InternalDeSerializer implements DataInputView, DataOutputView, Serializable { private static final long serialVersionUID = 1L; @@ -1510,6 +1510,49 @@ public final class Record implements Value, CopyableValue { return n; } } + + @Override + public void skipBytesToRead(int numBytes) throws IOException { + if(this.end - this.position < numBytes) { + throw new EOFException("Could not skip " + numBytes + "."); + } + skipBytes(numBytes); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if(b == null){ + throw new NullPointerException("Byte array b cannot be null."); + } + + if(off < 0){ + throw new IndexOutOfBoundsException("Offset cannot be negative."); + } + + if(len < 0){ + throw new IndexOutOfBoundsException("Length cannot be negative."); + } + + if(b.length - off < len){ + throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" + + "."); + } + + if(this.position >= this.end){ + return -1; + }else{ + int toRead = Math.min(this.end-this.position, len); + System.arraycopy(this.memory,this.position,b,off,toRead); + this.position += toRead; + + return toRead; + } + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } // ---------------------------------------------------------------------------------------- // Data Output @@ -1736,5 +1779,24 @@ public final class Record implements Value, CopyableValue { private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); + + @Override + public void skipBytesToWrite(int numBytes) throws IOException { + int skippedBytes = skipBytes(numBytes); + + if(skippedBytes != numBytes){ + throw new EOFException("Could not skip " + numBytes + " bytes."); + } + } + + @Override + public void write(DataInputView source, int numBytes) throws IOException { + if(numBytes > this.end - this.position){ + throw new IOException("Could not write " + numBytes + " bytes since the buffer is full."); + } + + source.read(this.memory,this.position, numBytes); + this.position += numBytes; + } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/main/java/eu/stratosphere/types/ShortValue.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/main/java/eu/stratosphere/types/ShortValue.java b/stratosphere-core/src/main/java/eu/stratosphere/types/ShortValue.java index 52d6fd2..d242986 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/types/ShortValue.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/types/ShortValue.java @@ -13,8 +13,6 @@ package eu.stratosphere.types; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.memory.DataInputView; @@ -80,12 +78,12 @@ public class ShortValue implements NormalizableKey, ResettableValue< // -------------------------------------------------------------------------------------------- @Override - public void read(DataInput in) throws IOException { + public void read(DataInputView in) throws IOException { this.value = in.readShort(); } @Override - public void write(DataOutput out) throws IOException { + public void write(DataOutputView out) throws IOException { out.writeShort(this.value); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/main/java/eu/stratosphere/types/StringValue.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/main/java/eu/stratosphere/types/StringValue.java b/stratosphere-core/src/main/java/eu/stratosphere/types/StringValue.java index cc970e8..1a133f6 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/types/StringValue.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/types/StringValue.java @@ -470,7 +470,7 @@ public class StringValue implements NormalizableKey, CharSequence, // -------------------------------------------------------------------------------------------- @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { int len = in.readUnsignedByte(); if (len >= HIGH_BIT) { @@ -508,7 +508,7 @@ public class StringValue implements NormalizableKey, CharSequence, } @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { int len = this.len; // write the length, variable-length encoded http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/BinaryInputFormatTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/BinaryInputFormatTest.java b/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/BinaryInputFormatTest.java index 7d10a9f..e820013 100644 --- a/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/BinaryInputFormatTest.java +++ b/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/BinaryInputFormatTest.java @@ -13,11 +13,11 @@ package eu.stratosphere.api.common.io; -import java.io.DataInput; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import eu.stratosphere.core.memory.DataInputView; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.BeforeClass; @@ -35,7 +35,7 @@ public class BinaryInputFormatTest { private static final long serialVersionUID = 1L; @Override - protected Record deserialize(Record record, DataInput dataInput) { + protected Record deserialize(Record record, DataInputView dataInput) { return record; } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/SequentialFormatTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/SequentialFormatTest.java b/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/SequentialFormatTest.java index 420fdf6..164899c 100644 --- a/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/SequentialFormatTest.java +++ b/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/SequentialFormatTest.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; +import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper; import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; @@ -92,7 +93,7 @@ public class SequentialFormatTest { ByteCounter byteCounter = new ByteCounter(); DataOutputStream out = new DataOutputStream(byteCounter); for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) { - this.getRecord(recordIndex).write(out); + this.getRecord(recordIndex).write(new OutputViewDataOutputStreamWrapper(out)); } this.rawDataSizes[fileIndex] = byteCounter.getLength(); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/api/distributions/SimpleDataDistributionTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/test/java/eu/stratosphere/api/distributions/SimpleDataDistributionTest.java b/stratosphere-core/src/test/java/eu/stratosphere/api/distributions/SimpleDataDistributionTest.java index c87d2e1..eb83f8c 100644 --- a/stratosphere-core/src/test/java/eu/stratosphere/api/distributions/SimpleDataDistributionTest.java +++ b/stratosphere-core/src/test/java/eu/stratosphere/api/distributions/SimpleDataDistributionTest.java @@ -19,6 +19,8 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper; +import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper; import junit.framework.Assert; import org.junit.Test; @@ -107,7 +109,7 @@ public class SimpleDataDistributionTest { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream dos = new DataOutputStream(baos); try { - ddWrite.write(dos); + ddWrite.write(new OutputViewDataOutputStreamWrapper(dos)); } catch (IOException e) { Assert.fail("Error serializing the DataDistribution: " + e.getMessage()); } @@ -120,7 +122,7 @@ public class SimpleDataDistributionTest { SimpleDistribution ddRead = new SimpleDistribution(); try { - ddRead.read(in); + ddRead.read(new InputViewDataInputStreamWrapper(in)); } catch (Exception ex) { Assert.fail("The deserialization of the encoded data distribution caused an error"); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/core/testutils/CommonTestUtils.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/test/java/eu/stratosphere/core/testutils/CommonTestUtils.java b/stratosphere-core/src/test/java/eu/stratosphere/core/testutils/CommonTestUtils.java index dcfdb80..fcdddcb 100644 --- a/stratosphere-core/src/test/java/eu/stratosphere/core/testutils/CommonTestUtils.java +++ b/stratosphere-core/src/test/java/eu/stratosphere/core/testutils/CommonTestUtils.java @@ -25,6 +25,8 @@ import java.io.IOException; import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.GlobalConfiguration; import eu.stratosphere.core.io.IOReadableWritable; +import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper; +import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper; /** * This class contains auxiliary methods for unit tests in the Nephele common module. @@ -95,7 +97,7 @@ public class CommonTestUtils { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream dos = new DataOutputStream(baos); - original.write(dos); + original.write(new OutputViewDataOutputStreamWrapper(dos)); final String className = original.getClass().getName(); if (className == null) { @@ -130,7 +132,7 @@ public class CommonTestUtils { final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); final DataInputStream dis = new DataInputStream(bais); - copy.read(dis); + copy.read(new InputViewDataInputStreamWrapper(dis)); return copy; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/types/CollectionsDataTypeTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/test/java/eu/stratosphere/types/CollectionsDataTypeTest.java b/stratosphere-core/src/test/java/eu/stratosphere/types/CollectionsDataTypeTest.java index 9e81d74..163473e 100644 --- a/stratosphere-core/src/test/java/eu/stratosphere/types/CollectionsDataTypeTest.java +++ b/stratosphere-core/src/test/java/eu/stratosphere/types/CollectionsDataTypeTest.java @@ -22,6 +22,8 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Map.Entry; +import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper; +import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper; import junit.framework.Assert; import org.junit.Before; @@ -58,8 +60,8 @@ public class CollectionsDataTypeTest { try { NfIntStringPair mPairActual = new NfIntStringPair(); - pair1.write(out); - mPairActual.read(in); + pair1.write(new OutputViewDataOutputStreamWrapper(out)); + mPairActual.read(new InputViewDataInputStreamWrapper(in)); Assert.assertEquals(pair1, mPairActual); } catch (IOException e) { @@ -182,8 +184,8 @@ public class CollectionsDataTypeTest { // now test data transfer NfIntStringMap nMap = new NfIntStringMap(); try { - map0.write(out); - nMap.read(in); + map0.write(new OutputViewDataOutputStreamWrapper(out)); + nMap.read(new InputViewDataInputStreamWrapper(in)); } catch (Exception e) { Assert.assertTrue(false); } @@ -210,8 +212,8 @@ public class CollectionsDataTypeTest { // test data transfer NfStringList mList2 = new NfStringList(); try { - list.write(out); - mList2.read(in); + list.write(new OutputViewDataOutputStreamWrapper(out)); + mList2.read(new InputViewDataInputStreamWrapper(in)); } catch (Exception e) { Assert.assertTrue(false); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/types/PrimitiveDataTypeTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/test/java/eu/stratosphere/types/PrimitiveDataTypeTest.java b/stratosphere-core/src/test/java/eu/stratosphere/types/PrimitiveDataTypeTest.java index 7e893ed..723ea18 100644 --- a/stratosphere-core/src/test/java/eu/stratosphere/types/PrimitiveDataTypeTest.java +++ b/stratosphere-core/src/test/java/eu/stratosphere/types/PrimitiveDataTypeTest.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper; +import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper; import junit.framework.Assert; import org.junit.Before; @@ -56,15 +58,15 @@ public class PrimitiveDataTypeTest { Assert.assertEquals(int0.compareTo(int3), -1); // test stream output and retrieval try { - int0.write(mOut); - int2.write(mOut); - int3.write(mOut); + int0.write(new OutputViewDataOutputStreamWrapper(mOut)); + int2.write(new OutputViewDataOutputStreamWrapper(mOut)); + int3.write(new OutputViewDataOutputStreamWrapper(mOut)); IntValue int1n = new IntValue(); IntValue int2n = new IntValue(); IntValue int3n = new IntValue(); - int1n.read(mIn); - int2n.read(mIn); - int3n.read(mIn); + int1n.read(new InputViewDataInputStreamWrapper(mIn)); + int2n.read(new InputViewDataInputStreamWrapper(mIn)); + int3n.read(new InputViewDataInputStreamWrapper(mIn)); Assert.assertEquals(int0.compareTo(int1n), 0); Assert.assertEquals(int0.getValue(), int1n.getValue()); Assert.assertEquals(int2.compareTo(int2n), 0); @@ -92,15 +94,15 @@ public class PrimitiveDataTypeTest { Assert.assertEquals(double0.compareTo(double3), -1); // test stream output and retrieval try { - double0.write(mOut); - double2.write(mOut); - double3.write(mOut); + double0.write(new OutputViewDataOutputStreamWrapper(mOut)); + double2.write(new OutputViewDataOutputStreamWrapper(mOut)); + double3.write(new OutputViewDataOutputStreamWrapper(mOut)); DoubleValue double1n = new DoubleValue(); DoubleValue double2n = new DoubleValue(); DoubleValue double3n = new DoubleValue(); - double1n.read(mIn); - double2n.read(mIn); - double3n.read(mIn); + double1n.read(new InputViewDataInputStreamWrapper(mIn)); + double2n.read(new InputViewDataInputStreamWrapper(mIn)); + double3n.read(new InputViewDataInputStreamWrapper(mIn)); Assert.assertEquals(double0.compareTo(double1n), 0); Assert.assertEquals(double0.getValue(), double1n.getValue()); Assert.assertEquals(double2.compareTo(double2n), 0); @@ -156,21 +158,21 @@ public class PrimitiveDataTypeTest { // test stream out/input try { - string0.write(mOut); - string4.write(mOut); - string2.write(mOut); - string3.write(mOut); - string7.write(mOut); + string0.write(new OutputViewDataOutputStreamWrapper(mOut)); + string4.write(new OutputViewDataOutputStreamWrapper(mOut)); + string2.write(new OutputViewDataOutputStreamWrapper(mOut)); + string3.write(new OutputViewDataOutputStreamWrapper(mOut)); + string7.write(new OutputViewDataOutputStreamWrapper(mOut)); StringValue string1n = new StringValue(); StringValue string2n = new StringValue(); StringValue string3n = new StringValue(); StringValue string4n = new StringValue(); StringValue string7n = new StringValue(); - string1n.read(mIn); - string4n.read(mIn); - string2n.read(mIn); - string3n.read(mIn); - string7n.read(mIn); + string1n.read(new InputViewDataInputStreamWrapper(mIn)); + string4n.read(new InputViewDataInputStreamWrapper(mIn)); + string2n.read(new InputViewDataInputStreamWrapper(mIn)); + string3n.read(new InputViewDataInputStreamWrapper(mIn)); + string7n.read(new InputViewDataInputStreamWrapper(mIn)); Assert.assertEquals(string0.compareTo(string1n), 0); Assert.assertEquals(string0.toString(), string1n.toString()); Assert.assertEquals(string4.compareTo(string4n), 0); @@ -209,12 +211,12 @@ public class PrimitiveDataTypeTest { try { // write it multiple times for (int i = 0; i < numWrites; i++) { - pn.write(mOut); + pn.write(new OutputViewDataOutputStreamWrapper(mOut)); } // read it multiple times for (int i = 0; i < numWrites; i++) { - pn.read(mIn); + pn.read(new InputViewDataInputStreamWrapper(mIn)); } Assert.assertEquals("Reading PactNull does not consume the same data as was written.", mIn.available(), 0); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/types/RecordTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/test/java/eu/stratosphere/types/RecordTest.java b/stratosphere-core/src/test/java/eu/stratosphere/types/RecordTest.java index 2f14257..89b3ce6 100644 --- a/stratosphere-core/src/test/java/eu/stratosphere/types/RecordTest.java +++ b/stratosphere-core/src/test/java/eu/stratosphere/types/RecordTest.java @@ -26,6 +26,8 @@ import java.io.PipedOutputStream; import java.util.Arrays; import java.util.Random; +import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper; +import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -59,15 +61,15 @@ public class RecordTest { try { // test deserialize into self Record empty = new Record(); - empty.write(this.out); - empty.read(this.in); + empty.write(new OutputViewDataOutputStreamWrapper(this.out)); + empty.read(new InputViewDataInputStreamWrapper(this.in)); Assert.assertTrue("Deserialized Empty record is not another empty record.", empty.getNumFields() == 0); // test deserialize into new empty = new Record(); - empty.write(this.out); + empty.write(new OutputViewDataOutputStreamWrapper(this.out)); empty = new Record(); - empty.read(this.in); + empty.read(new InputViewDataInputStreamWrapper(this.in)); Assert.assertTrue("Deserialized Empty record is not another empty record.", empty.getNumFields() == 0); } catch (Throwable t) { @@ -382,18 +384,18 @@ public class RecordTest { try { // serialize and deserialize to remove all buffered info - r.write(out); + r.write(new OutputViewDataOutputStreamWrapper(out)); r = new Record(); - r.read(in); + r.read(new InputViewDataInputStreamWrapper(in)); r.setField(1, new IntValue(10)); r.setField(4, new StringValue("Some long value")); r.setField(5, new StringValue("An even longer value")); r.setField(10, new IntValue(10)); - r.write(out); + r.write(new OutputViewDataOutputStreamWrapper(out)); r = new Record(); - r.read(in); + r.read(new InputViewDataInputStreamWrapper(in)); assertTrue(r.getField(0, IntValue.class).getValue() == 0); assertTrue(r.getField(1, IntValue.class).getValue() == 10); @@ -427,8 +429,8 @@ public class RecordTest { Record record2 = new Record(); try { // De/Serialize the record - record1.write(this.out); - record2.read(this.in); + record1.write(new OutputViewDataOutputStreamWrapper(this.out)); + record2.read(new InputViewDataInputStreamWrapper(this.in)); assertTrue(record1.getNumFields() == record2.getNumFields()); @@ -456,20 +458,20 @@ public class RecordTest { try { Record record = new Record(new IntValue(42)); - record.write(out); + record.write(new OutputViewDataOutputStreamWrapper(out)); Assert.assertEquals(42, record.getField(0, IntValue.class).getValue()); record.setField(0, new IntValue(23)); - record.write(out); + record.write(new OutputViewDataOutputStreamWrapper(out)); Assert.assertEquals(23, record.getField(0, IntValue.class).getValue()); record.clear(); Assert.assertEquals(0, record.getNumFields()); Record record2 = new Record(new IntValue(42)); - record2.read(in); + record2.read(new InputViewDataInputStreamWrapper(in)); Assert.assertEquals(42, record2.getField(0, IntValue.class).getValue()); - record2.read(in); + record2.read(new InputViewDataInputStreamWrapper(in)); Assert.assertEquals(23, record2.getField(0, IntValue.class).getValue()); } catch (Throwable t) { Assert.fail("Test failed due to an exception: " + t.getMessage()); @@ -541,7 +543,8 @@ public class RecordTest { } } - static final void blackboxTestRecordWithValues(Value[] values, Random rnd, DataInput reader, DataOutput writer) + static final void blackboxTestRecordWithValues(Value[] values, Random rnd, DataInputStream reader, + DataOutputStream writer) throws Exception { final int[] permutation1 = createPermutation(rnd, values.length); @@ -586,9 +589,9 @@ public class RecordTest { final int pos = permutation1[i]; rec.setField(pos, values[pos]); } - rec.write(writer); + rec.write(new OutputViewDataOutputStreamWrapper(writer)); rec = new Record(); - rec.read(reader); + rec.read(new InputViewDataInputStreamWrapper(reader)); testAllRetrievalMethods(rec, permutation2, values); // test adding and retrieving with full stream serialization and deserialization into the same record @@ -597,8 +600,8 @@ public class RecordTest { final int pos = permutation1[i]; rec.setField(pos, values[pos]); } - rec.write(writer); - rec.read(reader); + rec.write(new OutputViewDataOutputStreamWrapper(writer)); + rec.read(new InputViewDataInputStreamWrapper(reader)); testAllRetrievalMethods(rec, permutation2, values); // test adding and retrieving with partial stream serialization and deserialization into a new record @@ -606,18 +609,18 @@ public class RecordTest { updatePos = rnd.nextInt(values.length + 1); for (int i = 0; i < values.length; i++) { if (i == updatePos) { - rec.write(writer); + rec.write(new OutputViewDataOutputStreamWrapper(writer)); rec = new Record(); - rec.read(reader); + rec.read(new InputViewDataInputStreamWrapper(reader)); } final int pos = permutation1[i]; rec.setField(pos, values[pos]); } if (updatePos == values.length) { - rec.write(writer); + rec.write(new OutputViewDataOutputStreamWrapper(writer)); rec = new Record(); - rec.read(reader); + rec.read(new InputViewDataInputStreamWrapper(reader)); } testAllRetrievalMethods(rec, permutation2, values); @@ -626,16 +629,16 @@ public class RecordTest { updatePos = rnd.nextInt(values.length + 1); for (int i = 0; i < values.length; i++) { if (i == updatePos) { - rec.write(writer); - rec.read(reader); + rec.write(new OutputViewDataOutputStreamWrapper(writer)); + rec.read(new InputViewDataInputStreamWrapper(reader)); } final int pos = permutation1[i]; rec.setField(pos, values[pos]); } if (updatePos == values.length) { - rec.write(writer); - rec.read(reader); + rec.write(new OutputViewDataOutputStreamWrapper(writer)); + rec.read(new InputViewDataInputStreamWrapper(reader)); } testAllRetrievalMethods(rec, permutation2, values); @@ -644,17 +647,17 @@ public class RecordTest { updatePos = rnd.nextInt(values.length + 1); for (int i = 0; i < values.length; i++) { if (i == updatePos) { - rec.write(writer); + rec.write(new OutputViewDataOutputStreamWrapper(writer)); rec = new Record(); - rec.read(reader); + rec.read(new InputViewDataInputStreamWrapper(reader)); } final int pos = permutation1[i]; rec.setField(pos, values[pos]); } - rec.write(writer); + rec.write(new OutputViewDataOutputStreamWrapper(writer)); rec = new Record(); - rec.read(reader); + rec.read(new InputViewDataInputStreamWrapper(reader)); testAllRetrievalMethods(rec, permutation2, values); // test adding and retrieving with partial stream serialization and deserialization into the same record @@ -662,15 +665,15 @@ public class RecordTest { updatePos = rnd.nextInt(values.length + 1); for (int i = 0; i < values.length; i++) { if (i == updatePos) { - rec.write(writer); - rec.read(reader); + rec.write(new OutputViewDataOutputStreamWrapper(writer)); + rec.read(new InputViewDataInputStreamWrapper(reader)); } final int pos = permutation1[i]; rec.setField(pos, values[pos]); } - rec.write(writer); - rec.read(reader); + rec.write(new OutputViewDataOutputStreamWrapper(writer)); + rec.read(new InputViewDataInputStreamWrapper(reader)); testAllRetrievalMethods(rec, permutation2, values); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java ---------------------------------------------------------------------- diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java index 82f2755..db75ecd 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java @@ -27,8 +27,8 @@ import eu.stratosphere.api.common.io.GenericInputFormat; import eu.stratosphere.api.common.io.NonParallelInput; import eu.stratosphere.api.common.typeutils.TypeSerializer; import eu.stratosphere.core.io.GenericInputSplit; -import eu.stratosphere.core.memory.InputViewDataInputWrapper; -import eu.stratosphere.core.memory.OutputViewDataOutputWrapper; +import eu.stratosphere.core.memory.InputViewObjectInputStreamWrapper; +import eu.stratosphere.core.memory.OutputViewObjectOutputStreamWrapper; /** * An input format that returns objects from a collection. @@ -78,11 +78,8 @@ public class CollectionInputFormat extends GenericInputFormat implements N out.defaultWriteObject(); out.writeInt(dataSet.size()); - OutputViewDataOutputWrapper outWrapper = new OutputViewDataOutputWrapper(); - outWrapper.setDelegate(out); - for (T element : dataSet){ - serializer.serialize(element, outWrapper); + serializer.serialize(element, new OutputViewObjectOutputStreamWrapper(out)); } } @@ -92,12 +89,10 @@ public class CollectionInputFormat extends GenericInputFormat implements N int collectionLength = in.readInt(); List list = new ArrayList(collectionLength); - InputViewDataInputWrapper inWrapper = new InputViewDataInputWrapper(); - inWrapper.setDelegate(in); for (int i = 0; i < collectionLength; i++){ T element = serializer.createInstance(); - element = serializer.deserialize(element, inWrapper); + element = serializer.deserialize(element, new InputViewObjectInputStreamWrapper(in)); list.add(element); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/io/ExternalProcessInputSplit.java ---------------------------------------------------------------------- diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/io/ExternalProcessInputSplit.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/io/ExternalProcessInputSplit.java index f07ddd5..e6677a4 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/io/ExternalProcessInputSplit.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/io/ExternalProcessInputSplit.java @@ -13,12 +13,12 @@ package eu.stratosphere.api.java.record.io; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.GenericInputSplit; import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; /** * The ExternalProcessInputSplit contains all informations for {@link InputFormat} that read their data from external processes. @@ -58,13 +58,13 @@ public class ExternalProcessInputSplit extends GenericInputSplit { @Override - public void read(DataInput in) throws IOException { + public void read(DataInputView in) throws IOException { super.read(in); this.extProcessCommand = StringRecord.readString(in); } @Override - public void write(DataOutput out) throws IOException { + public void write(DataOutputView out) throws IOException { super.write(out); StringRecord.writeString(out, this.extProcessCommand); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java index 648c8dc..fa9b740 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java @@ -13,12 +13,12 @@ package eu.stratosphere.nephele; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; import eu.stratosphere.core.io.IOReadableWritable; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.util.StringUtils; import io.netty.buffer.ByteBuf; @@ -152,13 +152,13 @@ public class AbstractID implements IOReadableWritable { } @Override - public void read(DataInput in) throws IOException { + public void read(DataInputView in) throws IOException { this.lowerPart = in.readLong(); this.upperPart = in.readLong(); } @Override - public void write(DataOutput out) throws IOException { + public void write(DataOutputView out) throws IOException { out.writeLong(this.lowerPart); out.writeLong(this.upperPart); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/AbstractJobResult.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/AbstractJobResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/AbstractJobResult.java index 13b337d..931f1a2 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/AbstractJobResult.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/AbstractJobResult.java @@ -13,12 +13,12 @@ package eu.stratosphere.nephele.client; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.IOReadableWritable; import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.nephele.util.EnumUtils; /** @@ -78,7 +78,7 @@ public abstract class AbstractJobResult implements IOReadableWritable { @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { // Read the return code this.returnCode = EnumUtils.readEnum(in, ReturnCode.class); @@ -89,7 +89,7 @@ public abstract class AbstractJobResult implements IOReadableWritable { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { // Write the return code EnumUtils.writeEnum(out, this.returnCode); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobProgressResult.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobProgressResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobProgressResult.java index 5f4e34b..d6d05ba 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobProgressResult.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobProgressResult.java @@ -13,11 +13,11 @@ package eu.stratosphere.nephele.client; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.nephele.event.job.AbstractEvent; import eu.stratosphere.nephele.util.SerializableArrayList; @@ -62,7 +62,7 @@ public class JobProgressResult extends AbstractJobResult { @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { super.read(in); this.events.read(in); @@ -70,7 +70,7 @@ public class JobProgressResult extends AbstractJobResult { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { super.write(out); this.events.write(out); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobSubmissionResult.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobSubmissionResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobSubmissionResult.java index 9913be5..9443b0b 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobSubmissionResult.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobSubmissionResult.java @@ -13,8 +13,9 @@ package eu.stratosphere.nephele.client; -import java.io.DataInput; -import java.io.DataOutput; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; + import java.io.IOException; /** @@ -49,13 +50,13 @@ public class JobSubmissionResult extends AbstractJobResult { @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { super.read(in); } @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { super.write(out); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java index 4068e5b..daacd13 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java @@ -13,11 +13,11 @@ package eu.stratosphere.nephele.deployment; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.IOReadableWritable; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.runtime.io.channels.ChannelID; /** @@ -72,7 +72,7 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { this.outputChannelID.write(out); this.inputChannelID.write(out); @@ -80,7 +80,7 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable { @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { this.outputChannelID.read(in); this.inputChannelID.read(in); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java index 02d6578..7ad26dd 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java @@ -13,14 +13,14 @@ package eu.stratosphere.nephele.deployment; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import eu.stratosphere.core.io.IOReadableWritable; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.runtime.io.gates.GateID; import eu.stratosphere.runtime.io.channels.ChannelType; import eu.stratosphere.nephele.util.EnumUtils; @@ -93,7 +93,7 @@ public final class GateDeploymentDescriptor implements IOReadableWritable { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { this.gateID.write(out); EnumUtils.writeEnum(out, channelType); @@ -106,7 +106,7 @@ public final class GateDeploymentDescriptor implements IOReadableWritable { @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { this.gateID.read(in); this.channelType = EnumUtils.readEnum(in, ChannelType.class); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptor.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptor.java index 06fabde..4282600 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptor.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptor.java @@ -13,13 +13,13 @@ package eu.stratosphere.nephele.deployment; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.core.io.IOReadableWritable; import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager; import eu.stratosphere.nephele.executiongraph.ExecutionVertexID; import eu.stratosphere.nephele.jobgraph.JobID; @@ -189,7 +189,7 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { this.jobID.write(out); this.vertexID.write(out); @@ -222,7 +222,7 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable { @SuppressWarnings("unchecked") @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { this.jobID.read(in); this.vertexID.read(in); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java index 80e65ff..cffec9d 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java @@ -13,14 +13,14 @@ package eu.stratosphere.nephele.event.job; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.atomic.AtomicLong; import eu.stratosphere.core.io.IOReadableWritable; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; /** * An abstract event is transmitted from the job manager to the @@ -74,7 +74,7 @@ public abstract class AbstractEvent implements IOReadableWritable { @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { // Read the timestamp this.timestamp = in.readLong(); @@ -83,7 +83,7 @@ public abstract class AbstractEvent implements IOReadableWritable { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { // Write the timestamp out.writeLong(this.timestamp); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/ExecutionStateChangeEvent.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/ExecutionStateChangeEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/ExecutionStateChangeEvent.java index a2cac79..cf451ab 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/ExecutionStateChangeEvent.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/ExecutionStateChangeEvent.java @@ -13,10 +13,10 @@ package eu.stratosphere.nephele.event.job; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.nephele.execution.ExecutionState; import eu.stratosphere.nephele.managementgraph.ManagementVertexID; import eu.stratosphere.nephele.util.EnumUtils; @@ -86,7 +86,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { super.read(in); @@ -96,7 +96,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { super.write(out); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/JobEvent.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/JobEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/JobEvent.java index 420784a..b231978 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/JobEvent.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/JobEvent.java @@ -13,11 +13,11 @@ package eu.stratosphere.nephele.event.job; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.nephele.jobgraph.JobStatus; import eu.stratosphere.nephele.util.EnumUtils; @@ -68,7 +68,7 @@ public class JobEvent extends AbstractEvent { @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { super.read(in); // Read job status @@ -80,7 +80,7 @@ public class JobEvent extends AbstractEvent { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { super.write(out); // Write job status http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java index a477c7a..914adc3 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java @@ -13,11 +13,11 @@ package eu.stratosphere.nephele.event.job; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.nephele.jobgraph.JobID; import eu.stratosphere.nephele.jobgraph.JobStatus; import eu.stratosphere.nephele.util.EnumUtils; @@ -139,7 +139,7 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { super.read(in); // Read the job ID @@ -161,7 +161,7 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { super.write(out); // Write the job ID http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java index c86c12b..4b980ed 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java @@ -13,11 +13,11 @@ package eu.stratosphere.nephele.event.job; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.nephele.managementgraph.ManagementVertexID; /** @@ -83,7 +83,7 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage } @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { super.read(in); @@ -93,7 +93,7 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { super.write(out); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexEvent.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexEvent.java index 940f997..03f1525 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexEvent.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexEvent.java @@ -13,11 +13,11 @@ package eu.stratosphere.nephele.event.job; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.nephele.execution.ExecutionState; import eu.stratosphere.nephele.jobgraph.JobVertexID; import eu.stratosphere.nephele.util.EnumUtils; @@ -108,7 +108,7 @@ public class VertexEvent extends AbstractEvent { @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { super.read(in); @@ -122,7 +122,7 @@ public class VertexEvent extends AbstractEvent { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { super.write(out); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/IntegerTaskEvent.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/IntegerTaskEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/IntegerTaskEvent.java index 04e7b18..43e42e9 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/IntegerTaskEvent.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/IntegerTaskEvent.java @@ -30,8 +30,9 @@ package eu.stratosphere.nephele.event.task; -import java.io.DataInput; -import java.io.DataOutput; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; + import java.io.IOException; /** @@ -74,13 +75,13 @@ public class IntegerTaskEvent extends AbstractTaskEvent { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { out.writeInt(this.value); } @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { this.value = in.readInt(); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/StringTaskEvent.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/StringTaskEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/StringTaskEvent.java index 7079767..68e1ef5 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/StringTaskEvent.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/StringTaskEvent.java @@ -29,11 +29,11 @@ */ package eu.stratosphere.nephele.event.task; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; /** * This class provides a simple implementation of an event that holds a string value. @@ -73,14 +73,14 @@ public class StringTaskEvent extends AbstractTaskEvent { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { StringRecord.writeString(out, this.message); } @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { this.message = StringRecord.readString(in); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileRequest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileRequest.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileRequest.java index dc50012..82e8686 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileRequest.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileRequest.java @@ -13,12 +13,12 @@ package eu.stratosphere.nephele.execution.librarycache; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.IOReadableWritable; import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; /** * A library cache profile request includes a set of library names and issues a task manager to report which of these @@ -54,7 +54,7 @@ public class LibraryCacheProfileRequest implements IOReadableWritable { @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { // Read required jar files this.requiredLibraries = new String[in.readInt()]; @@ -66,7 +66,7 @@ public class LibraryCacheProfileRequest implements IOReadableWritable { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { if (this.requiredLibraries == null) { throw new IOException("requiredLibraries is null"); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileResponse.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileResponse.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileResponse.java index 3a2c92c..7c1ec26 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileResponse.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileResponse.java @@ -13,12 +13,12 @@ package eu.stratosphere.nephele.execution.librarycache; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.IOReadableWritable; import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; /** * A library cache profile response is the response to a library cache profile request. It contains the set of @@ -92,7 +92,7 @@ public class LibraryCacheProfileResponse implements IOReadableWritable { @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { // Read the names of the required jar files this.requiredLibraries = new String[in.readInt()]; @@ -110,7 +110,7 @@ public class LibraryCacheProfileResponse implements IOReadableWritable { @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { if (this.requiredLibraries == null) { throw new IOException("requiredLibraries is null"); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheUpdate.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheUpdate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheUpdate.java index 05473c7..6f7c404 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheUpdate.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheUpdate.java @@ -13,11 +13,11 @@ package eu.stratosphere.nephele.execution.librarycache; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.IOReadableWritable; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; /** * This class is used to encapsulate the transmission of a library file in a Nephele RPC call. @@ -48,14 +48,14 @@ public class LibraryCacheUpdate implements IOReadableWritable { @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { LibraryCacheManager.readLibraryFromStream(in); } @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { if (this.libraryFileName == null) { throw new IOException("libraryFileName is null"); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/HardwareDescription.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/HardwareDescription.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/HardwareDescription.java index 83d650a..2ca6068 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/HardwareDescription.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/HardwareDescription.java @@ -13,11 +13,11 @@ package eu.stratosphere.nephele.instance; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import eu.stratosphere.core.io.IOReadableWritable; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; /** * A hardware description reflects the hardware environment which is actually present on the task manager's compute @@ -68,7 +68,7 @@ public final class HardwareDescription implements IOReadableWritable { } @Override - public void write(final DataOutput out) throws IOException { + public void write(final DataOutputView out) throws IOException { out.writeInt(this.numberOfCPUCores); out.writeLong(this.sizeOfPhysicalMemory); @@ -76,7 +76,7 @@ public final class HardwareDescription implements IOReadableWritable { } @Override - public void read(final DataInput in) throws IOException { + public void read(final DataInputView in) throws IOException { this.numberOfCPUCores = in.readInt(); this.sizeOfPhysicalMemory = in.readLong(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java index 98c3810..b47f649 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java @@ -13,14 +13,14 @@ package eu.stratosphere.nephele.instance; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import eu.stratosphere.core.io.IOReadableWritable; import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; import eu.stratosphere.util.StringUtils; /** @@ -206,7 +206,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable