Class QueueFileOffHeapDispatchQueue<T>

  • Type Parameters:
    T - the type being queued
    All Implemented Interfaces:
    DispatchQueue<T>

    public class QueueFileOffHeapDispatchQueue<T>
    extends java.lang.Object
    implements DispatchQueue<T>
    A DispatchQueue that first attempts to queue items in memory and upon overflowing the allocated in-memory queue writes items "off heap" directly to disk via a file. The in-memory queue is volatile and if the process crashes its contents are lost. The contents written to disk are durable and in the event of a crash will be reloaded.

    This queue can be configured to only queue to memory by specifying the maximum off-heap size of 0. Using this configuration causes enqueue(T, java.lang.String) to block when the in-memory queue fills up rather than writing to the off-heap queue.

    Before queued items are written to disk they are first accumulated in a batch to limit the number of discrete writes we make to disk. The batched items are considered part of the in-memory portion of the queue and are also volatile.

    • Constructor Summary

      Constructors 
      Constructor Description
      QueueFileOffHeapDispatchQueue​(java.util.function.Function<T,​byte[]> serializer, java.util.function.Function<byte[],​T> deserializer, java.lang.String moduleName, java.nio.file.Path filePath, int inMemoryQueueSize, int batchSize, long maxFileSizeInBytes)  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      long checkFileSize()  
      java.util.Map.Entry<java.lang.String,​T> dequeue()
      On every call to dequeue, if the off-heap queue is configured, we check the file for an entry and drain it to the in-memory queue provided there is room.
      DispatchQueue.EnqueueResult enqueue​(T message, java.lang.String key)
      When enqueueing we prefer the in-memory queue unless the file based queue is already utilized.
      int getSize()  
      boolean isFull()  
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • QueueFileOffHeapDispatchQueue

        public QueueFileOffHeapDispatchQueue​(java.util.function.Function<T,​byte[]> serializer,
                                             java.util.function.Function<byte[],​T> deserializer,
                                             java.lang.String moduleName,
                                             java.nio.file.Path filePath,
                                             int inMemoryQueueSize,
                                             int batchSize,
                                             long maxFileSizeInBytes)
                                      throws java.io.IOException
        Throws:
        java.io.IOException
    • Method Detail

      • checkFileSize

        public long checkFileSize()
      • enqueue

        public DispatchQueue.EnqueueResult enqueue​(T message,
                                                   java.lang.String key)
                                            throws WriteFailedException
        When enqueueing we prefer the in-memory queue unless the file based queue is already utilized. If that fails (because it is full) we then enqueue via the file based queue provided it is not currently full and has been configured. If the file based queue is full or not configured we block and wait for capacity.

        We only write to the file based queue when we have a full batch ready. The batch container is then emptied after being written to disk.

        Specified by:
        enqueue in interface DispatchQueue<T>
        Throws:
        WriteFailedException
      • dequeue

        public java.util.Map.Entry<java.lang.String,​T> dequeue()
                                                              throws java.lang.InterruptedException
        On every call to dequeue, if the off-heap queue is configured, we check the file for an entry and drain it to the in-memory queue provided there is room. We then take exclusively from the head of the in-memory queue which ensures ordering with respect to the two discrete queues.

        After completely draining the queue on disk we check the existing batch for entries and drain them next.

        Specified by:
        dequeue in interface DispatchQueue<T>
        Throws:
        java.lang.InterruptedException - if interrupted while waiting
      • isFull

        public boolean isFull()
        Specified by:
        isFull in interface DispatchQueue<T>
        Returns:
        true if the queue has no more capacity, false otherwise
      • getSize

        public int getSize()
        Specified by:
        getSize in interface DispatchQueue<T>
        Returns:
        the current number of queued items