Getting Started
Documentation

Samza Async API and Multithreading User Guide

This tutorial provides examples and guide to use Samza asynchronous API and multithreading.

Synchronous Process with Multithreading

If your job process involves synchronous IO, or blocking IO, you can simply configure the Samza build-in thread pool to run your tasks in parallel. In the following example, SyncRestTask uses Jersey client to makes rest calls in each process().

public class SyncRestTask implements StreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target;

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
    Response response = target.request().get();
    System.out.println("Response status code " + response.getStatus() + " received.");
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}

By default Samza will run this task sequentially in a single thread. In below we configure the thread pool of size 16 to run the tasks in parallel:

# Thread pool to run synchronous tasks in parallel.
job.container.thread.pool.size=16

NOTE: The thread pool will be used to run all the synchronous operations of a task, including StreamTask.process(), WindowableTask.window(), and internally Task.commit(). This is for maximizing the parallelism between tasks as well as reducing the blocking time. When running tasks in multithreading, Samza still guarantees the in-order processing of the messages within a task by default.

Asynchronous Process with AsyncStreamTask API

If your job process is asynchronous, e.g. making non-blocking remote IO calls, AsyncStreamTask interface provides the support for it. In the following example AsyncRestTask makes asynchronous rest call and triggers callback once it’s complete.

public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target;

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        System.out.println("Response status code " + response.getStatus() + " received.");
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}

In the above example, the process is not complete when processAsync() returns. In the callback thread from Jersey client, we trigger TaskCallback to indicate the process is done. In order to make sure the callback will be triggered within certain time interval, e.g. 5 seconds, you can config the following property:

# Timeout for processAsync() callback. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container.
task.callback.timeout.ms=5000

NOTE: Samza also guarantees the in-order process of the messages within an AsyncStreamTask by default, meaning the next processAsync() of a task won’t be called until the previous processAsync() callback has been triggered.

Asynchronous Process in High Level API

If your processing logic is asynchronous, e.g. it makes non-blocking remote calls, you can implement it using the AsyncFlatMapFunction. The following example illustrates an application that processes Wikipedia feed updates and invokes a remote service to standardize the updates and sends the standardized events to Wikipedia.

public class WikipediaAsyncStandardizer implements StreamApplication {

  @Override
  public void describe(StreamApplicationDescriptor appDescriptor) {
    // Define a SystemDescriptor for Wikipedia data
    WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org", 6667);
    // Define InputDescriptors for consuming wikipedia data
    WikipediaInputDescriptor wikipediaInputDescriptor = wikipediaSystemDescriptor
        .getInputDescriptor("en-wikipedia")
        .withChannel("#en.wikipedia");
    // Define OutputDescriptor for producing wikipedia data
    WikipediaOutputDescriptor wikipediaOutputDescriptor = wikipediaSystemDescriptor
        .getOutputDescriptor("en-wikipedia-standardized")
        .withChannel("#en.wikipedia.standardized");

    appDescriptor.getInputStream(wikipediaInputDescriptor)
        .filter(WikipediaFeedEvent::isUpdate)
        .flatMapAsync(new AsyncStandardizerFunction())
        .sendTo(wikipediaOutputDescriptor);
  }

  static class AsyncStandardizerFunction implements AsyncFlatMapFunction<WikipediaFeedEvent, StandardizedWikipediaFeedEvent> {
    private transient Client client;

    @Override
    public void init(Context context) {
      client = ClientBuilder.newClient(context.getJobContext().getConfig().get("standardizer.uri"));
    }

    @Override
    public CompletionStage<Collection<StandardizedWikipediaFeedEvent>> apply(WikipediaFeedEvent wikipediaFeedEvent) {
      Request<StandardizerRequest> standardizerRequest = buildStandardizedRequest(wikipediaFeedEvent);
      CompletableFuture<StandardizerResponse> standardizerResponse = client.sendRequest(standardizerRequest);

      return standardizerResponse
          .thenApply(response -> extractStandardizedWikipediaFeedEvent(response));
    }

    @Override
    public void close() {
      client.close();
    }
  }
}

In the above example, the results from the AsyncStandardizerFunction are propagated to downstream operator once the future is complete. There is an overall timeout for each to message to be processed and you can tune it using:

# Timeout for the message to processed. When the timeout elapses, the container shuts down.
task.callback.timeout.ms

If IO library accepts callbacks instead of returning a Future, the callback can be adapted to a Future in the following way:

  public CompletionStage<Collection<StandardizedWikipediaFeedEvent>> apply(WikipediaFeedEvent wikipediaFeedEvent) {
    Request<StandardizerRequest> standardizationRequest = buildStandardizedRequest(wikipediaFeedEvent);
    CompletableFuture<Collection<StandardizedWikipediaFeedEvent>> standardizedFuture = new CompletableFuture<>();
    client.async().get(standardizationRequest, new InvocationCallback<Response>() {
          @Override
          public void completed(StandardizerResponse response) {
            standardizedFuture.complete(extractStandardizedWikipediaFeedEvent(response));
          }

          @Override
          public void failed(Throwable throwable) {
            standardizedFuture.completeExceptionally(throwable);
          }
        });
  }

Out-of-order Process

In all cases above, Samza supports in-order process by default. Further parallelism is also supported by allowing a task to process multiple outstanding messages in parallel. The following config allows one task to process at most 4 outstanding messages in parallel at a time:

# Max number of outstanding messages being processed per task at a time, applicable to both StreamTask and AsyncStreamTask.
task.max.concurrency=4

NOTE: In case of AsyncStreamTask, processAsync() is still invoked in the order of the message arrivals, but the completion can happen out of order. In case of StreamTask and High level API applications with task.max.concurrency > 1, delivery can be out-of-order. This option should NOT be used when strict ordering of the output is required.

Guaranteed Semantics

In any of the scenarios, Samza guarantees the following semantics:

  • Samza is thead-safe. You can safely access your job’s state in key-value store, write messages and checkpoint offset in the task threads. If you have other data shared among tasks, such as global variables or static data, it is not thread safe if the data can be accessed concurrently by multiple threads, e.g. StreamTask running in the configured thread pool with more than one threads. For states within a task, such as member variables, Samza guarantees the mutual exclusiveness of process, window and commit so there will be no concurrent modifications among these operations and any state change from one operation will be fully visible to the others.
  • WindowableTask.window is called when no outstanding process/processAsync and no new process/processAsync invocations can be scheduled until it completes. The Samza engine is responsible for ensuring that window is invoked in a timely manner.
  • Checkpointing is guaranteed to only cover events that are fully processed. It is persisted in commit() method.