This tutorial provides examples and guide to use Samza asynchronous API and multithreading.
Synchronous Process with Multithreading
If your job process involves synchronous IO, or blocking IO, you can simply configure the Samza build-in thread pool to run your tasks in parallel. In the following example, SyncRestTask uses Jersey client to makes rest calls in each process().
By default Samza will run this task sequentially in a single thread. In below we configure the thread pool of size 16 to run the tasks in parallel:
NOTE: The thread pool will be used to run all the synchronous operations of a task, including StreamTask.process(), WindowableTask.window(), and internally Task.commit(). This is for maximizing the parallelism between tasks as well as reducing the blocking time. When running tasks in multithreading, Samza still guarantees the in-order processing of the messages within a task by default.
Asynchronous Process with AsyncStreamTask API
If your job process is asynchronous, e.g. making non-blocking remote IO calls, AsyncStreamTask interface provides the support for it. In the following example AsyncRestTask makes asynchronous rest call and triggers callback once it’s complete.
In the above example, the process is not complete when processAsync() returns. In the callback thread from Jersey client, we trigger TaskCallback to indicate the process is done. In order to make sure the callback will be triggered within certain time interval, e.g. 5 seconds, you can config the following property:
NOTE: Samza also guarantees the in-order process of the messages within an AsyncStreamTask by default, meaning the next processAsync() of a task won’t be called until the previous processAsync() callback has been triggered.
Asynchronous Process in High Level API
If your processing logic is asynchronous, e.g. it makes non-blocking remote calls, you can implement it using the AsyncFlatMapFunction. The following example illustrates an application that processes Wikipedia feed updates and invokes a remote service to standardize the updates and sends the standardized events to Wikipedia.
In the above example, the results from the AsyncStandardizerFunction are propagated to downstream operator once the future is complete. There is an overall timeout for each to message to be processed and you can tune it using:
If IO library accepts callbacks instead of returning a Future, the callback can be adapted to a Future in the following way:
Out-of-order Process
In all cases above, Samza supports in-order process by default. Further parallelism is also supported by allowing a task to process multiple outstanding messages in parallel. The following config allows one task to process at most 4 outstanding messages in parallel at a time:
NOTE: In case of AsyncStreamTask, processAsync() is still invoked in the order of the message arrivals, but the completion can happen out of order. In case of StreamTask and High level API applications with task.max.concurrency > 1, delivery can be out-of-order. This option should NOT be used when strict ordering of the output is required.
Guaranteed Semantics
In any of the scenarios, Samza guarantees the following semantics:
Samza is thead-safe. You can safely access your job’s state in key-value store, write messages and checkpoint offset in the task threads. If you have other data shared among tasks, such as global variables or static data, it is not thread safe if the data can be accessed concurrently by multiple threads, e.g. StreamTask running in the configured thread pool with more than one threads. For states within a task, such as member variables, Samza guarantees the mutual exclusiveness of process, window and commit so there will be no concurrent modifications among these operations and any state change from one operation will be fully visible to the others.
WindowableTask.window is called when no outstanding process/processAsync and no new process/processAsync invocations can be scheduled until it completes. The Samza engine is responsible for ensuring that window is invoked in a timely manner.
Checkpointing is guaranteed to only cover events that are fully processed. It is persisted in commit() method.