kafka.producer package

Submodules

kafka.producer.base module

class kafka.producer.base.Producer(client, req_acks=1, ack_timeout=1000, codec=None, codec_compresslevel=None, sync_fail_on_error=True, async=False, batch_send=False, batch_send_every_n=20, batch_send_every_t=20, async_retry_limit=None, async_retry_backoff_ms=100, async_retry_on_timeouts=True, async_queue_maxsize=0, async_queue_put_timeout=0, async_log_messages_on_error=True, async_stop_timeout=30)

Bases: object

Base class to be used by producers

Parameters:
  • client (KafkaClient) – instance to use for broker communications. If async=True, the background thread will use client.copy(), which is expected to return a thread-safe object.
  • codec (kafka.protocol.ALL_CODECS) – compression codec to use.
  • req_acks (int, optional) – A value indicating the acknowledgements that the server must receive before responding to the request, defaults to 1 (local ack).
  • ack_timeout (int, optional) – millisecond timeout to wait for the configured req_acks, defaults to 1000.
  • sync_fail_on_error (bool, optional) – whether sync producer should raise exceptions (True), or just return errors (False), defaults to True.
  • async (bool, optional) – send message using a background thread, defaults to False.
  • batch_send_every_n (int, optional) – If async is True, messages are sent in batches of this size, defaults to 20.
  • batch_send_every_t (int or float, optional) – If async is True, messages are sent immediately after this timeout in seconds, even if there are fewer than batch_send_every_n, defaults to 20.
  • async_retry_limit (int, optional) – number of retries for failed messages or None for unlimited, defaults to None / unlimited.
  • async_retry_backoff_ms (int, optional) – milliseconds to backoff on failed messages, defaults to 100.
  • async_retry_on_timeouts (bool, optional) – whether to retry on RequestTimeoutError, defaults to True.
  • async_queue_maxsize (int, optional) – limit to the size of the internal message queue in number of messages (not size), defaults to 0 (no limit).
  • async_queue_put_timeout (int or float, optional) – timeout seconds for queue.put in send_messages for async producers – will only apply if async_queue_maxsize > 0 and the queue is Full, defaults to 0 (fail immediately on full queue).
  • async_log_messages_on_error (bool, optional) – set to False and the async producer will only log hash() contents on failed produce requests, defaults to True (log full messages). Hash logging will not allow you to identify the specific message that failed, but it will allow you to match failures with retries.
  • async_stop_timeout (int or float, optional) – seconds to continue attempting to send queued messages after producer.stop(), defaults to 30.
Deprecated Arguments:
batch_send (bool, optional): If True, messages are sent by a background
thread in batches, defaults to False. Deprecated, use ‘async’
ACK_AFTER_CLUSTER_COMMIT = -1
ACK_AFTER_LOCAL_WRITE = 1
ACK_NOT_REQUIRED = 0
DEFAULT_ACK_TIMEOUT = 1000
send_messages(topic, partition, *msg)

Helper method to send produce requests @param: topic, name of topic for produce request – type str @param: partition, partition number for produce request – type int @param: *msg, one or more message payloads – type bytes @returns: ResponseRequest returned by server raises on error

Note that msg type must be encoded to bytes by user. Passing unicode message will not work, for example you should encode before calling send_messages via something like unicode_message.encode(‘utf-8’)

All messages produced via this method will set the message ‘key’ to Null

stop(timeout=None)

Stop the producer (async mode). Blocks until async thread completes.

kafka.producer.keyed module

class kafka.producer.keyed.KeyedProducer(*args, **kwargs)

Bases: kafka.producer.base.Producer

A producer which distributes messages to partitions based on the key

See Producer class for Arguments

Additional Arguments:
partitioner: A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner. Defaults to HashedPartitioner.
send(topic, key, msg)
send_messages(topic, key, *msg)

kafka.producer.simple module

class kafka.producer.simple.SimpleProducer(*args, **kwargs)

Bases: kafka.producer.base.Producer

A simple, round-robin producer.

See Producer class for Base Arguments

Additional Arguments:
random_start (bool, optional): randomize the initial partition which
the first message block will be published to, otherwise if false, the first message block will always publish to partition 0 before cycling through each partition, defaults to True.
send_messages(topic, *msg)

Module contents

class kafka.producer.SimpleProducer(*args, **kwargs)

Bases: kafka.producer.base.Producer

A simple, round-robin producer.

See Producer class for Base Arguments

Additional Arguments:
random_start (bool, optional): randomize the initial partition which
the first message block will be published to, otherwise if false, the first message block will always publish to partition 0 before cycling through each partition, defaults to True.
send_messages(topic, *msg)
class kafka.producer.KeyedProducer(*args, **kwargs)

Bases: kafka.producer.base.Producer

A producer which distributes messages to partitions based on the key

See Producer class for Arguments

Additional Arguments:
partitioner: A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner. Defaults to HashedPartitioner.
send(topic, key, msg)
send_messages(topic, key, *msg)