Connecting to Kinesis
You can configure your Samza jobs to process data from AWS Kinesis, Amazon’s data streaming service. A
Kinesis data stream is similar to a Kafka topic and can have multiple partitions. Each message consumed from a Kinesis data stream is an instance of Record.
Consuming from Kinesis:
To configure Samza to consume from Kinesis streams:
# define a kinesis system factory with your identifier. eg: kinesis-system systems.kinesis-system.samza.factory=org.apache.samza.system.eventhub.KinesisSystemFactory # kinesis system consumer works with only AllSspToSingleTaskGrouperFactory job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory # define your streams task.inputs=kinesis-system.input0 # define required properties for your streams systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
The tuple required to access the Kinesis data stream must be provided, namely the fields
AWS Client Configs:
You can configure any AWS client config with the prefix
As an example, to set a proxy host and proxy port for the AWS Client:
Similarly, you can set any Kinesis Client Library config for a stream by configuring it under
As an example, to reset the checkpoint and set the starting position for a stream:
systems.kinesis-system.streams.input0.aws.kcl.TableName=my-app-table-name # set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest) systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=my-start-position
The following limitations apply for Samza jobs consuming from Kinesis streams using the Samza consumer: * Stateful processing (eg: windows or joins) is not supported on Kinesis streams. However, you can accomplish this by chaining two Samza jobs where the first job reads from Kinesis and sends to Kafka while the second job processes the data from Kafka. * Kinesis streams cannot be configured as bootstrap or broadcast streams. * Kinesis streams must be used with the AllSspToSingleTaskGrouperFactory. No other grouper is supported. * A Samza job that consumes from Kinesis cannot consume from any other input source. However, you can send your results to any destination (eg: Kafka, EventHubs), and have another Samza job consume them.
Producing to Kinesis:
The KinesisSystemProducer for Samza is not yet implemented.
How to configure Samza job to consume from Kinesis data stream ?
Update properties file
Update the following properties in the kinesis-hello-samza.properties file:
task.inputs=kinesis.<kinesis-stream> systems.kinesis.streams.<kinesis-stream>.aws.region=<kinesis-stream-region> systems.kinesis.streams.<kinesis-stream>.aws.accessKey=<your-access-key> sensitive.systems.kinesis.streams.<kinesis-stream>.aws.region=<your-secret-key>
Now, you are ready to run your Samza application on Yarn as described here. Check the log file for messages read from your Kinesis stream.