commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joc...@apache.org
Subject svn commit: r1750760 - in /commons/proper/io/trunk/src: changes/ main/java/org/apache/commons/io/input/ test/java/org/apache/commons/io/input/
Date Thu, 30 Jun 2016 09:04:21 GMT
Author: jochen
Date: Thu Jun 30 09:04:21 2016
New Revision: 1750760

URL: http://svn.apache.org/viewvc?rev=1750760&view=rev
Log:
Added the ObservableInputStream, and the MessageDigestCalculatingInputStream.

Added:
    commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
  (with props)
    commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
  (with props)
    commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
  (with props)
    commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
  (with props)
Modified:
    commons/proper/io/trunk/src/changes/changes.xml
    commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java

Modified: commons/proper/io/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/changes/changes.xml?rev=1750760&r1=1750759&r2=1750760&view=diff
==============================================================================
--- commons/proper/io/trunk/src/changes/changes.xml (original)
+++ commons/proper/io/trunk/src/changes/changes.xml Thu Jun 30 09:04:21 2016
@@ -46,6 +46,11 @@ The <action> type attribute can be add,u
 
   <body>
     <!-- The release date is the date RC is cut -->
+    <release version="2.7" date="Not yet published">
+      <action dev="jochen" type="add">
+        Added the ObservableInputStream, and the MessageDigestCalculatingInputStream.
+      </action>
+    </release>
     <release version="2.6" date="2016-MM-DD" description="New features and bug fixes.">
       <action issue="IO-511" dev="britter" type="fix" due-to="Ahmet Celik">
         After a few unit tests, a few newly created directories not cleaned completely.

Added: commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java?rev=1750760&view=auto
==============================================================================
--- commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
(added)
+++ commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
Thu Jun 30 09:04:21 2016
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+
+/**
+ * This class is an example for using an {@link ObservableInputStream}. It
+ * creates its own {@link Observer}, which calculates a checksum using a
+ * MessageDigest, for example an MD5 sum.
+ * {@em Note}: Neither {@link ObservableInputStream}, nor {@link MessageDigest},
+ * are thread safe. So is {@link MessageDigestCalculatingInputStream}.
+ */
+public class MessageDigestCalculatingInputStream extends ObservableInputStream {
+    public static class MessageDigestMaintainingObserver extends Observer {
+        private final MessageDigest md;
+
+        public MessageDigestMaintainingObserver(MessageDigest pMd) {
+            md = pMd;
+        }
+
+        @Override
+        void data(int pByte) throws IOException {
+            md.update((byte) pByte);
+        }
+
+        @Override
+        void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {
+            md.update(pBuffer, pOffset, pLength);
+        }
+    }
+
+    private final MessageDigest messageDigest;
+
+    /** Creates a new instance, which calculates a signature on the given stream,
+     * using the given {@link MessageDigest}.
+     */
+    public MessageDigestCalculatingInputStream(InputStream pStream, MessageDigest pDigest)
{
+        super(pStream);
+        messageDigest = pDigest;
+        add(new MessageDigestMaintainingObserver(pDigest));
+    }
+    /** Creates a new instance, which calculates a signature on the given stream,
+     * using a {@link MessageDigest} with the given algorithm.
+     */
+    public MessageDigestCalculatingInputStream(InputStream pStream, String pAlgorithm) throws
NoSuchAlgorithmException {
+        this(pStream, MessageDigest.getInstance(pAlgorithm));
+    }
+    /** Creates a new instance, which calculates a signature on the given stream,
+     * using a {@link MessageDigest} with the "MD5" algorithm.
+     */
+    public MessageDigestCalculatingInputStream(InputStream pStream) throws NoSuchAlgorithmException
{
+        this(pStream, MessageDigest.getInstance("MD5"));
+    }
+
+    /** Returns the {@link MessageDigest}, which is being used for generating the
+     * checksum.
+     * {@em Note}: The checksum will only reflect the data, which has been read so far.
+     * This is probably not, what you expect. Make sure, that the complete data has been
+     * read, if that is what you want. The easiest way to do so is by invoking
+     * {@link #consume()}.
+     */
+    public MessageDigest getMessageDigest() {
+        return messageDigest;
+    }
+}

Propchange: commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java?rev=1750760&view=auto
==============================================================================
--- commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
(added)
+++ commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
Thu Jun 30 09:04:21 2016
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * The {@link ObservableInputStream} allows, that an InputStream may be consumed
+ * by other receivers, apart from the thread, which is reading it.
+ * The other consumers are implemented as instances of {@link Observer}. A
+ * typical application may be the generation of a {@link MessageDigest} on the
+ * fly.
+ * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread safe,
+ * as instances of InputStream usually aren't.
+ * If you must access the stream from multiple threads, then synchronization, locking,
+ * or a similar means must be used.
+ * @see MessageDigestCalculatingInputStream
+ */
+public class ObservableInputStream extends ProxyInputStream {
+    public static abstract class Observer {
+        /** Called to indicate, that {@link InputStream#read()} has been invoked
+         * on the {@link ObservableInputStream}, and will return a value.
+         * @param pByte The value, which is being returned. This will never be -1 (EOF),
+         *    because, in that case, {link #finished()} will be invoked instead.
+         */
+        void data(int pByte) throws IOException {}
+        /** Called to indicate, that {@link InputStream#read(byte[])}, or
+         * {@link InputStream#read(byte[], int, int)} have been called, and are about to
+         * invoke data.
+         * @param pBuffer The byte array, which has been passed to the read call, and where
+         *   data has been stored.
+         * @param pOffset The offset within the byte array, where data has been stored.
+         * @param pLength The number of bytes, which have been stored in the byte array.
+         */
+        void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {}
+        /** Called to indicate, that EOF has been seen on the underlying stream.
+         * This method may be called multiple times, if the reader keeps invoking
+         * either of the read methods, and they will consequently keep returning
+         * EOF.
+         */
+        void finished() throws IOException {}
+        /** Called to indicate, that the {@link ObservableInputStream} has been closed.
+         */
+        void closed() throws IOException {}
+        /**
+         * Called to indicate, that an error occurred on the underlying stream.
+         */
+        void error(IOException pException) throws IOException { throw pException; }
+    }
+
+    private final List<Observer> observers = new ArrayList<Observer>();
+    
+    public ObservableInputStream(InputStream pProxy) {
+        super(pProxy);
+    }
+
+    public void add(Observer pObserver) {
+        observers.add(pObserver);
+    }
+
+    public void remove(Observer pObserver) {
+        observers.remove(pObserver);
+    }
+
+    public void removeAllObservers() {
+        observers.clear();
+    }
+
+    @Override
+    public int read() throws IOException {
+        int result = 0;
+        IOException ioe = null;
+        try {
+            result = super.read();
+        } catch (IOException pException) {
+            ioe = pException;
+        }
+        if (ioe != null) {
+            noteError(ioe);
+        } else if (result == -1) {
+            noteFinished();
+        } else {
+            noteDataByte(result);
+        }
+        return result;
+    }
+
+    @Override
+    public int read(byte[] pBuffer) throws IOException {
+        int result = 0;
+        IOException ioe = null;
+        try {
+            result = super.read(pBuffer);
+        } catch (IOException pException) {
+            ioe = pException;
+        }
+        if (ioe != null) {
+            noteError(ioe);
+        } else if (result == -1) {
+            noteFinished();
+        } else if (result > 0) {
+            noteDataBytes(pBuffer, 0, result);
+        }
+        return result;
+    }
+
+    @Override
+    public int read(byte[] pBuffer, int pOffset, int pLength) throws IOException {
+        int result = 0;
+        IOException ioe = null;
+        try {
+            result = super.read(pBuffer, pOffset, pLength);
+        } catch (IOException pException) {
+            ioe = pException;
+        }
+        if (ioe != null) {
+            noteError(ioe);
+        } else if (result == -1) {
+            noteFinished();
+        } else if (result > 0) {
+            noteDataBytes(pBuffer, pOffset, result);
+        }
+        return result;
+    }
+
+    /** Notifies the observers by invoking {@link Observer#data(byte[],int,int)}
+     * with the given arguments.
+     * @param pBuffer Passed to the observers.
+     * @param pOffset Passed to the observers.
+     * @param pLength Passed to the observers.
+     * @throws IOException Some observer has thrown an exception, which is being
+     *   passed down.
+     */
+    protected void noteDataBytes(byte[] pBuffer, int pOffset, int pLength) throws IOException
{
+        for (Observer observer : getObservers()) {
+            observer.data(pBuffer, pOffset, pLength);
+        }
+    }
+
+    /** Notifies the observers by invoking {@link Observer#finished()}.
+     * @throws IOException Some observer has thrown an exception, which is being
+     *   passed down.
+     */
+    protected void noteFinished() throws IOException {
+        for (Observer observer : getObservers()) {
+            observer.finished();
+        }
+    }
+
+    /** Notifies the observers by invoking {@link Observer#data(int)}
+     * with the given arguments.
+     * @param pDataByte Passed to the observers.
+     * @throws IOException Some observer has thrown an exception, which is being
+     *   passed down.
+     */
+    protected void noteDataByte(int pDataByte) throws IOException {
+        for (Observer observer : getObservers()) {
+            observer.data(pDataByte);
+        }
+    }
+
+    /** Notifies the observers by invoking {@link Observer#error(IOException)}
+     * with the given argument.
+     * @param pException Passed to the observers.
+     * @throws IOException Some observer has thrown an exception, which is being
+     *   passed down. This may be the same exception, which has been passed as an
+     *   argument.
+     */
+    protected void noteError(IOException pException) throws IOException {
+        for (Observer observer : getObservers()) {
+            observer.error(pException);
+        }
+    }
+
+    /** Notifies the observers by invoking {@link Observer#finished()}.
+     * @throws IOException Some observer has thrown an exception, which is being
+     *   passed down.
+     */
+    protected void noteClosed() throws IOException {
+        for (Observer observer : getObservers()) {
+            observer.closed();
+        }
+    }
+
+    protected List<Observer> getObservers() {
+        return observers;
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException ioe = null;
+        try {
+            super.close();
+        } catch (IOException e) {
+            ioe = e;
+        }
+        if (ioe == null) {
+            noteClosed();
+        } else {
+            noteError(ioe);
+        }
+    }
+
+    /** Reads all data from the underlying {@link InputStream}, while notifying the
+     * observers.
+     * @throws IOException The underlying {@link InputStream}, or either of the
+     *   observers has thrown an exception.
+     */
+    public void consume() throws IOException {
+        final byte[] buffer = new byte[8192];
+        for (;;) {
+            final int res = read(buffer);
+            if (res == -1) {
+                return;
+            }
+        }
+    }
+    
+}

Propchange: commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java?rev=1750760&r1=1750759&r2=1750760&view=diff
==============================================================================
--- commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
(original)
+++ commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java
Thu Jun 30 09:04:21 2016
@@ -35,6 +35,7 @@ import java.io.OutputStream;
  *
  * @version $Id$
  * @since 1.4
+ * @see ObservableInputStream 
  */
 public class TeeInputStream extends ProxyInputStream {
 

Added: commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java?rev=1750760&view=auto
==============================================================================
--- commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
(added)
+++ commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
Thu Jun 30 09:04:21 2016
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.security.MessageDigest;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class MessageDigestCalculatingInputStreamTest {
+    public static byte[] generateRandomByteStream(int pSize) {
+        final byte[] buffer = new byte[pSize];
+        final Random rnd = new Random();
+        rnd.nextBytes(buffer);
+        return buffer;
+    }
+
+    @Test
+    public void test() throws Exception {
+        for (int i = 256;  i < 8192;  i = i*2) {
+            final byte[] buffer = generateRandomByteStream(i);
+            final MessageDigest md5Sum = MessageDigest.getInstance("MD5");
+            final byte[] expect = md5Sum.digest(buffer);
+            final MessageDigestCalculatingInputStream md5InputStream = new MessageDigestCalculatingInputStream(new
ByteArrayInputStream(buffer));
+            md5InputStream.consume();
+            final byte[] got = md5InputStream.getMessageDigest().digest();
+            assertArrayEquals(expect, got);
+        }
+    }
+
+}

Propchange: commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
URL: http://svn.apache.org/viewvc/commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java?rev=1750760&view=auto
==============================================================================
--- commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
(added)
+++ commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
Thu Jun 30 09:04:21 2016
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.input.ObservableInputStream;
+import org.apache.commons.io.input.ObservableInputStream.Observer;
+import org.junit.Test;
+
+public class ObservableInputStreamTest {
+    private static class LastByteKeepingObserver extends Observer {
+        private int lastByteSeen = -1;
+        private boolean finished;
+        private boolean closed;
+
+        @Override
+        void data(int pByte) throws IOException {
+            super.data(pByte);
+            lastByteSeen = pByte;
+        }
+        
+        @Override
+        void finished() throws IOException {
+            super.finished();
+            finished = true;
+        }
+        
+        @Override
+        void closed() throws IOException {
+            super.closed();
+            closed = true;
+        }
+    }
+    private static class LastBytesKeepingObserver extends Observer {
+        private byte[] buffer = null;
+        private int offset = -1;
+        private int length = -1;
+
+        @Override
+        void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {
+            super.data(pBuffer, pOffset, pLength);
+            buffer = pBuffer;
+            offset = pOffset;
+            length = pLength;
+        }
+    }
+
+    /** Tests, that {@link Observer#data(int)} is called.
+     */
+    @Test
+    public void testDataByteCalled() throws Exception {
+        final byte[] buffer = MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);
+        final ObservableInputStream ois = new ObservableInputStream(new ByteArrayInputStream(buffer));
+        final LastByteKeepingObserver lko = new LastByteKeepingObserver();
+        assertEquals(-1, lko.lastByteSeen);
+        ois.read();
+        assertEquals(-1, lko.lastByteSeen);
+        assertFalse(lko.finished);
+        assertFalse(lko.closed);
+        ois.add(lko);
+        for (int i = 1;  i < buffer.length;  i++) {
+            final int result = ois.read();
+            assertEquals((byte) result, buffer[i]);
+            assertEquals(result, lko.lastByteSeen);
+            assertFalse(lko.finished);
+            assertFalse(lko.closed);
+        }
+        final int result = ois.read();
+        assertEquals(-1, result);
+        assertTrue(lko.finished);
+        assertFalse(lko.closed);
+        ois.close();
+        assertTrue(lko.finished);
+        assertTrue(lko.closed);
+    }
+
+    /** Tests, that {@link Observer#data(byte[],int,int)} is called.
+     */
+    @Test
+    public void testDataBytesCalled() throws Exception {
+        final byte[] buffer = MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+        final ObservableInputStream ois = new ObservableInputStream(bais);
+        final LastBytesKeepingObserver lko = new LastBytesKeepingObserver();
+        final byte[] readBuffer = new byte[23];
+        assertEquals(null, lko.buffer);
+        ois.read(readBuffer);
+        assertEquals(null, lko.buffer);
+        ois.add(lko);
+        for (;;) {
+            if (bais.available() >= 2048) {
+                final int result = ois.read(readBuffer);
+                if (result == -1) {
+                    ois.close();
+                    break;
+                } else {
+                    assertEquals(readBuffer, lko.buffer);
+                    assertEquals(0, lko.offset);
+                    assertEquals(readBuffer.length, lko.length);
+                }
+            } else {
+                final int res = Math.min(11, bais.available());
+                final int result = ois.read(readBuffer, 1, 11);
+                if (result == -1) {
+                    ois.close();
+                    break;
+                } else {
+                    assertEquals(readBuffer, lko.buffer);
+                    assertEquals(1, lko.offset);
+                    assertEquals(res, lko.length);
+                }
+            }
+        }
+    }
+
+}

Propchange: commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message