High Level Streams API
Table Of Contents
- Code Examples
- Key Concepts
- Operator IDs
- Data Serialization
- Application Serialization
Samza’s flexible High Level Streams API lets you describe your complex stream processing pipeline in the form of a Directional Acyclic Graph (DAG) of operations on MessageStream. It provides a rich set of built-in operators that simplify common stream processing operations such as filtering, projection, repartitioning, stream-stream and stream-table joins, and windowing.
The Samza Cookbook contains various recipes using the Samza High Level Streams API. These include:
The Filter example demonstrates how to perform stateless operations on a stream.
The Join example demonstrates how you can join a Kafka stream of page-views with a stream of ad-clicks
The Stream-Table Join example demonstrates how to use the Samza Table API. It joins a Kafka stream with a remote dataset accessed through a REST service.
A StreamApplication describes the inputs, outputs, state, configuration and the processing logic for an application written using Samza’s High Level Streams API.
A typical StreamApplication implementation consists of the following stages:
- Configuring the inputs, outputs and state (tables) using the appropriate SystemDescriptors, InputDescriptors, OutputDescriptors and TableDescriptors.
- Obtaining the corresponding MessageStreams, OutputStreams and Tables from the provided StreamApplicationDescriptor
- Defining the processing logic using operators and functions on the streams and tables thus obtained.
The following example StreamApplication removes page views older than 1 hour from the input stream:
A MessageStream, as the name implies, represents a stream of messages. A StreamApplication is described as a Directed Acyclic Graph (DAG) of transformations on MessageStreams. You can get a MessageStream in two ways:
- Calling StreamApplicationDescriptor#getInputStream() with an InputDescriptor obtained from a SystemDescriptor.
- By transforming an existing MessageStream using operators like map, filter, window, join etc.
A Table is an abstraction for data sources that support random access by key. It is an evolution of the older KeyValueStore API. It offers support for both local and remote data sources and composition through hybrid tables. For remote data sources, a [RemoteTable] provides optimized access with caching, rate-limiting, and retry support. Depending on the implementation, a Table can be a ReadableTable or a ReadWriteTable.
In the High Level Streams API, you can obtain and use a Table as follows:
- Use the appropriate TableDescriptor to specify the table properties.
- Register the TableDescriptor with the StreamApplicationDescriptor. This returns a Table reference, which can be used for populate the table using the Send To Table operator, or for joining a stream with the table using the Stream-Table Join operator.
- Alternatively, you can obtain a Table reference within an operator’s InitableFunction using the provided TaskContext.
The High Level Streams API provides common operations like map, flatmap, filter, merge, broadcast, joins, and windows on MessageStreams. Most of these operators accept their corresponding Functions as an argument.
Applies the provided 1:1 MapFunction to each element in the MessageStream and returns the transformed MessageStream. The MapFunction takes in a single message and returns a single message (potentially of a different type).
Applies the provided 1:n FlatMapFunction to each element in the MessageStream and returns the transformed MessageStream. The FlatMapFunction takes in a single message and returns zero or more messages.
Applies the provided FilterFunction to the MessageStream and returns the filtered MessageStream. The FilterFunction is a predicate that specifies whether a message should be retained in the filtered stream. Messages for which the FilterFunction returns false are filtered out.
Re-partitions this MessageStream using the key returned by the provided keyExtractor and returns the transformed MessageStream. Messages are sent through an intermediate stream during repartitioning.
The operator ID should be unique for each operator within the application and is used to identify the streams and stores created by the operator.
Merges the MessageStream with all the provided MessageStreams and returns the merged stream.
The merge transform preserves the order of messages within each MessageStream. If message
m1 appears before
m2 in any provided stream, then,
m1 will also appears before
m2 in the merged stream.
Broadcasts the contents of the MessageStream to every instance of downstream operators via an intermediate stream.
Sends all messages in this MessageStream to the provided OutputStream. You can specify the key and the value to be used for the outgoing messages.
Sends all messages in this MessageStream to the provided Table. The expected message type is KV.
Allows sending messages from this MessageStream to an output system using the provided SinkFunction.
This offers more control than SendTo (Stream) since the SinkFunction has access to the MessageCollector and the TaskCoordinator. For example, you can choose to manually commit offsets, or shut-down the job using the TaskCoordinator APIs. This operator can also be used to send messages to non-Samza systems (e.g. a remote databases, REST services, etc.)
The Stream-Stream Join operator joins messages from two MessageStreams using the provided pairwise JoinFunction. Messages are joined when the key extracted from a message from the first stream matches the key extracted from a message in the second stream. Messages in each stream are retained for the provided ttl duration and join results are emitted as matches are found. Join only retains the latest message for each input stream.
The Stream-Table Join operator joins messages from a MessageStream with messages in a Table using the provided StreamTableJoinFunction. Messages are joined when the key extracted from a message in the stream matches the key for a record in the table. The join function is invoked with both the message and the record. If a record is not found in the table, a null value is provided. The join function can choose to return null for an inner join, or an output message for a left outer join. For join correctness, it is important to ensure the input stream and table are partitioned using the same key (e.g., using the partitionBy operator) as this impacts the physical placement of data.
Windows, Triggers, and WindowPanes: The window operator groups incoming messages in the MessageStream into finite windows. Each emitted result contains one or more messages in the window and is called a WindowPane.
A window can have one or more associated triggers which determine when results from the window are emitted. Triggers can be either early triggers that allow emitting results speculatively before all data for the window has arrived, or late triggers that allow handling late messages for the window.
Aggregator Function: By default, the emitted WindowPane will contain all the messages for the window. Instead of retaining all messages, you typically define a more compact data structure for the WindowPane and update it incrementally as new messages arrive, e.g. for keeping a count of messages in the window. To do this, you can provide an aggregating FoldLeftFunction which is invoked for each incoming message added to the window and defines how to update the WindowPane for that message.
Accumulation Mode: A window’s accumulation mode determines how results emitted from a window relate to previously emitted results for the same window. This is particularly useful when the window is configured with early or late triggers. The accumulation mode can either be discarding or accumulating.
A discarding window clears all state for the window at every emission. Each emission will only correspond to new messages that arrived since the previous emission for the window.
An accumulating window retains window results from previous emissions. Each emission will contain all messages that arrived since the beginning of the window.
The Samza High Level Streams API currently supports tumbling and session windows.
Tumbling Window: A tumbling window defines a series of contiguous, fixed size time intervals in the stream.
Session Window: A session window groups a MessageStream into sessions. A session captures a period of activity over a MessageStream and is defined by a gap. A session is closed and results are emitted if no new messages arrive for the window for the gap duration.
Each operator in the StreamApplication is associated with a globally unique identifier. By default, each operator is assigned an ID by the framework based on its position in the operator DAG for the application. Some operators that create and use external resources require you to provide an explicit ID for them. Examples of such operators are partitionBy and broadcast with their intermediate streams, and window and join with their local stores and changelogs. It’s strongly recommended to provide meaningful IDs for such operators.
These IDs help you manage the underlying resources when you make changes to the application logic that change the position of the operator within the DAG and:
You wish to retain the previous state for the operator, since the changes to the DAG don’t affect the operator semantics. For example, you added a map operator before a partitionBy operator to log the incoming message. In this case, you can retain previous the operator ID.
You wish to discard the previous state for the operator, since the changes to the DAG change the operator semantics. For example, you added a filter operator before a partitionBy operator that discards some of the messages. In this case, you should change the operator ID. Note that by doing so you will lose any previously checkpointed messages that haven’t been completely processed by the downstream operators yet.
An operator ID is of the format: jobName-jobId-opCode-opId
- jobName is the name of your application, as specified using the configuration “app.name”
- jobId is the id of your application, as specified using the configuration “app.id”
- opCode is a pre-defined identifier for the type of the operator, e.g. map/filter/join
- opId is either auto-generated by the framework based on the position of the operator within the DAG, or can be provided by you for operators that manage external resources.
Producing data to and consuming data from streams and tables require serializing and de-serializing it. In addition, some stateful operators like joins and windows store data locally for durability across restarts. Such operations require you to provide a Serde implementation when using them. This also helps Samza infer the type of the data in your application, thus allowing the operator transforms to be checked for type safety at compile time. Samza provides the following Serde implementations that you can use out of the box:
- Common Types: Serdes for common Java data types, such as ByteBuffer, Double, Long, Integer, Byte, String.
- SerializableSerde: A Serde for Java classes that implement the java.io.Serializable interface.
- JsonSerdeV2: a Jackson based type safe JSON Serde that allows serializing from and deserializing to a POJO.
- KVSerde: A pair of Serdes, first for the keys, and the second for the values in the incoming/outgoing message, a table record, or a KV object.
- NoOpSerde: A marker serde that indicates that the framework should not attempt any serialization/deserialization of the data. This is useful in some cases where the SystemProducer or SystemConsumer handles serialization and deserialization of the data itself.
Samza uses Java Serialization to distribute an application’s processing logic to the processors. For this to work, all application logic, including any Function implementations passed to operators, needs to be serializable. If you need to use any non-serializable objects at runtime, you can use the ApplicationContainerContext and ApplicationTaskContext APIs to manage their lifecycle.