Samza Configuration Reference

The following table lists all the standard properties that can be included in a Samza job configuration file.

Words highlighted like this are placeholders for your own variable names.

NameDefaultDescription
Samza job configuration
job.factory.class Required: The job factory to use for running this job. The value is a fully-qualified Java classname, which must implement StreamJobFactory. Samza ships with three implementations:
org.apache.samza.job.local.ThreadJobFactory
Runs your job on your local machine using threads. This is intended only for development, not for production deployments.
org.apache.samza.job.local.ProcessJobFactory
Runs your job on your local machine as a subprocess. An optional command builder property can also be specified (see task.command.class for details). This is intended only for development, not for production deployments.
org.apache.samza.job.yarn.YarnJobFactory
Runs your job on a YARN grid. See below for YARN-specific configuration.
job.name Required: The name of your job. This name appears on the Samza dashboard, and it is used to tell apart this job's checkpoints from other jobs' checkpoints.
job.id 1 If you run several instances of your job at the same time, you need to give each execution a different job.id. This is important, since otherwise the jobs will overwrite each others' checkpoints, and perhaps interfere with each other in other ways.
job.coordinator.system Required: The system-name to use for creating and maintaining the Coordinator Stream.
job.coordinator.
replication.factor
3 If you are using Kafka for coordinator stream, this is the number of Kafka nodes to which you want the coordinator topic replicated for durability.
job.coordinator.
segment.bytes
26214400 If you are using a Kafka system for coordinator stream, this is the segment size to be used for the coordinator topic's log segments. Keeping this number small is useful because it increases the frequency that Kafka will garbage collect old messages.
job.coordinator.
monitor-partition-change
false If you are using Kafka for coordinator stream, this configuration enables the Job Coordinator to detect partition count difference in Kafka input topics. On detection, it updates a Gauge metric of format system-name.stream-name.partitionCount, which indicates the difference in the partition count from the initial state. Please note that currently this feature only works for Kafka-based systems.
job.coordinator.
monitor-partition-change.frequency.ms
300000 The frequency at which the input streams' partition count change should be detected. This check can be tuned to be pretty low as partition increase is not a common event.
job.config.rewriter.
rewriter-name.class
You can optionally define configuration rewriters, which have the opportunity to dynamically modify the job configuration before the job is started. For example, this can be useful for pulling configuration from an external configuration management system, or for determining the set of input streams dynamically at runtime. The value of this property is a fully-qualified Java classname which must implement ConfigRewriter. Samza ships with these rewriters by default:
org.apache.samza.config.RegExTopicGenerator
When consuming from Kafka, this allows you to consume all Kafka topics that match some regular expression (rather than having to list each topic explicitly). This rewriter has additional configuration.
org.apache.samza.config.EnvironmentConfigRewriter
This rewriter takes environment variables that are prefixed with SAMZA_ and adds them to the configuration, overriding previous values where they exist. The keys are lowercased and underscores are converted to dots.
job.config.rewriters If you have defined configuration rewriters, you need to list them here, in the order in which they should be applied. The value of this property is a comma-separated list of rewriter-name tokens.
job.systemstreampartition.
grouper.factory
org.apache.samza.
container.grouper.stream.
GroupByPartitionFactory
A factory class that is used to determine how input SystemStreamPartitions are grouped together for processing in individual StreamTask instances. The factory must implement the SystemStreamPartitionGrouperFactory interface. Once this configuration is set, it can't be changed, since doing so could violate state semantics, and lead to a loss of data.
org.apache.samza.container.grouper.stream.GroupByPartitionFactory
Groups input stream partitions according to their partition number. This grouping leads to a single StreamTask processing all messages for a single partition (e.g. partition 0) across all input streams that have a partition 0. Therefore, the default is that you get one StreamTask for all input partitions with the same partition number. Using this strategy, if two input streams have a partition 0, then messages from both partitions will be routed to a single StreamTask. This partitioning strategy is useful for joining and aggregating streams.
org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory
Assigns each SystemStreamPartition to its own unique StreamTask. The GroupBySystemStreamPartitionFactory is useful in cases where you want increased parallelism (more containers), and don't care about co-locating partitions for grouping or joins, since it allows for a greater number of StreamTasks to be divided up amongst Samza containers.
job.systemstreampartition.matcher.class If you want to enable static partition assignment, then this is a required configuration. The value of this property is a fully-qualified Java class name that implements the interface org.apache.samza.system.SystemStreamPartitionMatcher. Samza ships with two matcher classes:
org.apache.samza.system.RangeSystemStreamPartitionMatcher
This classes uses a comma separated list of range(s) to determine which partition matches, and thus statically assigned to the Job. For example "2,3,1-2", statically assigns partition 1, 2, and 3 for all the specified system and streams (topics in case of Kafka) to the job. For config validation each element in the comma separated list much conform to one of the following regex:
  • "(\\d+)" or
  • "(\\d+-\\d+)"
JobConfig.SSP_MATCHER_CLASS_RANGE constant has the canonical name of this class.
org.apache.samza.system.RegexSystemStreamPartitionMatcher
This classes uses a standard Java supported regex to determine which partition matches, and thus statically assigned to the Job. For example "[1-2]", statically assigns partition 1 and 2 for all the specified system and streams (topics in case of Kafka) to the job. JobConfig.SSP_MATCHER_CLASS_REGEX constant has the canonical name of this class.
job.systemstreampartition.matcher.config.range If job.systemstreampartition.matcher.class is specified, and the value of this property is org.apache.samza.system.RangeSystemStreamPartitionMatcher, then this property is a required configuration. Specify a comma separated list of range(s) to determine which partition matches, and thus statically assigned to the Job. For example "2,3,11-20", statically assigns partition 2, 3, and 11 to 20 for all the specified system and streams (topics in case of Kafka) to the job. A singel configuration value like "19" is valid as well. This statically assigns partition 19. For config validation each element in the comma separated list much conform to one of the following regex:
  • "(\\d+)" or
  • "(\\d+-\\d+)"
job.systemstreampartition.matcher.config.regex If job.systemstreampartition.matcher.class is specified, and the value of this property is org.apache.samza.system.RegexSystemStreamPartitionMatcher, then this property is a required configuration. The value should be a valid Java supported regex. For example "[1-2]", statically assigns partition 1 and 2 for all the specified system and streams (topics in case of Kakfa) to the job.
job.systemstreampartition.matcher.config.job.factory.regex This configuration can be used to specify the Java supported regex to match the StreamJobFactory for which the static partition assignment should be enabled. This configuration enables the partition assignment feature to be used for custom StreamJobFactory(ies) as well.

This config defaults to the following value: "org\\.apache\\.samza\\.job\\.local(.*ProcessJobFactory|.*ThreadJobFactory)", which enables static partition assignment when job.factory.class is set to org.apache.samza.job.local.ProcessJobFactory or org.apache.samza.job.local.ThreadJobFactory.

job.checkpoint.
validation.enabled
true This setting controls if the job should fail(true) or just warn(false) in case the validation of checkpoint partition number fails.
CAUTION: this configuration needs to be used w/ care. It should only be used as a work-around after the checkpoint has been auto-created with wrong number of partitions by mistake.
job.security.manager.factory This is the factory class used to create the proper SecurityManager to handle security for Samza containers when running in a secure environment, such as Yarn with Kerberos eanbled. Samza ships with one security manager by default:
org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory
Supports Samza containers to run properly in a Kerberos enabled Yarn cluster. Each Samza container, once started, will create a SamzaContainerSecurityManager. SamzaContainerSecurityManager runs on its separate thread and update user's delegation tokens at the interval specified by yarn.token.renewal.interval.seconds. See Yarn Security for details.
job.container.count 1 The number of YARN containers to request for running your job. This is the main parameter for controlling the scale (allocated computing resources) of your job: to increase the parallelism of processing, you need to increase the number of containers. The minimum is one container, and the maximum number of containers is the number of task instances (usually the number of input stream partitions). Task instances are evenly distributed across the number of containers that you specify.
job.container.single.thread.mode false If set to true, samza will fallback to legacy single-threaded event loop. Default is false, which enables the multithreading execution.
job.container.thread.pool.size If configured, the container thread pool will be used to run synchronous operations of each task in parallel. The operations include StreamTask.process(), WindowableTask.window(), and internally Task.commit(). Note that the thread pool is not applicable to AsyncStremTask.processAsync(). The size should always be greater than zero. If not configured, all task operations will run in a single thread.
job.host-affinity.enabled false This property indicates whether host-affinity is enabled or not. Host-affinity refers to the ability of Samza to request and allocate a container on the same host every time the job is deployed. When host-affinity is enabled, Samza makes a "best-effort" to honor the host-affinity constraint. The property cluster-manager.container.request.timeout.ms determines how long to wait before de-prioritizing the host-affinity constraint and assigning the container to any available resource. Please Note: This feature is tested to work with the FairScheduler in Yarn when continuous-scheduling is enabled.
job.changelog.system This property specifies a default system for changelog, which will be used with the stream specified in stores.store-name.changelog config. You can override this system by specifying both the system and the stream in stores.store-name.changelog.
Task configuration
task.class Required: The fully-qualified name of the Java class which processes incoming messages from input streams. The class must implement StreamTask or AsyncStreamTask, and may optionally implement InitableTask, ClosableTask and/or WindowableTask. The class will be instantiated several times, once for every input stream partition.
task.inputs Required: A comma-separated list of streams that are consumed by this job. Each stream is given in the format system-name.stream-name. For example, if you have one input system called my-kafka, and want to consume two Kafka topics called PageViewEvent and UserActivityEvent, then you would set task.inputs=my-kafka.PageViewEvent, my-kafka.UserActivityEvent.
task.window.ms -1 If task.class implements WindowableTask, it can receive a windowing callback in regular intervals. This property specifies the time between window() calls, in milliseconds. If the number is negative (the default), window() is never called. Note that Samza is single-threaded, so a window() call will never occur concurrently with the processing of a message. If a message is being processed at the time when a window() call is due, the window() call occurs after the processing of the current message has completed.
task.checkpoint.factory To enable checkpointing, you must set this property to the fully-qualified name of a Java class that implements CheckpointManagerFactory. This is not required, but recommended for most jobs. If you don't configure checkpointing, and a job or container restarts, it does not remember which messages it has already processed. Without checkpointing, consumer behavior is determined by the ...samza.offset.default setting, which by default skips any messages that were published while the container was restarting. Checkpointing allows a job to start up where it previously left off. Samza ships with two checkpoint managers by default:
org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
Writes checkpoints to files on the local filesystem. You can configure the file path with the task.checkpoint.path property. This is a simple option if your job always runs on the same machine. On a multi-machine cluster, this would require a network filesystem mount.
org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
Writes checkpoints to a dedicated topic on a Kafka cluster. This is the recommended option if you are already using Kafka for input or output streams. Use the task.checkpoint.system property to configure which Kafka cluster to use for checkpoints.
task.commit.ms 60000 If task.checkpoint.factory is configured, this property determines how often a checkpoint is written. The value is the time between checkpoints, in milliseconds. The frequency of checkpointing affects failure recovery: if a container fails unexpectedly (e.g. due to crash or machine failure) and is restarted, it resumes processing at the last checkpoint. Any messages processed since the last checkpoint on the failed container are processed again. Checkpointing more frequently reduces the number of messages that may be processed twice, but also uses more resources.
task.command.class org.apache.samza.job.
ShellCommandBuilder
The fully-qualified name of the Java class which determines the command line and environment variables for a container. It must be a subclass of CommandBuilder. This defaults to task.command.class=org.apache.samza.job.ShellCommandBuilder.
task.opts Any JVM options to include in the command line when executing Samza containers. For example, this can be used to set the JVM heap size, to tune the garbage collector, or to enable remote debugging. This cannot be used when running with ThreadJobFactory. Anything you put in task.opts gets forwarded directly to the commandline as part of the JVM invocation.
Example: task.opts=-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC
task.java.home The JAVA_HOME path for Samza containers. By setting this property, you can use a java version that is different from your cluster's java version. Remember to set the yarn.am.java.home as well.
Example: task.java.home=/usr/java/jdk1.8.0_05
task.execute bin/run-container.sh The command that starts a Samza container. The script must be included in the job package. There is usually no need to customize this.
task.chooser.class org.apache.samza.
system.chooser.
RoundRobinChooserFactory
This property can be optionally set to override the default message chooser, which determines the order in which messages from multiple input streams are processed. The value of this property is the fully-qualified name of a Java class that implements MessageChooserFactory.
task.drop.deserialization.errors This property is to define how the system deals with deserialization failure situation. If set to true, the system will skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default is false.
task.drop.serialization.errors This property is to define how the system deals with serialization failure situation. If set to true, the system will drop the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default is false.
task.log4j.system Specify the system name for the StreamAppender. If this property is not specified in the config, Samza throws exception. (See Stream Log4j Appender)
Example: task.log4j.system=kafka
task.log4j.location.info.enabled false Defines whether or not to include log4j's LocationInfo data in Log4j StreamAppender messages. LocationInfo includes information such as the file, class, and line that wrote a log message. This setting is only active if the Log4j stream appender is being used. (See Stream Log4j Appender)
Example: task.log4j.location.info.enabled=true
task.poll.interval.ms Samza's container polls for more messages under two conditions. The first condition arises when there are simply no remaining buffered messages to process for any input SystemStreamPartition. The second condition arises when some input SystemStreamPartitions have empty buffers, but some do not. In the latter case, a polling interval is defined to determine how often to refresh the empty SystemStreamPartition buffers. By default, this interval is 50ms, which means that any empty SystemStreamPartition buffer will be refreshed at least every 50ms. A higher value here means that empty SystemStreamPartitions will be refreshed less often, which means more latency is introduced, but less CPU and network will be used. Decreasing this value means that empty SystemStreamPartitions are refreshed more frequently, thereby introducing less latency, but increasing CPU and network utilization.
task.ignored.exceptions This property specifies which exceptions should be ignored if thrown in a task's process or window methods. The exceptions to be ignored should be a comma-separated list of fully-qualified class names of the exceptions or * to ignore all exceptions.
task.shutdown.ms 5000 This property controls how long the Samza container will wait for an orderly shutdown of task instances.
task.name.grouper.factory org.apache.samza.
container.grouper.task.
GroupByContainerCountFactory
The fully-qualified name of the Java class which determines the factory class which will build the TaskNameGrouper. The default configuration value if the property is not present is task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerCountFactory.
The user can specify a custom implementation of the TaskNameGrouperFactory where a custom logic is implemented for grouping the tasks.
task.broadcast.inputs This property specifies the partitions that all tasks should consume. The systemStreamPartitions you put here will be sent to all the tasks.
Format: system-name.stream-name#partitionId or system-name.stream-name#[startingPartitionId-endingPartitionId]
Example: task.broadcast.inputs=mySystem.broadcastStream#[0-2], mySystem.broadcastStream#0
task.max.concurrency 1 Max number of outstanding messages being processed per task at a time, and it’s applicable to both StreamTask and AsyncStreamTask. The values can be:
1
Each task processes one message at a time. Next message will wait until the current message process completes. This ensures strict in-order processing.
>1
Multiple outstanding messages are allowed to be processed per task at a time. The completion can be out of order. This option increases the parallelism within a task, but may result in out-of-order processing.
task.callback.timeout.ms This property is for AsyncStreamTask only. It defines the max time interval from processAsync() to callback is fired. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container. Default is no timeout.
Systems (input and output streams)
systems.system-name.
samza.factory
Required: The fully-qualified name of a Java class which provides a system. A system can provide input streams which you can consume in your Samza job, or output streams to which you can write, or both. The requirements on a system are very flexible — it may connect to a message broker, or read and write files, or use a database, or anything else. The class must implement SystemFactory. Samza ships with the following implementations:
org.apache.samza.system.kafka.KafkaSystemFactory
Connects to a cluster of Kafka brokers, allows Kafka topics to be consumed as streams in Samza, allows messages to be published to Kafka topics, and allows Kafka to be used for checkpointing (see task.checkpoint.factory). See also configuration of a Kafka system.
org.apache.samza.system.filereader.FileReaderSystemFactory
Reads data from a file on the local filesystem (the stream name is the path of the file to read). The file is read as ASCII, and treated as a stream of messages separated by newline (\n) characters. A task can consume each line of the file as a java.lang.String object. This system does not provide output streams.
systems.system-name.
samza.key.serde
The serde which will be used to deserialize the key of messages on input streams, and to serialize the key of messages on output streams. This property can be defined either for an individual stream, or for all streams within a system (if both are defined, the stream-level definition takes precedence). The value of this property must be a serde-name that is registered with serializers.registry.*.class. If this property is not set, messages are passed unmodified between the input stream consumer, the task and the output stream producer.
systems.system-name.
streams.stream-name.
samza.key.serde
systems.system-name.
samza.msg.serde
The serde which will be used to deserialize the value of messages on input streams, and to serialize the value of messages on output streams. This property can be defined either for an individual stream, or for all streams within a system (if both are defined, the stream-level definition takes precedence). The value of this property must be a serde-name that is registered with serializers.registry.*.class. If this property is not set, messages are passed unmodified between the input stream consumer, the task and the output stream producer.
systems.system-name.
streams.stream-name.
samza.msg.serde
systems.system-name.
samza.offset.default
upcoming If a container starts up without a checkpoint, this property determines where in the input stream we should start consuming. The value must be an OffsetType, one of the following:
upcoming
Start processing messages that are published after the job starts. Any messages published while the job was not running are not processed.
oldest
Start processing at the oldest available message in the system, and reprocess the entire available message history.
This property can be defined either for an individual stream, or for all streams within a system (if both are defined, the stream-level definition takes precedence).
systems.system-name.
streams.stream-name.
samza.offset.default
systems.system-name.
streams.stream-name.
samza.reset.offset
false If set to true, when a Samza container starts up, it ignores any checkpointed offset for this particular input stream. Its behavior is thus determined by the samza.offset.default setting. Note that the reset takes effect every time a container is started, which may be every time you restart your job, or more frequently if a container fails and is restarted by the framework.
systems.system-name.
streams.stream-name.
samza.priority
-1 If one or more streams have a priority set (any positive integer), they will be processed with higher priority than the other streams. You can set several streams to the same priority, or define multiple priority levels by assigning a higher number to the higher-priority streams. If a higher-priority stream has any messages available, they will always be processed first; messages from lower-priority streams are only processed when there are no new messages on higher-priority inputs.
systems.system-name.
streams.stream-name.
samza.bootstrap
false If set to true, this stream will be processed as a bootstrap stream. This means that every time a Samza container starts up, this stream will be fully consumed before messages from any other stream are processed.
task.consumer.batch.size 1 If set to a positive integer, the task will try to consume batches with the given number of messages from each input stream, rather than consuming round-robin from all the input streams on each individual message. Setting this property can improve performance in some cases.
Serializers/Deserializers (Serdes)
serializers.registry.
serde-name.class
Use this property to register a serializer/deserializer, which defines a way of encoding application objects as an array of bytes (used for messages in streams, and for data in persistent storage). You can give a serde any serde-name you want, and reference that name in properties like systems.*.samza.key.serde, systems.*.samza.msg.serde, stores.*.key.serde and stores.*.msg.serde. The value of this property is the fully-qualified name of a Java class that implements SerdeFactory. Samza ships with several serdes:
org.apache.samza.serializers.ByteSerdeFactory
A no-op serde which passes through the undecoded byte array.
org.apache.samza.serializers.IntegerSerdeFactory
Encodes java.lang.Integer objects as binary (4 bytes fixed-length big-endian encoding).
org.apache.samza.serializers.StringSerdeFactory
Encodes java.lang.String objects as UTF-8.
org.apache.samza.serializers.JsonSerdeFactory
Encodes nested structures of java.util.Map, java.util.List etc. as JSON.
org.apache.samza.serializers.LongSerdeFactory
Encodes java.lang.Long as binary (8 bytes fixed-length big-endian encoding).
org.apache.samza.serializers.DoubleSerdeFactory
Encodes java.lang.Double as binray (8 bytes double-precision float point).
org.apache.samza.serializers.MetricsSnapshotSerdeFactory
Encodes org.apache.samza.metrics.reporter.MetricsSnapshot objects (which are used for reporting metrics) as JSON.
org.apache.samza.serializers.KafkaSerdeFactory
Adapter which allows existing kafka.serializer.Encoder and kafka.serializer.Decoder implementations to be used as Samza serdes. Set serializers.registry.serde-name.encoder and serializers.registry.serde-name.decoder to the appropriate class names.
Using the filesystem for checkpoints
(This section applies if you have set task.checkpoint.factory = org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory)
task.checkpoint.path Required if you are using the filesystem for checkpoints. Set this to the path on your local filesystem where checkpoint files should be stored.
Using Elasticsearch for output streams
(This section applies if you have set systems.*.samza.factory = org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory)
systems.system-name.
client.factory
Required: The elasticsearch client factory used for connecting to the Elasticsearch cluster. Samza ships with the following implementations:
org.apache.samza.system.elasticsearch.client.TransportClientFactory
Creates a TransportClient that connects to the cluster remotely without joining it. This requires the transport host and port properties to be set.
org.apache.samza.system.elasticsearch.client.NodeClientFactory
Creates a Node client that connects to the cluster by joining it. By default this uses zen discovery to find the cluster but other methods can be configured.
systems.system-name.
index.request.factory
org.apache.samza.system
.elasticsearch.indexrequest.
DefaultIndexRequestFactory
The index request factory that converts the Samza OutgoingMessageEnvelope into the IndexRequest to be send to elasticsearch. The default IndexRequestFactory behaves as follows:
Stream name
The stream name is of the format {index-name}/{type-name} which map on to the elasticsearch index and type.
Message id
If the message has a key this is set as the document id, otherwise Elasticsearch will generate one for each document.
Partition id
If the partition key is set then this is used as the Elasticsearch routing key.
Message
The message must be either a byte[] which is passed directly on to Elasticsearch, or a Map which is passed on to the Elasticsearch client which serialises it into a JSON String. Samza serdes are not currently supported.
systems.system-name.
client.transport.host
Required for TransportClientFactory

The hostname that the TransportClientFactory connects to.

systems.system-name.
client.transport.port
Required for TransportClientFactory

The port that the TransportClientFactory connects to.

systems.system-name.
client.elasticsearch.*
Any Elasticsearch client settings can be used here. They will all be passed to both the transport and node clients. Some of the common settings you will want to provide are.
systems.system-name.client.elasticsearch.cluster.name
The name of the Elasticsearch cluster the client is connecting to.
systems.system-name.client.elasticsearch.client.transport.sniff
If set to true then the transport client will discover and keep up to date all cluster nodes. This is used for load balancing and fail-over on retries.
systems.system-name.
bulk.flush.max.actions
1000 The maximum number of messages to be buffered before flushing.
systems.system-name.
bulk.flush.max.size.mb
5 The maximum aggregate size of messages in the buffered before flushing.
systems.system-name.
bulk.flush.interval.ms
never How often buffered messages should be flushed.
Using Kafka for input streams, output streams and checkpoints
(This section applies if you have set systems.*.samza.factory = org.apache.samza.system.kafka.KafkaSystemFactory)
systems.system-name.
consumer.zookeeper.connect
The hostname and port of one or more Zookeeper nodes where information about the Kafka cluster can be found. This is given as a comma-separated list of hostname:port pairs, such as zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181. If the cluster information is at some sub-path of the Zookeeper namespace, you need to include the path at the end of the list of hostnames, for example: zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181/clusters/my-kafka
systems.system-name.
consumer.auto.offset.reset
largest This setting determines what happens if a consumer attempts to read an offset that is outside of the current valid range. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message history retained by the brokers. This property is not to be confused with systems.*.samza.offset.default, which determines what happens if there is no checkpoint. The following are valid values for auto.offset.reset:
smallest
Start consuming at the smallest (oldest) offset available on the broker (process as much message history as available).
largest
Start consuming at the largest (newest) offset available on the broker (skip any messages published while the job was not running).
anything else
Throw an exception and refuse to start up the job.
systems.system-name.
consumer.*
Any Kafka consumer configuration can be included here. For example, to change the socket timeout, you can set systems.system-name.consumer.socket.timeout.ms. (There is no need to configure group.id or client.id, as they are automatically configured by Samza. Also, there is no need to set auto.commit.enable because Samza has its own checkpointing mechanism.)
systems.system-name.
producer.bootstrap.servers
Note: This variable was previously defined as "producer.metadata.broker.list", which has been deprecated with this version.
A list of network endpoints where the Kafka brokers are running. This is given as a comma-separated list of hostname:port pairs, for example kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092. It's not necessary to list every single Kafka node in the cluster: Samza uses this property in order to discover which topics and partitions are hosted on which broker. This property is needed even if you are only consuming from Kafka, and not writing to it, because Samza uses it to discover metadata about streams being consumed.
systems.system-name.
producer.*
Any Kafka producer configuration can be included here. For example, to change the request timeout, you can set systems.system-name.producer.timeout.ms. (There is no need to configure client.id as it is automatically configured by Samza.)
systems.system-name.
samza.fetch.threshold
50000 When consuming streams from Kafka, a Samza container maintains an in-memory buffer for incoming messages in order to increase throughput (the stream task can continue processing buffered messages while new messages are fetched from Kafka). This parameter determines the number of messages we aim to buffer across all stream partitions consumed by a container. For example, if a container consumes 50 partitions, it will try to buffer 1000 messages per partition by default. When the number of buffered messages falls below that threshold, Samza fetches more messages from the Kafka broker to replenish the buffer. Increasing this parameter can increase a job's processing throughput, but also increases the amount of memory used.
systems.system-name.
samza.fetch.threshold.bytes
-1 When consuming streams from Kafka, a Samza container maintains an in-memory buffer for incoming messages in order to increase throughput (the stream task can continue processing buffered messages while new messages are fetched from Kafka). This parameter determines the total size of messages we aim to buffer across all stream partitions consumed by a container based on bytes. Defines how many bytes to use for the buffered prefetch messages for job as a whole. The bytes for a single system/stream/partition are computed based on this. This fetches the entire messages, hence this bytes limit is a soft one, and the actual usage can be the bytes limit + size of max message in the partition for a given stream. If the value of this property is > 0 then this takes precedence over systems.system-name.samza.fetch.threshold.
For example, if fetchThresholdBytes is set to 100000 bytes, and there are 50 SystemStreamPartitions registered, then the per-partition threshold is (100000 / 2) / 50 = 1000 bytes. As this is a soft limit, the actual usage can be 1000 bytes + size of max message. As soon as a SystemStreamPartition's buffered messages bytes drops below 1000, a fetch request will be executed to get more data for it. Increasing this parameter will decrease the latency between when a queue is drained of messages and when new messages are enqueued, but also leads to an increase in memory usage since more messages will be held in memory. The default value is -1, which means this is not used.
task.checkpoint.system This property is required if you are using Kafka for checkpoints (task.checkpoint.factory = org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory). You must set it to the system-name of a Kafka system. The stream name (topic name) within that system is automatically determined from the job name and ID: __samza_checkpoint_${job.name}_${job.id} (with underscores in the job name and ID replaced by hyphens).
task.checkpoint.
replication.factor
3 If you are using Kafka for checkpoints, this is the number of Kafka nodes to which you want the checkpoint topic replicated for durability.
task.checkpoint.
segment.bytes
26214400 If you are using Kafka for checkpoints, this is the segment size to be used for the checkpoint topic's log segments. Keeping this number small is useful because it increases the frequency that Kafka will garbage collect old checkpoints.
stores.store-name.changelog.
replication.factor
2 The property defines the number of replicas to use for the change log stream.
stores.store-name.changelog.
kafka.topic-level-property
The property allows you to specify topic level settings for the changelog topic to be created. For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete". Please refer to http://kafka.apache.org/documentation.html#configuration for more topic level configurations.
Consuming all Kafka topics matching a regular expression
(This section applies if you have set job.config.rewriter.*.class = org.apache.samza.config.RegExTopicGenerator)
job.config.rewriter.
rewriter-name.system
Set this property to the system-name of the Kafka system from which you want to consume all matching topics.
job.config.rewriter.
rewriter-name.regex
A regular expression specifying which topics you want to consume within the Kafka system job.config.rewriter.*.system. Any topics matched by this regular expression will be consumed in addition to any topics you specify with task.inputs.
job.config.rewriter.
rewriter-name.config.*
Any properties specified within this namespace are applied to the configuration of streams that match the regex in job.config.rewriter.*.regex. For example, you can set job.config.rewriter.*.config.samza.msg.serde to configure the deserializer for messages in the matching streams, which is equivalent to setting systems.*.streams.*.samza.msg.serde for each topic that matches the regex.
Storage and State Management
stores.store-name.factory This property defines a store, Samza's mechanism for efficient stateful stream processing. You can give a store any store-name, and use that name to get a reference to the store in your stream task (call TaskContext.getStore() in your task's init() method). The value of this property is the fully-qualified name of a Java class that implements StorageEngineFactory. Samza currently ships with one storage engine implementation:
org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
An on-disk storage engine with a key-value interface, implemented using RocksDB. It supports fast random-access reads and writes, as well as range queries on keys. RocksDB can be configured with various additional tuning parameters.
stores.store-name.key.serde If the storage engine expects keys in the store to be simple byte arrays, this serde allows the stream task to access the store using another object type as key. The value of this property must be a serde-name that is registered with serializers.registry.*.class. If this property is not set, keys are passed unmodified to the storage engine (and the changelog stream, if appropriate).
stores.store-name.msg.serde If the storage engine expects values in the store to be simple byte arrays, this serde allows the stream task to access the store using another object type as value. The value of this property must be a serde-name that is registered with serializers.registry.*.class. If this property is not set, values are passed unmodified to the storage engine (and the changelog stream, if appropriate).
stores.store-name.changelog Samza stores are local to a container. If the container fails, the contents of the store are lost. To prevent loss of data, you need to set this property to configure a changelog stream: Samza then ensures that writes to the store are replicated to this stream, and the store is restored from this stream after a failure. The value of this property is given in the form system-name.stream-name. The "system-name" part is optional. If it is omitted you must specify the system in job.changelog.system config. Any output stream can be used as changelog, but you must ensure that only one job ever writes to a given changelog stream (each instance of a job and each store needs its own changelog stream).
Using RocksDB for key-value storage
(This section applies if you have set stores.*.factory = org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory)
stores.store-name.
write.batch.size
500 For better write performance, the storage engine buffers writes and applies them to the underlying store in a batch. If the same key is written multiple times in quick succession, this buffer also deduplicates writes to the same key. This property is set to the number of key/value pairs that should be kept in this in-memory buffer, per task instance. The number cannot be greater than stores.*.object.cache.size.
stores.store-name.
object.cache.size
1000 Samza maintains an additional cache in front of RocksDB for frequently-accessed objects. This cache contains deserialized objects (avoiding the deserialization overhead on cache hits), in contrast to the RocksDB block cache (stores.*.container.cache.size.bytes), which caches serialized objects. This property determines the number of objects to keep in Samza's cache, per task instance. This same cache is also used for write buffering (see stores.*.write.batch.size). A value of 0 disables all caching and batching.
stores.store-name.container.
cache.size.bytes
104857600 The size of RocksDB's block cache in bytes, per container. If there are several task instances within one container, each is given a proportional share of this cache. Note that this is an off-heap memory allocation, so the container's total memory use is the maximum JVM heap size plus the size of this cache.
stores.store-name.container.
write.buffer.size.bytes
33554432 The amount of memory (in bytes) that RocksDB uses for buffering writes before they are written to disk, per container. If there are several task instances within one container, each is given a proportional share of this buffer. This setting also determines the size of RocksDB's segment files.
stores.store-name.
rocksdb.compression
snappy This property controls whether RocksDB should compress data on disk and in the block cache. The following values are valid:
snappy
Compress data using the Snappy codec.
bzip2
Compress data using the bzip2 codec.
zlib
Compress data using the zlib codec.
lz4
Compress data using the lz4 codec.
lz4hc
Compress data using the lz4hc (high compression) codec.
none
Do not compress data.
stores.store-name.
rocksdb.block.size.bytes
4096 If compression is enabled, RocksDB groups approximately this many uncompressed bytes into one compressed block. You probably don't need to change this property.
stores.store-name.
rocksdb.ttl.ms
The time-to-live of the store. Please note it's not a strict TTL limit (removed only after compaction). Please use caution opening a database with and without TTL, as it might corrupt the database. Please make sure to read the constraints before using.
stores.store-name.
rocksdb.compaction.style
universal This property controls the compaction style that RocksDB will employ when compacting its levels. The following values are valid:
universal
Use universal compaction.
fifo
Use FIFO compaction.
level
Use RocksDB's standard leveled compaction.
stores.store-name.
rocksdb.num.write.buffers
3 Configures the number of write buffers that a RocksDB store uses. This allows RocksDB to continue taking writes to other buffers even while a given write buffer is being flushed to disk.
stores.store-name.
rocksdb.max.log.file.size.bytes
67108864 The maximum size in bytes of the RocksDB LOG file before it is rotated.
stores.store-name.
rocksdb.keep.log.file.num
2 The number of RocksDB LOG files (including rotated LOG.old.* files) to keep.
Running Samza with a cluster manager
cluster-manager.container.memory.mb 1024 How much memory, in megabytes, to request from the cluster manager per container of your job. Along with cluster-manager.container.cpu.cores, this property determines how many containers the cluster manager will run on one machine. If the container exceeds this limit, it will be killed, so it is important that the container's actual memory use remains below the limit. The amount of memory used is normally the JVM heap size (configured with task.opts), plus the size of any off-heap memory allocation (for example stores.*.container.cache.size.bytes), plus a safety margin to allow for JVM overheads.
cluster-manager.container.cpu.cores 1 The number of CPU cores to request per container of your job. Each node in the cluster has a certain number of CPU cores available, so this number (along with cluster-manager.container.memory.mb) determines how many containers can be run on one machine.
cluster-manager.container.
retry.count
8 If a container fails, it is automatically restarted by Samza. However, if a container keeps failing shortly after startup, that indicates a deeper problem, so we should kill the job rather than retrying indefinitely. This property determines the maximum number of times we are willing to restart a failed container in quick succession (the time period is configured with cluster-manager.container.retry.window.ms). Each container in the job is counted separately. If this property is set to 0, any failed container immediately causes the whole job to fail. If it is set to a negative number, there is no limit on the number of retries.
cluster-manager.container.
retry.window.ms
300000 This property determines how frequently a container is allowed to fail before we give up and fail the job. If the same container has failed more than cluster-manager.container.retry.count times, and the time between failures was less than this property cluster-manager.container.retry.window.ms (in milliseconds), then we fail the job. There is no limit to the number of times we will restart a container if the time between failures is greater than cluster-manager.container.retry.window.ms.
cluster-manager.jobcoordinator.jmx.enabled true Determines whether a JMX server should be started on the job's JobCoordinator. (true or false).
cluster-manager.allocator.sleep.ms 3600 The container allocator thread is responsible for matching requests to allocated containers. The sleep interval for this thread is configured using this property.
cluster-manager.container.request.timeout.ms 5000 The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource. This property determines the number of milliseconds before a container request is considered to have expired / timed-out. When a request expires, it gets allocated to any available container that was returned by the cluster manager.
Running your job on a YARN cluster
(This section applies if you have set job.factory.class = org.apache.samza.job.yarn.YarnJobFactory)
yarn.package.path Required for YARN jobs: The URL from which the job package can be downloaded, for example a http:// or hdfs:// URL. The job package is a .tar.gz file with a specific directory structure.
yarn.container.memory.mb 1024 This is deprecated in favor of cluster-manager.container.memory.mb
yarn.container.cpu.cores 1 This is deprecated in favor of cluster-manager.container.cpu.cores
yarn.container.
retry.count
8 This is deprecated in favor of cluster-manager.container.retry.count
yarn.container.
retry.window.ms
300000 This is deprecated in favor of cluster-manager.container.retry.window.ms
yarn.am.container.
memory.mb
1024 Each Samza job when running in Yarn has one special container, the ApplicationMaster (AM), which manages the execution of the job. This property determines how much memory, in megabytes, to request from YARN for running the ApplicationMaster.
yarn.am.opts Any JVM options to include in the command line when executing the Samza ApplicationMaster. For example, this can be used to set the JVM heap size, to tune the garbage collector, or to enable remote debugging.
yarn.am.java.home The JAVA_HOME path for Samza AM. By setting this property, you can use a java version that is different from your cluster's java version. Remember to set the task.java.home as well.
Example: yarn.am.java.home=/usr/java/jdk1.8.0_05
yarn.am.poll.interval.ms 1000 The Samza ApplicationMaster sends regular heartbeats to the YARN ResourceManager to confirm that it is alive. This property determines the time (in milliseconds) between heartbeats.
yarn.am.jmx.enabled true This is deprecated in favor of cluster-manager.jobcoordinator.jmx.enabled
yarn.allocator.sleep.ms 3600 This is deprecated in favor of cluster-manager.allocator.sleep.ms
yarn.samza.host-affinity.enabled false This is deprecated in favor of job.host-affinity.enabled
yarn.container.request.timeout.ms 5000 This is deprecated in favor of cluster-manager.container.request.timeout.ms
yarn.queue Determines which YARN queue will be used for Samza job.
yarn.kerberos.principal Principal the Samza job uses to authenticate itself into KDC, when running on a Kerberos enabled YARN cluster.
yarn.kerberos.keytab The full path to the file containing keytab for the principal, specified by yarn.kerberos.principal. The keytab file is uploaded to the staging directory unique to each application on HDFS and the Application Master then uses the keytab and principal to periodically logs in to recreate the delegation tokens.
yarn.token.renewal.interval.seconds The time interval by which the Application Master re-authenticates and renew the delegation tokens. This value should be smaller than the length of time a delegation token is valid on hadoop namenodes before expiration.
Metrics
metrics.reporter.
reporter-name.class
Samza automatically tracks various metrics which are useful for monitoring the health of a job, and you can also track your own metrics. With this property, you can define any number of metrics reporters which send the metrics to a system of your choice (for graphing, alerting etc). You give each reporter an arbitrary reporter-name. To enable the reporter, you need to reference the reporter-name in metrics.reporters. The value of this property is the fully-qualified name of a Java class that implements MetricsReporterFactory. Samza ships with these implementations by default:
org.apache.samza.metrics.reporter.JmxReporterFactory
With this reporter, every container exposes its own metrics as JMX MBeans. The JMX server is started on a random port to avoid collisions between containers running on the same machine.
org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
This reporter sends the latest values of all metrics as messages to an output stream once per minute. The output stream is configured with metrics.reporter.*.stream and it can use any system supported by Samza.
metrics.reporters If you have defined any metrics reporters with metrics.reporter.*.class, you need to list them here in order to enable them. The value of this property is a comma-separated list of reporter-name tokens.
metrics.reporter.
reporter-name.stream
If you have registered the metrics reporter metrics.reporter.*.class = org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory, you need to set this property to configure the output stream to which the metrics data should be sent. The stream is given in the form system-name.stream-name, and the system must be defined in the job configuration. It's fine for many different jobs to publish their metrics to the same metrics stream. Samza defines a simple JSON encoding for metrics; in order to use this encoding, you also need to configure a serde for the metrics stream:
metrics.reporter.
reporter-name.interval
If you have registered the metrics reporter metrics.reporter.*.class = org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory, you can use this property to configure how frequently the reporter will report the metrics registered with it. The value for this property should be length of the interval between consecutive metric reporting. This value is in seconds, and should be a positive integer value. This property is optional and set to 60 by default, which means metrics will be reported every 60 seconds.
Writing to HDFS
systems.*.producer.hdfs.writer.class org.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter Fully-qualified class name of the HdfsWriter implementation this HDFS Producer system should use
systems.*.producer.hdfs.compression.type none A human-readable label for the compression type to use, such as "gzip" "snappy" etc. This label will be interpreted differently (or ignored) depending on the nature of the HdfsWriter implementation.
systems.*.producer.hdfs.base.output.dir /user/USERNAME/SYSTEMNAME The base output directory for HDFS writes. Defaults to the home directory of the user who ran the job, followed by the systemName for this HdfsSystemProducer as defined in the job.properties file.
systems.*.producer.hdfs.bucketer.class org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer Fully-qualified class name of the Bucketer implementation that will manage HDFS paths and file names. Used to batch writes by time, or other similar partitioning methods.
systems.*.producer.hdfs.bucketer.date.path.format A date format (using Java's SimpleDataFormat syntax) appropriate for use in an HDFS Path, which can configure time-based bucketing of output files.
systems.*.producer.hdfs.write.batch.size.bytes 268435456 The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.
systems.*.producer.hdfs.write.batch.size.records 262144 The number of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 262144 if not set.
Reading from HDFS
systems.*.consumer.bufferCapacity 10 Capacity of the hdfs consumer buffer - the blocking queue used for storing messages. Larger buffer capacity typically leads to better throughput but consumes more memory.
systems.*.consumer.numMaxRetries 10 The number of retry attempts when there is a failure to fetch messages from HDFS, before the container fails.
systems.*.partitioner.defaultPartitioner.whitelist .* White list used by directory partitioner to select files in a hdfs directory, in Java Pattern style.
systems.*.partitioner.defaultPartitioner.blacklist Black list used by directory partitioner to filter out unwanted files in a hdfs directory, in Java Pattern style.
systems.*.partitioner.defaultPartitioner.groupPattern Group pattern used by directory partitioner for advanced partitioning. The advanced partitioning goes beyond the basic assumption that each file is a partition. With advanced partitioning you can group files into partitions arbitrarily. For example, if you have a set of files as [part-01-a.avro, part-01-b.avro, part-02-a.avro, part-02-b.avro, part-03-a.avro], and you want to organize the partitions as (part-01-a.avro, part-01-b.avro), (part-02-a.avro, part-02-b.avro), (part-03-a.avro), where the numbers in the middle act as a "group identifier", you can then set this property to be "part-[id]-.*" (note that "[id]" is a reserved term here, i.e. you have to literally put it as "[id]"). The partitioner will apply this pattern to all file names and extract the "group identifier" ("[id]" in the pattern), then use the "group identifier" to group files into partitions. See more details in HdfsSystemConsumer design doc
systems.*.consumer.reader avro Type of the file reader for different event formats (avro, plain, json, etc.). "avro" is only type supported for now.
systems.*.stagingDirectory Staging directory for storing partition description. By default (if not set by users) the value is inherited from "yarn.job.staging.directory" internally. The default value is typically good enough unless you want explicitly use a separate location.
Migrating from Samza 0.9.1 to 0.10.0
(This section applies if you are upgrading from Samza 0.9.1 to 0.10.0 and have set task.checkpoint.factory to anything other than org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory)
task.checkpoint.skip-migration false When migrating from 0.9.1 to 0.10.0, the taskName-to-changelog partition mapping was moved from the checkpoint stream to the coordinator stream.
If you are using a checkpoint manager other than kafka (org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory), you have to manually migrate taskName-to-changelog partition mapping to the coordinator stream.
This can be achieved with the assistance of the checkpoint-tool.sh.