Samza provides an Integration framework which allows you to test applications by quickly running them against a few messages and asserting on expected results. This alleviates the need to set up dependencies like Kafka, Yarn, Zookeeper to test your Samza applications
Integration Framework can test the new StreamDSL (StreamApplication) and Task APIs (TaskApplication) as well as supports testing for legacy low level (StreamTask and AsyncStreamTask) samza jobs
Some Prerequisite Information
Your Samza job will be executed in single container mode and framework will set all the required configs for you to run your job (more on configs later)
Your Samza job will read from a special kind of bounded streams introduced in the next section, containing finite number of messages to make testing feasible.
Introduction to In Memory System and Streams
With Samza 1.0 we now get the feature of using streams that are maintained in memory using an in memory system.
These in memory streams are described by InMemoryInputDescriptor, InMemoryOutputDescriptor and the corresponding system is described by InMemorySystemDescriptors
These streams are like Kafka streams but there lifecycle is maintained in memory which means they get initialized with your job, are available throughout its run and are destroyed after the test ends .
Introduction to TestRunner api
Samza 1.0 introduces a new TestRunner api to set up a test for Samza job, add configs, configure input/output streams, run the job in testing mode
TestRunner also provides utilities to consume contents of a stream once the test has ran successfully
TestRunner does basic config setup for you by default, you have flexibility to change these default configs if required
TestRunner supports stateless and stateful job testing. TestRunner works with InMemoryTables and RocksDB Tables
How To Write Test
For example, here is a StreamApplication that validates and decorates page views with viewer’s profile information.
There are 4 simple steps to write a test for your stream processing logic and assert on the output
Step 1: Construct an InMemorySystem
In the example we are writing we use a Kafka system called “test”, so we will configure an equivalent in memory system (name should be the same as used in job) as shown below:
Step 2: Initialize your input and output streams
TestRunner API uses a special kind of input and output streams called in memory streams which are easy to define and write assertions on.
Data in these streams are maintained in memory hence they always use a NoOpSerde<>
You need to configure all the stream that your job reads/writes to.
You can obtain handle of these streams from the system we initialized in previous step
We have two choices when we configure a stream type
Input Stream described by InMemoryInputDescriptor, these streams need to be initialized with messages (data), since your job reads this.
Output Stream described by InMemoryOutputDescriptor, these streams need to be initialized with with a partition count and are empty since your job writes to these streams
Step 3: Create a TestRunner
Initialize a TestRunner of your Samza job
Configure TestRunner with input streams and mock data to it
Configure TestRunner with output streams with a partition count
Add any configs if necessary
Run the test runner
Step-4: Assert on the output stream
You have the following choices for asserting the results of your tests
You can use StreamAssert utils on your In Memory Streams to do consumption of all partitions
You have the flexibility to define your custom assertions using API TestRunner.consumeStream() to assert on any partitions of the stream
Complete Glance at the code
Example for Low Level Api:
For a Low Level Task API
Follow a similar approach for Legacy Low Level API, just provide the classname
(class implementing StreamTask or AsyncStreamTask) to TestRunner
There is no additional config/changes required for TestRunner apis for testing samza jobs using StreamApplication or TaskApplication APIs
Legacy task api only supports RocksDbTable and needs following configs to be added to TestRunner.
For example if your job is using a RocksDbTable named “my-store” with key and msg serde of String type