Low level Task API
Table Of Contents
Introduction
Samza’s powerful Low Level Task API lets you write your application in terms of processing logic for each incoming message. When using the Low Level Task API, you implement a TaskApplication. The processing logic is defined as either a StreamTask or an AsyncStreamTask.
Code Examples
The Hello Samza Wikipedia applications demonstrate how to use Samza’s Low Level Task API. These applications consume various events from Wikipedia, transform them, and calculates several statistics about them.
The WikipediaFeedTaskApplication demonstrates how to consume multiple Wikipedia event streams and merge them to an Apache Kafka topic.
The WikipediaParserTaskApplication demonstrates how to project the incoming events from the Apache Kafka topic to a custom JSON data type.
The WikipediaStatsTaskApplication demonstrates how to calculate and emit periodic statistics about the incoming events while using a local KV store for durability.
Key Concepts
TaskApplication
A TaskApplication describes the inputs, outputs, state, configuration and the processing logic for an application written using Samza’s Low Level Task API.
A typical TaskApplication implementation consists of the following stages:
- Configuring the inputs, outputs and state (tables) using the appropriate SystemDescriptors, InputDescriptors, OutputDescriptors and TableDescriptors.
- Adding the descriptors above to the provided TaskApplicationDescriptor
- Defining the processing logic in a StreamTask or an AsyncStreamTask implementation, and adding its corresponding StreamTaskFactory or AsyncStreamTaskFactory to the TaskApplicationDescriptor.
The following example TaskApplication removes page views with “bad URLs” from the input stream:
TaskFactory
Your TaskFactory will be used to create instances of your Task in each of Samza’s processors. If you’re implementing a StreamTask, you can provide a StreamTaskFactory. Similarly, if you’re implementing an AsyncStreamTask, you can provide an AsyncStreamTaskFactory. For example:
Task Interfaces
Your processing logic can be implemented in a StreamTask or an AsyncStreamTask.
StreamTask
You can implement a StreamTask for synchronous message processing. Samza delivers messages to the task one at a time, and considers each message to be processed when the process method call returns. For example:
Note that synchronous message processing does not imply sequential execution. Multiple instances of your Task class implementation may still run concurrently within a container.
AsyncStreamTask
You can implement a AsyncStreamTask for asynchronous message processing. This can be useful when you need to perform long running I/O operations to process a message, e.g., making an http request. For example:
Samza delivers the incoming message and a TaskCallback with the processAsync() method call, and considers each message to be processed when its corresponding callback.complete() or callback.failure() has been invoked. If callback.failure() is invoked, or neither callback.complete() or callback.failure() is invoked within task.callback.ms
milliseconds, Samza will shut down the running Container.
If configured, Samza will keep up to task.max.concurrency
number of messages processing asynchronously at a time within each Task Instance. Note that while message delivery (i.e., processAsync invocation) is guaranteed to be in-order within a stream partition, message processing may complete out of order when setting task.max.concurrency
> 1.
For more details on asynchronous and concurrent processing, see the Samza Async API and Multithreading User Guide.
Additional Task Interfaces
There are a few other interfaces you can implement in your StreamTask or AsyncStreamTask that provide additional functionality.
InitableTask
You can implement the InitableTask interface to access the Context. Context provides access to any runtime objects you need in your task, whether they’re provided by the framework, or your own.
ClosableTask
You can implement the ClosableTask to clean up any runtime state during shutdown. This interface is deprecated. It’s recommended to use the ApplicationContainerContext and ApplicationTaskContext APIs to manage the lifecycle of any runtime objects.
WindowableTask
You can implement the WindowableTask interface to implement processing logic that is invoked periodically by the framework.
EndOfStreamListenerTask
You can implement the EndOfStreamListenerTask interface to implement processing logic that is invoked when a Task Instance has reached the end of all input SystemStreamPartitions it’s consuming. This is typically relevant when running Samza as a batch job.
Common Operations
Receiving Messages from Input Streams
Samza calls your Task instance’s process or processAsync method with each incoming message on your input streams. The IncomingMessageEnvelope can be used to obtain the following information: the de-serialized key, the de-serialized message, and the SystemStreamPartition that the message came from.
The key and message objects need to be cast to the correct type in your Task implementation based on the Serde provided for the InputDescriptor for the input stream.
The SystemStreamPartition object tells you where the message came from. It consists of three parts: 1. The system: the name of the system the message came from, as specified for the SystemDescriptor in your TaskApplication. You can have multiple systems for input and/or output, each with a different name. 2. The stream name: the name of the stream (e.g., topic, queue) within the input system. This is the physical name of the stream, as specified for the InputDescriptor in your TaskApplication. 3. The partition: A stream is normally split into several partitions, and each partition is assigned to one task instance by Samza.
If you have several input streams for your TaskApplication, you can use the SystemStreamPartition to determine what kind of message you’ve received.
Sending Messages to Output Streams
To send a message to a stream, you first create an OutgoingMessageEnvelope. At a minimum, you need to provide the message you want to send, and the system and stream to send it to. Optionally you can specify the partitioning key and other parameters. See the javadoc for details.
You can then send the OutgoingMessageEnvelope using the MessageCollector provided with the process() or processAsync() call. You must use the MessageCollector delivered for the message you’re currently processing. Holding on to a MessageCollector and reusing it later will cause your messages to not be sent correctly.
Accessing Tables
A Table is an abstraction for data sources that support random access by key. It is an evolution of the older KeyValueStore API. It offers support for both local and remote data sources and composition through hybrid tables. For remote data sources, a [RemoteTable] provides optimized access with caching, rate-limiting, and retry support. Depending on the implementation, a Table can be a ReadableTable or a ReadWriteTable.
In the Low Level API, you can obtain and use a Table as follows:
- Use the appropriate TableDescriptor to specify the table properties.
- Register the TableDescriptor with the TaskApplicationDescriptor.
- Obtain a Table reference within the task implementation using TaskContext.getTable(). TaskContext is available via Context.getTaskContext(), which in turn is available by implementing InitiableTask. init().
For example:
Side Inputs for Local Tables
For populating a local Table with secondary data sources, we can use side inputs to specify the source stream. Additionally, the table descriptor also takes
in a SideInputsProcessor
that will be applied before writing the entries to the table. The TableDescriptor
that is registered with the TaskApplicationDescriptor
can be used to specify side input properties.
The following code snippet shows a sample TableDescriptor
for a local table that is backed by side inputs.
Legacy Applications
For legacy Low Level API applications, you can continue specifying your system, stream and store properties along with your task.class in configuration. An incomplete example of configuration for legacy task application looks like this (see the configuration documentation for more detail):