ConcatInputStream Javadoc/*
* Copyright (C) 2004-2010 Stephen Ostermiller
* http://ostermiller.org/contact.pl?regarding=Java+Utilities
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* See LICENSE.txt for details.
*/
package com.Ostermiller.util;
import java.io.*;
import java.util.ArrayList;
/**
* An input stream which reads sequentially from multiple sources.
* More information about this class is available from <a target="_top" href=
* "http://ostermiller.org/utils/">ostermiller.org</a>.
*
* @author Stephen Ostermiller http://ostermiller.org/contact.pl?regarding=Java+Utilities
* @since ostermillerutils 1.04.00
*/
public class ConcatInputStream extends InputStream {
/**
* Current index to inputStreamQueue
*
* @since ostermillerutils 1.04.01
*/
private int inputStreamQueueIndex = 0;
/**
* Queue of inputStreams that have yet to be read from.
*
* @since ostermillerutils 1.04.01
*/
private ArrayList<InputStream> inputStreamQueue = new ArrayList<InputStream>();
/**
* A cache of the current inputStream from the inputStreamQueue
* to avoid unneeded access to the queue which must
* be synchronized.
*
* @since ostermillerutils 1.04.01
*/
private InputStream currentInputStream = null;
/**
* true iff the client may add more inputStreams.
*
* @since ostermillerutils 1.04.01
*/
private boolean doneAddingInputStreams = false;
/**
* Causes the addInputStream method to throw IllegalStateException
* and read() methods to return -1 (end of stream)
* when there is no more available data.
* <p>
* Calling this method when this class is no longer accepting
* more inputStreams has no effect.
*
* @since ostermillerutils 1.04.01
*/
public void lastInputStreamAdded(){
doneAddingInputStreams = true;
}
/**
* Add the given inputStream to the queue of inputStreams from which to
* concatenate data.
*
* @param in InputStream to add to the concatenation.
* @throws IllegalStateException if more inputStreams can't be added because lastInputStreamAdded() has been called, close() has been called, or a constructor with inputStream parameters was used.
*
* @since ostermillerutils 1.04.01
*/
public void addInputStream(InputStream in){
synchronized(inputStreamQueue){
if (in == null) throw new NullPointerException();
if (closed) throw new IllegalStateException("ConcatInputStream has been closed");
if (doneAddingInputStreams) throw new IllegalStateException("Cannot add more inputStreams - the last inputStream has already been added.");
inputStreamQueue.add(in);
}
}
/**
* Add the given inputStream to the queue of inputStreams from which to
* concatenate data.
*
* @param in InputStream to add to the concatenation.
* @throws IllegalStateException if more inputStreams can't be added because lastInputStreamAdded() has been called, close() has been called, or a constructor with inputStream parameters was used.
* @throws NullPointerException the array of inputStreams, or any of the contents is null.
*
* @since ostermillerutils 1.04.01
*/
public void addInputStreams(InputStream[] in){
for (InputStream element: in) {
addInputStream(element);
}
}
/**
* Gets the current inputStream, looking at the next
* one in the list if the current one is null.
*
* @since ostermillerutils 1.04.01
*/
private InputStream getCurrentInputStream(){
if (currentInputStream == null && inputStreamQueueIndex < inputStreamQueue.size()){
synchronized(inputStreamQueue){
// inputStream queue index is advanced only by the nextInputStream()
// method. Don't do it here.
currentInputStream = inputStreamQueue.get(inputStreamQueueIndex);
}
}
return currentInputStream;
}
/**
* Indicate that we are done with the current inputStream and we should
* advance to the next inputStream.
*
* @since ostermillerutils 1.04.01
*/
private void advanceToNextInputStream(){
currentInputStream = null;
inputStreamQueueIndex++;
}
/**
* True iff this the close() method has been called on this stream.
*
* @since ostermillerutils 1.04.00
*/
private boolean closed = false;
/**
* Create a new input stream that can dynamically accept new sources.
* <p>
* New sources should be added using the addInputStream() method.
* When all sources have been added the lastInputStreamAdded() should
* be called so that read methods can return -1 (end of stream).
* <p>
* Adding new sources may by interleaved with read calls.
*
* @since ostermillerutils 1.04.01
*/
public ConcatInputStream(){
// Empty constructor
}
/**
* Create a new InputStream with one source.
*
* @param in InputStream to use as a source.
*
* @throws NullPointerException if in is null
*
* @since ostermillerutils 1.04.00
*/
public ConcatInputStream(InputStream in){
addInputStream(in);
lastInputStreamAdded();
}
/**
* Create a new InputStream with two sources.
*
* @param in1 first InputStream to use as a source.
* @param in2 second InputStream to use as a source.
*
* @throws NullPointerException if either source is null.
*
* @since ostermillerutils 1.04.00
*/
public ConcatInputStream(InputStream in1, InputStream in2){
addInputStream(in1);
addInputStream(in2);
lastInputStreamAdded();
}
/**
* Create a new InputStream with an arbitrary number of sources.
*
* @param in InputStreams to use as a sources.
*
* @throws NullPointerException if the input array on any element is null.
*
* @since ostermillerutils 1.04.00
*/
public ConcatInputStream(InputStream[] in){
addInputStreams(in);
lastInputStreamAdded();
}
/**
* Reads the next byte of data from the underlying streams. The value byte is
* returned as an int in the range 0 to 255. If no byte is available because
* the end of the stream has been reached, the value -1 is returned. This method
* blocks until input data is available, the end of the stream is detected, or
* an exception is thrown.
* <p>
* If this class in not done accepting input streams and the end of the last known
* stream is reached, this method will block forever unless another thread
* adds an input stream or interrupts.
*
* @return the next byte of data, or -1 if the end of the stream is reached.
*
* @throws IOException if an I/O error occurs.
*/
@Override public int read() throws IOException {
if (closed) throw new IOException("InputStream closed");
int r = -1;
while (r == -1){
InputStream in = getCurrentInputStream();
if (in == null){
if (doneAddingInputStreams) return -1;
try {
Thread.sleep(100);
} catch (InterruptedException iox){
throw new IOException("Interrupted");
}
} else {
r = in.read();
if (r == -1) advanceToNextInputStream();
}
}
return r;
}
/**
* Reads some number of bytes from the underlying streams and stores them into
* the buffer array b. The number of bytes actually read is returned as an
* integer. This method blocks until input data is available, end of file is
* detected, or an exception is thrown.
* <p>
* If the length of b is zero,
* then no bytes are read and 0 is returned; otherwise, there is an attempt
* to read at least one byte.
* <p>
* The read(b) method for class InputStream has the same effect as:<br>
* read(b, 0, b.length)
* <p>
* If this class in not done accepting input streams and the end of the last known
* stream is reached, this method will block forever unless another thread
* adds an input stream or interrupts.
*
* @param b - Destination buffer
* @return The number of bytes read, or -1 if the end of the stream has been reached
*
* @throws IOException - If an I/O error occurs
* @throws NullPointerException - If b is null.
*
* @since ostermillerutils 1.04.00
*/
@Override public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
/**
* Reads up to length bytes of data from the underlying streams into an array of bytes.
* An attempt is made to read as many as length bytes, but a smaller number may be read,
* possibly zero. The number of bytes actually read is returned as an integer.
* <p>
* If length is zero,
* then no bytes are read and 0 is returned; otherwise, there is an attempt
* to read at least one byte.
* <p>
* This method blocks until input data is available
* <p>
* If this class in not done accepting input streams and the end of the last known
* stream is reached, this method will block forever unless another thread
* adds an input stream or interrupts.
*
* @param b Destination buffer
* @param off Offset at which to start storing bytes
* @param len Maximum number of bytes to read
* @return The number of bytes read, or -1 if the end of the stream has been reached
*
* @throws IOException - If an I/O error occurs
* @throws NullPointerException - If b is null.
* @throws IndexOutOfBoundsException - if length or offset are not possible.
*/
@Override public int read(byte[] b, int off, int len) throws IOException {
if (off < 0 || len < 0 || off + len > b.length) throw new IllegalArgumentException();
if (closed) throw new IOException("InputStream closed");
int r = -1;
while (r == -1){
InputStream in = getCurrentInputStream();
if (in == null){
if (doneAddingInputStreams) return -1;
try {
Thread.sleep(100);
} catch (InterruptedException iox){
throw new IOException("Interrupted");
}
} else {
r = in.read(b, off, len);
if (r == -1) advanceToNextInputStream();
}
}
return r;
}
/**
* Skips over and discards n bytes of data from this input stream. The skip method
* may, for a variety of reasons, end up skipping over some smaller number of bytes,
* possibly 0. This may result from any of a number of conditions; reaching end of
* file before n bytes have been skipped is only one possibility. The actual number
* of bytes skipped is returned. If n is negative, no bytes are skipped.
* <p>
* If this class in not done accepting input streams and the end of the last known
* stream is reached, this method will block forever unless another thread
* adds an input stream or interrupts.
*
* @param n he number of characters to skip
* @return The number of characters actually skipped
*
* @throws IOException If an I/O error occurs
*
* @since ostermillerutils 1.04.00
*/
@Override public long skip(long n) throws IOException {
if (closed) throw new IOException("InputStream closed");
if (n <= 0) return 0;
long s = -1;
while (s <= 0){
InputStream in = getCurrentInputStream();
if (in == null){
if (doneAddingInputStreams) return 0;
try {
Thread.sleep(100);
} catch (InterruptedException iox){
throw new IOException("Interrupted");
}
} else {
s = in.skip(n);
// When nothing was skipped it is a bit of a puzzle.
// The most common cause is that the end of the underlying
// stream was reached. In which case calling skip on it
// will always return zero. If somebody were calling skip
// until it skipped everything they needed, there would
// be an infinite loop if we were to return zero here.
// If we get zero, let us try to read one character so
// we can see if we are at the end of the stream. If so,
// we will move to the next.
if (s <= 0) {
// read() will advance to the next stream for us, so don't do it again
s = ((read()==-1)?-1:1);
}
}
}
return s;
}
/**
* Returns the number of bytes that can be read (or skipped over) from this input
* stream without blocking by the next caller of a method for this input stream.
* The next caller might be the same thread or or another thread.
*
* @throws IOException If an I/O error occurs
*
* @since ostermillerutils 1.04.00
*/
@Override public int available() throws IOException {
if (closed) throw new IOException("InputStream closed");
InputStream in = getCurrentInputStream();
if (in == null) return 0;
return in.available();
}
/**
* Closes this input stream and releases any system resources associated with the stream.
*
* @since ostermillerutils 1.04.00
*/
@Override public void close() throws IOException {
if (closed) return;
for (Object element: inputStreamQueue) {
((InputStream)element).close();
}
closed = true;
}
/**
* Mark not supported
*
* @since ostermillerutils 1.04.00
*/
@Override public void mark(int readlimit){
// Mark not supported -- do nothing
}
/**
* Reset not supported.
*
* @throws IOException because reset is not supported.
*
* @since ostermillerutils 1.04.00
*/
@Override public void reset() throws IOException {
throw new IOException("Reset not supported");
}
/**
* Does not support mark.
*
* @return false
*
* @since ostermillerutils 1.04.00
*/
@Override public boolean markSupported(){
return false;
}
}