When writing a stream processor for Samza, you must implement either StreamTask or AsyncStreamTask interface. You should implement StreamTask for synchronous process, where the message processing is complete after the process method returns. An example of StreamTask is a computation that does not involve remote calls:
The AsyncSteamTask interface, on the other hand, supports asynchronous process, where the message processing may not be complete after the processAsync method returns. Various concurrent libraries like Java NIO, ParSeq and Akka can be used here to make asynchronous calls, and the completion is marked by invoking the TaskCallback. Samza will continue to process next message or shut down the container based on the callback status. An example of AsyncStreamTask is a computation that make remote calls but don’t block on the call completion:
When you run your job, Samza will create several instances of your class (potentially on multiple machines). These task instances process the messages in the input streams.
In your job’s configuration you can tell Samza which streams you want to consume. An incomplete example could look like this (see the configuration documentation for more detail):
For each message that Samza receives from the task’s input streams, the process method is called. The envelope contains three things of importance: the message, the key, and the stream that the message came from.
The key and value are declared as Object, and need to be cast to the correct type. If you don’t configure a serializer/deserializer, they are typically Java byte arrays. A deserializer can convert these bytes into any other type, for example the JSON deserializer mentioned above parses the byte array into java.util.Map, java.util.List and String objects.
getSystemStreamPartition() method returns a SystemStreamPartition object, which tells you where the message came from. It consists of three parts:
- The system: the name of the system from which the message came, as defined in your job configuration. You can have multiple systems for input and/or output, each with a different name.
- The stream name: the name of the stream (topic, queue) within the source system. This is also defined in the job configuration.
- The partition: a stream is normally split into several partitions, and each partition is assigned to one StreamTask instance by Samza.
The API looks like this:
In the example job configuration above, the system name is “kafka”, the stream name is “PageViewEvent”. (The name “kafka” isn’t special — you can give your system any name you want.) If you have several input streams feeding into your StreamTask, you can use the SystemStreamPartition to determine what kind of message you’ve received.
What about sending messages? If you take a look at the process() method in StreamTask, you’ll see that you get a MessageCollector.
To send a message, you create an OutgoingMessageEnvelope object and pass it to the message collector. At a minimum, the envelope specifies the message you want to send, and the system and stream name to send it to. Optionally you can specify the partitioning key and other parameters. See the javadoc for details.
NOTE: Please only use the MessageCollector object within the
process() method. If you hold on to a MessageCollector instance and use it again later, your messages may not be sent correctly.
For example, here’s a simple task that splits each input message into words, and emits each word as a separate message: