@InterfaceStability.Unstable public interface StreamApplication
The following example removes page views older than 1 hour from the input stream:
public class PageViewCounter implements StreamApplication {
public void init(StreamGraph graph, Config config) {
MessageStream<PageViewEvent> pageViewEvents =
graph.getInputStream("pageViewEvents", (k, m) -> (PageViewEvent) m);
OutputStream<String, PageViewEvent, PageViewEvent> recentPageViewEvents =
graph.getOutputStream("recentPageViewEvents", m -> m.memberId, m -> m);
pageViewEvents
.filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis())
.sendTo(filteredPageViewEvents);
}
}
The example above can be run using an ApplicationRunner:
public static void main(String[] args) {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
PageViewCounter app = new PageViewCounter();
LocalApplicationRunner runner = new LocalApplicationRunner(config);
runner.run(app);
runner.waitForFinish();
}
Implementation Notes: Currently StreamApplications are wrapped in a StreamTask
during execution.
A new StreamApplication instance will be created and initialized when planning the execution, as well as for each
StreamTask
instance used for processing incoming messages. Execution is synchronous and thread-safe within
each StreamTask
.
Functions implemented for transforms in StreamApplications (MapFunction
,
FilterFunction
for e.g.) are initable and closable. They are initialized
before messages are delivered to them and closed after their execution when the StreamTask
instance is closed.
See InitableFunction
and ClosableFunction
.
Modifier and Type | Method and Description |
---|---|
void |
init(StreamGraph graph,
Config config)
Describes and initializes the transforms for processing message streams and generating results.
|
void init(StreamGraph graph, Config config)
The StreamGraph
provides access to input and output streams. Input MessageStream
s can be
transformed into other MessageStream
s or sent to an OutputStream
using the MessageStream
operators.
Most operators accept custom functions for doing the transformations. These functions are InitableFunction
s
and are provided the Config
and TaskContext
during their own initialization. The config and the
context can be used, for example, to create custom metrics or access durable state stores.
A shared context between InitableFunction
s for different operators within a task instance can be set
up by providing a ContextManager
using StreamGraph.withContextManager(org.apache.samza.operators.ContextManager)
.
graph
- the StreamGraph
to get input/output streams fromconfig
- the configuration for the application