Class OpennmsKafkaProducer
- java.lang.Object
-
- org.opennms.features.kafka.producer.OpennmsKafkaProducer
-
- All Implemented Interfaces:
AlarmFeedbackListener
,AlarmLifecycleListener
,EventListener
,ThreadAwareEventListener
,OnmsTopologyConsumer
public class OpennmsKafkaProducer extends java.lang.Object implements AlarmLifecycleListener, EventListener, AlarmFeedbackListener, OnmsTopologyConsumer, ThreadAwareEventListener
-
-
Field Summary
Fields Modifier and Type Field Description static java.lang.String
KAFKA_CLIENT_PID
-
Constructor Summary
Constructors Constructor Description OpennmsKafkaProducer(ProtobufMapper protobufMapper, NodeCache nodeCache, org.osgi.service.cm.ConfigurationAdmin configAdmin, EventSubscriptionService eventSubscriptionService, OnmsTopologyDao topologyDao, int nodeAsyncUpdateThreads)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
consume(OnmsTopologyMessage message)
void
destroy()
AlarmCallbackStateTracker
getAlarmCallbackStateTracker()
java.util.concurrent.CountDownLatch
getAlarmFeedbackForwardedLatch()
java.util.concurrent.CountDownLatch
getAlarmForwardedLatch()
java.lang.String
getEncoding()
java.util.concurrent.CountDownLatch
getEventForwardedLatch()
java.util.concurrent.CountDownLatch
getForwardedTopologyEdgeMessage()
java.util.concurrent.CountDownLatch
getForwardedTopologyVertexMessage()
java.lang.String
getName()
Return the id of the listenerjava.util.concurrent.CountDownLatch
getNodeForwardedLatch()
int
getNumEventListenerThreads()
int
getNumThreads()
java.util.Set<OnmsTopologyProtocol>
getProtocols()
void
handleAlarmFeedback(java.util.List<AlarmFeedback> alarmFeedback)
Handle the newly generated collection of alarm feedback.void
handleAlarmSnapshot(java.util.List<OnmsAlarm> alarms)
Called periodically with a complete set of alarms as present in the database at the given timestamp.void
handleDeletedAlarm(int alarmId, java.lang.String reductionKey)
Called when an alarm has been deleted.void
handleNewOrUpdatedAlarm(OnmsAlarm alarm)
Called when an alarm has been created or updated.void
init()
boolean
isForwardingAlarms()
void
onEvent(IEvent event)
Process a sent event.void
postHandleAlarmSnapshot()
Called afterAlarmLifecycleListener.handleAlarmSnapshot(java.util.List<org.opennms.netmgt.model.OnmsAlarm>)
has been called on all the listeners, and after the session & transaction used to perform the snapshot has been closed.void
preHandleAlarmSnapshot()
Called before the transaction is opened and the alarms are read for subsequent calls toAlarmLifecycleListener.handleAlarmSnapshot(java.util.List<org.opennms.netmgt.model.OnmsAlarm>)
.void
setAlarmFeedbackTopic(java.lang.String alarmFeedbackTopic)
void
setAlarmFilter(java.lang.String alarmFilter)
void
setAlarmTopic(java.lang.String alarmTopic)
OpennmsKafkaProducer
setDataSync(KafkaAlarmDataSync dataSync)
void
setEncoding(java.lang.String encoding)
void
setEventFilter(java.lang.String eventFilter)
void
setEventTopic(java.lang.String eventTopic)
void
setKafkaSendQueueCapacity(int kafkaSendQueueCapacity)
void
setNodeTopic(java.lang.String nodeTopic)
void
setNumEventListenerThreads(int numEventListenerThreads)
void
setSuppressIncrementalAlarms(boolean suppressIncrementalAlarms)
void
setTopologyEdgeTopic(java.lang.String topologyEdgeTopic)
void
setTopologyVertexTopic(java.lang.String topologyVertexTopic)
boolean
shouldForwardAlarm(OnmsAlarm alarm)
-
-
-
Field Detail
-
KAFKA_CLIENT_PID
public static final java.lang.String KAFKA_CLIENT_PID
- See Also:
- Constant Field Values
-
-
Constructor Detail
-
OpennmsKafkaProducer
public OpennmsKafkaProducer(ProtobufMapper protobufMapper, NodeCache nodeCache, org.osgi.service.cm.ConfigurationAdmin configAdmin, EventSubscriptionService eventSubscriptionService, OnmsTopologyDao topologyDao, int nodeAsyncUpdateThreads)
-
-
Method Detail
-
init
public void init() throws java.io.IOException
- Throws:
java.io.IOException
-
destroy
public void destroy()
-
shouldForwardAlarm
public boolean shouldForwardAlarm(OnmsAlarm alarm)
-
handleAlarmSnapshot
public void handleAlarmSnapshot(java.util.List<OnmsAlarm> alarms)
Description copied from interface:AlarmLifecycleListener
Called periodically with a complete set of alarms as present in the database at the given timestamp. This should be used to synchronize any state to ensure it matches what is currently in the database. Note that it is possible that the *current* state of alarms is different from the state at the time at which the snapshot was taken. Implementations should take this in consideration when performing any state synchronization. This method will be called while the related session & transaction that created the alarm are still open. All of the listeners are invoked serially, so the implementors should avoid blocking when possible.- Specified by:
handleAlarmSnapshot
in interfaceAlarmLifecycleListener
- Parameters:
alarms
- canonical set of alarms in the database
-
preHandleAlarmSnapshot
public void preHandleAlarmSnapshot()
Description copied from interface:AlarmLifecycleListener
Called before the transaction is opened and the alarms are read for subsequent calls toAlarmLifecycleListener.handleAlarmSnapshot(java.util.List<org.opennms.netmgt.model.OnmsAlarm>)
. This can be used to trigger any necessary state tracking to accurately handle the snapshot results.- Specified by:
preHandleAlarmSnapshot
in interfaceAlarmLifecycleListener
-
postHandleAlarmSnapshot
public void postHandleAlarmSnapshot()
Description copied from interface:AlarmLifecycleListener
Called afterAlarmLifecycleListener.handleAlarmSnapshot(java.util.List<org.opennms.netmgt.model.OnmsAlarm>)
has been called on all the listeners, and after the session & transaction used to perform the snapshot has been closed. This can be used to trigger any necessary post-processing of the results once the related session has been closed. This function may be called immediately after a call toAlarmLifecycleListener.preHandleAlarmSnapshot()
if an error occurred while preparing the snapshot i.e. when opening the transaction.- Specified by:
postHandleAlarmSnapshot
in interfaceAlarmLifecycleListener
-
handleNewOrUpdatedAlarm
public void handleNewOrUpdatedAlarm(OnmsAlarm alarm)
Description copied from interface:AlarmLifecycleListener
Called when an alarm has been created or updated. This method will be called while the related session & transaction that created the alarm are still open. All of the listeners are invoked serially, so the implementors should avoid blocking when possible.- Specified by:
handleNewOrUpdatedAlarm
in interfaceAlarmLifecycleListener
- Parameters:
alarm
- a newly created or updated alarm
-
handleDeletedAlarm
public void handleDeletedAlarm(int alarmId, java.lang.String reductionKey)
Description copied from interface:AlarmLifecycleListener
Called when an alarm has been deleted. This method will be called while the related session & transaction that created the alarm are still open. All of the listeners are invoked serially, so the implementors should avoid blocking when possible.- Specified by:
handleDeletedAlarm
in interfaceAlarmLifecycleListener
- Parameters:
alarmId
- id of the alarm that was deletedreductionKey
- reduction key of the alarm that was deleted
-
getName
public java.lang.String getName()
Description copied from interface:EventListener
Return the id of the listener- Specified by:
getName
in interfaceEventListener
- Specified by:
getName
in interfaceOnmsTopologyConsumer
- Returns:
- a
String
object.
-
onEvent
public void onEvent(IEvent event)
Description copied from interface:EventListener
Process a sent event.- Specified by:
onEvent
in interfaceEventListener
- Parameters:
event
- aIEvent
object.
-
getProtocols
public java.util.Set<OnmsTopologyProtocol> getProtocols()
- Specified by:
getProtocols
in interfaceOnmsTopologyConsumer
-
consume
public void consume(OnmsTopologyMessage message)
- Specified by:
consume
in interfaceOnmsTopologyConsumer
-
setTopologyVertexTopic
public void setTopologyVertexTopic(java.lang.String topologyVertexTopic)
-
setTopologyEdgeTopic
public void setTopologyEdgeTopic(java.lang.String topologyEdgeTopic)
-
setEventTopic
public void setEventTopic(java.lang.String eventTopic)
-
setAlarmTopic
public void setAlarmTopic(java.lang.String alarmTopic)
-
setNodeTopic
public void setNodeTopic(java.lang.String nodeTopic)
-
setAlarmFeedbackTopic
public void setAlarmFeedbackTopic(java.lang.String alarmFeedbackTopic)
-
setEventFilter
public void setEventFilter(java.lang.String eventFilter)
-
setAlarmFilter
public void setAlarmFilter(java.lang.String alarmFilter)
-
setDataSync
public OpennmsKafkaProducer setDataSync(KafkaAlarmDataSync dataSync)
-
handleAlarmFeedback
public void handleAlarmFeedback(java.util.List<AlarmFeedback> alarmFeedback)
Description copied from interface:AlarmFeedbackListener
Handle the newly generated collection of alarm feedback.- Specified by:
handleAlarmFeedback
in interfaceAlarmFeedbackListener
- Parameters:
alarmFeedback
- the collection of alarm feedback
-
isForwardingAlarms
public boolean isForwardingAlarms()
-
getEventForwardedLatch
public java.util.concurrent.CountDownLatch getEventForwardedLatch()
-
getAlarmForwardedLatch
public java.util.concurrent.CountDownLatch getAlarmForwardedLatch()
-
getNodeForwardedLatch
public java.util.concurrent.CountDownLatch getNodeForwardedLatch()
-
getAlarmFeedbackForwardedLatch
public java.util.concurrent.CountDownLatch getAlarmFeedbackForwardedLatch()
-
setSuppressIncrementalAlarms
public void setSuppressIncrementalAlarms(boolean suppressIncrementalAlarms)
-
getAlarmCallbackStateTracker
public AlarmCallbackStateTracker getAlarmCallbackStateTracker()
-
setKafkaSendQueueCapacity
public void setKafkaSendQueueCapacity(int kafkaSendQueueCapacity)
-
getNumThreads
public int getNumThreads()
- Specified by:
getNumThreads
in interfaceThreadAwareEventListener
-
getForwardedTopologyVertexMessage
public java.util.concurrent.CountDownLatch getForwardedTopologyVertexMessage()
-
getForwardedTopologyEdgeMessage
public java.util.concurrent.CountDownLatch getForwardedTopologyEdgeMessage()
-
getEncoding
public java.lang.String getEncoding()
-
setEncoding
public void setEncoding(java.lang.String encoding)
-
getNumEventListenerThreads
public int getNumEventListenerThreads()
-
setNumEventListenerThreads
public void setNumEventListenerThreads(int numEventListenerThreads)
-
-