Getting Started
Documentation

Event Hubs Connector


EventHubs I/O: QuickStart

The Samza EventHubs connector provides access to Azure EventHubs, Microsoft’s data streaming service on Azure. An eventhub is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of EventData.

The hello-samza project includes an example of reading and writing to EventHubs.

Concepts

EventHubsSystemDescriptor

Samza refers to any IO source (eg: Kafka) it interacts with as a system, whose properties are set using a corresponding SystemDescriptor. The EventHubsSystemDescriptor allows you to configure various properties for the EventHubsClient used by Samza.

 1  EventHubsSystemDescriptor eventHubsSystemDescriptor = new EventHubsSystemDescriptor("eventhubs").withNumClientThreads(5);

EventHubsInputDescriptor

The EventHubsInputDescriptor allows you to specify the properties of each EventHubs stream your application should read from. For each of your input streams, you should create a corresponding instance of EventHubsInputDescriptor by providing a topic-name and a serializer.

    EventHubsInputDescriptor<KV<String, String>> inputDescriptor = 
        systemDescriptor.getInputDescriptor(streamId, "eventhubs-namespace", "eventhubs-name", new StringSerde())
          .withSasKeyName("secretkey")
          .withSasKey("sasToken-123")
          .withConsumerGroup("$notdefault");

By default, messages are sent and received as byte arrays. Samza then de-serializes them to typed objects using your provided Serde. For example, the above uses a StringSerde to de-serialize messages.

EventHubsOutputDescriptor

Similarly, the EventHubsOutputDescriptor allows you to specify the output streams for your application. For each output stream you write to in EventHubs, you should create an instance of EventHubsOutputDescriptor.

    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor =
        systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, new StringSerde();)
            .withSasKeyName(..)
            .withSasKey(..);

Security Model

Each EventHubs stream is scoped to a container called a namespace, which uniquely identifies an EventHubs in a region. EventHubs’s security model is based on Shared Access Signatures(SAS). Hence, you should also provide your SAS keys and tokens to access the stream. You can generate your SAS tokens using the

Data Model

Each event produced and consumed from an EventHubs stream is an instance of EventData, which wraps a byte-array payload. When producing to EventHubs, Samza serializes your object into an EventData payload before sending it over the wire. Likewise, when consuming messages from EventHubs, messages are de-serialized into typed objects using the provided Serde.

Configuration

Producer partitioning

You can use #withPartitioningMethod to control how outgoing messages are partitioned. The following partitioning schemes are supported:

  1. EVENT_HUB_HASHING: By default, Samza computes the partition for an outgoing message based on the hash of its partition-key. This ensures that events with the same key are sent to the same partition. If this option is chosen, the partition key should be a string. If the partition key is not set, the key in the message is used for partitioning.

  2. PARTITION_KEY_AS_PARTITION: In this method, each message is sent to the partition specified by its partition key. This requires the partition key to be an integer. If the key is greater than the number of partitions, a modulo operation will be performed on the key. Similar to EVENT_HUB_HASHING, the key in the message is used if the partition key is not specified.

  3. ROUND_ROBIN: In this method, outgoing messages are distributed in a round-robin across all partitions. The key and the partition key in the message are ignored.

EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs")
        .withPartitioningMethod(PartitioningMethod.EVENT_HUB_HASHING);

Consumer groups

Event Hubs supports the notion of consumer groups which enable multiple applications to have their own view of the event stream. Each partition is exclusively consumed by one consumer in the group. Each event hub stream has a pre-defined consumer group named $Default. You can define your own consumer group for your job using withConsumerGroup.

EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs");
EventHubsInputDescriptor<KV<String, String>> inputDescriptor =
        systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, serde)
            .withConsumerGroup("my-group");

Consumer buffer size

When the consumer reads a message from EventHubs, it appends them to a shared producer-consumer queue corresponding to its partition. This config determines the per-partition queue size. Setting a higher value for this config typically achieves a higher throughput at the expense of increased on-heap memory.

 EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs")
        .withReceiveQueueSize(10);

Code walkthrough

In this section, we will walk through a simple pipeline that reads from one EventHubs stream and copies each message to another output stream.

1    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs").withNumClientThreads(5);

2    EventHubsInputDescriptor<KV<String, String>> inputDescriptor =
        systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, new StringSerde())
            .withSasKeyName(..)
            .withSasKey(..));

3    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor =
        systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, serde)
            .withSasKeyName(..))
            .withSasKey(..));

4    MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
5    OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);

    // Define the execution flow with the High Level Streams API
6    eventhubInput
7        .map((message) -> {
8          System.out.println("Received Key: " + message.getKey());
9          System.out.println("Received Message: " + message.getValue());
10          return message;
11        })
12        .sendTo(eventhubOutput);

-Line 1 instantiates an EventHubsSystemDescriptor configuring an EventHubsClient with 5 threads. To consume from other input sources like Kafka, you can define their corresponding descriptors.

-Line 2 creates an EventHubsInputDescriptor with a String serde for its values. Recall that Samza follows a KV data-model for input messages. In the case of EventHubs, the key is a string which is set to the partitionKey in the message. Hence, no separate key serde is required.

-Line 3 creates an EventHubsOutputDescriptor to write to an EventHubs stream with the given credentials.

-Line 4 obtains a MessageStream from the input descriptor that you can later chain operations on.

-Line 5 creates an OutputStream with the previously defined EventHubsOutputDescriptor that you can send messages to.

-Line 7-12 define a simple pipeline that copies message from one EventHubs stream to another