Package org.opennms.core.ipc.sink.api
Interface AggregationPolicy<S,T,U>
-
- Type Parameters:
S
- type of message that will be sent by the producersT
- type of message that will be received by the consumersU
- intermerdiary accumulator type used to aggregate the messages
- All Known Implementing Classes:
ArrayListAggregationPolicy
,IdentityAggregationPolicy
,MappingAggregationPolicy
public interface AggregationPolicy<S,T,U>
Defines how messages will be aggregated. When aSinkModule
defines aAggregationPolicy
, messages will be aggregated by accumulators, which are keyed based on the object returned bykey(Object)
. The aggregation functionaggregate(Object, Object)
is called to create accumulators and add messages to existing accumulators. The completion size and completion interval options determine the conditions under which the buckets will be dispatched.- Author:
- jwhite
-
-
Method Summary
All Methods Instance Methods Abstract Methods Modifier and Type Method Description U
aggregate(U accumulator, S newMessage)
Aggregate the given message into an existing accumulator, or create a new accumulator if no accumulator exists.T
build(U accumulator)
Build the resulting message from the accumulator.int
getCompletionIntervalMs()
Maximum number of milliseconds for which buckets should continue accumulating messages after creation.int
getCompletionSize()
Maximum number of messages to be added to a bucket before dispatching.java.lang.Object
key(S message)
Calculate a key for the given message.
-
-
-
Method Detail
-
getCompletionSize
int getCompletionSize()
Maximum number of messages to be added to a bucket before dispatching. If this value is <= 1, the buckets should be dispatched immediately after adding a single element.- Returns:
- maximum number of messages per bucket
-
getCompletionIntervalMs
int getCompletionIntervalMs()
Maximum number of milliseconds for which buckets should continue accumulating messages after creation. If a bucket has been created for longer than this interval, it will be dispatched regardless of it's current size. Values <= 0 will disable periodic flushing and buckets will only be dispatched once they have reached the maximum size.- Returns:
- number of milliseconds to keep a bucket before dispatching
-
key
java.lang.Object key(S message)
Calculate a key for the given message. Objects with the same key will be aggregated together. Returned values should non-null.- Parameters:
message
- the message- Returns:
- the message's key
-
aggregate
U aggregate(U accumulator, S newMessage)
Aggregate the given message into an existing accumulator, or create a new accumulator if no accumulator exists.- Parameters:
accumulator
- the existing accumulator, ornull
if a new accumulator should be creatednewMessage
- the message to aggregate- Returns:
- the new or updated accumulator
-
-