public abstract class BlockingEnvelopeMap extends java.lang.Object implements SystemConsumer
BlockingEnvelopeMap is a helper class for SystemConsumer implementations. Samza's poll() requirements make implementing SystemConsumers somewhat tricky. BlockingEnvelopeMap is provided to help other developers write SystemConsumers. The intended audience is not those writing Samza jobs, but rather those extending Samza to consume from new types of stream providers and other systems.
SystemConsumers that implement BlockingEnvelopeMap need to add messages using add (or addAll), and update noMoreMessage using setIsAtHead. The noMoreMessage variable is used to determine whether a SystemStreamPartition is "caught up" (has read all possible messages from the underlying system). For example, with a Kafka system, noMoreMessages would be set to true when the last message offset returned is equal to the offset high watermark for a given topic/partition.
Modifier and Type | Class and Description |
---|---|
class |
BlockingEnvelopeMap.BlockingEnvelopeMapMetrics |
class |
BlockingEnvelopeMap.BufferGauge |
BLOCK_ON_OUTSTANDING_MESSAGES
Constructor and Description |
---|
BlockingEnvelopeMap() |
BlockingEnvelopeMap(Clock clock) |
BlockingEnvelopeMap(MetricsRegistry metricsRegistry) |
BlockingEnvelopeMap(MetricsRegistry metricsRegistry,
Clock clock) |
BlockingEnvelopeMap(MetricsRegistry metricsRegistry,
Clock clock,
java.lang.String metricsGroupName) |
Modifier and Type | Method and Description |
---|---|
int |
getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) |
protected boolean |
isAtHead(SystemStreamPartition systemStreamPartition) |
protected java.util.concurrent.BlockingQueue<IncomingMessageEnvelope> |
newBlockingQueue() |
java.util.List<IncomingMessageEnvelope> |
poll(java.util.Map<SystemStreamPartition,java.lang.Integer> systemStreamPartitionAndMaxPerStream,
long timeout)
Poll the SystemConsumer to get any available messages from the underlying
system.
|
protected void |
put(SystemStreamPartition systemStreamPartition,
IncomingMessageEnvelope envelope) |
protected void |
putAll(SystemStreamPartition systemStreamPartition,
java.util.List<IncomingMessageEnvelope> envelopes) |
void |
register(SystemStreamPartition systemStreamPartition,
java.lang.String offset)
Register a SystemStreamPartition to this SystemConsumer.
|
protected java.lang.Boolean |
setIsAtHead(SystemStreamPartition systemStreamPartition,
boolean isAtHead) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
start, stop
public BlockingEnvelopeMap()
public BlockingEnvelopeMap(Clock clock)
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry)
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock)
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, java.lang.String metricsGroupName)
public void register(SystemStreamPartition systemStreamPartition, java.lang.String offset)
SystemConsumer
register
in interface SystemConsumer
systemStreamPartition
- The SystemStreamPartition object representing the Samza
SystemStreamPartition to receive messages from.offset
- String representing the offset of the point in the stream to start
reading messages from. This is an inclusive parameter; if "7" were
specified, the first message for the system/stream/partition to be
consumed and returned would be a message whose offset is "7".protected java.util.concurrent.BlockingQueue<IncomingMessageEnvelope> newBlockingQueue()
public java.util.List<IncomingMessageEnvelope> poll(java.util.Map<SystemStreamPartition,java.lang.Integer> systemStreamPartitionAndMaxPerStream, long timeout) throws java.lang.InterruptedException
SystemConsumer
If the underlying implementation does not take care to adhere to the timeout parameter, the SamzaContainer's performance will suffer drastically. Specifically, if poll blocks when it's not supposed to, it will block the entire main thread in SamzaContainer, and no messages will be processed while blocking is occurring.
poll
in interface SystemConsumer
systemStreamPartitionAndMaxPerStream
- A map from SystemStreamPartition to maximum number of messages to
return for the SystemStreamPartition. Polling with {stream1: 100,
stream2: 1000} tells the SystemConsumer that it can return between
0 and 100 messages (inclusive) for stream1, and between 0 and 1000
messages for stream2. If SystemConsumer has messages available for
other registered SystemStreamPartitions, but they are not in the
systemStreamPartitions map in a given poll invocation, they can't
be returned. It is illegal to pass in SystemStreamPartitions that
have not been registered with the SystemConsumer first.timeout
- If timeout < 0, poll will block unless all SystemStreamPartition
are at "head" (the underlying system has been checked, and
returned an empty set). If at head, an empty list is returned. If
timeout >= 0, poll will return any messages that are currently
available for any of the SystemStreamPartitions specified. If no
new messages are available, it will wait up to timeout
milliseconds for messages from any SystemStreamPartition to become
available. It will return an empty list if the timeout is hit, and
no new messages are available.java.lang.InterruptedException
protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws java.lang.InterruptedException
java.lang.InterruptedException
protected void putAll(SystemStreamPartition systemStreamPartition, java.util.List<IncomingMessageEnvelope> envelopes) throws java.lang.InterruptedException
java.lang.InterruptedException
public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition)
protected java.lang.Boolean setIsAtHead(SystemStreamPartition systemStreamPartition, boolean isAtHead)
protected boolean isAtHead(SystemStreamPartition systemStreamPartition)