kafka package¶
Submodules¶
kafka.client module¶
-
class
kafka.client.
KafkaClient
(hosts, client_id='kafka-python', timeout=120, correlation_id=0)¶ Bases:
object
-
CLIENT_ID
= 'kafka-python'¶
-
close
()¶
-
copy
()¶ Create an inactive copy of the client object, suitable for passing to a separate thread.
Note that the copied connections are not initialized, so reinit() must be called on the returned copy.
-
ensure_topic_exists
(topic, timeout=30)¶
-
get_partition_ids_for_topic
(topic)¶
-
has_metadata_for_topic
(topic)¶
-
load_metadata_for_topics
(*topics)¶ Fetch broker and topic-partition metadata from the server, and update internal data: broker list, topic/partition list, and topic/parition -> broker map
This method should be called after receiving any error
Parameters: *topics (optional) – If a list of topics is provided, the metadata refresh will be limited to the specified topics only. If the broker is configured to not auto-create topics, expect UnknownTopicOrPartitionError for topics that don’t exist
If the broker is configured to auto-create topics, expect LeaderNotAvailableError for new topics until partitions have been initialized.
Exceptions will not be raised in a full refresh (i.e. no topic list) In this case, error codes will be logged as errors
Partition-level errors will also not be raised here (a single partition w/o a leader, for example)
-
reinit
()¶
-
reset_all_metadata
()¶
-
reset_topic_metadata
(*topics)¶
-
send_fetch_request
(payloads=[], fail_on_error=True, callback=None, max_wait_time=100, min_bytes=4096)¶ Encode and send a FetchRequest
Payloads are grouped by topic and partition so they can be pipelined to the same brokers.
-
send_metadata_request
(payloads=[], fail_on_error=True, callback=None)¶
-
send_offset_commit_request
(group, payloads=[], fail_on_error=True, callback=None)¶
-
send_offset_fetch_request
(group, payloads=[], fail_on_error=True, callback=None)¶
-
send_offset_request
(payloads=[], fail_on_error=True, callback=None)¶
-
send_produce_request
(payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None)¶ Encode and send some ProduceRequests
ProduceRequests will be grouped by (topic, partition) and then sent to a specific broker. Output is a list of responses in the same order as the list of payloads specified
Parameters: - payloads (list of ProduceRequest) – produce requests to send to kafka ProduceRequest payloads must not contain duplicates for any topic-partition.
- acks (int, optional) – how many acks the servers should receive from replica brokers before responding to the request. If it is 0, the server will not send any response. If it is 1, the server will wait until the data is written to the local log before sending a response. If it is -1, the server will wait until the message is committed by all in-sync replicas before sending a response. For any value > 1, the server will wait for this number of acks to occur (but the server will never wait for more acknowledgements than there are in-sync replicas). defaults to 1.
- timeout (int, optional) – maximum time in milliseconds the server can await the receipt of the number of acks, defaults to 1000.
- fail_on_error (bool, optional) – raise exceptions on connection and server response errors, defaults to True.
- callback (function, optional) – instead of returning the ProduceResponse, first pass it through this function, defaults to None.
Returns: list of ProduceResponses, or callback results if supplied, in the order of input payloads
-
topics
¶
-
kafka.codec module¶
-
kafka.codec.
gzip_decode
(payload)¶
-
kafka.codec.
gzip_encode
(payload)¶
-
kafka.codec.
has_gzip
()¶
-
kafka.codec.
has_snappy
()¶
-
kafka.codec.
snappy_decode
(payload)¶
-
kafka.codec.
snappy_encode
(payload, xerial_compatible=False, xerial_blocksize=32768)¶ Encodes the given data with snappy if xerial_compatible is set then the stream is encoded in a fashion compatible with the xerial snappy library
The block size (xerial_blocksize) controls how frequent the blocking occurs 32k is the default in the xerial library.
- The format winds up being
- Header16 bytes
Block1 len Block1 data Blockn len Blockn datasnappy bytesBE int32 snappy bytes BE int32 It is important to not that the blocksize is the amount of uncompressed data presented to snappy at each block, whereas the blocklen is the number of bytes that will be present in the stream, that is the length will always be <= blocksize.
kafka.common module¶
-
exception
kafka.common.
AsyncProducerQueueFull
(failed_msgs, *args)¶ Bases:
kafka.common.KafkaError
-
class
kafka.common.
BrokerMetadata
(nodeId, host, port)¶ Bases:
tuple
-
host
¶ Alias for field number 1
-
nodeId
¶ Alias for field number 0
-
port
¶ Alias for field number 2
-
-
exception
kafka.common.
BrokerNotAvailableError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 8¶
-
message
= 'BROKER_NOT_AVAILABLE'¶
-
-
exception
kafka.common.
BrokerResponseError
¶ Bases:
kafka.common.KafkaError
-
exception
kafka.common.
BufferUnderflowError
¶ Bases:
kafka.common.KafkaError
-
exception
kafka.common.
ChecksumError
¶ Bases:
kafka.common.KafkaError
-
exception
kafka.common.
ConnectionError
¶ Bases:
kafka.common.KafkaError
-
exception
kafka.common.
ConsumerFetchSizeTooSmall
¶ Bases:
kafka.common.KafkaError
-
exception
kafka.common.
ConsumerNoMoreData
¶ Bases:
kafka.common.KafkaError
-
exception
kafka.common.
ConsumerTimeout
¶ Bases:
kafka.common.KafkaError
-
exception
kafka.common.
FailedPayloadsError
(payload, *args)¶ Bases:
kafka.common.KafkaError
-
class
kafka.common.
FetchRequest
(topic, partition, offset, max_bytes)¶ Bases:
tuple
-
max_bytes
¶ Alias for field number 3
-
offset
¶ Alias for field number 2
-
partition
¶ Alias for field number 1
-
topic
¶ Alias for field number 0
-
-
class
kafka.common.
FetchResponse
(topic, partition, error, highwaterMark, messages)¶ Bases:
tuple
-
error
¶ Alias for field number 2
-
highwaterMark
¶ Alias for field number 3
-
messages
¶ Alias for field number 4
-
partition
¶ Alias for field number 1
-
topic
¶ Alias for field number 0
-
-
exception
kafka.common.
InvalidFetchRequestError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 4¶
-
message
= 'INVALID_FETCH_SIZE'¶
-
-
exception
kafka.common.
InvalidMessageError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 2¶
-
message
= 'INVALID_MESSAGE'¶
-
-
exception
kafka.common.
KafkaConfigurationError
¶ Bases:
kafka.common.KafkaError
-
exception
kafka.common.
KafkaError
¶ Bases:
exceptions.RuntimeError
-
class
kafka.common.
KafkaMessage
(topic, partition, offset, key, value)¶ Bases:
tuple
-
key
¶ Alias for field number 3
-
offset
¶ Alias for field number 2
-
partition
¶ Alias for field number 1
-
topic
¶ Alias for field number 0
-
value
¶ Alias for field number 4
-
-
exception
kafka.common.
KafkaTimeoutError
¶ Bases:
kafka.common.KafkaError
Bases:
kafka.common.KafkaError
-
exception
kafka.common.
LeaderNotAvailableError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 5¶
-
message
= 'LEADER_NOT_AVAILABLE'¶
-
-
class
kafka.common.
Message
(magic, attributes, key, value)¶ Bases:
tuple
-
attributes
¶ Alias for field number 1
-
key
¶ Alias for field number 2
-
magic
¶ Alias for field number 0
-
value
¶ Alias for field number 3
-
-
exception
kafka.common.
MessageSizeTooLargeError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 10¶
-
message
= 'MESSAGE_SIZE_TOO_LARGE'¶
-
-
class
kafka.common.
MetadataResponse
(brokers, topics)¶ Bases:
tuple
-
brokers
¶ Alias for field number 0
-
topics
¶ Alias for field number 1
-
-
exception
kafka.common.
NotLeaderForPartitionError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 6¶
-
message
= 'NOT_LEADER_FOR_PARTITION'¶
-
-
class
kafka.common.
OffsetAndMessage
(offset, message)¶ Bases:
tuple
-
message
¶ Alias for field number 1
-
offset
¶ Alias for field number 0
-
-
class
kafka.common.
OffsetCommitRequest
(topic, partition, offset, metadata)¶ Bases:
tuple
-
metadata
¶ Alias for field number 3
-
offset
¶ Alias for field number 2
-
partition
¶ Alias for field number 1
-
topic
¶ Alias for field number 0
-
-
class
kafka.common.
OffsetCommitResponse
(topic, partition, error)¶ Bases:
tuple
-
error
¶ Alias for field number 2
-
partition
¶ Alias for field number 1
-
topic
¶ Alias for field number 0
-
-
class
kafka.common.
OffsetFetchRequest
(topic, partition)¶ Bases:
tuple
-
partition
¶ Alias for field number 1
-
topic
¶ Alias for field number 0
-
-
class
kafka.common.
OffsetFetchResponse
(topic, partition, offset, metadata, error)¶ Bases:
tuple
-
error
¶ Alias for field number 4
-
metadata
¶ Alias for field number 3
-
offset
¶ Alias for field number 2
-
partition
¶ Alias for field number 1
-
topic
¶ Alias for field number 0
-
-
exception
kafka.common.
OffsetMetadataTooLargeError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 12¶
-
message
= 'OFFSET_METADATA_TOO_LARGE'¶
-
-
exception
kafka.common.
OffsetOutOfRangeError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 1¶
-
message
= 'OFFSET_OUT_OF_RANGE'¶
-
-
class
kafka.common.
OffsetRequest
(topic, partition, time, max_offsets)¶ Bases:
tuple
-
max_offsets
¶ Alias for field number 3
-
partition
¶ Alias for field number 1
-
time
¶ Alias for field number 2
-
topic
¶ Alias for field number 0
-
-
class
kafka.common.
OffsetResponse
(topic, partition, error, offsets)¶ Bases:
tuple
-
error
¶ Alias for field number 2
-
offsets
¶ Alias for field number 3
-
partition
¶ Alias for field number 1
-
topic
¶ Alias for field number 0
-
-
class
kafka.common.
PartitionMetadata
(topic, partition, leader, replicas, isr, error)¶ Bases:
tuple
-
error
¶ Alias for field number 5
-
isr
¶ Alias for field number 4
-
leader
¶ Alias for field number 2
-
partition
¶ Alias for field number 1
-
replicas
¶ Alias for field number 3
-
topic
¶ Alias for field number 0
-
-
class
kafka.common.
ProduceRequest
(topic, partition, messages)¶ Bases:
tuple
-
messages
¶ Alias for field number 2
-
partition
¶ Alias for field number 1
-
topic
¶ Alias for field number 0
-
-
class
kafka.common.
ProduceResponse
(topic, partition, error, offset)¶ Bases:
tuple
-
error
¶ Alias for field number 2
-
offset
¶ Alias for field number 3
-
partition
¶ Alias for field number 1
-
topic
¶ Alias for field number 0
-
-
exception
kafka.common.
ProtocolError
¶ Bases:
kafka.common.KafkaError
-
exception
kafka.common.
ReplicaNotAvailableError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 9¶
-
message
= 'REPLICA_NOT_AVAILABLE'¶
-
-
exception
kafka.common.
RequestTimedOutError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 7¶
-
message
= 'REQUEST_TIMED_OUT'¶
-
-
class
kafka.common.
RetryOptions
(limit, backoff_ms, retry_on_timeouts)¶ Bases:
tuple
-
backoff_ms
¶ Alias for field number 1
-
limit
¶ Alias for field number 0
-
retry_on_timeouts
¶ Alias for field number 2
-
-
exception
kafka.common.
StaleControllerEpochError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 11¶
-
message
= 'STALE_CONTROLLER_EPOCH'¶
-
-
exception
kafka.common.
StaleLeaderEpochCodeError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 13¶
-
message
= 'STALE_LEADER_EPOCH_CODE'¶
-
-
class
kafka.common.
TopicAndPartition
(topic, partition)¶ Bases:
tuple
-
partition
¶ Alias for field number 1
-
topic
¶ Alias for field number 0
-
-
class
kafka.common.
TopicMetadata
(topic, error, partitions)¶ Bases:
tuple
-
error
¶ Alias for field number 1
-
partitions
¶ Alias for field number 2
-
topic
¶ Alias for field number 0
-
-
exception
kafka.common.
UnknownError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= -1¶
-
message
= 'UNKNOWN'¶
-
-
exception
kafka.common.
UnknownTopicOrPartitionError
¶ Bases:
kafka.common.BrokerResponseError
-
errno
= 3¶
-
message
= 'UNKNOWN_TOPIC_OR_PARTITON'¶
-
-
exception
kafka.common.
UnsupportedCodecError
¶ Bases:
kafka.common.KafkaError
-
kafka.common.
check_error
(response)¶
-
kafka.common.
x
¶ alias of
UnknownTopicOrPartitionError
kafka.conn module¶
-
class
kafka.conn.
KafkaConnection
(host, port, timeout=120)¶ Bases:
thread._local
A socket connection to a single Kafka broker
This class is _not_ thread safe. Each call to send must be followed by a call to recv in order to get the correct response. Eventually, we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id.
Parameters: - host – the host name or IP address of a kafka broker
- port – the port number the kafka broker is listening on
- timeout – default 120. The socket timeout for sending and receiving data in seconds. None means no timeout, so a request can block forever.
-
close
()¶ Shutdown and close the connection socket
-
copy
()¶ Create an inactive copy of the connection object, suitable for passing to a background thread.
The returned copy is not connected; you must call reinit() before using.
-
recv
(request_id)¶ Get a response packet from Kafka
Parameters: request_id – can be any int (only used for debug logging...) Returns: Encoded kafka packet response from server Return type: str
-
reinit
()¶ Re-initialize the socket connection close current socket (if open) and start a fresh connection raise ConnectionError on error
-
send
(request_id, payload)¶ Send a request to Kafka
- Arguments::
- request_id (int): can be any int (used only for debug logging...) payload: an encoded kafka packet (see KafkaProtocol)
-
kafka.conn.
collect_hosts
(hosts, randomize=True)¶ Collects a comma-separated set of hosts (host:port) and optionally randomize the returned list.
kafka.context module¶
Context manager to commit/rollback consumer offsets.
-
class
kafka.context.
OffsetCommitContext
(consumer)¶ Bases:
object
Provides commit/rollback semantics around a SimpleConsumer.
Usage assumes that auto_commit is disabled, that messages are consumed in batches, and that the consuming process will record its own successful processing of each message. Both the commit and rollback operations respect a “high-water mark” to ensure that last unsuccessfully processed message will be retried.
Example:
consumer = SimpleConsumer(client, group, topic, auto_commit=False) consumer.provide_partition_info() consumer.fetch_last_known_offsets() while some_condition: with OffsetCommitContext(consumer) as context: messages = consumer.get_messages(count, block=False) for partition, message in messages: if can_process(message): context.mark(partition, message.offset) else: break if not context: sleep(delay)
These semantics allow for deferred message processing (e.g. if can_process compares message time to clock time) and for repeated processing of the last unsuccessful message (until some external error is resolved).
-
commit
()¶ Commit this context’s offsets:
- If the high-water mark has moved, commit up to and position the consumer at the high-water mark.
- Otherwise, reset to the consumer to the initial offsets.
-
commit_partition_offsets
(partition_offsets)¶ Commit explicit partition/offset pairs.
-
handle_out_of_range
()¶ Handle out of range condition by seeking to the beginning of valid ranges.
This assumes that an out of range doesn’t happen by seeking past the end of valid ranges – which is far less likely.
-
mark
(partition, offset)¶ Set the high-water mark in the current context.
In order to know the current partition, it is helpful to initialize the consumer to provide partition info via:
consumer.provide_partition_info()
-
rollback
()¶ Rollback this context:
- Position the consumer at the initial offsets.
-
update_consumer_offsets
(partition_offsets)¶ Update consumer offsets to explicit positions.
-
kafka.protocol module¶
-
class
kafka.protocol.
KafkaProtocol
¶ Bases:
object
Class to encapsulate all of the protocol encoding/decoding. This class does not have any state associated with it, it is purely for organization.
-
FETCH_KEY
= 1¶
-
METADATA_KEY
= 3¶
-
OFFSET_COMMIT_KEY
= 8¶
-
OFFSET_FETCH_KEY
= 9¶
-
OFFSET_KEY
= 2¶
-
PRODUCE_KEY
= 0¶
-
classmethod
decode_fetch_response
(data)¶ Decode bytes to a FetchResponse
Parameters: data – bytes to decode
-
classmethod
decode_metadata_response
(data)¶ Decode bytes to a MetadataResponse
Parameters: data – bytes to decode
-
classmethod
decode_offset_commit_response
(data)¶ Decode bytes to an OffsetCommitResponse
Parameters: data – bytes to decode
-
classmethod
decode_offset_fetch_response
(data)¶ Decode bytes to an OffsetFetchResponse
Parameters: data – bytes to decode
-
classmethod
decode_offset_response
(data)¶ Decode bytes to an OffsetResponse
Parameters: data – bytes to decode
-
classmethod
decode_produce_response
(data)¶ Decode bytes to a ProduceResponse
Parameters: data – bytes to decode
-
classmethod
encode_fetch_request
(client_id, correlation_id, payloads=None, max_wait_time=100, min_bytes=4096)¶ Encodes some FetchRequest structs
Parameters: - client_id – string
- correlation_id – int
- payloads – list of FetchRequest
- max_wait_time – int, how long to block waiting on min_bytes of data
- min_bytes – int, the minimum number of bytes to accumulate before returning the response
-
classmethod
encode_metadata_request
(client_id, correlation_id, topics=None, payloads=None)¶ Encode a MetadataRequest
Parameters: - client_id – string
- correlation_id – int
- topics – list of strings
-
classmethod
encode_offset_commit_request
(client_id, correlation_id, group, payloads)¶ Encode some OffsetCommitRequest structs
Parameters: - client_id – string
- correlation_id – int
- group – string, the consumer group you are committing offsets for
- payloads – list of OffsetCommitRequest
-
classmethod
encode_offset_fetch_request
(client_id, correlation_id, group, payloads)¶ Encode some OffsetFetchRequest structs
Parameters: - client_id – string
- correlation_id – int
- group – string, the consumer group you are fetching offsets for
- payloads – list of OffsetFetchRequest
-
classmethod
encode_offset_request
(client_id, correlation_id, payloads=None)¶
-
classmethod
encode_produce_request
(client_id, correlation_id, payloads=None, acks=1, timeout=1000)¶ Encode some ProduceRequest structs
Parameters: - client_id – string
- correlation_id – int
- payloads – list of ProduceRequest
- acks – How “acky” you want the request to be 0: immediate response 1: written to disk by the leader 2+: waits for this many number of replicas to sync -1: waits for all replicas to be in sync
- timeout – Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout
-
-
kafka.protocol.
create_gzip_message
(payloads, key=None)¶ Construct a Gzipped Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka.
Parameters: - payloads – list(bytes), a list of payload to send be sent to Kafka
- key – bytes, a key used for partition routing (optional)
-
kafka.protocol.
create_message
(payload, key=None)¶ Construct a Message
Parameters: - payload – bytes, the payload to send to Kafka
- key – bytes, a key used for partition routing (optional)
-
kafka.protocol.
create_message_set
(messages, codec=0, key=None)¶ Create a message set using the given codec.
If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise, return a list containing a single codec-encoded message.
-
kafka.protocol.
create_snappy_message
(payloads, key=None)¶ Construct a Snappy Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka.
Parameters: - payloads – list(bytes), a list of payload to send be sent to Kafka
- key – bytes, a key used for partition routing (optional)
kafka.util module¶
-
class
kafka.util.
ReentrantTimer
(t, fn, *args, **kwargs)¶ Bases:
object
A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer)
Parameters: - t – timer interval in milliseconds
- fn – a callable to invoke
- args – tuple of args to be passed to function
- kwargs – keyword arguments to be passed to function
-
start
()¶
-
stop
()¶
-
kafka.util.
crc32
(data)¶
-
kafka.util.
group_by_topic_and_partition
(tuples)¶
-
kafka.util.
kafka_bytestring
(s)¶ Takes a string or bytes instance Returns bytes, encoding strings in utf-8 as necessary
-
kafka.util.
read_int_string
(data, cur)¶
-
kafka.util.
read_short_string
(data, cur)¶
-
kafka.util.
relative_unpack
(fmt, data, cur)¶
-
kafka.util.
write_int_string
(s)¶
-
kafka.util.
write_short_string
(s)¶
Module contents¶
-
class
kafka.
KafkaClient
(hosts, client_id='kafka-python', timeout=120, correlation_id=0)¶ Bases:
object
-
CLIENT_ID
= 'kafka-python'¶
-
close
()¶
-
copy
()¶ Create an inactive copy of the client object, suitable for passing to a separate thread.
Note that the copied connections are not initialized, so reinit() must be called on the returned copy.
-
ensure_topic_exists
(topic, timeout=30)¶
-
get_partition_ids_for_topic
(topic)¶
-
has_metadata_for_topic
(topic)¶
-
load_metadata_for_topics
(*topics)¶ Fetch broker and topic-partition metadata from the server, and update internal data: broker list, topic/partition list, and topic/parition -> broker map
This method should be called after receiving any error
Parameters: *topics (optional) – If a list of topics is provided, the metadata refresh will be limited to the specified topics only. If the broker is configured to not auto-create topics, expect UnknownTopicOrPartitionError for topics that don’t exist
If the broker is configured to auto-create topics, expect LeaderNotAvailableError for new topics until partitions have been initialized.
Exceptions will not be raised in a full refresh (i.e. no topic list) In this case, error codes will be logged as errors
Partition-level errors will also not be raised here (a single partition w/o a leader, for example)
-
reinit
()¶
-
reset_all_metadata
()¶
-
reset_topic_metadata
(*topics)¶
-
send_fetch_request
(payloads=[], fail_on_error=True, callback=None, max_wait_time=100, min_bytes=4096)¶ Encode and send a FetchRequest
Payloads are grouped by topic and partition so they can be pipelined to the same brokers.
-
send_metadata_request
(payloads=[], fail_on_error=True, callback=None)¶
-
send_offset_commit_request
(group, payloads=[], fail_on_error=True, callback=None)¶
-
send_offset_fetch_request
(group, payloads=[], fail_on_error=True, callback=None)¶
-
send_offset_request
(payloads=[], fail_on_error=True, callback=None)¶
-
send_produce_request
(payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None)¶ Encode and send some ProduceRequests
ProduceRequests will be grouped by (topic, partition) and then sent to a specific broker. Output is a list of responses in the same order as the list of payloads specified
Parameters: - payloads (list of ProduceRequest) – produce requests to send to kafka ProduceRequest payloads must not contain duplicates for any topic-partition.
- acks (int, optional) – how many acks the servers should receive from replica brokers before responding to the request. If it is 0, the server will not send any response. If it is 1, the server will wait until the data is written to the local log before sending a response. If it is -1, the server will wait until the message is committed by all in-sync replicas before sending a response. For any value > 1, the server will wait for this number of acks to occur (but the server will never wait for more acknowledgements than there are in-sync replicas). defaults to 1.
- timeout (int, optional) – maximum time in milliseconds the server can await the receipt of the number of acks, defaults to 1000.
- fail_on_error (bool, optional) – raise exceptions on connection and server response errors, defaults to True.
- callback (function, optional) – instead of returning the ProduceResponse, first pass it through this function, defaults to None.
Returns: list of ProduceResponses, or callback results if supplied, in the order of input payloads
-
topics
¶
-
-
class
kafka.
KafkaConnection
(host, port, timeout=120)¶ Bases:
thread._local
A socket connection to a single Kafka broker
This class is _not_ thread safe. Each call to send must be followed by a call to recv in order to get the correct response. Eventually, we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id.
Parameters: - host – the host name or IP address of a kafka broker
- port – the port number the kafka broker is listening on
- timeout – default 120. The socket timeout for sending and receiving data in seconds. None means no timeout, so a request can block forever.
-
close
()¶ Shutdown and close the connection socket
-
copy
()¶ Create an inactive copy of the connection object, suitable for passing to a background thread.
The returned copy is not connected; you must call reinit() before using.
-
recv
(request_id)¶ Get a response packet from Kafka
Parameters: request_id – can be any int (only used for debug logging...) Returns: Encoded kafka packet response from server Return type: str
-
reinit
()¶ Re-initialize the socket connection close current socket (if open) and start a fresh connection raise ConnectionError on error
-
send
(request_id, payload)¶ Send a request to Kafka
- Arguments::
- request_id (int): can be any int (used only for debug logging...) payload: an encoded kafka packet (see KafkaProtocol)
-
class
kafka.
SimpleProducer
(*args, **kwargs)¶ Bases:
kafka.producer.base.Producer
A simple, round-robin producer.
See Producer class for Base Arguments
- Additional Arguments:
- random_start (bool, optional): randomize the initial partition which
- the first message block will be published to, otherwise if false, the first message block will always publish to partition 0 before cycling through each partition, defaults to True.
-
send_messages
(topic, *msg)¶
-
class
kafka.
KeyedProducer
(*args, **kwargs)¶ Bases:
kafka.producer.base.Producer
A producer which distributes messages to partitions based on the key
See Producer class for Arguments
- Additional Arguments:
- partitioner: A partitioner class that will be used to get the partition
- to send the message to. Must be derived from Partitioner. Defaults to HashedPartitioner.
-
send
(topic, key, msg)¶
-
send_messages
(topic, key, *msg)¶
-
class
kafka.
RoundRobinPartitioner
(partitions)¶ Bases:
kafka.partitioner.base.Partitioner
Implements a round robin partitioner which sends data to partitions in a round robin fashion
-
partition
(key, partitions=None)¶
-
-
kafka.
HashedPartitioner
¶ alias of
LegacyPartitioner
-
class
kafka.
SimpleConsumer
(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None, auto_offset_reset='largest')¶ Bases:
kafka.consumer.base.Consumer
A simple consumer implementation that consumes all/specified partitions for a topic
Parameters: - client – a connected KafkaClient
- group – a name for this consumer, used for offset storage and must be unique If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you must set this to None
- topic – the topic to consume
Keyword Arguments: - partitions – An optional list of partitions to consume the data from
- auto_commit – default True. Whether or not to auto commit the offsets
- auto_commit_every_n – default 100. How many messages to consume before a commit
- auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
- fetch_size_bytes – number of bytes to request in a FetchRequest
- buffer_size – default 4K. Initial number of bytes to tell kafka we have available. This will double as needed.
- max_buffer_size – default 16K. Max number of bytes to tell kafka we have available. None means no limit.
- iter_timeout – default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever.
- auto_offset_reset – default largest. Reset partition offsets upon OffsetOutOfRangeError. Valid values are largest and smallest. Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
-
get_message
(block=True, timeout=0.1, get_partition_info=None)¶
-
get_messages
(count=1, block=True, timeout=0.1)¶ Fetch the specified number of messages
Keyword Arguments: - count – Indicates the maximum number of messages to be fetched
- block – If True, the API will block till some messages are fetched.
- timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
-
provide_partition_info
()¶ Indicates that partition info must be returned by the consumer
-
reset_partition_offset
(partition)¶ Update offsets using auto_offset_reset policy (smallest|largest)
Parameters: partition (int) – the partition for which offsets should be updated Returns: Updated offset on success, None on failure
-
seek
(offset, whence)¶ Alter the current offset in the consumer, similar to fseek
Parameters: - offset – how much to modify the offset
- whence –
where to modify it from
- 0 is relative to the earliest available offset (head)
- 1 is relative to the current offset
- 2 is relative to the latest known offset (tail)
-
class
kafka.
MultiProcessConsumer
(client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0, **simple_consumer_options)¶ Bases:
kafka.consumer.base.Consumer
A consumer implementation that consumes partitions for a topic in parallel using multiple processes
Parameters: - client – a connected KafkaClient
- group – a name for this consumer, used for offset storage and must be unique If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you must set this to None
- topic – the topic to consume
Keyword Arguments: - partitions – An optional list of partitions to consume the data from
- auto_commit – default True. Whether or not to auto commit the offsets
- auto_commit_every_n – default 100. How many messages to consume before a commit
- auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
- num_procs – Number of processes to start for consuming messages. The available partitions will be divided among these processes
- partitions_per_proc – Number of partitions to be allocated per process (overrides num_procs)
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
-
get_messages
(count=1, block=True, timeout=10)¶ Fetch the specified number of messages
Keyword Arguments: - count – Indicates the maximum number of messages to be fetched
- block – If True, the API will block till some messages are fetched.
- timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
-
stop
()¶
-
kafka.
create_message
(payload, key=None)¶ Construct a Message
Parameters: - payload – bytes, the payload to send to Kafka
- key – bytes, a key used for partition routing (optional)
-
kafka.
create_gzip_message
(payloads, key=None)¶ Construct a Gzipped Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka.
Parameters: - payloads – list(bytes), a list of payload to send be sent to Kafka
- key – bytes, a key used for partition routing (optional)
-
kafka.
create_snappy_message
(payloads, key=None)¶ Construct a Snappy Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka.
Parameters: - payloads – list(bytes), a list of payload to send be sent to Kafka
- key – bytes, a key used for partition routing (optional)
-
class
kafka.
KafkaConsumer
(*topics, **configs)¶ Bases:
object
A simpler kafka consumer
-
commit
()¶ Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.
Returns: True on success, or False if no offsets were found for commit Note
this functionality requires server version >=0.8.1.1 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
-
configure
(**configs)¶ Configure the consumer instance
Configuration settings can be passed to constructor, otherwise defaults will be used:
Keyword Arguments: - bootstrap_servers (list) – List of initial broker nodes the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request.
- client_id (str) – a unique name for this client. Defaults to ‘kafka.consumer.kafka’.
- group_id (str) – the name of the consumer group to join, Offsets are fetched / committed to this group name.
- fetch_message_max_bytes (int, optional) – Maximum bytes for each topic/partition fetch request. Defaults to 1024*1024.
- fetch_min_bytes (int, optional) – Minimum amount of data the server should return for a fetch request, otherwise wait up to fetch_wait_max_ms for more data to accumulate. Defaults to 1.
- fetch_wait_max_ms (int, optional) – Maximum time for the server to block waiting for fetch_min_bytes messages to accumulate. Defaults to 100.
- refresh_leader_backoff_ms (int, optional) – Milliseconds to backoff when refreshing metadata on errors (subject to random jitter). Defaults to 200.
- socket_timeout_ms (int, optional) – TCP socket timeout in milliseconds. Defaults to 30*1000.
- auto_offset_reset (str, optional) – A policy for resetting offsets on OffsetOutOfRange errors. ‘smallest’ will move to the oldest available message, ‘largest’ will move to the most recent. Any ofther value will raise the exception. Defaults to ‘largest’.
- deserializer_class (callable, optional) –
Any callable that takes a raw message value and returns a deserialized value. Defaults to
lambda msg: msg. - auto_commit_enable (bool, optional) – Enabling auto-commit will cause the KafkaConsumer to periodically commit offsets without an explicit call to commit(). Defaults to False.
- auto_commit_interval_ms (int, optional) – If auto_commit_enabled, the milliseconds between automatic offset commits. Defaults to 60 * 1000.
- auto_commit_interval_messages (int, optional) – If auto_commit_enabled, a number of messages consumed between automatic offset commits. Defaults to None (disabled).
- consumer_timeout_ms (int, optional) – number of millisecond to throw a timeout exception to the consumer if no message is available for consumption. Defaults to -1 (dont throw exception).
Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi
-
fetch_messages
()¶ Sends FetchRequests for all topic/partitions set for consumption
Returns: Generator that yields KafkaMessage structs after deserializing with the configured deserializer_class Note
Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy
See also
Key KafkaConsumer configuration parameters: * fetch_message_max_bytes * fetch_max_wait_ms * fetch_min_bytes * deserializer_class * auto_offset_reset
-
get_partition_offsets
(topic, partition, request_time_ms, max_num_offsets)¶ Request available fetch offsets for a single topic/partition
Keyword Arguments: - topic (str) – topic for offset request
- partition (int) – partition for offset request
- request_time_ms (int) – Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.
- max_num_offsets (int) – Maximum offsets to include in the OffsetResponse
Returns: a list of offsets in the OffsetResponse submitted for the provided topic / partition. See: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
-
next
()¶ Return the next available message
Blocks indefinitely unless consumer_timeout_ms > 0
Returns: a single KafkaMessage from the message iterator Raises: ConsumerTimeout after consumer_timeout_ms and no message Note
This is also the method called internally during iteration
-
offsets
(group=None)¶ Get internal consumer offset values
Keyword Arguments: group – Either “fetch”, “commit”, “task_done”, or “highwater”. If no group specified, returns all groups. Returns: A copy of internal offsets struct
-
set_topic_partitions
(*topics)¶ Set the topic/partitions to consume Optionally specify offsets to start from
Accepts types:
- str (utf-8): topic name (will consume all available partitions)
- tuple: (topic, partition)
- dict:
- { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
Optionally, offsets can be specified directly:
- tuple: (topic, partition, offset)
- dict: { (topic, partition): offset, ... }
Example:
kafka = KafkaConsumer() # Consume topic1-all; topic2-partition2; topic3-partition0 kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45 # using tuples -- kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45)) # using dict -- kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })
-
task_done
(message)¶ Mark a fetched message as consumed.
Offsets for messages marked as “task_done” will be stored back to the kafka cluster for this consumer group on commit()
Parameters: message (KafkaMessage) – the message to mark as complete Returns: True, unless the topic-partition for this message has not been configured for the consumer. In normal operation, this should not happen. But see github issue 364.
-