public class OpennmsGrpcServer extends AbstractMessageConsumerManager implements RpcClientFactory
RPC : RPC runs in bi-directional streaming mode. OpenNMS needs a client(minion) handle for sending RPC request so minion always sends it's headers (SystemId/location) when it initializes. This Server maintains a list of client(minion) handles and sends RPC request to each minion in round-robin fashion. When it is directed RPC, server invokes specific minion handle directly. For each RPC request received, server creates a rpcId and maintains the state of this request in the concurrent map. The request is also added to a delay queue which can timeout the request if response is not received within expiration time. RPC responses are received in the observers that are created at start. Each response handling is done in a separate thread which may be used by rpc module to process the response.
Sink: Sink runs in uni-directional streaming mode. OpenNMS receives sink messages from client and they are dispatched in the consumer threads that are initialized at start.
SINK_INITIAL_SLEEP_TIME, startupExecutor, waitForStartup
JMX_DOMAIN_RPC, LOG_PREFIX, RPC_COUNT, RPC_DURATION, RPC_FAILED, RPC_REQUEST_SIZE, RPC_RESPONSE_SIZE
LOG_PREFIX, METRIC_DISPATCH_TIME, METRIC_MESSAGE_SIZE, METRIC_MESSAGES_RECEIVED
Constructor and Description |
---|
OpennmsGrpcServer() |
Modifier and Type | Method and Description |
---|---|
<S extends RpcRequest,T extends RpcResponse> |
getClient(RpcModule<S,T> module) |
Identity |
getIdentity() |
String |
getLocation() |
io.grpc.stub.StreamObserver<RpcRequestProto> |
getRpcHandler(String location,
String systemId) |
com.google.common.collect.Multimap<String,io.grpc.stub.StreamObserver<RpcRequestProto>> |
getRpcHandlerByLocation() |
com.codahale.metrics.MetricRegistry |
getSinkMetrics() |
io.opentracing.Tracer |
getTracer() |
TracerRegistry |
getTracerRegistry() |
void |
setConfigAdmin(org.osgi.service.cm.ConfigurationAdmin configAdmin) |
void |
setIdentity(Identity identity) |
void |
setLocation(String location) |
void |
setRpcMetrics(com.codahale.metrics.MetricRegistry metricRegistry) |
void |
setSinkMetrics(com.codahale.metrics.MetricRegistry sinkMetrics) |
void |
setTracerRegistry(TracerRegistry tracerRegistry) |
void |
shutdown() |
void |
start() |
protected void |
startConsumingForModule(SinkModule<?,Message> module) |
protected void |
stopConsumingForModule(SinkModule<?,Message> module) |
dispatch, getNumConsumerThreads, getStartupExecutor, registerConsumer, unregisterAllConsumers, unregisterConsumer
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
markFailed, markRpcCount, updateDuration, updateRequestSize, updateResponseSize
getDispatchTimerMetric, updateMessageSize
public void start() throws IOException
IOException
protected void startConsumingForModule(SinkModule<?,Message> module) throws Exception
startConsumingForModule
in class AbstractMessageConsumerManager
Exception
protected void stopConsumingForModule(SinkModule<?,Message> module) throws Exception
stopConsumingForModule
in class AbstractMessageConsumerManager
Exception
public <S extends RpcRequest,T extends RpcResponse> RpcClient<S,T> getClient(RpcModule<S,T> module)
getClient
in interface RpcClientFactory
public io.grpc.stub.StreamObserver<RpcRequestProto> getRpcHandler(String location, String systemId)
public String getLocation()
public void setLocation(String location)
public Identity getIdentity()
public void setIdentity(Identity identity)
public void setConfigAdmin(org.osgi.service.cm.ConfigurationAdmin configAdmin)
public void setRpcMetrics(com.codahale.metrics.MetricRegistry metricRegistry)
public com.codahale.metrics.MetricRegistry getSinkMetrics()
public void setSinkMetrics(com.codahale.metrics.MetricRegistry sinkMetrics)
public TracerRegistry getTracerRegistry()
public void setTracerRegistry(TracerRegistry tracerRegistry)
public io.opentracing.Tracer getTracer()
public void shutdown()
public com.google.common.collect.Multimap<String,io.grpc.stub.StreamObserver<RpcRequestProto>> getRpcHandlerByLocation()
Copyright © 2020. All rights reserved.