Checkpointing

Samza provides fault-tolerant processing of streams: Samza guarantees that messages won’t be lost, even if your job crashes, if a machine dies, if there is a network fault, or something else goes wrong. In order to provide this guarantee, Samza expects the input system to meet the following requirements:

  • The stream may be sharded into one or more partitions. Each partition is independent from the others, and is replicated across multiple machines (the stream continues to be available, even if a machine fails).
  • Each partition consists of a sequence of messages in a fixed order. Each message has an offset, which indicates its position in that sequence. Messages are always consumed sequentially within each partition.
  • A Samza job can start consuming the sequence of messages from any starting offset.

Kafka meets these requirements, but they can also be implemented with other message broker systems.

As described in the section on SamzaContainer, each task instance of your job consumes one partition of an input stream. Each task has a current offset for each input stream: the offset of the next message to be read from that stream partition. Every time a message is read from the stream, the current offset moves forwards.

If a Samza container fails, it needs to be restarted (potentially on another machine) and resume processing where the failed container left off. In order to enable this, a container periodically checkpoints the current offset for each task instance.

Illustration of checkpointing

When a Samza container starts up, it looks for the most recent checkpoint and starts consuming messages from the checkpointed offsets. If the previous container failed unexpectedly, the most recent checkpoint may be slightly behind the current offsets (i.e. the job may have consumed some more messages since the last checkpoint was written), but we can’t know for sure. In that case, the job may process a few messages again.

This guarantee is called at-least-once processing: Samza ensures that your job doesn’t miss any messages, even if containers need to be restarted. However, it is possible for your job to see the same message more than once when a container is restarted. We are planning to address this in a future version of Samza, but for now it is just something to be aware of: for example, if you are counting page views, a forcefully killed container could cause events to be slightly over-counted. You can reduce duplication by checkpointing more frequently, at a slight performance cost.

For checkpoints to be effective, they need to be written somewhere where they will survive faults. Samza allows you to write checkpoints to the file system (using FileSystemCheckpointManager), but that doesn’t help if the machine fails and the container needs to be restarted on another machine. The most common configuration is to use Kafka for checkpointing. You can enable this with the following job configuration:

# The name of your job determines the name under which checkpoints will be stored
job.name=example-job

# Define a system called "kafka" for consuming and producing to a Kafka cluster
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

# Declare that we want our job's checkpoints to be written to Kafka
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka

# By default, a checkpoint is written every 60 seconds. You can change this if you like.
task.commit.ms=60000

In this configuration, Samza writes checkpoints to a separate Kafka topic called __samza_checkpoint_<job-name>_<job-id> (in the example configuration above, the topic would be called __samza_checkpoint_example-job_1). Once per minute, Samza automatically sends a message to this topic, in which the current offsets of the input streams are encoded. When a Samza container starts up, it looks for the most recent offset message in this topic, and loads that checkpoint.

Sometimes it can be useful to use checkpoints only for some input streams, but not for others. In this case, you can tell Samza to ignore any checkpointed offsets for a particular stream name:

# Ignore any checkpoints for the topic "my-special-topic"
systems.kafka.streams.my-special-topic.samza.reset.offset=true

# Always start consuming "my-special-topic" at the oldest available offset
systems.kafka.streams.my-special-topic.samza.offset.default=oldest

The following table explains the meaning of these configuration parameters:

Parameter name Value Meaning
systems.<system>.
streams.<stream>.
samza.reset.offset
false (default) When container starts up, resume processing from last checkpoint
true Ignore checkpoint (pretend that no checkpoint is present)
systems.<system>.
streams.<stream>.
samza.offset.default
upcoming (default) When container starts and there is no checkpoint (or the checkpoint is ignored), only process messages that are published after the job is started, but no old messages
oldest When container starts and there is no checkpoint (or the checkpoint is ignored), jump back to the oldest available message in the system, and consume all messages from that point onwards (most likely this means repeated processing of messages already seen previously)

Note that the example configuration above causes your tasks to start consuming from the oldest offset every time a container starts up. This is useful in case you have some in-memory state in your tasks that you need to rebuild from source data in an input stream. If you are using streams in this way, you may also find bootstrap streams useful.

Manipulating Checkpoints Manually

If you want to make a one-off change to a job’s consumer offsets, for example to force old messages to be processed again with a new version of your code, you can use CheckpointTool to inspect and manipulate the job’s checkpoint. The tool is included in Samza’s source repository.

To inspect a job’s latest checkpoint, you need to specify your job’s config file, so that the tool knows which job it is dealing with:

samza-example/target/bin/checkpoint-tool.sh \
  --config-path=file:///path/to/job/config.properties

This command prints out the latest checkpoint in a properties file format. You can save the output to a file, and edit it as you wish. For example, to jump back to the oldest possible point in time, you can set all the offsets to 0. Then you can feed that properties file back into checkpoint-tool.sh and save the modified checkpoint:

samza-example/target/bin/checkpoint-tool.sh \
  --config-path=file:///path/to/job/config.properties \
  --new-offsets=file:///path/to/new/offsets.properties

Note that Samza only reads checkpoints on container startup. In order for your checkpoint change to take effect, you need to first stop the job, then save the modified offsets, and then start the job again. If you write a checkpoint while the job is running, it will most likely have no effect.

Checkpoint Callbacks

Currently Samza takes care of checkpointing for all the systems. But there are some use-cases when we may need to inform the Consumer about each checkpoint we make. Here are few examples:

  • Samza cannot do checkpointing correctly or efficiently. One such case is when Samza is not doing the partitioning. In this case the container doesn’t know which SSPs it is responsible for, and thus cannot checkpoint them. An actual example could be a system which relies on an auto-balanced High Level Kafka Consumer for partitioning.
  • Systems in which the consumer itself needs to control the checkpointed offset. Some systems do not support seek() operation (are not replayable), but they rely on ACKs for the delivered messages. Example could be a Kinesis consumer. Kinesis library provides a checkpoint callback in the* process() *call (push system). This callback needs to be invoked after the records are processed. This can only be done by the consumer itself.
  • Systems that use checkpoint/offset information for some maintenance actions. This information may be used to implement a smart retention policy (deleting all the data after it has been consumed).

In order to use the checkpoint callback a SystemConsumer needs to implement the CheckpointListener interface:

public interface CheckpointListener {
  void onCheckpoint(Map<SystemStreamPartition, String> offsets);
}

For the SystemConsumers which implement this interface Samza will invoke onCheckpoint() callback every time OffsetManager checkpoints. Checkpoints are done per task, and ‘offsets’ are all the offsets Samza checkpoints for a task, and these are the offsets which will be passed to the consumer on restart. Note that the callback will happen after the checkpoint and is not atomic.

State Management »