kafka.consumer package¶
Submodules¶
kafka.consumer.base module¶
-
class
kafka.consumer.base.
Consumer
(client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000)¶ Bases:
object
Base class to be used by other consumers. Not to be used directly
This base class provides logic for
- initialization and fetching metadata of partitions
- Auto-commit logic
- APIs for fetching pending message count
-
commit
(partitions=None)¶ Commit stored offsets to Kafka via OffsetCommitRequest (v0)
Keyword Arguments: partitions (list) – list of partitions to commit, default is to commit all of them Returns: True on success, False on failure
-
fetch_last_known_offsets
(partitions=None)¶
-
pending
(partitions=None)¶ Gets the pending message count
Keyword Arguments: partitions (list) – list of partitions to check for, default is to check all
-
provide_partition_info
()¶ Indicates that partition info must be returned by the consumer
-
stop
()¶
kafka.consumer.kafka module¶
-
class
kafka.consumer.kafka.
KafkaConsumer
(*topics, **configs)¶ Bases:
object
A simpler kafka consumer
-
DEFAULT_CONFIG
= {'fetch_message_max_bytes': 1048576, 'group_id': None, 'consumer_timeout_ms': -1, 'auto_commit_interval_messages': None, 'auto_commit_interval_ms': 60000, 'refresh_leader_backoff_ms': 200, 'deserializer_class': <function <lambda> at 0x7f942a12d848>, 'rebalance_max_retries': 4, 'auto_commit_enable': False, 'rebalance_backoff_ms': 2000, 'queued_max_message_chunks': 10, 'default_fetcher_backoff_ms': 1000, 'client_id': 'kafka.consumer.kafka', 'fetch_wait_max_ms': 100, 'auto_offset_reset': 'largest', 'bootstrap_servers': [], 'socket_timeout_ms': 30000, 'socket_receive_buffer_bytes': 65536, 'fetch_min_bytes': 1, 'num_consumer_fetchers': 1}¶
-
close
()¶ Close this consumer’s underlying client.
-
commit
()¶ Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.
Returns: True on success, or False if no offsets were found for commit Note
this functionality requires server version >=0.8.1.1 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
-
configure
(**configs)¶ Configure the consumer instance
Configuration settings can be passed to constructor, otherwise defaults will be used:
Keyword Arguments: bootstrap_servers (list) – List of initial broker nodes the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request.
client_id (str) – a unique name for this client. Defaults to ‘kafka.consumer.kafka’.
group_id (str) – the name of the consumer group to join, Offsets are fetched / committed to this group name.
fetch_message_max_bytes (int, optional) – Maximum bytes for each topic/partition fetch request. Defaults to 1024*1024.
fetch_min_bytes (int, optional) – Minimum amount of data the server should return for a fetch request, otherwise wait up to fetch_wait_max_ms for more data to accumulate. Defaults to 1.
fetch_wait_max_ms (int, optional) – Maximum time for the server to block waiting for fetch_min_bytes messages to accumulate. Defaults to 100.
refresh_leader_backoff_ms (int, optional) – Milliseconds to backoff when refreshing metadata on errors (subject to random jitter). Defaults to 200.
socket_timeout_ms (int, optional) – TCP socket timeout in milliseconds. Defaults to 30*1000.
auto_offset_reset (str, optional) – A policy for resetting offsets on OffsetOutOfRange errors. ‘smallest’ will move to the oldest available message, ‘largest’ will move to the most recent. Any ofther value will raise the exception. Defaults to ‘largest’.
deserializer_class (callable, optional) – Any callable that takes a raw message value and returns a deserialized value. Defaults to
lambda msg: msg.
auto_commit_enable (bool, optional) – Enabling auto-commit will cause the KafkaConsumer to periodically commit offsets without an explicit call to commit(). Defaults to False.
auto_commit_interval_ms (int, optional) – If auto_commit_enabled, the milliseconds between automatic offset commits. Defaults to 60 * 1000.
auto_commit_interval_messages (int, optional) – If auto_commit_enabled, a number of messages consumed between automatic offset commits. Defaults to None (disabled).
consumer_timeout_ms (int, optional) – number of millisecond to throw a timeout exception to the consumer if no message is available for consumption. Defaults to -1 (dont throw exception).
Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi
-
fetch_messages
()¶ Sends FetchRequests for all topic/partitions set for consumption
Returns: Generator that yields KafkaMessage structs after deserializing with the configured deserializer_class Note
Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy
See also
Key KafkaConsumer configuration parameters: * fetch_message_max_bytes * fetch_max_wait_ms * fetch_min_bytes * deserializer_class * auto_offset_reset
-
get_partition_offsets
(topic, partition, request_time_ms, max_num_offsets)¶ Request available fetch offsets for a single topic/partition
Keyword Arguments: - topic (str) – topic for offset request
- partition (int) – partition for offset request
- request_time_ms (int) – Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.
- max_num_offsets (int) – Maximum offsets to include in the OffsetResponse
Returns: a list of offsets in the OffsetResponse submitted for the provided topic / partition. See: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
-
next
()¶ Return the next available message
Blocks indefinitely unless consumer_timeout_ms > 0
Returns: a single KafkaMessage from the message iterator Raises: ConsumerTimeout after consumer_timeout_ms and no message Note
This is also the method called internally during iteration
-
offsets
(group=None)¶ Get internal consumer offset values
Keyword Arguments: group – Either “fetch”, “commit”, “task_done”, or “highwater”. If no group specified, returns all groups. Returns: A copy of internal offsets struct
-
set_topic_partitions
(*topics)¶ Set the topic/partitions to consume Optionally specify offsets to start from
Accepts types:
str (utf-8): topic name (will consume all available partitions)
tuple: (topic, partition)
- dict:
- { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
Optionally, offsets can be specified directly:
- tuple: (topic, partition, offset)
- dict: { (topic, partition): offset, ... }
Example:
kafka = KafkaConsumer() # Consume topic1-all; topic2-partition2; topic3-partition0 kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45 # using tuples -- kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45)) # using dict -- kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })
-
task_done
(message)¶ Mark a fetched message as consumed.
Offsets for messages marked as “task_done” will be stored back to the kafka cluster for this consumer group on commit()
Parameters: message (KafkaMessage) – the message to mark as complete Returns: True, unless the topic-partition for this message has not been configured for the consumer. In normal operation, this should not happen. But see github issue 364.
-
kafka.consumer.multiprocess module¶
-
class
kafka.consumer.multiprocess.
Events
(start, pause, exit)¶ Bases:
tuple
-
exit
¶ Alias for field number 2
-
pause
¶ Alias for field number 1
-
start
¶ Alias for field number 0
-
-
class
kafka.consumer.multiprocess.
MultiProcessConsumer
(client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0, **simple_consumer_options)¶ Bases:
kafka.consumer.base.Consumer
A consumer implementation that consumes partitions for a topic in parallel using multiple processes
Parameters: - client – a connected KafkaClient
- group – a name for this consumer, used for offset storage and must be unique If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you must set this to None
- topic – the topic to consume
Keyword Arguments: - partitions – An optional list of partitions to consume the data from
- auto_commit – default True. Whether or not to auto commit the offsets
- auto_commit_every_n – default 100. How many messages to consume before a commit
- auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
- num_procs – Number of processes to start for consuming messages. The available partitions will be divided among these processes
- partitions_per_proc – Number of partitions to be allocated per process (overrides num_procs)
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
-
get_messages
(count=1, block=True, timeout=10)¶ Fetch the specified number of messages
Keyword Arguments: - count – Indicates the maximum number of messages to be fetched
- block – If True, the API will block till all messages are fetched. If block is a positive integer the API will block until that many messages are fetched.
- timeout – When blocking is requested the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
-
stop
()¶
kafka.consumer.simple module¶
-
class
kafka.consumer.simple.
FetchContext
(consumer, block, timeout)¶ Bases:
object
Class for managing the state of a consumer during fetch
-
class
kafka.consumer.simple.
SimpleConsumer
(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None, auto_offset_reset='largest')¶ Bases:
kafka.consumer.base.Consumer
A simple consumer implementation that consumes all/specified partitions for a topic
Parameters: - client – a connected KafkaClient
- group – a name for this consumer, used for offset storage and must be unique If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you must set this to None
- topic – the topic to consume
Keyword Arguments: - partitions – An optional list of partitions to consume the data from
- auto_commit – default True. Whether or not to auto commit the offsets
- auto_commit_every_n – default 100. How many messages to consume before a commit
- auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
- fetch_size_bytes – number of bytes to request in a FetchRequest
- buffer_size – default 4K. Initial number of bytes to tell kafka we have available. This will double as needed.
- max_buffer_size – default 16K. Max number of bytes to tell kafka we have available. None means no limit.
- iter_timeout – default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever.
- auto_offset_reset – default largest. Reset partition offsets upon OffsetOutOfRangeError. Valid values are largest and smallest. Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
-
get_message
(block=True, timeout=0.1, get_partition_info=None)¶
-
get_messages
(count=1, block=True, timeout=0.1)¶ Fetch the specified number of messages
Keyword Arguments: - count – Indicates the maximum number of messages to be fetched
- block – If True, the API will block till all messages are fetched. If block is a positive integer the API will block until that many messages are fetched.
- timeout – When blocking is requested the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
-
reset_partition_offset
(partition)¶ Update offsets using auto_offset_reset policy (smallest|largest)
Parameters: partition (int) – the partition for which offsets should be updated Returns: Updated offset on success, None on failure
-
seek
(offset, whence=None, partition=None)¶ Alter the current offset in the consumer, similar to fseek
Parameters: - offset – how much to modify the offset
- whence –
where to modify it from, default is None
- None is an absolute offset
- 0 is relative to the earliest available offset (head)
- 1 is relative to the current offset
- 2 is relative to the latest known offset (tail)
- partition – modify which partition, default is None. If partition is None, would modify all partitions.
Module contents¶
-
class
kafka.consumer.
SimpleConsumer
(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None, auto_offset_reset='largest')¶ Bases:
kafka.consumer.base.Consumer
A simple consumer implementation that consumes all/specified partitions for a topic
Parameters: - client – a connected KafkaClient
- group – a name for this consumer, used for offset storage and must be unique If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you must set this to None
- topic – the topic to consume
Keyword Arguments: - partitions – An optional list of partitions to consume the data from
- auto_commit – default True. Whether or not to auto commit the offsets
- auto_commit_every_n – default 100. How many messages to consume before a commit
- auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
- fetch_size_bytes – number of bytes to request in a FetchRequest
- buffer_size – default 4K. Initial number of bytes to tell kafka we have available. This will double as needed.
- max_buffer_size – default 16K. Max number of bytes to tell kafka we have available. None means no limit.
- iter_timeout – default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever.
- auto_offset_reset – default largest. Reset partition offsets upon OffsetOutOfRangeError. Valid values are largest and smallest. Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
-
get_message
(block=True, timeout=0.1, get_partition_info=None)¶
-
get_messages
(count=1, block=True, timeout=0.1)¶ Fetch the specified number of messages
Keyword Arguments: - count – Indicates the maximum number of messages to be fetched
- block – If True, the API will block till all messages are fetched. If block is a positive integer the API will block until that many messages are fetched.
- timeout – When blocking is requested the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
-
reset_partition_offset
(partition)¶ Update offsets using auto_offset_reset policy (smallest|largest)
Parameters: partition (int) – the partition for which offsets should be updated Returns: Updated offset on success, None on failure
-
seek
(offset, whence=None, partition=None)¶ Alter the current offset in the consumer, similar to fseek
Parameters: - offset – how much to modify the offset
- whence –
where to modify it from, default is None
- None is an absolute offset
- 0 is relative to the earliest available offset (head)
- 1 is relative to the current offset
- 2 is relative to the latest known offset (tail)
- partition – modify which partition, default is None. If partition is None, would modify all partitions.
-
class
kafka.consumer.
MultiProcessConsumer
(client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0, **simple_consumer_options)¶ Bases:
kafka.consumer.base.Consumer
A consumer implementation that consumes partitions for a topic in parallel using multiple processes
Parameters: - client – a connected KafkaClient
- group – a name for this consumer, used for offset storage and must be unique If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you must set this to None
- topic – the topic to consume
Keyword Arguments: - partitions – An optional list of partitions to consume the data from
- auto_commit – default True. Whether or not to auto commit the offsets
- auto_commit_every_n – default 100. How many messages to consume before a commit
- auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
- num_procs – Number of processes to start for consuming messages. The available partitions will be divided among these processes
- partitions_per_proc – Number of partitions to be allocated per process (overrides num_procs)
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
-
get_messages
(count=1, block=True, timeout=10)¶ Fetch the specified number of messages
Keyword Arguments: - count – Indicates the maximum number of messages to be fetched
- block – If True, the API will block till all messages are fetched. If block is a positive integer the API will block until that many messages are fetched.
- timeout – When blocking is requested the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
-
stop
()¶
-
class
kafka.consumer.
KafkaConsumer
(*topics, **configs)¶ Bases:
object
A simpler kafka consumer
-
DEFAULT_CONFIG
= {'fetch_message_max_bytes': 1048576, 'group_id': None, 'consumer_timeout_ms': -1, 'auto_commit_interval_messages': None, 'auto_commit_interval_ms': 60000, 'refresh_leader_backoff_ms': 200, 'deserializer_class': <function <lambda> at 0x7f942a12d848>, 'rebalance_max_retries': 4, 'auto_commit_enable': False, 'rebalance_backoff_ms': 2000, 'queued_max_message_chunks': 10, 'default_fetcher_backoff_ms': 1000, 'client_id': 'kafka.consumer.kafka', 'fetch_wait_max_ms': 100, 'auto_offset_reset': 'largest', 'bootstrap_servers': [], 'socket_timeout_ms': 30000, 'socket_receive_buffer_bytes': 65536, 'fetch_min_bytes': 1, 'num_consumer_fetchers': 1}¶
-
close
()¶ Close this consumer’s underlying client.
-
commit
()¶ Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.
Returns: True on success, or False if no offsets were found for commit Note
this functionality requires server version >=0.8.1.1 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
-
configure
(**configs)¶ Configure the consumer instance
Configuration settings can be passed to constructor, otherwise defaults will be used:
Keyword Arguments: bootstrap_servers (list) – List of initial broker nodes the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request.
client_id (str) – a unique name for this client. Defaults to ‘kafka.consumer.kafka’.
group_id (str) – the name of the consumer group to join, Offsets are fetched / committed to this group name.
fetch_message_max_bytes (int, optional) – Maximum bytes for each topic/partition fetch request. Defaults to 1024*1024.
fetch_min_bytes (int, optional) – Minimum amount of data the server should return for a fetch request, otherwise wait up to fetch_wait_max_ms for more data to accumulate. Defaults to 1.
fetch_wait_max_ms (int, optional) – Maximum time for the server to block waiting for fetch_min_bytes messages to accumulate. Defaults to 100.
refresh_leader_backoff_ms (int, optional) – Milliseconds to backoff when refreshing metadata on errors (subject to random jitter). Defaults to 200.
socket_timeout_ms (int, optional) – TCP socket timeout in milliseconds. Defaults to 30*1000.
auto_offset_reset (str, optional) – A policy for resetting offsets on OffsetOutOfRange errors. ‘smallest’ will move to the oldest available message, ‘largest’ will move to the most recent. Any ofther value will raise the exception. Defaults to ‘largest’.
deserializer_class (callable, optional) – Any callable that takes a raw message value and returns a deserialized value. Defaults to
lambda msg: msg.
auto_commit_enable (bool, optional) – Enabling auto-commit will cause the KafkaConsumer to periodically commit offsets without an explicit call to commit(). Defaults to False.
auto_commit_interval_ms (int, optional) – If auto_commit_enabled, the milliseconds between automatic offset commits. Defaults to 60 * 1000.
auto_commit_interval_messages (int, optional) – If auto_commit_enabled, a number of messages consumed between automatic offset commits. Defaults to None (disabled).
consumer_timeout_ms (int, optional) – number of millisecond to throw a timeout exception to the consumer if no message is available for consumption. Defaults to -1 (dont throw exception).
Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi
-
fetch_messages
()¶ Sends FetchRequests for all topic/partitions set for consumption
Returns: Generator that yields KafkaMessage structs after deserializing with the configured deserializer_class Note
Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy
See also
Key KafkaConsumer configuration parameters: * fetch_message_max_bytes * fetch_max_wait_ms * fetch_min_bytes * deserializer_class * auto_offset_reset
-
get_partition_offsets
(topic, partition, request_time_ms, max_num_offsets)¶ Request available fetch offsets for a single topic/partition
Keyword Arguments: - topic (str) – topic for offset request
- partition (int) – partition for offset request
- request_time_ms (int) – Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.
- max_num_offsets (int) – Maximum offsets to include in the OffsetResponse
Returns: a list of offsets in the OffsetResponse submitted for the provided topic / partition. See: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
-
next
()¶ Return the next available message
Blocks indefinitely unless consumer_timeout_ms > 0
Returns: a single KafkaMessage from the message iterator Raises: ConsumerTimeout after consumer_timeout_ms and no message Note
This is also the method called internally during iteration
-
offsets
(group=None)¶ Get internal consumer offset values
Keyword Arguments: group – Either “fetch”, “commit”, “task_done”, or “highwater”. If no group specified, returns all groups. Returns: A copy of internal offsets struct
-
set_topic_partitions
(*topics)¶ Set the topic/partitions to consume Optionally specify offsets to start from
Accepts types:
str (utf-8): topic name (will consume all available partitions)
tuple: (topic, partition)
- dict:
- { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
Optionally, offsets can be specified directly:
- tuple: (topic, partition, offset)
- dict: { (topic, partition): offset, ... }
Example:
kafka = KafkaConsumer() # Consume topic1-all; topic2-partition2; topic3-partition0 kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45 # using tuples -- kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45)) # using dict -- kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })
-
task_done
(message)¶ Mark a fetched message as consumed.
Offsets for messages marked as “task_done” will be stored back to the kafka cluster for this consumer group on commit()
Parameters: message (KafkaMessage) – the message to mark as complete Returns: True, unless the topic-partition for this message has not been configured for the consumer. In normal operation, this should not happen. But see github issue 364.
-