Class QueueFileOffHeapDispatchQueue<T>
- java.lang.Object
-
- org.opennms.core.ipc.sink.offheap.QueueFileOffHeapDispatchQueue<T>
-
- Type Parameters:
T
- the type being queued
- All Implemented Interfaces:
DispatchQueue<T>
public class QueueFileOffHeapDispatchQueue<T> extends java.lang.Object implements DispatchQueue<T>
ADispatchQueue
that first attempts to queue items in memory and upon overflowing the allocated in-memory queue writes items "off heap" directly to disk via a file. The in-memory queue is volatile and if the process crashes its contents are lost. The contents written to disk are durable and in the event of a crash will be reloaded.This queue can be configured to only queue to memory by specifying the maximum off-heap size of 0. Using this configuration causes
enqueue(T, java.lang.String)
to block when the in-memory queue fills up rather than writing to the off-heap queue.Before queued items are written to disk they are first accumulated in a batch to limit the number of discrete writes we make to disk. The batched items are considered part of the in-memory portion of the queue and are also volatile.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.opennms.core.ipc.sink.api.DispatchQueue
DispatchQueue.EnqueueResult
-
-
Constructor Summary
Constructors Constructor Description QueueFileOffHeapDispatchQueue(java.util.function.Function<T,byte[]> serializer, java.util.function.Function<byte[],T> deserializer, java.lang.String moduleName, java.nio.file.Path filePath, int inMemoryQueueSize, int batchSize, long maxFileSizeInBytes)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description long
checkFileSize()
java.util.Map.Entry<java.lang.String,T>
dequeue()
On every call to dequeue, if the off-heap queue is configured, we check the file for an entry and drain it to the in-memory queue provided there is room.DispatchQueue.EnqueueResult
enqueue(T message, java.lang.String key)
When enqueueing we prefer the in-memory queue unless the file based queue is already utilized.int
getSize()
boolean
isFull()
-
-
-
Constructor Detail
-
QueueFileOffHeapDispatchQueue
public QueueFileOffHeapDispatchQueue(java.util.function.Function<T,byte[]> serializer, java.util.function.Function<byte[],T> deserializer, java.lang.String moduleName, java.nio.file.Path filePath, int inMemoryQueueSize, int batchSize, long maxFileSizeInBytes) throws java.io.IOException
- Throws:
java.io.IOException
-
-
Method Detail
-
checkFileSize
public long checkFileSize()
-
enqueue
public DispatchQueue.EnqueueResult enqueue(T message, java.lang.String key) throws WriteFailedException
When enqueueing we prefer the in-memory queue unless the file based queue is already utilized. If that fails (because it is full) we then enqueue via the file based queue provided it is not currently full and has been configured. If the file based queue is full or not configured we block and wait for capacity.We only write to the file based queue when we have a full batch ready. The batch container is then emptied after being written to disk.
- Specified by:
enqueue
in interfaceDispatchQueue<T>
- Throws:
WriteFailedException
-
dequeue
public java.util.Map.Entry<java.lang.String,T> dequeue() throws java.lang.InterruptedException
On every call to dequeue, if the off-heap queue is configured, we check the file for an entry and drain it to the in-memory queue provided there is room. We then take exclusively from the head of the in-memory queue which ensures ordering with respect to the two discrete queues.After completely draining the queue on disk we check the existing batch for entries and drain them next.
- Specified by:
dequeue
in interfaceDispatchQueue<T>
- Throws:
java.lang.InterruptedException
- if interrupted while waiting
-
isFull
public boolean isFull()
- Specified by:
isFull
in interfaceDispatchQueue<T>
- Returns:
- true if the queue has no more capacity, false otherwise
-
getSize
public int getSize()
- Specified by:
getSize
in interfaceDispatchQueue<T>
- Returns:
- the current number of queued items
-
-