Connecting to Eventhubs
You can configure your Samza jobs to process data from Azure Eventhubs, Microsoft’s data streaming service. An event hub
is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of EventData.
Consuming from EventHubs:
Samza’s EventHubSystemConsumer wraps the EventData into an EventHubIncomingMessageEnvelope. The key of the message is set to the partition key of the EventData. The message is obtained from the EventData body.
To configure Samza to configure from EventHub streams:
# define an event hub system factory with your identifier. eg: eh-system
systems.eh-system.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
# define your streams
systems.eh-system.stream.list=input0, output0
# define required properties for your streams
systems.eh-system.streams.input0.eventhubs.namespace=YOUR-STREAM-NAMESPACE
systems.eh-system.streams.input0.eventhubs.entitypath=YOUR-ENTITY-NAME
systems.eh-system.streams.input0.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
systems.eh-system.streams.input0.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
systems.eh-system.streams.output0.eventhubs.namespace=YOUR-STREAM-NAMESPACE
systems.eh-system.streams.output0.eventhubs.entitypath=YOUR-ENTITY-NAME
systems.eh-system.streams.output0.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
systems.eh-system.streams.output0.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
The tuple required to access the Eventhubs entity per stream must be provided, namely the fields YOUR-STREAM-NAMESPACE
, YOUR-ENTITY-NAME
, YOUR-SAS-KEY-NAME
, YOUR-SAS-KEY-TOKEN
.
Producing to EventHubs:
Similarly, you can also configure your Samza job to write to EventHubs.
OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("eh-system", "output0"), key, message);
collector.send(envelope);
Each OutgoingMessageEnvelope is converted into an EventData instance whose body is set to the message
in the envelope. Additionally, the key
and the produce timestamp
are set as properties in the EventData before sending it to EventHubs.
Size limit of partition key:
Note that EventHubs has a limit on the length of partition key (128 characters). In EventHubSystemProducer we truncate the partition key if the size of the key exceeds the limit.
Advanced configuration:
Producer partitioning:
The partition.method
property determines how outgoing messages are partitioned. Valid values for this config are EVENT_HUB_HASHING
, PARTITION_KEY_AS_PARTITION
or ROUND_ROBIN
.
EVENT_HUB_HASHING
: By default, Samza computes the partition for an outgoing message based on the hash of its partition-key. This ensures that events with the same key are sent to the same partition. If this option is chosen, the partition key should be a string. If the partition key is not set, the key in the message is used for partitioning.
PARTITION_KEY_AS_PARTITION
: In this method, each message is sent to the partition specified by its partition key. This requires the partition key to be an integer. If the key is greater than the number of partitions, a modulo operation will be performed on the key. Similar to EVENTHUBHASHING, the key in the message is used if the partition key is not specified.
ROUND_ROBIN
: In this method, outgoing messages are distributed in a round-robin across all partitions. The key and the partition key in the message are ignored.
systems.eh-system.partition.method = EVENT_HUB_HASHING
Consumer groups:
Eventhub supports a notion of consumer groups which enable multiple applications have their own view of the event stream. Each event hub stream has a pre-defined consumer group named $Default
. You can define your own consumer group for your job and configure a eventhubs.consumer.group
systems.eh-system.streams.eh-input0.eventhubs.consumer.group = my-group
Serde:
By default, the messages from EventHubs are sent and received as byte arrays. You can configure a serializer and deserializer for your message by setting a value for msg.serde
for your stream.
streams.input0.samza.msg.serde = json
streams.output0.samza.msg.serde = json
Consumer buffer size:
When the consumer reads a message from event hubs, it appends them to a shared producer-consumer buffer corresponding to its partition. This config determines the per-partition queue size. Setting a higher value for this config typically achieves a higher throughput at the expense of increased on-heap memory.
systems.eh-system.eventhubs.receive.queue.size = 10
For the list of all configs, check out the configuration table page here
Azure Eventhubs Hello-Samza Example
The hello-samza project contains an example of a high level job that consumes and produces to Eventhub using the Zookeeper deployment model.
Get the Code
Let’s get started by cloning the hello-samza project
git clone https://git.apache.org/samza-hello-samza.git hello-samza
cd hello-samza
git checkout latest
The project comes up with numerous examples and for this tutorial, we will pick the Azure application.
Setting up the Deployment Environment
For our Azure application, we require ZooKeeper. The hello-samza project comes with a script called “grid” to help with the environment setup
./bin/grid standalone
This command will download, install, and start ZooKeeper and Kafka. It will also check out the latest version of Samza and build it. All package files will be put in a sub-directory called “deploy” inside hello-samza’s root folder.
If you get a complaint that JAVA_HOME is not set, then you’ll need to set it to the path where Java is installed on your system.
Configuring the Azure application
Here are the configs you must set before building the project. Configure these in the src/main/config/azure-application-local-runner.properties
file.
# Add your EventHubs input stream credentials here
systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
# Add your EventHubs output stream credentials here
systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
Optionally, you may also use the Azure Checkpoint Manager. Otherwise, comment out both these lines.
# Azure Table Checkpoint Manager
task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory
azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING
Building the Hello Samza Project
With the environment setup complete, let us move on to building the hello-samza project. Execute the following commands:
mvn clean package
mkdir -p deploy/samza
tar -xvf ./target/hello-samza-0.15.0-SNAPSHOT-dist.tar.gz -C deploy/samza
We are now all set to deploy the application locally.
Running the Azure application
In order to run the application, we will use the run-azure-application script.
./deploy/samza/bin/run-azure-application.sh
The above command executes the helper script which invokes the AzureZKLocalApplication main class, which starts the AzureApplication. This application filters out the messages consumed without keys, prints them out and send them the configured output stream.
The messages consumed should be printed in the following format:
Sending:
Received Key: <KEY>
Received Message: <VALUE>
Shutdown
This application can be shutdown by terminating the run-azure-application script. We can use the grid script to tear down the local environment (Kafka and Zookeeper).
bin/grid stop all