Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D5BDE11D70 for ; Tue, 10 Jun 2014 19:36:49 +0000 (UTC) Received: (qmail 67768 invoked by uid 500); 10 Jun 2014 19:36:49 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 67752 invoked by uid 500); 10 Jun 2014 19:36:49 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 67743 invoked by uid 99); 10 Jun 2014 19:36:49 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Jun 2014 19:36:49 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 10 Jun 2014 19:35:26 +0000 Received: (qmail 59677 invoked by uid 99); 10 Jun 2014 19:35:00 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Jun 2014 19:35:00 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 05E7191DAB6; Tue, 10 Jun 2014 19:34:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.incubator.apache.org Date: Tue, 10 Jun 2014 19:35:07 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [10/34] Offer buffer-oriented API for I/O (#25) X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java index 965a5aa..06c857e 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java @@ -13,44 +13,46 @@ package eu.stratosphere.nephele.jobmanager; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.GlobalConfiguration; import eu.stratosphere.core.fs.Path; import eu.stratosphere.nephele.client.JobClient; import eu.stratosphere.nephele.client.JobExecutionException; -import eu.stratosphere.nephele.io.DistributionPattern; -import eu.stratosphere.nephele.io.channels.ChannelType; -import eu.stratosphere.nephele.io.library.FileLineReader; -import eu.stratosphere.nephele.io.library.FileLineWriter; +import eu.stratosphere.nephele.jobgraph.DistributionPattern; +import eu.stratosphere.runtime.io.channels.ChannelType; import eu.stratosphere.nephele.jobgraph.JobFileInputVertex; import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException; import eu.stratosphere.nephele.jobgraph.JobTaskVertex; import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode; +import eu.stratosphere.nephele.taskmanager.Task; import eu.stratosphere.nephele.taskmanager.TaskManager; -import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask; +import eu.stratosphere.nephele.util.FileLineReader; +import eu.stratosphere.nephele.util.FileLineWriter; import eu.stratosphere.nephele.util.JarFileCreator; import eu.stratosphere.nephele.util.ServerTestUtils; import eu.stratosphere.util.LogUtils; +import eu.stratosphere.util.StringUtils; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; /** * This test is intended to cover the basic functionality of the {@link JobManager}. @@ -185,8 +187,8 @@ public class JobManagerITCase { // connect vertices try { i1.connectTo(t1, ChannelType.NETWORK); - t1.connectTo(t2, ChannelType.INMEMORY); - t2.connectTo(o1, ChannelType.INMEMORY); + t1.connectTo(t2, ChannelType.IN_MEMORY); + t2.connectTo(o1, ChannelType.IN_MEMORY); } catch (JobGraphDefinitionException e) { e.printStackTrace(); } @@ -286,8 +288,8 @@ public class JobManagerITCase { o1.setVertexToShareInstancesWith(i1); // connect vertices - i1.connectTo(t1, ChannelType.INMEMORY); - t1.connectTo(o1, ChannelType.INMEMORY); + i1.connectTo(t1, ChannelType.IN_MEMORY); + t1.connectTo(o1, ChannelType.IN_MEMORY); // add jar jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + exceptionClassName + ".jar") @@ -297,7 +299,7 @@ public class JobManagerITCase { jobClient = new JobClient(jg, configuration); // deactivate logging of expected test exceptions - Logger rtLogger = Logger.getLogger(RuntimeTask.class); + Logger rtLogger = Logger.getLogger(Task.class); Level rtLevel = rtLogger.getEffectiveLevel(); rtLogger.setLevel(Level.OFF); @@ -382,8 +384,8 @@ public class JobManagerITCase { o1.setVertexToShareInstancesWith(i1); // connect vertices - i1.connectTo(t1, ChannelType.INMEMORY); - t1.connectTo(o1, ChannelType.INMEMORY); + i1.connectTo(t1, ChannelType.IN_MEMORY); + t1.connectTo(o1, ChannelType.IN_MEMORY); // add jar jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + runtimeExceptionClassName @@ -492,8 +494,8 @@ public class JobManagerITCase { // connect vertices try { i1.connectTo(t1, ChannelType.NETWORK); - t1.connectTo(t2, ChannelType.INMEMORY); - t2.connectTo(o1, ChannelType.INMEMORY); + t1.connectTo(t2, ChannelType.IN_MEMORY); + t2.connectTo(o1, ChannelType.IN_MEMORY); } catch (JobGraphDefinitionException e) { e.printStackTrace(); } @@ -583,9 +585,9 @@ public class JobManagerITCase { o1.setVertexToShareInstancesWith(i1); // connect vertices - i1.connectTo(t1, ChannelType.INMEMORY, DistributionPattern.POINTWISE); + i1.connectTo(t1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE); i1.connectTo(t1, ChannelType.NETWORK, DistributionPattern.BIPARTITE); - t1.connectTo(o1, ChannelType.INMEMORY); + t1.connectTo(o1, ChannelType.IN_MEMORY); // add jar jg.addJar(new Path(jarFile.toURI())); @@ -657,7 +659,7 @@ public class JobManagerITCase { o1.setVertexToShareInstancesWith(i1); // connect vertices - i1.connectTo(o1, ChannelType.INMEMORY); + i1.connectTo(o1, ChannelType.IN_MEMORY); // add jar jg.addJar(new Path(jarFile.toURI())); @@ -751,9 +753,9 @@ public class JobManagerITCase { u1.setVertexToShareInstancesWith(o1); // connect vertices - i1.connectTo(u1, ChannelType.INMEMORY, DistributionPattern.POINTWISE); - i2.connectTo(u1, ChannelType.INMEMORY); - u1.connectTo(o1, ChannelType.INMEMORY); + i1.connectTo(u1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE); + i2.connectTo(u1, ChannelType.IN_MEMORY); + u1.connectTo(o1, ChannelType.IN_MEMORY); // add jar jg.addJar(new Path(jarFile.toURI())); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java index 3b95133..124a24d 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java @@ -14,9 +14,9 @@ package eu.stratosphere.nephele.jobmanager; import eu.stratosphere.core.io.StringRecord; -import eu.stratosphere.nephele.io.MutableRecordReader; -import eu.stratosphere.nephele.io.RecordWriter; -import eu.stratosphere.nephele.io.UnionRecordReader; +import eu.stratosphere.runtime.io.api.MutableRecordReader; +import eu.stratosphere.runtime.io.api.RecordWriter; +import eu.stratosphere.runtime.io.api.UnionRecordReader; import eu.stratosphere.nephele.template.AbstractTask; /** @@ -41,13 +41,17 @@ public class UnionTask extends AbstractTask { recordReaders[1] = new MutableRecordReader(this); this.unionReader = new UnionRecordReader(recordReaders, StringRecord.class); - this.writer = new RecordWriter(this, StringRecord.class); + this.writer = new RecordWriter(this); } @Override public void invoke() throws Exception { + this.writer.initializeSerializers(); + while (this.unionReader.hasNext()) { this.writer.emit(this.unionReader.next()); } + + this.writer.flush(); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java index 9fc4256..f1e3191 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.List; +import eu.stratosphere.runtime.io.api.RecordWriter; import org.junit.Test; import eu.stratosphere.core.io.StringRecord; @@ -29,9 +30,8 @@ import eu.stratosphere.nephele.executiongraph.ExecutionGraph; import eu.stratosphere.nephele.executiongraph.ExecutionVertex; import eu.stratosphere.nephele.executiongraph.GraphConversionException; import eu.stratosphere.nephele.instance.InstanceManager; -import eu.stratosphere.nephele.io.RecordReader; -import eu.stratosphere.nephele.io.RecordWriter; -import eu.stratosphere.nephele.io.channels.ChannelType; +import eu.stratosphere.runtime.io.api.RecordReader; +import eu.stratosphere.runtime.io.channels.ChannelType; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException; import eu.stratosphere.nephele.jobgraph.JobInputVertex; @@ -57,7 +57,7 @@ public class QueueSchedulerTest { */ @Override public void registerInputOutput() { - new RecordWriter(this, StringRecord.class); + new RecordWriter(this); } /** @@ -145,7 +145,7 @@ public class QueueSchedulerTest { final TestDeploymentManager tdm = new TestDeploymentManager(); final QueueScheduler scheduler = new QueueScheduler(tdm, tim); - final ExecutionGraph executionGraph = createExecutionGraph(ChannelType.INMEMORY, tim); + final ExecutionGraph executionGraph = createExecutionGraph(ChannelType.IN_MEMORY, tim); try { try { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java index 4fd1ac1..630f365 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java @@ -19,7 +19,7 @@ import java.util.Iterator; import org.junit.Test; -import eu.stratosphere.nephele.io.channels.ChannelType; +import eu.stratosphere.runtime.io.channels.ChannelType; import eu.stratosphere.nephele.jobgraph.JobID; import eu.stratosphere.nephele.util.ManagementTestUtils; @@ -275,7 +275,7 @@ public class ManagementGraphTest { // Group Edges new ManagementGroupEdge(groupVertex1, 0, groupVertex2, 0, ChannelType.NETWORK); new ManagementGroupEdge(groupVertex2, 0, groupVertex3, 0, ChannelType.NETWORK); - new ManagementGroupEdge(groupVertex3, 0, groupVertex4, 0, ChannelType.INMEMORY); + new ManagementGroupEdge(groupVertex3, 0, groupVertex4, 0, ChannelType.IN_MEMORY); // Edges new ManagementEdge(new ManagementEdgeID(), new ManagementEdgeID(), outputGate1_1, 0, inputGate2_1, 0, @@ -287,7 +287,7 @@ public class ManagementGraphTest { new ManagementEdge(new ManagementEdgeID(), new ManagementEdgeID(), outputGate2_2, 0, inputGate3_1, 1, ChannelType.NETWORK); new ManagementEdge(new ManagementEdgeID(), new ManagementEdgeID(), outputGate3_1, 0, inputGate4_1, 0, - ChannelType.INMEMORY); + ChannelType.IN_MEMORY); return graph; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java index dda9491..f1b83a6 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java @@ -74,7 +74,7 @@ public class InstanceProfilerTest { @Before public void setUp() throws Exception { initMocks(this); - when(this.infoMock.getAddress()).thenReturn(this.addressMock); + when(this.infoMock.address()).thenReturn(this.addressMock); when(this.addressMock.getHostAddress()).thenReturn("192.168.1.1"); whenNew(FileReader.class).withArguments(InstanceProfiler.PROC_STAT).thenReturn(this.cpuReaderMock); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java deleted file mode 100644 index 1c9efd5..0000000 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java +++ /dev/null @@ -1,358 +0,0 @@ -/*********************************************************************************************************************** - * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - **********************************************************************************************************************/ - -package eu.stratosphere.nephele.taskmanager.transferenvelope; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.util.ArrayDeque; -import java.util.Queue; - -import org.junit.Test; - -import eu.stratosphere.core.memory.MemorySegment; -import eu.stratosphere.nephele.io.channels.Buffer; -import eu.stratosphere.nephele.io.channels.BufferFactory; -import eu.stratosphere.nephele.io.channels.ChannelID; -import eu.stratosphere.nephele.io.channels.MemoryBuffer; -import eu.stratosphere.nephele.jobgraph.JobID; -import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener; -import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider; -import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProviderBroker; -import eu.stratosphere.nephele.util.BufferPoolConnector; -import eu.stratosphere.nephele.util.InterruptibleByteChannel; -import eu.stratosphere.util.StringUtils; - -/** - * This class contains tests covering the deserialization of a byte stream to a transfer envelope. - * - */ -public class DefaultDeserializerTest { - - /** - * The size of the test byte buffers in byte. - */ - private static final int TEST_BUFFER_CAPACITY = 1024; - - /** - * The sequence number to be used during the tests. - */ - private static final int SEQUENCE_NUMBER = 0; - - /** - * The job ID to be used during the tests. - */ - private static final JobID JOB_ID = new JobID(); - - /** - * The channel ID to be used during the tests. - */ - private static final ChannelID CHANNEL_ID = new ChannelID(); - - /** - * A dummy implementation of a {@link BufferProvider} which is used in this test. - *

- * This class is not thread-safe. - * - */ - private static final class TestBufferProvider implements BufferProvider { - - /** - * Stores the available byte buffers. - */ - private final Queue bufferPool; - - /** - * Constructs a new test buffer provider. - * - * @param numberOfBuffers - * the number of byte buffers this pool has available. - */ - private TestBufferProvider(final int numberOfBuffers) { - - this.bufferPool = new ArrayDeque(numberOfBuffers); - for (int i = 0; i < numberOfBuffers; ++i) { - this.bufferPool.add(new MemorySegment(new byte[TEST_BUFFER_CAPACITY])); - } - } - - /** - * {@inheritDoc} - */ - @Override - public Buffer requestEmptyBuffer(final int minimumSizeOfBuffer) throws IOException { - - if (this.bufferPool.isEmpty()) { - return null; - } - - return BufferFactory.createFromMemory(minimumSizeOfBuffer, this.bufferPool.poll(), - new BufferPoolConnector(this.bufferPool)); - } - - /** - * {@inheritDoc} - */ - @Override - public Buffer requestEmptyBufferBlocking(final int minimumSizeOfBuffer) throws IOException, - InterruptedException { - - throw new IllegalStateException("requestEmptyBufferBlocking called"); - } - - /** - * {@inheritDoc} - */ - @Override - public int getMaximumBufferSize() { - - throw new IllegalStateException("getMaximumBufferSize called"); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isShared() { - - throw new IllegalStateException("isShared called"); - } - - /** - * {@inheritDoc} - */ - @Override - public void reportAsynchronousEvent() { - - throw new IllegalStateException("reportAsynchronousEvent called"); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean registerBufferAvailabilityListener(final BufferAvailabilityListener bufferAvailabilityListener) { - - throw new IllegalStateException("registerBufferAvailabilityListener called"); - } - } - - /** - * A dummy implementation of a {@link BufferProviderBroker} which is used during this test. - *

- * This class is not thread-safe. - * - */ - private static final class TestBufferProviderBroker implements BufferProviderBroker { - - private final BufferProvider bufferProvider; - - private TestBufferProviderBroker(final BufferProvider bufferProvider) { - this.bufferProvider = bufferProvider; - } - - /** - * {@inheritDoc} - */ - @Override - public BufferProvider getBufferProvider(final JobID jobID, final ChannelID sourceChannelID) throws IOException, - InterruptedException { - - return this.bufferProvider; - } - } - - /** - * Constructs an {@link InterruptibleByteChannel} from which the deserializer to be tested can read its data. - * - * @param readInterruptPositions - * the positions after which the byte stream shall be interrupted - * @param testBufferSize - * the size of the test buffer to create - * @return an {@link InterruptibleByteChannel} holding the serialized data in memory - * @throws IOException - * thrown if an error occurs while serializing the original data - */ - private ReadableByteChannel createByteChannel(final int[] readInterruptPositions, final int testBufferSize) - throws IOException { - - final TransferEnvelope te = new TransferEnvelope(SEQUENCE_NUMBER, JOB_ID, CHANNEL_ID); - - if (testBufferSize >= 0) { - - if (testBufferSize > 100) { - throw new IllegalStateException("Test buffer size can be 100 bytes at most"); - } - - final Queue bufferPool = new ArrayDeque(); - final MemorySegment ms = new MemorySegment(new byte[TEST_BUFFER_CAPACITY]); - - final MemoryBuffer buffer = BufferFactory.createFromMemory(ms.size(), ms, new BufferPoolConnector(bufferPool)); - - final ByteBuffer srcBuffer = ByteBuffer.allocate(testBufferSize); - for (int i = 0; i < testBufferSize; ++i) { - srcBuffer.put((byte) i); - } - srcBuffer.flip(); - - buffer.write(srcBuffer); - buffer.flip(); - te.setBuffer(buffer); - } - - final DefaultSerializer ds = new DefaultSerializer(); - ds.setTransferEnvelope(te); - - final InterruptibleByteChannel ibc = new InterruptibleByteChannel(null, readInterruptPositions); - - while (ds.write(ibc)); - - ibc.switchToReadPhase(); - - return ibc; - } - - /** - * Executes the deserialization method. - * - * @param rbc - * the byte channel to read the serialized data from - * @param bpb - * the buffer provider broker to request empty buffers from - * @return the deserialized transfer envelope - * @throws IOException - * thrown if an error occurs during the deserialization process - * @throws NoBufferAvailableException - * thrown if the buffer provider broker could not provide an empty buffer - */ - private TransferEnvelope executeDeserialization(final ReadableByteChannel rbc, final BufferProviderBroker bpb) - throws IOException, NoBufferAvailableException { - - final DefaultDeserializer dd = new DefaultDeserializer(bpb); - - TransferEnvelope te = dd.getFullyDeserializedTransferEnvelope(); - while (te == null) { - - dd.read(rbc); - te = dd.getFullyDeserializedTransferEnvelope(); - } - - assertEquals(SEQUENCE_NUMBER, te.getSequenceNumber()); - assertEquals(JOB_ID, te.getJobID()); - assertEquals(CHANNEL_ID, te.getSource()); - - return te; - } - - /** - * Tests the deserialization process of a {@link TransferEnvelope} with a buffer when no interruption of the byte - * stream. - */ - @Test - public void testDeserializationWithBufferAndWithoutInterruption() { - - try { - - final ReadableByteChannel rbc = createByteChannel(null, 10); - - final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(1)); - - final TransferEnvelope te = executeDeserialization(rbc, tbpb); - - assertNotNull(te.getBuffer()); - assertEquals(10, te.getBuffer().size()); - - } catch (IOException ioe) { - fail(StringUtils.stringifyException(ioe)); - } catch (NoBufferAvailableException nbae) { - fail(StringUtils.stringifyException(nbae)); - } - } - - /** - * Tests the deserialization process of a {@link TransferEnvelope} with a buffer and interruptions of the byte - * stream. - */ - @Test - public void testDeserializationWithBufferAndInterruptions() { - - try { - - final ReadableByteChannel rbc = createByteChannel(new int[] { 3, 7, 24, 52 }, 10); - - final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(1)); - - final TransferEnvelope te = executeDeserialization(rbc, tbpb); - - assertNotNull(te.getBuffer()); - assertEquals(10, te.getBuffer().size()); - - } catch (IOException ioe) { - fail(StringUtils.stringifyException(ioe)); - } catch (NoBufferAvailableException nbae) { - fail(StringUtils.stringifyException(nbae)); - } - } - - /** - * Tests the deserialization process of a {@link TransferEnvelope} without a buffer and without interruptions of the - * byte stream. - */ - @Test - public void testDeserializationWithoutBufferAndInterruptions() { - - try { - - final ReadableByteChannel rbc = createByteChannel(null, -1); - - final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(1)); - - final TransferEnvelope te = executeDeserialization(rbc, tbpb); - - assertNull(te.getBuffer()); - - } catch (IOException ioe) { - fail(StringUtils.stringifyException(ioe)); - } catch (NoBufferAvailableException nbae) { - fail(StringUtils.stringifyException(nbae)); - } - } - - /** - * Tests the deserialization process in case the buffer provide cannot deliver an empty buffer to read the byte - * stream into. - */ - @Test - public void testDeserializationWithNoBufferAvailable() { - - try { - final ReadableByteChannel rbc = createByteChannel(null, 10); - final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(0)); - executeDeserialization(rbc, tbpb); - - } catch (IOException ioe) { - fail(StringUtils.stringifyException(ioe)); - } catch (NoBufferAvailableException nbae) { - // Expected exception was successfully caught - return; - } - - fail("Expected NoBufferAvailableException but has not been thrown"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java deleted file mode 100644 index a50fbe2..0000000 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java +++ /dev/null @@ -1,313 +0,0 @@ -/*********************************************************************************************************************** - * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - **********************************************************************************************************************/ - -package eu.stratosphere.nephele.taskmanager.transferenvelope; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.ArrayDeque; -import java.util.Deque; - -import org.junit.Test; - -import eu.stratosphere.core.memory.MemorySegment; -import eu.stratosphere.nephele.io.AbstractID; -import eu.stratosphere.nephele.io.channels.BufferFactory; -import eu.stratosphere.nephele.io.channels.ChannelID; -import eu.stratosphere.nephele.io.channels.MemoryBuffer; -import eu.stratosphere.nephele.jobgraph.JobID; -import eu.stratosphere.nephele.util.BufferPoolConnector; -import eu.stratosphere.nephele.util.ServerTestUtils; - -/** - * This class contains tests covering the serialization of transfer envelopes to a byte stream. - * - */ -public class DefaultSerializerTest { - - /** - * The maximum size of the transfer envelope's buffer. - */ - private static final int BUFFER_SIZE = 4096; // 4 KB; - - /** - * An arbitrarily chosen byte used to fill the transfer envelope's buffer. - */ - private static final byte BUFFER_CONTENT = 13; - - /** - * The size of a sequence number. - */ - private static final int SIZE_OF_SEQ_NR = 4; - - /** - * The size of an ID. - */ - private static final int SIZE_OF_ID = 16; - - /** - * The size of an integer number. - */ - private static final int SIZE_OF_INTEGER = 4; - - /** - * The job ID used during the serialization process. - */ - private final JobID jobID = new JobID(); - - /** - * The target channel ID used during the serialization process. - */ - private final ChannelID sourceChannelID = new ChannelID(); - - /** - * Auxiliary class to explicitly access the internal buffer of an ID object. - * - */ - private static class SerializationTestID extends AbstractID { - - /** - * Constructs a new ID. - * - * @param content - * a byte buffer representing the ID - */ - private SerializationTestID(byte[] content) { - super(content); - } - } - - /** - * This test checks the correctness of the serialization of {@link TransferEnvelope} objects. - */ - @Test - public void testSerialization() { - - try { - - // Generate test file - final File testFile = generateDataStream(); - - // Analyze the test file - analyzeStream(testFile); - - // Delete the test file - testFile.delete(); - - } catch (IOException e) { - fail(e.getMessage()); - } - } - - /** - * Generates and serializes a series of {@link TransferEnvelope} objects to a random file. - * - * @return the file containing the serializes envelopes - * @throws IOException - * thrown if an I/O error occurs while writing the envelopes - */ - private File generateDataStream() throws IOException { - - final File outputFile = new File(ServerTestUtils.getTempDir() + File.separator - + ServerTestUtils.getRandomFilename()); - final FileOutputStream outputStream = new FileOutputStream(outputFile); - final FileChannel fileChannel = outputStream.getChannel(); - final Deque recycleQueue = new ArrayDeque(); - final DefaultSerializer serializer = new DefaultSerializer(); - final MemorySegment byteBuffer = new MemorySegment(new byte[BUFFER_SIZE]); - final ByteBuffer initBuffer = ByteBuffer.allocate(1); - - // The byte buffer is initialized from this buffer - initBuffer.put(BUFFER_CONTENT); - initBuffer.flip(); - - // Put byte buffer to recycled queue - recycleQueue.add(byteBuffer); - - for (int i = 0; i < BUFFER_SIZE; i++) { - - final MemoryBuffer buffer = BufferFactory.createFromMemory(i, recycleQueue.poll(), new BufferPoolConnector( - recycleQueue)); - - // Initialize buffer - for (int j = 0; j < i; j++) { - buffer.write(initBuffer); - initBuffer.position(0); - } - buffer.flip(); - - final TransferEnvelope transferEnvelope = new TransferEnvelope(i, this.jobID, this.sourceChannelID); - transferEnvelope.setBuffer(buffer); - - // set envelope to be serialized and write it to file channel - serializer.setTransferEnvelope(transferEnvelope); - while (serializer.write(fileChannel)); - - // Put buffer back to the recycling queue - buffer.recycleBuffer(); - } - - fileChannel.close(); - - return outputFile; - } - - /** - * Analyzes the given test file and checks whether its content matches Nephele's serialization pattern. - * - * @param testFile - * the test file to analyze - * @throws IOException - * thrown if an I/O error occurs while reading the test file - */ - private void analyzeStream(File testFile) throws IOException { - - FileInputStream fileInputStream = new FileInputStream(testFile); - - for (int i = 0; i < BUFFER_SIZE; i++) { - - readAndCheckSequenceNumber(fileInputStream, i); - readAndCheckID(fileInputStream, this.jobID); - readAndCheckID(fileInputStream, this.sourceChannelID); - readAndCheckNotificationList(fileInputStream); - readAndCheckBuffer(fileInputStream, i); - } - - fileInputStream.close(); - } - - /** - * Attempts to read a buffer of the given size from the file stream and checks the buffer's content. - * - * @param fileInputStream - * the file stream to read from - * @param expectedBufferSize - * the expected size of the buffer - * @throws IOException - * thrown if an error occurs while reading from the file stream - */ - private static void readAndCheckBuffer(FileInputStream fileInputStream, int expectedBufferSize) throws IOException { - - // Check if buffer exists - assertEquals(1L, fileInputStream.read()); - - byte[] temp = new byte[SIZE_OF_INTEGER]; - fileInputStream.read(temp); - int bufferSize = bufferToInteger(temp); - - assertEquals(expectedBufferSize, bufferSize); - - byte[] buffer = new byte[bufferSize]; - int r = fileInputStream.read(buffer); - for (int i = 0; i < buffer.length; i++) { - assertEquals(BUFFER_CONTENT, buffer[i]); - } - } - - /** - * Attempts to read an empty notification list from the given file input stream. - * - * @param fileInputStream - * the file input stream to read from - * @throws IOException - * thrown if an I/O occurs while reading data from the stream - */ - private void readAndCheckNotificationList(FileInputStream fileInputStream) throws IOException { - - if (fileInputStream.read() != 0) { - - byte[] temp = new byte[SIZE_OF_INTEGER]; - - fileInputStream.read(temp); - final int sizeOfDataBlock = bufferToInteger(temp); - - assertEquals(SIZE_OF_INTEGER, sizeOfDataBlock); - - fileInputStream.read(temp); - final int sizeOfNotificationList = bufferToInteger(temp); - - assertEquals(0, sizeOfNotificationList); - } - } - - /** - * Attempts to read an integer number from the given file input stream and compares it to - * expectedSequenceNumber. - * - * @param fileInputStream - * the file input stream to read from - * @param expectedSeqNumber - * the integer number the read number is expected to match - * @throws IOException - * thrown if an I/O occurs while reading data from the stream - */ - private void readAndCheckSequenceNumber(FileInputStream fileInputStream, int expectedSeqNumber) throws IOException { - - byte[] temp = new byte[SIZE_OF_SEQ_NR]; - fileInputStream.read(temp); - int seqNumber = bufferToInteger(temp); - - assertEquals(seqNumber, expectedSeqNumber); - } - - /** - * Attempts to read a channel ID from the given file input stream and compares it to expectedChannelID. - * - * @param fileInputStream - * the file input stream to read from - * @param expectedID - * the ID which the read ID is expected to match - * @throws IOException - * thrown if an I/O occurs while reading data from the stream - */ - private void readAndCheckID(FileInputStream fileInputStream, AbstractID expectedID) throws IOException { - - byte[] temp = new byte[SIZE_OF_INTEGER]; - fileInputStream.read(temp); - - final int sizeOfID = bufferToInteger(temp); // ID has fixed size and therefore does not announce its size - - assertEquals(sizeOfID, SIZE_OF_ID); - - byte[] id = new byte[sizeOfID]; - fileInputStream.read(id); - - final AbstractID channelID = new SerializationTestID(id); - assertEquals(expectedID, channelID); - } - - /** - * Converts the first four bytes of the provided buffer's content to an integer number. - * - * @param buffer - * the buffer to convert - * @return the integer number converted from the first four bytes of the buffer's content - */ - private static int bufferToInteger(byte[] buffer) { - - int integer = 0; - - for (int i = 0; i < SIZE_OF_INTEGER; ++i) { - integer |= (buffer[SIZE_OF_INTEGER - 1 - i] & 0xff) << (i << 3); - } - - return integer; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java deleted file mode 100644 index 3f2c79e..0000000 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java +++ /dev/null @@ -1,53 +0,0 @@ -/*********************************************************************************************************************** - * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - **********************************************************************************************************************/ - -package eu.stratosphere.nephele.util; - -import java.util.Queue; - -import eu.stratosphere.core.memory.MemorySegment; -import eu.stratosphere.nephele.io.channels.MemoryBufferPoolConnector; - -/** - * This is a simple implementation of a {@link MemoryBufferPoolConnector} used for the server unit tests. - *

- * This class is thread-safe. - * - */ -public final class BufferPoolConnector implements MemoryBufferPoolConnector { - - /** - * Reference to the memory pool the byte buffer was originally taken from. - */ - private final Queue memoryPool; - - /** - * Constructs a new buffer pool connector - * - * @param bufferPool - * a reference to the memory pool the byte buffer was originally taken from - */ - public BufferPoolConnector(final Queue bufferPool) { - this.memoryPool = bufferPool; - } - - - @Override - public void recycle(final MemorySegment memSeg) { - - synchronized (this.memoryPool) { - this.memoryPool.add(memSeg); - this.memoryPool.notify(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java new file mode 100644 index 0000000..9d4d2a5 --- /dev/null +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java @@ -0,0 +1,24 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ +package eu.stratosphere.nephele.util; + +import eu.stratosphere.core.memory.MemorySegment; +import eu.stratosphere.runtime.io.BufferRecycler; + +public class DiscardingRecycler implements BufferRecycler { + + @Override + public void recycle(MemorySegment memSeg) {} +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java new file mode 100644 index 0000000..fcb4fa1 --- /dev/null +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java @@ -0,0 +1,80 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.nephele.util; + +import java.util.Iterator; + +import eu.stratosphere.core.fs.FSDataInputStream; +import eu.stratosphere.core.fs.FileInputSplit; +import eu.stratosphere.core.fs.FileSystem; +import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.runtime.io.api.RecordWriter; +import eu.stratosphere.nephele.template.AbstractFileInputTask; +import eu.stratosphere.runtime.fs.LineReader; + +/** + * A file line reader reads the associated file input splits line by line and outputs the lines as string records. + * + */ +public class FileLineReader extends AbstractFileInputTask { + + private RecordWriter output = null; + + @Override + public void invoke() throws Exception { + + output.initializeSerializers(); + + final Iterator splitIterator = getFileInputSplits(); + + while (splitIterator.hasNext()) { + + final FileInputSplit split = splitIterator.next(); + + long start = split.getStart(); + long length = split.getLength(); + + final FileSystem fs = FileSystem.get(split.getPath().toUri()); + + final FSDataInputStream fdis = fs.open(split.getPath()); + + final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024)); + + byte[] line = lineReader.readLine(); + + while (line != null) { + + // Create a string object from the data read + StringRecord str = new StringRecord(); + str.set(line); + + // Send out string + output.emit(str); + + line = lineReader.readLine(); + } + + // Close the stream; + lineReader.close(); + } + + this.output.flush(); + } + + @Override + public void registerInputOutput() { + output = new RecordWriter(this); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java new file mode 100644 index 0000000..bc738df --- /dev/null +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java @@ -0,0 +1,75 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.nephele.util; + +import eu.stratosphere.core.fs.FSDataOutputStream; +import eu.stratosphere.core.fs.FileStatus; +import eu.stratosphere.core.fs.FileSystem; +import eu.stratosphere.core.fs.Path; +import eu.stratosphere.core.io.StringRecord; +import eu.stratosphere.runtime.io.api.RecordReader; +import eu.stratosphere.nephele.template.AbstractFileOutputTask; + +/** + * A file line writer reads string records its input gate and writes them to the associated output file. + * + */ +public class FileLineWriter extends AbstractFileOutputTask { + + /** + * The record reader through which incoming string records are received. + */ + private RecordReader input = null; + + + @Override + public void invoke() throws Exception { + + Path outputPath = getFileOutputPath(); + + FileSystem fs = FileSystem.get(outputPath.toUri()); + if (fs.exists(outputPath)) { + FileStatus status = fs.getFileStatus(outputPath); + + if (status.isDir()) { + outputPath = new Path(outputPath.toUri().toString() + "/file_" + getIndexInSubtaskGroup() + ".txt"); + } + } + + final FSDataOutputStream outputStream = fs.create(outputPath, true); + + while (this.input.hasNext()) { + + StringRecord record = this.input.next(); + byte[] recordByte = (record.toString() + "\r\n").getBytes(); + outputStream.write(recordByte, 0, recordByte.length); + } + + outputStream.close(); + + } + + + @Override + public void registerInputOutput() { + this.input = new RecordReader(this, StringRecord.class); + } + + + @Override + public int getMaximumNumberOfSubtasks() { + // The default implementation always returns -1 + return -1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java new file mode 100644 index 0000000..09b244f --- /dev/null +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java @@ -0,0 +1,76 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ +package eu.stratosphere.nephele.util; + +import eu.stratosphere.core.memory.MemorySegment; +import eu.stratosphere.runtime.io.Buffer; +import eu.stratosphere.runtime.io.BufferRecycler; +import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener; +import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider; + +import java.io.IOException; +import java.util.Random; + +public class TestBufferProvider implements BufferProvider { + + private final BufferRecycler recycler = new DiscardingRecycler(); + + private final Random rnd = new Random(); + + private final int sizeOfMemorySegments; + + private final float probabilityForNoneAvailable; + + + public TestBufferProvider(int sizeOfMemorySegments) { + this(sizeOfMemorySegments, -1.0f); + } + + public TestBufferProvider(int sizeOfMemorySegments, float probabilityForNoneAvailable) { + this.sizeOfMemorySegments = sizeOfMemorySegments; + this.probabilityForNoneAvailable = probabilityForNoneAvailable; + } + + @Override + public Buffer requestBuffer(int sizeOfBuffer) throws IOException { + if (rnd.nextFloat() < this.probabilityForNoneAvailable) { + return null; + } else { + MemorySegment segment = new MemorySegment(new byte[this.sizeOfMemorySegments]); + return new Buffer(segment, sizeOfBuffer, this.recycler); + } + } + + @Override + public Buffer requestBufferBlocking(int sizeOfBuffer) throws IOException, InterruptedException { + MemorySegment segment = new MemorySegment(new byte[this.sizeOfMemorySegments]); + return new Buffer(segment, sizeOfBuffer, this.recycler); + } + + @Override + public int getBufferSize() { + return Integer.MAX_VALUE; + } + + @Override + public void reportAsynchronousEvent() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java index 6c66b78..bfd0d42 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java @@ -42,8 +42,12 @@ import eu.stratosphere.types.IntValue; import eu.stratosphere.types.Key; import eu.stratosphere.types.Record; -public class DataSinkTaskTest extends TaskTestBase -{ +public class DataSinkTaskTest extends TaskTestBase { + + private static final int MEMORY_MANAGER_SIZE = 1024 * 1024; + + private static final int NETWORK_BUFFER_SIZE = 1024; + private static final Log LOG = LogFactory.getLog(DataSinkTaskTest.class); private final String tempTestPath = Path.constructTestPath("dst_test"); @@ -61,8 +65,8 @@ public class DataSinkTaskTest extends TaskTestBase int keyCnt = 100; int valCnt = 20; - - super.initEnvironment(1024 * 1024); + + super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0); DataSinkTask testTask = new DataSinkTask(); @@ -127,8 +131,8 @@ public class DataSinkTaskTest extends TaskTestBase int keyCnt = 100; int valCnt = 20; - - super.initEnvironment(1024 * 1024); + + super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 0, 0, false), 0); super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt, 0, false), 0); super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt*2, 0, false), 0); @@ -197,8 +201,8 @@ public class DataSinkTaskTest extends TaskTestBase int keyCnt = 100; int valCnt = 20; - - super.initEnvironment(1024 * 1024 * 4); + + super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE); super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0); DataSinkTask testTask = new DataSinkTask(); @@ -275,8 +279,8 @@ public class DataSinkTaskTest extends TaskTestBase int keyCnt = 100; int valCnt = 20; - - super.initEnvironment(1024 * 1024); + + super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0); DataSinkTask testTask = new DataSinkTask(); @@ -307,8 +311,8 @@ public class DataSinkTaskTest extends TaskTestBase int keyCnt = 100; int valCnt = 20; - - super.initEnvironment(4 * 1024 * 1024); + + super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE); super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0); DataSinkTask testTask = new DataSinkTask(); @@ -343,8 +347,8 @@ public class DataSinkTaskTest extends TaskTestBase @Test public void testCancelDataSinkTask() { - - super.initEnvironment(1024 * 1024); + + super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addInput(new InfiniteInputIterator(), 0); final DataSinkTask testTask = new DataSinkTask(); @@ -385,8 +389,8 @@ public class DataSinkTaskTest extends TaskTestBase @Test @SuppressWarnings("unchecked") public void testCancelSortingDataSinkTask() { - - super.initEnvironment(4 * 1024 * 1024); + + super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE); super.addInput(new InfiniteInputIterator(), 0); final DataSinkTask testTask = new DataSinkTask(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java index 0198db2..732cf8d 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java @@ -37,8 +37,12 @@ import eu.stratosphere.types.IntValue; import eu.stratosphere.types.Record; import eu.stratosphere.util.MutableObjectIterator; -public class DataSourceTaskTest extends TaskTestBase -{ +public class DataSourceTaskTest extends TaskTestBase { + + private static final int MEMORY_MANAGER_SIZE = 1024 * 1024; + + private static final int NETWORK_BUFFER_SIZE = 1024; + private List outList; private String tempTestPath = Path.constructTestPath("dst_test"); @@ -50,11 +54,9 @@ public class DataSourceTaskTest extends TaskTestBase tempTestFile.delete(); } } - @Test public void testDataSourceTask() { - int keyCnt = 100; int valCnt = 20; @@ -67,7 +69,7 @@ public class DataSourceTaskTest extends TaskTestBase Assert.fail("Unable to set-up test input file"); } - super.initEnvironment(1024 * 1024); + super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addOutput(this.outList); DataSourceTask testTask = new DataSourceTask(); @@ -110,7 +112,6 @@ public class DataSourceTaskTest extends TaskTestBase @Test public void testFailingDataSourceTask() { - int keyCnt = 20; int valCnt = 10; @@ -122,8 +123,8 @@ public class DataSourceTaskTest extends TaskTestBase } catch (IOException e1) { Assert.fail("Unable to set-up test input file"); } - - super.initEnvironment(1024 * 1024); + + super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addOutput(this.outList); DataSourceTask testTask = new DataSourceTask(); @@ -148,11 +149,10 @@ public class DataSourceTaskTest extends TaskTestBase @Test public void testCancelDataSourceTask() { - int keyCnt = 20; int valCnt = 4; - - super.initEnvironment(1024 * 1024); + + super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addOutput(new NirvanaOutputList()); try { @@ -184,7 +184,7 @@ public class DataSourceTaskTest extends TaskTestBase try { tct.join(); - taskRunner.join(); + taskRunner.join(); } catch(InterruptedException ie) { Assert.fail("Joining threads failed"); } @@ -192,12 +192,10 @@ public class DataSourceTaskTest extends TaskTestBase // assert that temp file was created File tempTestFile = new File(this.tempTestPath); Assert.assertTrue("Temp output file does not exist",tempTestFile.exists()); - } - private static class InputFilePreparator - { + private static class InputFilePreparator { public static void prepareInputFile(MutableObjectIterator inIt, String inputFilePath, boolean insertInvalidData) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java index 100bf7b..dda215e 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java @@ -43,7 +43,11 @@ import eu.stratosphere.util.LogUtils; public class ChainTaskTest extends TaskTestBase { - + + private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3; + + private static final int NETWORK_BUFFER_SIZE = 1024; + private final List outList = new ArrayList(); @SuppressWarnings("unchecked") @@ -65,9 +69,8 @@ public class ChainTaskTest extends TaskTestBase { final int valCnt = 20; try { - // environment - initEnvironment(3*1024*1024); + super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0); addOutput(this.outList); @@ -123,7 +126,7 @@ public class ChainTaskTest extends TaskTestBase { try { // environment - initEnvironment(3*1024*1024); + super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0); addOutput(this.outList); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java index cc00387..c66d821 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java @@ -26,6 +26,7 @@ import org.apache.commons.lang.NotImplementedException; import org.junit.Test; import eu.stratosphere.api.common.typeutils.TypeComparator; +import eu.stratosphere.runtime.io.api.ChannelSelector; import eu.stratosphere.api.common.typeutils.base.IntSerializer; import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparatorFactory; import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java index 8afc78f..0b968d8 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java @@ -27,6 +27,8 @@ import org.junit.Test; import eu.stratosphere.api.common.distributions.DataDistribution; import eu.stratosphere.api.common.distributions.UniformIntegerDistribution; +import eu.stratosphere.runtime.io.api.ChannelSelector; +import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparator; import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator; import eu.stratosphere.pact.runtime.shipping.RecordOutputEmitter; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java index 8e25082..a397312 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java @@ -13,38 +13,40 @@ package eu.stratosphere.pact.runtime.test.util; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.FutureTask; - import eu.stratosphere.configuration.Configuration; import eu.stratosphere.core.fs.Path; import eu.stratosphere.core.io.IOReadableWritable; +import eu.stratosphere.core.memory.MemorySegment; import eu.stratosphere.nephele.execution.Environment; -import eu.stratosphere.nephele.io.ChannelSelector; -import eu.stratosphere.nephele.io.GateID; -import eu.stratosphere.nephele.io.InputChannelResult; -import eu.stratosphere.nephele.io.InputGate; -import eu.stratosphere.nephele.io.MutableRecordDeserializerFactory; -import eu.stratosphere.nephele.io.OutputGate; -import eu.stratosphere.nephele.io.RecordAvailabilityListener; -import eu.stratosphere.nephele.io.RecordDeserializerFactory; -import eu.stratosphere.nephele.io.RuntimeInputGate; -import eu.stratosphere.nephele.io.RuntimeOutputGate; -import eu.stratosphere.nephele.io.channels.ChannelID; +import eu.stratosphere.runtime.io.gates.InputChannelResult; +import eu.stratosphere.runtime.io.gates.RecordAvailabilityListener; +import eu.stratosphere.runtime.io.serialization.AdaptiveSpanningRecordDeserializer; +import eu.stratosphere.runtime.io.Buffer; +import eu.stratosphere.runtime.io.channels.ChannelID; +import eu.stratosphere.runtime.io.gates.GateID; +import eu.stratosphere.runtime.io.gates.InputGate; +import eu.stratosphere.runtime.io.gates.OutputGate; import eu.stratosphere.nephele.jobgraph.JobID; import eu.stratosphere.nephele.protocols.AccumulatorProtocol; import eu.stratosphere.nephele.services.iomanager.IOManager; import eu.stratosphere.nephele.services.memorymanager.MemoryManager; import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager; +import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener; +import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider; +import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool; +import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner; import eu.stratosphere.nephele.template.InputSplitProvider; +import eu.stratosphere.runtime.io.serialization.RecordDeserializer; +import eu.stratosphere.runtime.io.serialization.RecordDeserializer.DeserializationResult; import eu.stratosphere.types.Record; import eu.stratosphere.util.MutableObjectIterator; -public class MockEnvironment implements Environment { +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +public class MockEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner { private final MemoryManager memManager; @@ -56,21 +58,24 @@ public class MockEnvironment implements Environment { private final Configuration taskConfiguration; - private final List> inputs; + private final List> inputs; - private final List> outputs; + private final List outputs; private final JobID jobID = new JobID(); - public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider) { + private final Buffer mockBuffer; + + public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { this.jobConfiguration = new Configuration(); this.taskConfiguration = new Configuration(); - this.inputs = new LinkedList>(); - this.outputs = new LinkedList>(); + this.inputs = new LinkedList>(); + this.outputs = new LinkedList(); this.memManager = new DefaultMemoryManager(memorySize); this.ioManager = new IOManager(System.getProperty("java.io.tmpdir")); this.inputSplitProvider = inputSplitProvider; + this.mockBuffer = new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, null); } public void addInput(MutableObjectIterator inputIterator) { @@ -103,13 +108,62 @@ public class MockEnvironment implements Environment { return this.jobID; } + @Override + public Buffer requestBuffer(int minBufferSize) throws IOException { + return mockBuffer; + } + + @Override + public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException { + return mockBuffer; + } + + @Override + public int getBufferSize() { + return this.mockBuffer.size(); + } + + @Override + public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) { + return false; + } + + @Override + public int getNumberOfChannels() { + return 1; + } + + @Override + public void setDesignatedNumberOfBuffers(int numBuffers) { + + } + + @Override + public void clearLocalBufferPool() { + + } + + @Override + public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) { + + } + + @Override + public void logBufferUtilization() { + + } + + @Override + public void reportAsynchronousEvent() { - private static class MockInputGate extends RuntimeInputGate { + } + + private static class MockInputGate extends InputGate { private MutableObjectIterator it; public MockInputGate(int id, MutableObjectIterator it) { - super(new JobID(), new GateID(), MutableRecordDeserializerFactory.get(), id); + super(new JobID(), new GateID(), id); this.it = it; } @@ -132,18 +186,43 @@ public class MockEnvironment implements Environment { } } - private static class MockOutputGate extends RuntimeOutputGate { + private class MockOutputGate extends OutputGate { private List out; + private RecordDeserializer deserializer; + + private Record record; + public MockOutputGate(int index, List outList) { - super(new JobID(), new GateID(), Record.class, index, null, false); + super(new JobID(), new GateID(), index); this.out = outList; + this.deserializer = new AdaptiveSpanningRecordDeserializer(); + this.record = new Record(); } @Override - public void writeRecord(Record record) throws IOException, InterruptedException { - out.add(record.createCopy()); + public void sendBuffer(Buffer buffer, int targetChannel) throws IOException, InterruptedException { + + this.deserializer.setNextMemorySegment(MockEnvironment.this.mockBuffer.getMemorySegment(), MockEnvironment.this.mockBuffer.size()); + + while (this.deserializer.hasUnfinishedData()) { + DeserializationResult result = this.deserializer.getNextRecord(this.record); + + if (result.isFullRecord()) { + this.out.add(this.record.createCopy()); + } + + if (result == DeserializationResult.LAST_RECORD_FROM_BUFFER || + result == DeserializationResult.PARTIAL_RECORD) { + break; + } + } + } + + @Override + public int getNumChannels() { + return 1; } } @@ -188,11 +267,6 @@ public class MockEnvironment implements Environment { } @Override - public GateID getNextUnboundOutputGateID() { - return null; - } - - @Override public int getNumberOfOutputGates() { return this.outputs.size(); } @@ -203,16 +277,6 @@ public class MockEnvironment implements Environment { } @Override - public void registerOutputGate(final OutputGate outputGate) { - // Nothing to do here - } - - @Override - public void registerInputGate(final InputGate inputGate) { - // Nothing to do here - } - - @Override public Set getOutputChannelIDs() { throw new IllegalStateException("getOutputChannelIDs called on MockEnvironment"); } @@ -242,18 +306,14 @@ public class MockEnvironment implements Environment { throw new IllegalStateException("getInputChannelIDsOfGate called on MockEnvironment"); } - @SuppressWarnings("unchecked") @Override - public OutputGate createOutputGate(GateID gateID, Class outputClass, - ChannelSelector selector, boolean isBroadcast) + public OutputGate createAndRegisterOutputGate() { - return (OutputGate) this.outputs.remove(0); + return this.outputs.remove(0); } - @SuppressWarnings("unchecked") @Override - public InputGate createInputGate(GateID gateID, - RecordDeserializerFactory deserializerFactory) + public InputGate createAndRegisterInputGate() { return (InputGate) this.inputs.remove(0); } @@ -275,8 +335,7 @@ public class MockEnvironment implements Environment { } @Override - public Map> getCopyTask() { - return null; + public BufferProvider getOutputBufferProvider() { + return this; } - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java index 826113c..a60b479 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java @@ -49,10 +49,10 @@ public abstract class TaskTestBase { protected MockEnvironment mockEnv; - public void initEnvironment(long memorySize) { + public void initEnvironment(long memorySize, int bufferSize) { this.memorySize = memorySize; this.inputSplitProvider = new MockInputSplitProvider(); - this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider); + this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider, bufferSize); } public void addInput(MutableObjectIterator input, int groupId) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java new file mode 100644 index 0000000..af46689 --- /dev/null +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java @@ -0,0 +1,78 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.runtime.fs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.PrintWriter; + +import org.junit.Test; + +import eu.stratosphere.core.fs.FSDataInputStream; +import eu.stratosphere.core.fs.Path; +import eu.stratosphere.core.fs.local.LocalFileSystem; +import eu.stratosphere.nephele.util.CommonTestUtils; + +/** + * This class tests the functionality of the LineReader class using a local filesystem. + * + */ + +public class LineReaderTest { + + /** + * This test tests the LineReader. So far only under usual conditions. + */ + @Test + public void testLineReader() { + final File testfile = new File(CommonTestUtils.getTempDir() + File.separator + + CommonTestUtils.getRandomFilename()); + final Path pathtotestfile = new Path(testfile.toURI().getPath()); + + try { + PrintWriter pw = new PrintWriter(testfile, "UTF8"); + + for (int i = 0; i < 100; i++) { + pw.append("line\n"); + } + pw.close(); + + LocalFileSystem lfs = new LocalFileSystem(); + FSDataInputStream fis = lfs.open(pathtotestfile); + + // first, we test under "usual" conditions + final LineReader lr = new LineReader(fis, 0, testfile.length(), 256); + + byte[] buffer; + int linecount = 0; + while ((buffer = lr.readLine()) != null) { + assertEquals(new String(buffer, "UTF8"), "line"); + linecount++; + } + assertEquals(linecount, 100); + + // the linereader can not handle situations with larger length than the total file... + + } catch (Exception e) { + fail(e.toString()); + e.printStackTrace(); + } finally { + testfile.delete(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java new file mode 100644 index 0000000..30b5219 --- /dev/null +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java @@ -0,0 +1,460 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.runtime.fs.s3; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; + +import org.junit.Before; +import org.junit.Test; + +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.configuration.GlobalConfiguration; +import eu.stratosphere.core.fs.BlockLocation; +import eu.stratosphere.core.fs.FSDataInputStream; +import eu.stratosphere.core.fs.FSDataOutputStream; +import eu.stratosphere.core.fs.FileStatus; +import eu.stratosphere.core.fs.FileSystem; +import eu.stratosphere.core.fs.Path; + +/** + * This test checks the S3 implementation of the {@link FileSystem} interface. + * + */ +public class S3FileSystemTest { + + /** + * The length of the bucket/object names used in this test. + */ + private static final int NAME_LENGTH = 32; + + /** + * The alphabet to generate the random bucket/object names from. + */ + private static final char[] ALPHABET = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', + 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' }; + + /** + * The size of the byte buffer used during the tests in bytes. + */ + private static final int TEST_BUFFER_SIZE = 128; + + /** + * The size of the small test file in bytes. + */ + private static final int SMALL_FILE_SIZE = 512; + + /** + * The size of the large test file in bytes. + */ + private static final int LARGE_FILE_SIZE = 1024 * 1024 * 12; // 12 MB + + /** + * The modulus to be used when generating the test data. Must not be larger than 128. + */ + private static final int MODULUS = 128; + + private static final String S3_BASE_URI = "s3:///"; + + /** + * Tries to read the AWS access key and the AWS secret key from the environments variables. If accessing these keys + * fails, all tests will be skipped and marked as successful. + */ + @Before + public void initKeys() { + final String accessKey = System.getenv("AK"); + final String secretKey = System.getenv("SK"); + + if (accessKey != null || secretKey != null) { + Configuration conf = new Configuration(); + if (accessKey != null) { + conf.setString(S3FileSystem.S3_ACCESS_KEY_KEY, accessKey); + } + if (secretKey != null) { + conf.setString(S3FileSystem.S3_SECRET_KEY_KEY, secretKey); + } + GlobalConfiguration.includeConfiguration(conf); + } + } + + /** + * This test creates and deletes a bucket inside S3 and checks it is correctly displayed inside the directory + * listing. + */ + @Test + public void createAndDeleteBucketTest() { + + if (!testActivated()) { + return; + } + + final String bucketName = getRandomName(); + final Path bucketPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR); + + try { + + final FileSystem fs = bucketPath.getFileSystem(); + + // Create directory + fs.mkdirs(bucketPath); + + // Check if directory is correctly displayed in file system hierarchy + final FileStatus[] content = fs.listStatus(new Path(S3_BASE_URI)); + boolean entryFound = false; + for (final FileStatus entry : content) { + if (bucketPath.equals(entry.getPath())) { + entryFound = true; + break; + } + } + + if (!entryFound) { + fail("Cannot find entry " + bucketName + " in directory " + S3_BASE_URI); + } + + // Check the concrete directory file status + try { + final FileStatus directoryFileStatus = fs.getFileStatus(bucketPath); + assertTrue(directoryFileStatus.isDir()); + assertEquals(0L, directoryFileStatus.getAccessTime()); + assertTrue(directoryFileStatus.getModificationTime() > 0L); + + } catch (FileNotFoundException e) { + fail(e.getMessage()); + } + + // Delete the bucket + fs.delete(bucketPath, true); + + // Make sure the bucket no longer exists + try { + fs.getFileStatus(bucketPath); + fail("Expected FileNotFoundException for " + bucketPath.toUri()); + } catch (FileNotFoundException e) { + // This is an expected exception + } + + } catch (IOException ioe) { + fail(ioe.getMessage()); + } + } + + /** + * Creates and reads the a larger test file in S3. The test file is generated according to a specific pattern. + * During the read phase the incoming data stream is also checked against this pattern. + */ + @Test + public void createAndReadLargeFileTest() { + + try { + createAndReadFileTest(LARGE_FILE_SIZE); + } catch (IOException ioe) { + fail(ioe.getMessage()); + } + } + + /** + * Creates and reads the a small test file in S3. The test file is generated according to a specific pattern. + * During the read phase the incoming data stream is also checked against this pattern. + */ + @Test + public void createAndReadSmallFileTest() { + + try { + createAndReadFileTest(SMALL_FILE_SIZE); + } catch (IOException ioe) { + fail(ioe.getMessage()); + } + } + + /** + * The tests checks the mapping of the file system directory structure to the underlying bucket/object model of + * Amazon S3. + */ + @Test + public void multiLevelDirectoryTest() { + + if (!testActivated()) { + return; + } + + final String dirName = getRandomName(); + final String subdirName = getRandomName(); + final String subsubdirName = getRandomName(); + final String fileName = getRandomName(); + final Path dir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR); + final Path subdir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR); + final Path subsubdir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR + + subsubdirName + Path.SEPARATOR); + final Path file = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR + fileName); + + try { + + final FileSystem fs = dir.getFileSystem(); + + fs.mkdirs(subsubdir); + + final OutputStream os = fs.create(file, true); + generateTestData(os, SMALL_FILE_SIZE); + os.close(); + + // On this directory levels there should only be one subdirectory + FileStatus[] list = fs.listStatus(dir); + int numberOfDirs = 0; + int numberOfFiles = 0; + for (final FileStatus entry : list) { + + if (entry.isDir()) { + ++numberOfDirs; + assertEquals(subdir, entry.getPath()); + } else { + fail(entry.getPath() + " is a file which must not appear on this directory level"); + } + } + + assertEquals(1, numberOfDirs); + assertEquals(0, numberOfFiles); + + list = fs.listStatus(subdir); + numberOfDirs = 0; + + for (final FileStatus entry : list) { + if (entry.isDir()) { + assertEquals(subsubdir, entry.getPath()); + ++numberOfDirs; + } else { + assertEquals(file, entry.getPath()); + ++numberOfFiles; + } + } + + assertEquals(1, numberOfDirs); + assertEquals(1, numberOfFiles); + + fs.delete(dir, true); + + } catch (IOException ioe) { + fail(ioe.getMessage()); + } + } + + /** + * This test checks the S3 implementation of the file system method to retrieve the block locations of a file. + */ + @Test + public void blockLocationTest() { + + if (!testActivated()) { + return; + } + + final String dirName = getRandomName(); + final String fileName = getRandomName(); + final Path dir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR); + final Path file = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + fileName); + + try { + + final FileSystem fs = dir.getFileSystem(); + + fs.mkdirs(dir); + + final OutputStream os = fs.create(file, true); + generateTestData(os, SMALL_FILE_SIZE); + os.close(); + + final FileStatus fileStatus = fs.getFileStatus(file); + assertNotNull(fileStatus); + + BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 0, SMALL_FILE_SIZE + 1); + assertNull(blockLocations); + + blockLocations = fs.getFileBlockLocations(fileStatus, 0, SMALL_FILE_SIZE); + assertEquals(1, blockLocations.length); + + final BlockLocation bl = blockLocations[0]; + assertNotNull(bl.getHosts()); + assertEquals(1, bl.getHosts().length); + assertEquals(SMALL_FILE_SIZE, bl.getLength()); + assertEquals(0, bl.getOffset()); + final URI s3Uri = fs.getUri(); + assertNotNull(s3Uri); + assertEquals(s3Uri.getHost(), bl.getHosts()[0]); + + fs.delete(dir, true); + + } catch (IOException ioe) { + fail(ioe.getMessage()); + } + } + + /** + * Creates and reads a file with the given size in S3. The test file is generated according to a specific pattern. + * During the read phase the incoming data stream is also checked against this pattern. + * + * @param fileSize + * the size of the file to be generated in bytes + * @throws IOException + * thrown if an I/O error occurs while writing or reading the test file + */ + private void createAndReadFileTest(final int fileSize) throws IOException { + + if (!testActivated()) { + return; + } + + final String bucketName = getRandomName(); + final String objectName = getRandomName(); + final Path bucketPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR); + final Path objectPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR + objectName); + + FileSystem fs = bucketPath.getFileSystem(); + + // Create test bucket + fs.mkdirs(bucketPath); + + // Write test file to S3 + final FSDataOutputStream outputStream = fs.create(objectPath, false); + generateTestData(outputStream, fileSize); + outputStream.close(); + + // Now read the same file back from S3 + final FSDataInputStream inputStream = fs.open(objectPath); + testReceivedData(inputStream, fileSize); + inputStream.close(); + + // Delete test bucket + fs.delete(bucketPath, true); + } + + /** + * Receives test data from the given input stream and checks the size of the data as well as the pattern inside the + * received data. + * + * @param inputStream + * the input stream to read the test data from + * @param expectedSize + * the expected size of the data to be read from the input stream in bytes + * @throws IOException + * thrown if an error occurs while reading the data + */ + private void testReceivedData(final InputStream inputStream, final int expectedSize) throws IOException { + + final byte[] testBuffer = new byte[TEST_BUFFER_SIZE]; + + int totalBytesRead = 0; + int nextExpectedNumber = 0; + while (true) { + + final int bytesRead = inputStream.read(testBuffer); + if (bytesRead < 0) { + break; + } + + totalBytesRead += bytesRead; + + for (int i = 0; i < bytesRead; ++i) { + if (testBuffer[i] != nextExpectedNumber) { + throw new IOException("Read number " + testBuffer[i] + " but expected " + nextExpectedNumber); + } + + ++nextExpectedNumber; + + if (nextExpectedNumber == MODULUS) { + nextExpectedNumber = 0; + } + } + } + + if (totalBytesRead != expectedSize) { + throw new IOException("Expected to read " + expectedSize + " bytes but only received " + totalBytesRead); + } + } + + /** + * Generates test data of the given size according to some specific pattern and writes it to the provided output + * stream. + * + * @param outputStream + * the output stream to write the data to + * @param size + * the size of the test data to be generated in bytes + * @throws IOException + * thrown if an error occurs while writing the data + */ + private void generateTestData(final OutputStream outputStream, final int size) throws IOException { + + final byte[] testBuffer = new byte[TEST_BUFFER_SIZE]; + for (int i = 0; i < testBuffer.length; ++i) { + testBuffer[i] = (byte) (i % MODULUS); + } + + int bytesWritten = 0; + while (bytesWritten < size) { + + final int diff = size - bytesWritten; + if (diff < testBuffer.length) { + outputStream.write(testBuffer, 0, diff); + bytesWritten += diff; + } else { + outputStream.write(testBuffer); + bytesWritten += testBuffer.length; + } + } + } + + /** + * Generates a random name. + * + * @return a random name + */ + private String getRandomName() { + + final StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < NAME_LENGTH; ++i) { + final char c = ALPHABET[(int) (Math.random() * (double) ALPHABET.length)]; + stringBuilder.append(c); + } + + return stringBuilder.toString(); + } + + /** + * Checks whether the AWS access key and the AWS secret keys have been successfully loaded from the configuration + * and whether the S3 tests shall be performed. + * + * @return true if the tests shall be performed, false if the tests shall be skipped + * because at least one AWS key is missing + */ + private boolean testActivated() { + + final String accessKey = GlobalConfiguration.getString(S3FileSystem.S3_ACCESS_KEY_KEY, null); + final String secretKey = GlobalConfiguration.getString(S3FileSystem.S3_SECRET_KEY_KEY, null); + + if (accessKey != null && secretKey != null) { + return true; + } + + return false; + } +}