Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 056B7200B29 for ; Wed, 15 Jun 2016 16:59:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 040CC160A60; Wed, 15 Jun 2016 14:59:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 26C82160A5D for ; Wed, 15 Jun 2016 16:59:37 +0200 (CEST) Received: (qmail 35560 invoked by uid 500); 15 Jun 2016 14:59:36 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 34663 invoked by uid 99); 15 Jun 2016 14:59:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jun 2016 14:59:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CE8D8E0459; Wed, 15 Jun 2016 14:59:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: carl@apache.org To: commits@cassandra.apache.org Date: Wed, 15 Jun 2016 14:59:38 -0000 Message-Id: <5e300667347e4e1caee465466ecd882a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/20] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2 archived-at: Wed, 15 Jun 2016 14:59:40 -0000 http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java index 71fab61,0000000..8c6cc90 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java @@@ -1,231 -1,0 +1,251 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.io; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import static org.junit.Assert.*; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.SequentialWriter; + +public class RandomAccessReaderTest +{ + @Test + public void testReadFully() throws IOException + { + final File f = File.createTempFile("testReadFully", "1"); + final String expected = "The quick brown fox jumps over the lazy dog"; + + SequentialWriter writer = SequentialWriter.open(f); + writer.write(expected.getBytes()); + writer.finish(); + + assert f.exists(); + + ChannelProxy channel = new ChannelProxy(f); + RandomAccessReader reader = RandomAccessReader.open(channel); + assertEquals(f.getAbsolutePath(), reader.getPath()); + assertEquals(expected.length(), reader.length()); + + byte[] b = new byte[expected.length()]; + reader.readFully(b); + assertEquals(expected, new String(b)); + + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + + reader.close(); + channel.close(); + } + + @Test + public void testReadBytes() throws IOException + { + File f = File.createTempFile("testReadBytes", "1"); + final String expected = "The quick brown fox jumps over the lazy dog"; + + SequentialWriter writer = SequentialWriter.open(f); + writer.write(expected.getBytes()); + writer.finish(); + + assert f.exists(); + + ChannelProxy channel = new ChannelProxy(f); + RandomAccessReader reader = RandomAccessReader.open(channel); + assertEquals(f.getAbsolutePath(), reader.getPath()); + assertEquals(expected.length(), reader.length()); + + ByteBuffer b = reader.readBytes(expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + + reader.close(); + channel.close(); + } + + @Test + public void testReset() throws IOException + { + File f = File.createTempFile("testMark", "1"); + final String expected = "The quick brown fox jumps over the lazy dog"; + final int numIterations = 10; + + SequentialWriter writer = SequentialWriter.open(f); + for (int i = 0; i < numIterations; i++) + writer.write(expected.getBytes()); + writer.finish(); + + assert f.exists(); + + ChannelProxy channel = new ChannelProxy(f); + RandomAccessReader reader = RandomAccessReader.open(channel); + assertEquals(expected.length() * numIterations, reader.length()); + + ByteBuffer b = reader.readBytes(expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + + assertFalse(reader.isEOF()); + assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining()); + + FileMark mark = reader.mark(); + assertEquals(0, reader.bytesPastMark()); + assertEquals(0, reader.bytesPastMark(mark)); + + for (int i = 0; i < (numIterations - 1); i++) + { + b = reader.readBytes(expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + } + assertTrue(reader.isEOF()); + assertEquals(expected.length() * (numIterations -1), reader.bytesPastMark()); + assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark(mark)); + + reader.reset(mark); + assertEquals(0, reader.bytesPastMark()); + assertEquals(0, reader.bytesPastMark(mark)); + assertFalse(reader.isEOF()); + for (int i = 0; i < (numIterations - 1); i++) + { + b = reader.readBytes(expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + } + + reader.reset(); + assertEquals(0, reader.bytesPastMark()); + assertEquals(0, reader.bytesPastMark(mark)); + assertFalse(reader.isEOF()); + for (int i = 0; i < (numIterations - 1); i++) + { + b = reader.readBytes(expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + } + + assertTrue(reader.isEOF()); + reader.close(); + channel.close(); + } + + @Test + public void testSeekSingleThread() throws IOException, InterruptedException + { + testSeek(1); + } + + @Test + public void testSeekMultipleThreads() throws IOException, InterruptedException + { + testSeek(10); + } + + private void testSeek(int numThreads) throws IOException, InterruptedException + { + final File f = File.createTempFile("testMark", "1"); + final String[] expected = new String[10]; + int len = 0; + for (int i = 0; i < expected.length; i++) + { + expected[i] = UUID.randomUUID().toString(); + len += expected[i].length(); + } + final int totalLength = len; + + SequentialWriter writer = SequentialWriter.open(f); + for (int i = 0; i < expected.length; i++) + writer.write(expected[i].getBytes()); + writer.finish(); + + assert f.exists(); + + final ChannelProxy channel = new ChannelProxy(f); + + final Runnable worker = new Runnable() { + + @Override + public void run() + { + try + { + RandomAccessReader reader = RandomAccessReader.open(channel); + assertEquals(totalLength, reader.length()); + + ByteBuffer b = reader.readBytes(expected[0].length()); + assertEquals(expected[0], new String(b.array(), Charset.forName("UTF-8"))); + + assertFalse(reader.isEOF()); + assertEquals(totalLength - expected[0].length(), reader.bytesRemaining()); + + long filePointer = reader.getFilePointer(); + + for (int i = 1; i < expected.length; i++) + { + b = reader.readBytes(expected[i].length()); + assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8"))); + } + assertTrue(reader.isEOF()); + + reader.seek(filePointer); + assertFalse(reader.isEOF()); + for (int i = 1; i < expected.length; i++) + { + b = reader.readBytes(expected[i].length()); + assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8"))); + } + + assertTrue(reader.isEOF()); + reader.close(); + } + catch (Exception ex) + { + ex.printStackTrace(); + fail(ex.getMessage()); + } + } + }; + + if(numThreads == 1) + { + worker.run(); + return; + } + + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) + executor.submit(worker); + + executor.shutdown(); + executor.awaitTermination(1, TimeUnit.MINUTES); + + channel.close(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java index ffe9cb9,0000000..0c58e41 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java @@@ -1,489 -1,0 +1,509 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.io.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UTFDataFormatException; +import java.lang.reflect.Field; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.Random; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class BufferedDataOutputStreamTest +{ + + @Test(expected = BufferOverflowException.class) + public void testDataOutputBufferFixedByes() throws Exception + { + try (DataOutputBufferFixed dob = new DataOutputBufferFixed()) + { + try + { + for (int ii = 0; ii < 128; ii++) + dob.write(0); + } + catch (BufferOverflowException e) + { + fail("Should not throw BufferOverflowException yet"); + } + dob.write(0); + } + } + + @Test(expected = BufferOverflowException.class) + public void testDataOutputBufferFixedByteBuffer() throws Exception + { + try (DataOutputBufferFixed dob = new DataOutputBufferFixed()) + { + try + { + dob.write(ByteBuffer.allocateDirect(128)); + } + catch (BufferOverflowException e) + { + fail("Should not throw BufferOverflowException yet"); + } + dob.write(ByteBuffer.allocateDirect(1)); + } + } + + WritableByteChannel adapter = new WritableByteChannel() + { + + @Override + public boolean isOpen() {return true;} + + @Override + public void close() throws IOException {} + + @Override + public int write(ByteBuffer src) throws IOException + { + int retval = src.remaining(); + while (src.hasRemaining()) + generated.write(src.get()); + return retval; + } + + }; + + BufferedDataOutputStreamPlus fakeStream = new BufferedDataOutputStreamPlus(adapter, 8); + + @SuppressWarnings("resource") + @Test(expected = NullPointerException.class) + public void testNullChannel() + { + new BufferedDataOutputStreamPlus((WritableByteChannel)null, 8); + } + + @SuppressWarnings("resource") + @Test(expected = IllegalArgumentException.class) + public void testTooSmallBuffer() + { + new BufferedDataOutputStreamPlus(adapter, 7); + } + + @Test(expected = NullPointerException.class) + public void testNullBuffer() throws Exception + { + byte type[] = null; + fakeStream.write(type, 0, 1); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testNegativeOffset() throws Exception + { + byte type[] = new byte[10]; + fakeStream.write(type, -1, 1); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testNegativeLength() throws Exception + { + byte type[] = new byte[10]; + fakeStream.write(type, 0, -1); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testTooBigLength() throws Exception + { + byte type[] = new byte[10]; + fakeStream.write(type, 0, 11); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testTooBigLengthWithOffset() throws Exception + { + byte type[] = new byte[10]; + fakeStream.write(type, 8, 3); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testTooBigOffset() throws Exception + { + byte type[] = new byte[10]; + fakeStream.write(type, 11, 1); + } + + static final Random r; + + static Field baos_bytes; + static { + long seed = System.nanoTime(); + //seed = 210187780999648L; + System.out.println("Seed " + seed); + r = new Random(seed); + try + { + baos_bytes = ByteArrayOutputStream.class.getDeclaredField("buf"); + baos_bytes.setAccessible(true); + } + catch (Throwable t) + { + throw new RuntimeException(t); + } + } + + private ByteArrayOutputStream generated; + private BufferedDataOutputStreamPlus ndosp; + + private ByteArrayOutputStream canonical; + private DataOutputStreamPlus dosp; + + void setUp() + { + + generated = new ByteArrayOutputStream(); + canonical = new ByteArrayOutputStream(); + dosp = new WrappedDataOutputStreamPlus(canonical); + ndosp = new BufferedDataOutputStreamPlus(adapter, 4096); + } + + @Test + public void testFuzz() throws Exception + { + for (int ii = 0; ii < 30; ii++) + fuzzOnce(); + } + + String simple = "foobar42"; + public static final String twoByte = "\u0180"; + public static final String threeByte = "\u34A8"; + public static final String fourByte = "\uD841\uDF79"; + + @SuppressWarnings("unused") + private void fuzzOnce() throws Exception + { + setUp(); + int iteration = 0; + int bytesChecked = 0; + int action = 0; + while (generated.size() < 1024 * 1024 * 8) + { + action = r.nextInt(19); + + //System.out.println("Action " + action + " iteration " + iteration); + iteration++; + + switch (action) + { + case 0: + { + generated.flush(); + dosp.flush(); + break; + } + case 1: + { + int val = r.nextInt(); + dosp.write(val); + ndosp.write(val); + break; + } + case 2: + { + byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)]; + r.nextBytes(randomBytes); + dosp.write(randomBytes); + ndosp.write(randomBytes); + break; + } + case 3: + { + byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)]; + r.nextBytes(randomBytes); + int offset = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length); + int length = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length - offset); + dosp.write(randomBytes, offset, length); + ndosp.write(randomBytes, offset, length); + break; + } + case 4: + { + boolean val = r.nextInt(2) == 0; + dosp.writeBoolean(val); + ndosp.writeBoolean(val); + break; + } + case 5: + { + int val = r.nextInt(); + dosp.writeByte(val); + ndosp.writeByte(val); + break; + } + case 6: + { + int val = r.nextInt(); + dosp.writeShort(val); + ndosp.writeShort(val); + break; + } + case 7: + { + int val = r.nextInt(); + dosp.writeChar(val); + ndosp.writeChar(val); + break; + } + case 8: + { + int val = r.nextInt(); + dosp.writeInt(val); + ndosp.writeInt(val); + break; + } + case 9: + { + int val = r.nextInt(); + dosp.writeLong(val); + ndosp.writeLong(val); + break; + } + case 10: + { + float val = r.nextFloat(); + dosp.writeFloat(val); + ndosp.writeFloat(val); + break; + } + case 11: + { + double val = r.nextDouble(); + dosp.writeDouble(val); + ndosp.writeDouble(val); + break; + } + case 12: + { + dosp.writeBytes(simple); + ndosp.writeBytes(simple); + break; + } + case 13: + { + dosp.writeChars(twoByte); + ndosp.writeChars(twoByte); + break; + } + case 14: + { + StringBuilder sb = new StringBuilder(); + int length = r.nextInt(500); + //Some times do big strings + if (r.nextDouble() > .95) + length += 4000; + sb.append(simple + twoByte + threeByte + fourByte); + for (int ii = 0; ii < length; ii++) + { + sb.append((char)(r.nextInt() & 0xffff)); + } + String str = sb.toString(); + writeUTFLegacy(str, dosp); + ndosp.writeUTF(str); + break; + } + case 15: + { + StringBuilder sb = new StringBuilder(); + int length = r.nextInt(500); + sb.append("the very model of a modern major general familiar with all things animal vegetable and mineral"); + for (int ii = 0; ii < length; ii++) + { + sb.append(' '); + } + String str = sb.toString(); + writeUTFLegacy(str, dosp); + ndosp.writeUTF(str); + break; + } + case 16: + { + ByteBuffer buf = ByteBuffer.allocate(r.nextInt(1024 * 8 + 1)); + r.nextBytes(buf.array()); + buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity())); + buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position()))); + ByteBuffer dup = buf.duplicate(); + ndosp.write(buf.duplicate()); + assertEquals(dup.position(), buf.position()); + assertEquals(dup.limit(), buf.limit()); + dosp.write(buf.duplicate()); + break; + } + case 17: + { + ByteBuffer buf = ByteBuffer.allocateDirect(r.nextInt(1024 * 8 + 1)); + while (buf.hasRemaining()) + buf.put((byte)r.nextInt()); + buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity())); + buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position()))); + ByteBuffer dup = buf.duplicate(); + ndosp.write(buf.duplicate()); + assertEquals(dup.position(), buf.position()); + assertEquals(dup.limit(), buf.limit()); + dosp.write(buf.duplicate()); + break; + } + case 18: + { + try (Memory buf = Memory.allocate(r.nextInt(1024 * 8 - 1) + 1);) + { + for (int ii = 0; ii < buf.size(); ii++) + buf.setByte(ii, (byte)r.nextInt()); + long offset = buf.size() == 0 ? 0 : r.nextInt((int)buf.size()); + long length = (buf.size() - offset == 0 ? 0 : r.nextInt((int)(buf.size() - offset))); + ndosp.write(buf, offset, length); + dosp.write(buf, offset, length); + } + break; + } + default: + fail("Shouldn't reach here"); + } + //bytesChecked = assertSameOutput(bytesChecked, action, iteration); + } + + assertSameOutput(0, -1, iteration); + } + + public static void writeUTFLegacy(String str, OutputStream out) throws IOException + { + int utfCount = 0, length = str.length(); + for (int i = 0; i < length; i++) + { + int charValue = str.charAt(i); + if (charValue > 0 && charValue <= 127) + { + utfCount++; + } + else if (charValue <= 2047) + { + utfCount += 2; + } + else + { + utfCount += 3; + } + } + if (utfCount > 65535) + { + throw new UTFDataFormatException(); //$NON-NLS-1$ + } + byte utfBytes[] = new byte[utfCount + 2]; + int utfIndex = 2; + for (int i = 0; i < length; i++) + { + int charValue = str.charAt(i); + if (charValue > 0 && charValue <= 127) + { + utfBytes[utfIndex++] = (byte) charValue; + } + else if (charValue <= 2047) + { + utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6))); + utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue)); + } + else + { + utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12))); + utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6))); + utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue)); + } + } + utfBytes[0] = (byte) (utfCount >> 8); + utfBytes[1] = (byte) utfCount; + out.write(utfBytes); + } + + private int assertSameOutput(int bytesChecked, int lastAction, int iteration) throws Exception + { + ndosp.flush(); + dosp.flush(); + + byte generatedBytes[] = (byte[])baos_bytes.get(generated); + byte canonicalBytes[] = (byte[])baos_bytes.get(canonical); + + int count = generated.size(); + if (count != canonical.size()) + System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration); + assertEquals(count, canonical.size()); + for (;bytesChecked < count; bytesChecked++) + { + byte generatedByte = generatedBytes[bytesChecked]; + byte canonicalByte = canonicalBytes[bytesChecked]; + if (generatedByte != canonicalByte) + System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration); + assertEquals(generatedByte, canonicalByte); + } + return count; + } + + @Test + public void testWriteUTF() throws Exception + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput dataOut = new DataOutputStream(baos); + + StringBuilder sb = new StringBuilder(65535); + for (int ii = 0; ii < 1 << 16; ii++) + { + String s = sb.toString(); + UnbufferedDataOutputStreamPlus.writeUTF(s, dataOut); + DataInput dataIn = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + assertEquals(s, dataIn.readUTF()); + baos.reset(); + sb.append("a"); + } + } + + @Test + public void testWriteUTFBigChar() throws Exception + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput dataOut = new DataOutputStream(baos); + + StringBuilder sb = new StringBuilder(65535); + for (int ii = 0; ii < 1 << 15; ii++) + { + String s = sb.toString(); + UnbufferedDataOutputStreamPlus.writeUTF(s, dataOut); + DataInput dataIn = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + assertEquals(s, dataIn.readUTF()); + baos.reset(); + if (ii == (1 << 15) - 1) + sb.append("a"); + else + sb.append(twoByte); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java index a19346b,0000000..953d882 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java +++ b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java @@@ -1,664 -1,0 +1,684 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.io.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.Random; + +import org.apache.cassandra.io.util.NIODataInputStream; +import org.junit.Test; + +import com.google.common.base.Charsets; + +import static org.junit.Assert.*; + +public class NIODataInputStreamTest +{ + + Random r; + ByteBuffer corpus = ByteBuffer.allocate(1024 * 1024 * 8); + + void init() + { + long seed = System.nanoTime(); + //seed = 365238103404423L; + System.out.println("Seed " + seed); + r = new Random(seed); + r.nextBytes(corpus.array()); + } + + class FakeChannel implements ReadableByteChannel + { + + @Override + public boolean isOpen() { return true; } + + @Override + public void close() throws IOException {} + + @Override + public int read(ByteBuffer dst) throws IOException { return 0; } + + } + + class DummyChannel implements ReadableByteChannel + { + + boolean isOpen = true; + Queue slices = new ArrayDeque(); + + DummyChannel() + { + slices.clear(); + corpus.clear(); + + while (corpus.hasRemaining()) + { + int sliceSize = Math.min(corpus.remaining(), r.nextInt(8193)); + corpus.limit(corpus.position() + sliceSize); + slices.offer(corpus.slice()); + corpus.position(corpus.limit()); + corpus.limit(corpus.capacity()); + } + corpus.clear(); + } + + @Override + public boolean isOpen() + { + return isOpen(); + } + + @Override + public void close() throws IOException + { + isOpen = false; + } + + @Override + public int read(ByteBuffer dst) throws IOException + { + if (!isOpen) throw new IOException("closed"); + if (slices.isEmpty()) return -1; + + if (!slices.peek().hasRemaining()) + { + if (r.nextInt(2) == 1) + { + return 0; + } + else + { + slices.poll(); + if (slices.isEmpty()) return -1; + } + } + + ByteBuffer slice = slices.peek(); + int oldLimit = slice.limit(); + + int copied = 0; + if (slice.remaining() > dst.remaining()) + { + slice.limit(slice.position() + dst.remaining()); + copied = dst.remaining(); + } + else + { + copied = slice.remaining(); + } + + dst.put(slice); + slice.limit(oldLimit); + + + return copied; + } + + } + + NIODataInputStream fakeStream = new NIODataInputStream(new FakeChannel(), 8); + + @Test(expected = IOException.class) + public void testResetThrows() throws Exception + { + fakeStream.reset(); + } + + @Test(expected = NullPointerException.class) + public void testNullReadBuffer() throws Exception + { + fakeStream.read(null, 0, 1); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testNegativeOffsetReadBuffer() throws Exception + { + fakeStream.read(new byte[1], -1, 1); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testNegativeLengthReadBuffer() throws Exception + { + fakeStream.read(new byte[1], 0, -1); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testLengthToBigReadBuffer() throws Exception + { + fakeStream.read(new byte[1], 0, 2); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testLengthToWithOffsetBigReadBuffer() throws Exception + { + fakeStream.read(new byte[1], 1, 1); + } + + @Test(expected = UnsupportedOperationException.class) + public void testReadLine() throws Exception + { + fakeStream.readLine(); + } + + @Test + public void testMarkSupported() throws Exception + { + assertFalse(fakeStream.markSupported()); + } + + @SuppressWarnings("resource") + @Test(expected = IllegalArgumentException.class) + public void testTooSmallBufferSize() throws Exception + { + new NIODataInputStream(new FakeChannel(), 4); + } + + @SuppressWarnings("resource") + @Test(expected = NullPointerException.class) + public void testNullRBC() throws Exception + { + new NIODataInputStream(null, 8); + } + + @SuppressWarnings("resource") + @Test + public void testAvailable() throws Exception + { + init(); + DummyChannel dc = new DummyChannel(); + dc.slices.clear(); + dc.slices.offer(ByteBuffer.allocate(8190)); + NIODataInputStream is = new NIODataInputStream(dc, 4096); + assertEquals(0, is.available()); + is.read(); + assertEquals(4095, is.available()); + is.read(new byte[4095]); + assertEquals(0, is.available()); + is.read(new byte[10]); + assertEquals(8190 - 10 - 4096, is.available()); + + File f = File.createTempFile("foo", "bar"); + RandomAccessFile fos = new RandomAccessFile(f, "rw"); + fos.write(new byte[10]); + fos.seek(0); + + is = new NIODataInputStream(fos.getChannel(), 8); + + int remaining = 10; + assertEquals(10, is.available()); + + while (remaining > 0) + { + is.read(); + remaining--; + assertEquals(remaining, is.available()); + } + assertEquals(0, is.available()); + } + + @SuppressWarnings("resource") + @Test + public void testReadUTF() throws Exception + { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream daos = new DataOutputStream(baos); + + String simple = "foobar42"; + + assertEquals(2, BufferedDataOutputStreamTest.twoByte.getBytes(Charsets.UTF_8).length); + assertEquals(3, BufferedDataOutputStreamTest.threeByte.getBytes(Charsets.UTF_8).length); + assertEquals(4, BufferedDataOutputStreamTest.fourByte.getBytes(Charsets.UTF_8).length); + + daos.writeUTF(simple); + daos.writeUTF(BufferedDataOutputStreamTest.twoByte); + daos.writeUTF(BufferedDataOutputStreamTest.threeByte); + daos.writeUTF(BufferedDataOutputStreamTest.fourByte); + + NIODataInputStream is = new NIODataInputStream(new ReadableByteChannel() + { + + @Override + public boolean isOpen() {return false;} + + @Override + public void close() throws IOException {} + + @Override + public int read(ByteBuffer dst) throws IOException + { + dst.put(baos.toByteArray()); + return baos.toByteArray().length; + } + + }, 4096); + + assertEquals(simple, is.readUTF()); + assertEquals(BufferedDataOutputStreamTest.twoByte, is.readUTF()); + assertEquals(BufferedDataOutputStreamTest.threeByte, is.readUTF()); + assertEquals(BufferedDataOutputStreamTest.fourByte, is.readUTF()); + } + + @Test + public void testFuzz() throws Exception + { + for (int ii = 0; ii < 80; ii++) + fuzzOnce(); + } + + void validateAgainstCorpus(byte bytes[], int offset, int length, int position) throws Exception + { + assertEquals(corpus.position(), position); + int startPosition = corpus.position(); + for (int ii = 0; ii < length; ii++) + { + byte expected = corpus.get(); + byte actual = bytes[ii + offset]; + if (expected != actual) + fail("Mismatch compared to ByteBuffer"); + byte canonical = dis.readByte(); + if (canonical != actual) + fail("Mismatch compared to DataInputStream"); + } + assertEquals(length, corpus.position() - startPosition); + } + + DataInputStream dis; + + @SuppressWarnings({ "resource", "unused" }) + void fuzzOnce() throws Exception + { + init(); + int read = 0; + int totalRead = 0; + + DummyChannel dc = new DummyChannel(); + NIODataInputStream is = new NIODataInputStream( dc, 1024 * 4); + dis = new DataInputStream(new ByteArrayInputStream(corpus.array())); + + int iteration = 0; + while (totalRead < corpus.capacity()) + { + assertEquals(corpus.position(), totalRead); + int action = r.nextInt(16); + +// System.out.println("Action " + action + " iteration " + iteration + " remaining " + corpus.remaining()); +// if (iteration == 434756) { +// System.out.println("Here we go"); +// } + iteration++; + + switch (action) { + case 0: + { + byte bytes[] = new byte[111]; + + int expectedBytes = corpus.capacity() - totalRead; + boolean expectEOF = expectedBytes < 111; + boolean threwEOF = false; + try + { + is.readFully(bytes); + } + catch (EOFException e) + { + threwEOF = true; + } + + assertEquals(expectEOF, threwEOF); + + if (expectEOF) + return; + + validateAgainstCorpus(bytes, 0, 111, totalRead); + + totalRead += 111; + break; + } + case 1: + { + byte bytes[] = new byte[r.nextInt(1024 * 8 + 1)]; + + int offset = bytes.length == 0 ? 0 : r.nextInt(bytes.length); + int length = bytes.length == 0 ? 0 : r.nextInt(bytes.length - offset); + int expectedBytes = corpus.capacity() - totalRead; + boolean expectEOF = expectedBytes < length; + boolean threwEOF = false; + try { + is.readFully(bytes, offset, length); + } + catch (EOFException e) + { + threwEOF = true; + } + + assertEquals(expectEOF, threwEOF); + + if (expectEOF) + return; + + validateAgainstCorpus(bytes, offset, length, totalRead); + + totalRead += length; + break; + } + case 2: + { + byte bytes[] = new byte[r.nextInt(1024 * 8 + 1)]; + + int offset = bytes.length == 0 ? 0 : r.nextInt(bytes.length); + int length = bytes.length == 0 ? 0 : r.nextInt(bytes.length - offset); + int expectedBytes = corpus.capacity() - totalRead; + boolean expectEOF = expectedBytes == 0; + read = is.read(bytes, offset, length); + + assertTrue((expectEOF && read <= 0) || (!expectEOF && read >= 0)); + + if (expectEOF) + return; + + validateAgainstCorpus(bytes, offset, read, totalRead); + + totalRead += read; + break; + } + case 3: + { + byte bytes[] = new byte[111]; + + int expectedBytes = corpus.capacity() - totalRead; + boolean expectEOF = expectedBytes == 0; + read = is.read(bytes); + + assertTrue((expectEOF && read <= 0) || (!expectEOF && read >= 0)); + + if (expectEOF) + return; + + validateAgainstCorpus(bytes, 0, read, totalRead); + + totalRead += read; + break; + } + case 4: + { + boolean expected = corpus.get() != 0; + boolean canonical = dis.readBoolean(); + boolean actual = is.readBoolean(); + assertTrue(expected == canonical && canonical == actual); + totalRead++; + break; + } + case 5: + { + byte expected = corpus.get(); + byte canonical = dis.readByte(); + byte actual = is.readByte(); + assertTrue(expected == canonical && canonical == actual); + totalRead++; + break; + } + case 6: + { + int expected = corpus.get() & 0xFF; + int canonical = dis.read(); + int actual = is.read(); + assertTrue(expected == canonical && canonical == actual); + totalRead++; + break; + } + case 7: + { + int expected = corpus.get() & 0xFF; + int canonical = dis.readUnsignedByte(); + int actual = is.readUnsignedByte(); + assertTrue(expected == canonical && canonical == actual); + totalRead++; + break; + } + case 8: + { + if (corpus.remaining() < 2) + { + boolean threw = false; + try + { + is.readShort(); + } + catch (EOFException e) + { + try { dis.readShort(); } catch (EOFException e2) {} + threw = true; + } + assertTrue(threw); + assertTrue(corpus.remaining() - totalRead < 2); + totalRead = corpus.capacity(); + break; + } + short expected = corpus.getShort(); + short canonical = dis.readShort(); + short actual = is.readShort(); + assertTrue(expected == canonical && canonical == actual); + totalRead += 2; + break; + } + case 9: + { + if (corpus.remaining() < 2) + { + boolean threw = false; + try + { + is.readUnsignedShort(); + } + catch (EOFException e) + { + try { dis.readUnsignedShort(); } catch (EOFException e2) {} + threw = true; + } + assertTrue(threw); + assertTrue(corpus.remaining() - totalRead < 2); + totalRead = corpus.capacity(); + break; + } + int ch1 = corpus.get() & 0xFF; + int ch2 = corpus.get() & 0xFF; + int expected = (ch1 << 8) + (ch2 << 0); + int canonical = dis.readUnsignedShort(); + int actual = is.readUnsignedShort(); + assertTrue(expected == canonical && canonical == actual); + totalRead += 2; + break; + } + case 10: + { + if (corpus.remaining() < 2) + { + boolean threw = false; + try + { + is.readChar(); + } + catch (EOFException e) + { + try { dis.readChar(); } catch (EOFException e2) {} + threw = true; + } + assertTrue(threw); + assertTrue(corpus.remaining() - totalRead < 2); + totalRead = corpus.capacity(); + break; + } + char expected = corpus.getChar(); + char canonical = dis.readChar(); + char actual = is.readChar(); + assertTrue(expected == canonical && canonical == actual); + totalRead += 2; + break; + } + case 11: + { + if (corpus.remaining() < 4) + { + boolean threw = false; + try + { + is.readInt(); + } + catch (EOFException e) + { + try { dis.readInt(); } catch (EOFException e2) {} + threw = true; + } + assertTrue(threw); + assertTrue(corpus.remaining() - totalRead < 4); + totalRead = corpus.capacity(); + break; + } + int expected = corpus.getInt(); + int canonical = dis.readInt(); + int actual = is.readInt(); + assertTrue(expected == canonical && canonical == actual); + totalRead += 4; + break; + } + case 12: + { + if (corpus.remaining() < 4) + { + boolean threw = false; + try + { + is.readFloat(); + } + catch (EOFException e) + { + try { dis.readFloat(); } catch (EOFException e2) {} + threw = true; + } + assertTrue(threw); + assertTrue(corpus.remaining() - totalRead < 4); + totalRead = corpus.capacity(); + break; + } + float expected = corpus.getFloat(); + float canonical = dis.readFloat(); + float actual = is.readFloat(); + totalRead += 4; + + if (Float.isNaN(expected)) { + assertTrue(Float.isNaN(canonical) && Float.isNaN(actual)); + } else { + assertTrue(expected == canonical && canonical == actual); + } + break; + } + case 13: + { + if (corpus.remaining() < 8) + { + boolean threw = false; + try + { + is.readLong(); + } + catch (EOFException e) + { + try { dis.readLong(); } catch (EOFException e2) {} + threw = true; + } + assertTrue(threw); + assertTrue(corpus.remaining() - totalRead < 8); + totalRead = corpus.capacity(); + break; + } + long expected = corpus.getLong(); + long canonical = dis.readLong(); + long actual = is.readLong(); + + assertTrue(expected == canonical && canonical == actual); + totalRead += 8; + break; + } + case 14: + { + if (corpus.remaining() < 8) + { + boolean threw = false; + try + { + is.readDouble(); + } + catch (EOFException e) + { + try { dis.readDouble(); } catch (EOFException e2) {} + threw = true; + } + assertTrue(threw); + assertTrue(corpus.remaining() - totalRead < 8); + totalRead = corpus.capacity(); + break; + } + double expected = corpus.getDouble(); + double canonical = dis.readDouble(); + double actual = is.readDouble(); + totalRead += 8; + + if (Double.isNaN(expected)) { + assertTrue(Double.isNaN(canonical) && Double.isNaN(actual)); + } else { + assertTrue(expected == canonical && canonical == actual); + } + break; + } + case 15: + { + int skipBytes = r.nextInt(1024); + int actuallySkipped = Math.min(skipBytes, corpus.remaining()); + + totalRead += actuallySkipped; + corpus.position(corpus.position() + actuallySkipped); + int canonical = dis.skipBytes(actuallySkipped); + int actual = is.skipBytes(actuallySkipped); + assertEquals(actuallySkipped, canonical); + assertEquals(canonical, actual); + break; + } + default: + fail("Should never reach here"); + } + } + + assertEquals(totalRead, corpus.capacity()); + assertEquals(-1, dis.read()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java index 6d24447,0000000..7121550 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java +++ b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java @@@ -1,78 -1,0 +1,98 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.locator; + +import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class PendingRangeMapsTest { + + private Range genRange(String left, String right) + { + return new Range(new BigIntegerToken(left), new BigIntegerToken(right)); + } + + @Test + public void testPendingEndpoints() throws UnknownHostException + { + PendingRangeMaps pendingRangeMaps = new PendingRangeMaps(); + + pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1")); + pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2")); + pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3")); + pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4")); + pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5")); + pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6")); + + assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size()); + assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size()); + assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size()); + + Collection endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")); + assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1"))); + } + + @Test + public void testWrapAroundRanges() throws UnknownHostException + { + PendingRangeMaps pendingRangeMaps = new PendingRangeMaps(); + + pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1")); + pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2")); + pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3")); + pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4")); + pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5")); + pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6")); + pendingRangeMaps.addPendingRange(genRange("65", "7"), InetAddress.getByName("127.0.0.7")); + + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size()); + assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("7")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size()); + assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size()); + + Collection endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6")); + assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1"))); + assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.7"))); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java ----------------------------------------------------------------------