CircularObjectBuffer.java Source Code

  • CircularBuffer Documentation and Examples
  • CircularObjectBuffer Javadoc
    /*
     * Circular Object Buffer
     * Copyright (C) 2002-2010 Stephen Ostermiller
     * http://ostermiller.org/contact.pl?regarding=Java+Utilities
     *
     * This program is free software; you can redistribute it and/or modify
     * it under the terms of the GNU General Public License as published by
     * the Free Software Foundation; either version 2 of the License, or
     * (at your option) any later version.
     *
     * This program is distributed in the hope that it will be useful,
     * but WITHOUT ANY WARRANTY; without even the implied warranty of
     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     * GNU General Public License for more details.
     *
     * See LICENSE.txt for details.
     */
    package com.Ostermiller.util;
    
    /**
     * Implements the Circular Buffer producer/consumer model for Objects.
     * More information about this class is available from <a target="_top" href=
     * "http://ostermiller.org/utils/CircularObjectBuffer.html">ostermiller.org</a>.
     * <p>
     * This class is thread safe.
     *
     * @see CircularCharBuffer
     * @see CircularByteBuffer
     *
     * @author Stephen Ostermiller http://ostermiller.org/contact.pl?regarding=Java+Utilities
     * @param <ElementType> Type of object allowed in this circular buffer
     * @since ostermillerutils 1.00.00
     */
    public class CircularObjectBuffer <ElementType> {
    
    	/**
    	 * The default size for a circular object buffer.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	private final static int DEFAULT_SIZE = 1024;
    
    	/**
    	 * A buffer that will grow as things are added.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public final static int INFINITE_SIZE = -1;
    
    	/**
    	 * The circular buffer.
    	 * <p>
    	 * The actual capacity of the buffer is one less than the actual length
    	 * of the buffer so that an empty and a full buffer can be
    	 * distinguished.  An empty buffer will have the readPostion and the
    	 * writePosition equal to each other.  A full buffer will have
    	 * the writePosition one less than the readPostion.
    	 * <p>
    	 * There are two important indexes into the buffer:
    	 * The readPosition, and the writePosition. The Objects
    	 * available to be read go from the readPosition to the writePosition,
    	 * wrapping around the end of the buffer.  The space available for writing
    	 * goes from the write position to one less than the readPosition,
    	 * wrapping around the end of the buffer.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	protected ElementType[] buffer;
    	/**
    	 * Index of the first Object available to be read.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	protected volatile int readPosition = 0;
    	/**
    	 * Index of the first Object available to be written.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	protected volatile int writePosition = 0;
    	/**
    	 * If this buffer is infinite (should resize itself when full)
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	protected volatile boolean infinite = false;
    	/**
    	 * True if a write to a full buffer should block until the buffer
    	 * has room, false if the write method should throw an IOException
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	protected boolean blockingWrite = true;
    
    	/**
    	 * True when no more input is coming into this buffer.  At that
    	 * point reading from the buffer may return  null if the buffer
    	 * is empty, otherwise a read will block until an Object is available.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	protected boolean inputDone = false;
    
    	/**
    	 * Make this buffer ready for reuse.  The contents of the buffer
    	 * will be cleared and the streams associated with this buffer
    	 * will be reopened if they had been closed.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public void clear(){
    		synchronized (this){
    			readPosition = 0;
    			writePosition = 0;
    			inputDone = false;
    		}
    	}
    
    	/**
    	 * Get number of Objects that are available to be read.
    	 * <p>
    	 * Note that the number of Objects available plus
    	 * the number of Objects free may not add up to the
    	 * capacity of this buffer, as the buffer may reserve some
    	 * space for other purposes.
    	 *
    	 * @return the size in Objects of this buffer
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public int getAvailable(){
    		synchronized (this){
    			return available();
    		}
    	}
    
    	/**
    	 * Get the number of Objects this buffer has free for
    	 * writing.
    	 * <p>
    	 * Note that the number of Objects available plus
    	 * the number of Objects free may not add up to the
    	 * capacity of this buffer, as the buffer may reserve some
    	 * space for other purposes.
    	 *
    	 * @return the available space in Objects of this buffer
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public int getSpaceLeft(){
    		synchronized (this){
    			return spaceLeft();
    		}
    	}
    
    	/**
    	 * Get the capacity of this buffer.
    	 * <p>
    	 * Note that the number of Objects available plus
    	 * the number of Objects free may not add up to the
    	 * capacity of this buffer, as the buffer may reserve some
    	 * space for other purposes.
    	 *
    	 * @return the size in Objects of this buffer
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public int getSize(){
    		synchronized (this){
    			return buffer.length;
    		}
    	}
    
    	@SuppressWarnings("unchecked") private ElementType[] createArray(int size){
    		return (ElementType[]) new Object[size];
    	}
    
    	/**
    	 * double the size of the buffer
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	private void resize(){
    		ElementType[] newBuffer = createArray(buffer.length * 2);
    		int available = available();
    		if (readPosition <= writePosition){
    			// any space between the read and
    			// the first write needs to be saved.
    			// In this case it is all in one piece.
    			int length = writePosition - readPosition;
    			System.arraycopy(buffer, readPosition, newBuffer, 0, length);
    		} else {
    			int length1 = buffer.length - readPosition;
    			System.arraycopy(buffer, readPosition, newBuffer, 0, length1);
    			int length2 = writePosition;
    			System.arraycopy(buffer, 0, newBuffer, length1, length2);
    		}
    		buffer = newBuffer;
    		readPosition = 0;
    		writePosition = available;
    	}
    
    	/**
    	 * Space available in the buffer which can be written.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	private int spaceLeft(){
    		if (writePosition < readPosition){
    			// any space between the first write and
    			// the read except one Object is available.
    			// In this case it is all in one piece.
    			return (readPosition - writePosition - 1);
    		}
    		// space at the beginning and end.
    		return ((buffer.length - 1) - (writePosition - readPosition));
    	}
    
    	/**
    	 * Objects available for reading.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	private int available(){
    		if (readPosition <= writePosition){
    			// any space between the first read and
    			// the first write is available.  In this case i
    			// is all in one piece.
    			return (writePosition - readPosition);
    		}
    		// space at the beginning and end.
    		return (buffer.length - (readPosition - writePosition));
    	}
    
    	/**
    	 * Create a new buffer with a default capacity.
    	 * Writing to a full buffer will block until space
    	 * is available rather than throw an exception.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public CircularObjectBuffer(){
    		this (DEFAULT_SIZE, true);
    	}
    
    	/**
    	 * Create a new buffer with given capacity.
    	 * Writing to a full buffer will block until space
    	 * is available rather than throw an exception.
    	 * <p>
    	 * Note that the buffer may reserve some Objects for
    	 * special purposes and capacity number of Objects may
    	 * not be able to be written to the buffer.
    	 * <p>
    	 * Note that if the buffer is of INFINITE_SIZE it will
    	 * neither block or throw exceptions, but rather grow
    	 * without bound.
    	 *
    	 * @param size desired capacity of the buffer in Objects or CircularObjectBuffer.INFINITE_SIZE.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public CircularObjectBuffer(int size){
    		this (size, true);
    	}
    
    	/**
    	 * Create a new buffer with a default capacity and
    	 * given blocking behavior.
    	 *
    	 * @param blockingWrite true writing to a full buffer should block
    	 *        until space is available, false if an exception should
    	 *        be thrown instead.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public CircularObjectBuffer(boolean blockingWrite){
    		this (DEFAULT_SIZE, blockingWrite);
    	}
    
    	/**
    	 * Create a new buffer with the given capacity and
    	 * blocking behavior.
    	 * <p>
    	 * Note that the buffer may reserve some Objects for
    	 * special purposes and capacity number of Objects may
    	 * not be able to be written to the buffer.
    	 * <p>
    	 * Note that if the buffer is of INFINITE_SIZE it will
    	 * neither block or throw exceptions, but rather grow
    	 * without bound.
    	 *
    	 * @param size desired capacity of the buffer in Objects or CircularObjectBuffer.INFINITE_SIZE.
    	 * @param blockingWrite true writing to a full buffer should block
    	 *        until space is available, false if an exception should
    	 *        be thrown instead.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public CircularObjectBuffer(int size, boolean blockingWrite){
    		if (size == INFINITE_SIZE){
    			buffer = createArray(DEFAULT_SIZE);
    			infinite = true;
    		} else {
    			buffer = createArray(size);
    			infinite = false;
    		}
    		this.blockingWrite = blockingWrite;
    	}
    
    
    	/**
    	 * Get a single Object from this buffer.  This method should be called
    	 * by the consumer.
    	 * This method will block until a Object is available or no more
    	 * objects are available.
    	 *
    	 * @return The Object read, or null if there are no more objects
    	 * @throws InterruptedException if the thread is interrupted while waiting.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public ElementType read() throws InterruptedException {
    		while (true){
    			synchronized (this){
    				int available = available();
    				if (available > 0){
    					ElementType result = buffer[readPosition];
    					readPosition++;
    					if (readPosition == buffer.length){
    						readPosition = 0;
    					}
    					return result;
    				} else if (inputDone){
    					return null;
    				}
    			}
    			Thread.sleep(100);
    		}
    	}
    
    	/**
    	 * Get Objects into an array from this buffer.  This method should
    	 * be called by the consumer.
    	 * This method will block until some input is available,
    	 * or there is no more input.
    	 *
    	 * @param buf Destination buffer.
    	 * @return The number of Objects read, or -1 there will
    	 *     be no more objects available.
    	 * @throws InterruptedException if the thread is interrupted while waiting.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public int read(ElementType[] buf) throws InterruptedException {
    		return read(buf, 0, buf.length);
    	}
    
    	/**
    	 * Get Objects into a portion of an array from this buffer.  This
    	 * method should be called by the consumer.
    	 * This method will block until some input is available,
    	 * an I/O error occurs, or the end of the stream is reached.
    	 *
    	 * @param buf Destination buffer.
    	 * @param off Offset at which to start storing Objects.
    	 * @param len Maximum number of Objects to read.
    	 * @return The number of Objects read, or -1 there will
    	 *     be no more objects available.
    	 * @throws InterruptedException if the thread is interrupted while waiting.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public int read(ElementType[] buf, int off, int len) throws InterruptedException {
    		while (true){
    			synchronized (this){
    				int available = available();
    				if (available > 0){
    					int length = Math.min(len, available);
    					int firstLen = Math.min(length, buffer.length - readPosition);
    					int secondLen = length - firstLen;
    					System.arraycopy(buffer, readPosition, buf, off, firstLen);
    					if (secondLen > 0){
    						System.arraycopy(buffer, 0, buf, off+firstLen,  secondLen);
    						readPosition = secondLen;
    					} else {
    						readPosition += length;
    					}
    					if (readPosition == buffer.length) {
    						readPosition = 0;
    					}
    					return length;
    				} else if (inputDone){
    					return -1;
    				}
    			}
    			Thread.sleep(100);
    		}
    	}
    
    
    	/**
    	 * Skip Objects.  This method should be used by the consumer
    	 * when it does not care to examine some number of Objects.
    	 * This method will block until some Objects are available,
    	 * or there will be no more Objects available.
    	 *
    	 * @param n The number of Objects to skip
    	 * @return The number of Objects actually skipped
    	 * @throws IllegalArgumentException if n is negative.
    	 * @throws InterruptedException if the thread is interrupted while waiting.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public long skip(long n) throws InterruptedException, IllegalArgumentException {
    		while (true){
    			synchronized (this){
    				int available = available();
    				if (available > 0){
    					int length = Math.min((int)n, available);
    					int firstLen = Math.min(length, buffer.length - readPosition);
    					int secondLen = length - firstLen;
    					if (secondLen > 0){
    						readPosition = secondLen;
    					} else {
    						readPosition += length;
    					}
    					if (readPosition == buffer.length) {
    						readPosition = 0;
    					}
    					return length;
    				} else if (inputDone){
    					return 0;
    				}
    			}
    			Thread.sleep(100);
    		}
    	}
    
    	/**
    	 * This method should be used by the producer to signal to the consumer
    	 * that the producer is done producing objects and that the consumer
    	 * should stop asking for objects once it has used up buffered objects.
    	 * <p>
    	 * Once the producer has signaled that it is done, further write() invocations
    	 * will cause an IllegalStateException to be thrown. Calling done() multiple times,
    	 * however, has no effect.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public void done(){
    		synchronized (this){
    			inputDone = true;
    		}
    	}
    
    	/**
    	 * Fill this buffer with array of Objects.  This method should be called
    	 * by the producer.
    	 * If the buffer allows blocking writes, this method will block until
    	 * all the data has been written rather than throw a BufferOverflowException.
    	 *
    	 * @param buf Array of Objects to be written
    	 * @throws BufferOverflowException if buffer does not allow blocking writes
    	 *   and the buffer is full.  If the exception is thrown, no data
    	 *   will have been written since the buffer was set to be non-blocking.
    	 * @throws IllegalStateException if done() has been called.
    	 * @throws InterruptedException if the write is interrupted.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public void write(ElementType[] buf) throws BufferOverflowException, IllegalStateException, InterruptedException {
    		write(buf, 0, buf.length);
    	}
    
    	/**
    	 * Fill this buffer with a portion of an array of Objects.
    	 * This method should be called by the producer.
    	 * If the buffer allows blocking writes, this method will block until
    	 * all the data has been written rather than throw an IOException.
    	 *
    	 * @param buf Array of Objects
    	 * @param off Offset from which to start writing Objects
    	 * @param len - Number of Objects to write
    	 * @throws BufferOverflowException if buffer does not allow blocking writes
    	 *   and the buffer is full.  If the exception is thrown, no data
    	 *   will have been written since the buffer was set to be non-blocking.
    	 * @throws IllegalStateException if done() has been called.
    	 * @throws InterruptedException if the write is interrupted.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public void write(ElementType[] buf, int off, int len) throws BufferOverflowException, IllegalStateException, InterruptedException {
    		while (len > 0){
    			synchronized (CircularObjectBuffer.this){
    				if (inputDone) throw new IllegalStateException("CircularObjectBuffer.done() has been called, CircularObjectBuffer.write() failed.");
    				int spaceLeft = spaceLeft();
    				while (infinite && spaceLeft < len){
    					resize();
    					spaceLeft = spaceLeft();
    				}
    				if (!blockingWrite && spaceLeft < len) throw new BufferOverflowException("CircularObjectBuffer is full; cannot write " + len + " Objects");
    				int realLen = Math.min(len, spaceLeft);
    				int firstLen = Math.min(realLen, buffer.length - writePosition);
    				int secondLen = Math.min(realLen - firstLen, buffer.length - readPosition - 1);
    				int written = firstLen + secondLen;
    				if (firstLen > 0){
    					System.arraycopy(buf, off, buffer, writePosition, firstLen);
    				}
    				if (secondLen > 0){
    					System.arraycopy(buf, off+firstLen, buffer, 0, secondLen);
    					writePosition = secondLen;
    				} else {
    					writePosition += written;
    				}
    				if (writePosition == buffer.length) {
    					writePosition = 0;
    				}
    				off += written;
    				len -= written;
    			}
    			if (len > 0){
    				Thread.sleep(100);
    			}
    		}
    	}
    
    	/**
    	 * Add a single Object to this buffer.  This method should be
    	 * called by the producer.
    	 * If the buffer allows blocking writes, this method will block until
    	 * all the data has been written rather than throw an IOException.
    	 *
    	 * @param o Object to be written.
    	 * @throws BufferOverflowException if buffer does not allow blocking writes
    	 *   and the buffer is full.  If the exception is thrown, no data
    	 *   will have been written since the buffer was set to be non-blocking.
    	 * @throws IllegalStateException if done() has been called.
    	 * @throws InterruptedException if the write is interrupted.
    	 *
    	 * @since ostermillerutils 1.00.00
    	 */
    	public void write(ElementType o) throws BufferOverflowException, IllegalStateException, InterruptedException {
    		boolean written = false;
    		while (!written){
    			synchronized (CircularObjectBuffer.this){
    				if (inputDone) throw new IllegalStateException("CircularObjectBuffer.done() has been called, CircularObjectBuffer.write() failed.");
    				int spaceLeft = spaceLeft();
    				while (infinite && spaceLeft < 1){
    					resize();
    					spaceLeft = spaceLeft();
    				}
    				if (!blockingWrite && spaceLeft < 1) throw new BufferOverflowException("CircularObjectBuffer is full; cannot write 1 Object");
    				if (spaceLeft > 0){
    					buffer[writePosition] = o;
    					writePosition++;
    					if (writePosition == buffer.length) {
    						writePosition = 0;
    					}
    					written = true;
    				}
    			}
    			if (!written){
    				Thread.sleep(100);
    			}
    		}
    	}
    }