From 1c8ce2ccdaca0d4dbf145c718d1517ac229d0c9a Mon Sep 17 00:00:00 2001 From: Albin Skott Date: Sun, 14 Sep 2025 18:33:20 +0200 Subject: [PATCH 1/7] chore(python): update pyo3-stub-gen to latest version on crates.io --- foreign/python/apache_iggy.pyi | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi index a5141681bf..ef2f26bf9c 100644 --- a/foreign/python/apache_iggy.pyi +++ b/foreign/python/apache_iggy.pyi @@ -477,10 +477,10 @@ class SendMessage: """ def __new__(cls, data: builtins.str | bytes) -> SendMessage: r""" - Constructs a new `SendMessage` instance from a string. + Constructs a new `SendMessage` instance from a string or bytes. This method allows for the creation of a `SendMessage` instance - directly from Python using the provided string data. + directly from Python using the provided string or bytes data. """ class StreamDetails: From a217aef6346cf7fc0622fe6509ca606112ba664a Mon Sep 17 00:00:00 2001 From: Albin Skott Date: Sun, 14 Sep 2025 20:50:01 +0200 Subject: [PATCH 2/7] feat(python): add AsyncIterator interface to IggyConsumer This adds a `.iter()` method for asynchronously iterating over Iggy messages from a consumer. Adding this method requires the consumer group to be initialized so `consumer_group` is made async and initialization is made upfront. `partition_id` is also exposed on `ReceiveMessage` to allow manual commit of offsets. --- foreign/python/Cargo.toml | 7 ++++ foreign/python/apache_iggy.pyi | 10 ++++- foreign/python/src/client.rs | 28 +++++++++---- foreign/python/src/consumer.rs | 21 ++++++---- foreign/python/src/iterator.rs | 59 +++++++++++++++++++++++++++ foreign/python/src/lib.rs | 3 ++ foreign/python/src/receive_message.rs | 15 +++---- foreign/python/tests/test_iggy_sdk.py | 48 ++++++++++++++++++++-- 8 files changed, 160 insertions(+), 31 deletions(-) create mode 100644 foreign/python/src/iterator.rs diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml index 5d1b1b4d77..5ed988b80e 100644 --- a/foreign/python/Cargo.toml +++ b/foreign/python/Cargo.toml @@ -30,9 +30,16 @@ repository = "https://github.com/apache/iggy" [dependencies] bytes = "1.10.1" +<<<<<<< HEAD iggy = { path = "../../core/sdk", version = "0.8.1-edge.1" } pyo3 = "0.26.0" pyo3-async-runtimes = { version = "0.26.0", features = [ +======= +futures = "0.3.31" +iggy = { path = "../../core/sdk", version = "0.7.0" } +pyo3 = "0.25.0" +pyo3-async-runtimes = { version = "0.25.0", features = [ +>>>>>>> 2ce2ba0a (feat(python): add AsyncIterator interface to IggyConsumer) "attributes", "tokio-runtime", ] } diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi index ef2f26bf9c..4138f36853 100644 --- a/foreign/python/apache_iggy.pyi +++ b/foreign/python/apache_iggy.pyi @@ -324,7 +324,7 @@ class IggyClient: init_retries: typing.Optional[builtins.int] = None, init_retry_interval: typing.Optional[datetime.timedelta] = None, allow_replay: builtins.bool = False, - ) -> IggyConsumer: + ) -> collections.abc.Awaitable[IggyConsumer]: r""" Creates a new consumer group consumer. @@ -385,6 +385,10 @@ class IggyConsumer: Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` if the operation fails. """ + def iter(self) -> collections.abc.AsyncIterator[ReceiveMessage]: + r""" + Asynchronously iterate over `ReceiveMessage`s. + """ def consume_messages( self, callback: collections.abc.Callable[ @@ -467,6 +471,10 @@ class ReceiveMessage: The length represents the length of the payload. """ + def partition_id(self) -> builtins.int: + r""" + Retrieves the partition this message belongs to. + """ class SendMessage: r""" diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs index f30000d9fe..b0db32fc3f 100644 --- a/foreign/python/src/client.rs +++ b/foreign/python/src/client.rs @@ -16,9 +16,6 @@ * under the License. */ -use std::str::FromStr; -use std::sync::Arc; - use iggy::prelude::{ Consumer as RustConsumer, IggyClient as RustIggyClient, IggyMessage as RustMessage, PollingStrategy as RustPollingStrategy, *, @@ -28,6 +25,8 @@ use pyo3::types::{PyDelta, PyList, PyType}; use pyo3_async_runtimes::tokio::future_into_py; use pyo3_stub_gen::define_stub_info_gatherer; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; +use std::str::FromStr; +use std::sync::Arc; use crate::consumer::{py_delta_to_iggy_duration, AutoCommit, IggyConsumer}; use crate::identifier::PyIdentifier; @@ -314,7 +313,10 @@ impl IggyClient { let messages = polled_messages .messages .into_iter() - .map(ReceiveMessage::from_rust_message) + .map(|m| ReceiveMessage { + inner: m, + partition_id, + }) .collect::>(); Ok(messages) }) @@ -340,8 +342,10 @@ impl IggyClient { init_retry_interval=None, allow_replay=false, ))] - fn consumer_group( + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[IggyConsumer]", imports=("collections.abc")))] + fn consumer_group<'a>( &self, + py: Python<'a>, name: &str, stream: &str, topic: &str, @@ -356,7 +360,7 @@ impl IggyClient { init_retries: Option, init_retry_interval: Option>, allow_replay: bool, - ) -> PyResult { + ) -> PyResult> { let mut builder = self .inner .consumer_group(name, stream, topic) @@ -412,10 +416,16 @@ impl IggyClient { if allow_replay { builder = builder.allow_replay() } - let consumer = builder.build(); + let mut consumer = builder.build(); - Ok(IggyConsumer { - inner: Arc::new(Mutex::new(consumer)), + future_into_py(py, async move { + consumer + .init() + .await + .map_err(|e| PyErr::new::(format!("{e:?}")))?; + Ok(IggyConsumer { + inner: Arc::new(Mutex::new(consumer)), + }) }) } } diff --git a/foreign/python/src/consumer.rs b/foreign/python/src/consumer.rs index 8a830aeb21..795b41bd59 100644 --- a/foreign/python/src/consumer.rs +++ b/foreign/python/src/consumer.rs @@ -36,6 +36,7 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use crate::identifier::PyIdentifier; +use crate::iterator::ReceiveMessageIterator; use crate::receive_message::ReceiveMessage; /// A Python class representing the Iggy consumer. @@ -129,6 +130,13 @@ impl IggyConsumer { }) } + /// Asynchronously iterate over `ReceiveMessage`s. + #[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]", imports=("collections.abc")))] + fn iter<'a>(&self) -> ReceiveMessageIterator { + let inner = self.inner.clone(); + ReceiveMessageIterator { inner } + } + /// Consumes messages continuously using a callback function and an optional `asyncio.Event` for signaling shutdown. /// /// Returns an awaitable that completes when shutdown is signaled or a PyRuntimeError on failure. @@ -148,14 +156,6 @@ impl IggyConsumer { future_into_py(py, async { let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); - let inner_init = inner.clone(); - let mut inner_init = inner_init.lock().await; - inner_init - .init() - .await - .map_err(|e| PyErr::new::(format!("{e:?}")))?; - drop(inner_init); - let task_locals = Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?; let handle_consume = get_runtime().spawn(scope(task_locals, async move { let task_locals = @@ -219,7 +219,10 @@ impl MessageConsumer for PyCallbackConsumer { let callback = self.callback.clone(); let task_locals = self.task_locals.clone().lock_owned().await; let task_locals = Python::with_gil(|py| task_locals.clone_ref(py)); - let message = ReceiveMessage::from_rust_message(received.message); + let message = ReceiveMessage { + inner: received.message, + partition_id: received.partition_id, + }; get_runtime() .spawn(scope(task_locals, async move { Python::with_gil(|py| { diff --git a/foreign/python/src/iterator.rs b/foreign/python/src/iterator.rs new file mode 100644 index 0000000000..3131d28c41 --- /dev/null +++ b/foreign/python/src/iterator.rs @@ -0,0 +1,59 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::sync::Arc; + +use futures::StreamExt; +use iggy::prelude::IggyConsumer as RustIggyConsumer; +use pyo3::exceptions::PyStopIteration; + +use crate::receive_message::ReceiveMessage; +use pyo3::prelude::*; +use pyo3_async_runtimes::tokio::future_into_py; +use tokio::sync::Mutex; + +#[pyclass] +pub struct ReceiveMessageIterator { + pub(crate) inner: Arc>, +} + +#[pymethods] +impl ReceiveMessageIterator { + pub fn __anext__<'a>(&self, py: Python<'a>) -> PyResult> { + let inner = self.inner.clone(); + future_into_py(py, async move { + let mut inner = inner.lock().await; + if let Some(message) = inner.next().await { + Ok(message + .map(|m| ReceiveMessage { + inner: m.message, + partition_id: m.partition_id, + }) + .map_err(|e| { + PyErr::new::(format!("{e:?}")) + })?) + } else { + Err(PyStopIteration::new_err("No more messages")) + } + }) + } + + pub fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } +} diff --git a/foreign/python/src/lib.rs b/foreign/python/src/lib.rs index efc12cab54..63afe785a0 100644 --- a/foreign/python/src/lib.rs +++ b/foreign/python/src/lib.rs @@ -19,6 +19,7 @@ pub mod client; mod consumer; mod identifier; +mod iterator; mod receive_message; mod send_message; mod stream; @@ -26,6 +27,7 @@ mod topic; use client::IggyClient; use consumer::{AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer}; +use iterator::ReceiveMessageIterator; use pyo3::prelude::*; use receive_message::{PollingStrategy, ReceiveMessage}; use send_message::SendMessage; @@ -45,5 +47,6 @@ fn apache_iggy(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/foreign/python/src/receive_message.rs b/foreign/python/src/receive_message.rs index cd3be2978d..078e590451 100644 --- a/foreign/python/src/receive_message.rs +++ b/foreign/python/src/receive_message.rs @@ -28,15 +28,7 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyclass_complex_enum, gen #[gen_stub_pyclass] pub struct ReceiveMessage { pub(crate) inner: RustReceiveMessage, -} - -impl ReceiveMessage { - /// Converts a Rust message into its corresponding Python representation. - /// - /// This is an internal utility function, not exposed to Python. - pub(crate) fn from_rust_message(message: RustReceiveMessage) -> Self { - Self { inner: message } - } + pub(crate) partition_id: u32, } #[gen_stub_pymethods] @@ -83,6 +75,11 @@ impl ReceiveMessage { pub fn length(&self) -> u32 { self.inner.header.payload_length } + + /// Retrieves the partition this message belongs to. + pub fn partition_id(&self) -> u32 { + self.partition_id + } } #[derive(Clone, Copy)] diff --git a/foreign/python/tests/test_iggy_sdk.py b/foreign/python/tests/test_iggy_sdk.py index 142e9ce620..5ac72fa7ee 100644 --- a/foreign/python/tests/test_iggy_sdk.py +++ b/foreign/python/tests/test_iggy_sdk.py @@ -445,7 +445,7 @@ async def test_meta(self, iggy_client: IggyClient, consumer_group_setup): await iggy_client.create_topic( stream=stream_name, name=topic_name, partitions_count=1 ) - consumer = iggy_client.consumer_group( + consumer = await iggy_client.consumer_group( consumer_name, stream_name, topic_name, @@ -482,7 +482,7 @@ async def test_consume_messages( stream=stream_name, name=topic_name, partitions_count=1 ) - consumer = iggy_client.consumer_group( + consumer = await iggy_client.consumer_group( consumer_name, stream_name, topic_name, @@ -510,6 +510,48 @@ async def send() -> None: assert received_messages == test_messages + @pytest.mark.asyncio + async def test_iter_messages(self, iggy_client: IggyClient, consumer_group_setup): + """Test that the consumer group can consume messages.""" + consumer_name = consumer_group_setup["consumer"] + stream_name = consumer_group_setup["stream"] + topic_name = consumer_group_setup["topic"] + partition_id = consumer_group_setup["partition_id"] + test_messages = consumer_group_setup["messages"] + + # Setup + received_messages = [] + shutdown_event = asyncio.Event() + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + consumer = await iggy_client.consumer_group( + consumer_name, + stream_name, + topic_name, + partition_id, + PollingStrategy.Next(), + 10, + auto_commit=AutoCommit.Interval(timedelta(seconds=5)), + poll_interval=timedelta(seconds=1), + ) + + await iggy_client.send_messages( + stream_name, + topic_name, + partition_id, + [Message(m) for m in test_messages], + ) + + async for message in consumer.iter(): + received_messages.append(message.payload().decode()) + if len(received_messages) == 5: + break + + assert received_messages == test_messages + @pytest.mark.asyncio async def test_shutdown(self, iggy_client: IggyClient, consumer_group_setup): """Test that the consumer group can be signaled to shutdown.""" @@ -525,7 +567,7 @@ async def test_shutdown(self, iggy_client: IggyClient, consumer_group_setup): stream=stream_name, name=topic_name, partitions_count=1 ) - consumer = iggy_client.consumer_group( + consumer = await iggy_client.consumer_group( consumer_name, stream_name, topic_name, From df3e0809b3d1950723c2ea7973bbe2a47db60819 Mon Sep 17 00:00:00 2001 From: Rimuksh Kansal Date: Sat, 10 Jan 2026 20:17:26 +0900 Subject: [PATCH 3/7] change python method name; add documentation --- foreign/python/apache_iggy.pyi | 257 ++++++++++++--------------------- foreign/python/pyproject.toml | 3 + foreign/python/src/consumer.rs | 10 +- 3 files changed, 102 insertions(+), 168 deletions(-) diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi index 4138f36853..3d0901acf7 100644 --- a/foreign/python/apache_iggy.pyi +++ b/foreign/python/apache_iggy.pyi @@ -32,86 +32,70 @@ class AutoCommit: r""" The auto-commit is disabled and the offset must be stored manually by the consumer. """ - __match_args__ = ((),) def __new__(cls) -> AutoCommit.Disabled: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + class Interval(AutoCommit): r""" The auto-commit is enabled and the offset is stored on the server after a certain interval. """ - __match_args__ = ("_0",) @property def _0(self) -> datetime.timedelta: ... - def __new__(cls, _0: datetime.timedelta) -> AutoCommit.Interval: ... + def __new__(cls, _0:datetime.timedelta) -> AutoCommit.Interval: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + class IntervalOrWhen(AutoCommit): r""" The auto-commit is enabled and the offset is stored on the server after a certain interval or depending on the mode when consuming the messages. """ - - __match_args__ = ( - "_0", - "_1", - ) + __match_args__ = ("_0", "_1",) @property def _0(self) -> datetime.timedelta: ... @property def _1(self) -> AutoCommitWhen: ... - def __new__( - cls, _0: datetime.timedelta, _1: AutoCommitWhen - ) -> AutoCommit.IntervalOrWhen: ... + def __new__(cls, _0:datetime.timedelta, _1:AutoCommitWhen) -> AutoCommit.IntervalOrWhen: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + class IntervalOrAfter(AutoCommit): r""" The auto-commit is enabled and the offset is stored on the server after a certain interval or depending on the mode after consuming the messages. """ - - __match_args__ = ( - "_0", - "_1", - ) + __match_args__ = ("_0", "_1",) @property def _0(self) -> datetime.timedelta: ... @property def _1(self) -> AutoCommitAfter: ... - def __new__( - cls, _0: datetime.timedelta, _1: AutoCommitAfter - ) -> AutoCommit.IntervalOrAfter: ... + def __new__(cls, _0:datetime.timedelta, _1:AutoCommitAfter) -> AutoCommit.IntervalOrAfter: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + class When(AutoCommit): r""" The auto-commit is enabled and the offset is stored on the server depending on the mode when consuming the messages. """ - __match_args__ = ("_0",) @property def _0(self) -> AutoCommitWhen: ... - def __new__(cls, _0: AutoCommitWhen) -> AutoCommit.When: ... + def __new__(cls, _0:AutoCommitWhen) -> AutoCommit.When: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + class After(AutoCommit): r""" The auto-commit is enabled and the offset is stored on the server depending on the mode after consuming the messages. """ - __match_args__ = ("_0",) @property def _0(self) -> AutoCommitAfter: ... - def __new__(cls, _0: AutoCommitAfter) -> AutoCommit.After: ... + def __new__(cls, _0:AutoCommitAfter) -> AutoCommit.After: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + ... class AutoCommitAfter: @@ -122,36 +106,31 @@ class AutoCommitAfter: r""" The offset is stored on the server after all the messages are consumed. """ - __match_args__ = ((),) def __new__(cls) -> AutoCommitAfter.ConsumingAllMessages: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + class ConsumingEachMessage(AutoCommitAfter): r""" The offset is stored on the server after consuming each message. """ - __match_args__ = ((),) def __new__(cls) -> AutoCommitAfter.ConsumingEachMessage: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + class ConsumingEveryNthMessage(AutoCommitAfter): r""" The offset is stored on the server after consuming every Nth message. """ - __match_args__ = ("_0",) @property def _0(self) -> builtins.int: ... - def __new__( - cls, _0: builtins.int - ) -> AutoCommitAfter.ConsumingEveryNthMessage: ... + def __new__(cls, _0:builtins.int) -> AutoCommitAfter.ConsumingEveryNthMessage: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + ... class AutoCommitWhen: @@ -162,46 +141,40 @@ class AutoCommitWhen: r""" The offset is stored on the server when the messages are received. """ - __match_args__ = ((),) def __new__(cls) -> AutoCommitWhen.PollingMessages: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + class ConsumingAllMessages(AutoCommitWhen): r""" The offset is stored on the server when all the messages are consumed. """ - __match_args__ = ((),) def __new__(cls) -> AutoCommitWhen.ConsumingAllMessages: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + class ConsumingEachMessage(AutoCommitWhen): r""" The offset is stored on the server when consuming each message. """ - __match_args__ = ((),) def __new__(cls) -> AutoCommitWhen.ConsumingEachMessage: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + class ConsumingEveryNthMessage(AutoCommitWhen): r""" The offset is stored on the server when consuming every Nth message. """ - __match_args__ = ("_0",) @property def _0(self) -> builtins.int: ... - def __new__( - cls, _0: builtins.int - ) -> AutoCommitWhen.ConsumingEveryNthMessage: ... + def __new__(cls, _0:builtins.int) -> AutoCommitWhen.ConsumingEveryNthMessage: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key: builtins.int) -> typing.Any: ... - + def __getitem__(self, key:builtins.int) -> typing.Any: ... + ... class IggyClient: @@ -210,39 +183,37 @@ class IggyClient: It wraps the RustIggyClient and provides asynchronous functionality through the contained runtime. """ - def __new__(cls, conn: typing.Optional[builtins.str] = None) -> IggyClient: + def __new__(cls, conn:typing.Optional[builtins.str]=None) -> IggyClient: r""" Constructs a new IggyClient from a TCP server address. - + This initializes a new runtime for asynchronous operations. Future versions might utilize asyncio for more Pythonic async. """ @classmethod - def from_connection_string(cls, connection_string: builtins.str) -> IggyClient: + def from_connection_string(cls, connection_string:builtins.str) -> IggyClient: r""" Constructs a new IggyClient from a connection string. - + Returns an error if the connection string provided is invalid. """ def ping(self) -> collections.abc.Awaitable[None]: r""" Sends a ping request to the server to check connectivity. - + Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` if the connection fails. """ - def login_user( - self, username: builtins.str, password: builtins.str - ) -> collections.abc.Awaitable[None]: + def login_user(self, username:builtins.str, password:builtins.str) -> collections.abc.Awaitable[None]: r""" Logs in the user with the given credentials. - + Returns `Ok(())` on success, or a PyRuntimeError on failure. """ def connect(self) -> collections.abc.Awaitable[None]: r""" Connects the IggyClient to its service. - + Returns Ok(()) on successful connection or a PyRuntimeError on failure. """ def create_stream(self, name: builtins.str) -> collections.abc.Awaitable[None]: @@ -251,83 +222,40 @@ class IggyClient: Returns Ok(()) on successful stream creation or a PyRuntimeError on failure. """ - def get_stream( - self, stream_id: builtins.str | builtins.int - ) -> collections.abc.Awaitable[typing.Optional[StreamDetails]]: + def get_stream(self, stream_id:builtins.str | builtins.int) -> collections.abc.Awaitable[typing.Optional[StreamDetails]]: r""" Gets stream by id. - + Returns Option of stream details or a PyRuntimeError on failure. """ - def create_topic( - self, - stream: builtins.str | builtins.int, - name: builtins.str, - partitions_count: builtins.int, - compression_algorithm: typing.Optional[builtins.str] = None, - replication_factor: typing.Optional[builtins.int] = None, - ) -> collections.abc.Awaitable[None]: + def create_topic(self, stream:builtins.str | builtins.int, name:builtins.str, partitions_count:builtins.int, compression_algorithm:typing.Optional[builtins.str]=None, topic_id:typing.Optional[builtins.int]=None, replication_factor:typing.Optional[builtins.int]=None) -> collections.abc.Awaitable[None]: r""" Creates a new topic with the given parameters. - + Returns Ok(()) on successful topic creation or a PyRuntimeError on failure. """ - def get_topic( - self, - stream_id: builtins.str | builtins.int, - topic_id: builtins.str | builtins.int, - ) -> collections.abc.Awaitable[typing.Optional[TopicDetails]]: + def get_topic(self, stream_id:builtins.str | builtins.int, topic_id:builtins.str | builtins.int) -> collections.abc.Awaitable[typing.Optional[TopicDetails]]: r""" Gets topic by stream and id. - + Returns Option of topic details or a PyRuntimeError on failure. """ - def send_messages( - self, - stream: builtins.str | builtins.int, - topic: builtins.str | builtins.int, - partitioning: builtins.int, - messages: list[SendMessage], - ) -> collections.abc.Awaitable[None]: + def send_messages(self, stream:builtins.str | builtins.int, topic:builtins.str | builtins.int, partitioning:builtins.int, messages:list[SendMessage]) -> collections.abc.Awaitable[None]: r""" Sends a list of messages to the specified topic. - + Returns Ok(()) on successful sending or a PyRuntimeError on failure. """ - def poll_messages( - self, - stream: builtins.str | builtins.int, - topic: builtins.str | builtins.int, - partition_id: builtins.int, - polling_strategy: PollingStrategy, - count: builtins.int, - auto_commit: builtins.bool, - ) -> collections.abc.Awaitable[list[ReceiveMessage]]: + def poll_messages(self, stream:builtins.str | builtins.int, topic:builtins.str | builtins.int, partition_id:builtins.int, polling_strategy:PollingStrategy, count:builtins.int, auto_commit:builtins.bool) -> collections.abc.Awaitable[list[ReceiveMessage]]: r""" Polls for messages from the specified topic and partition. - + Returns a list of received messages or a PyRuntimeError on failure. """ - def consumer_group( - self, - name: builtins.str, - stream: builtins.str, - topic: builtins.str, - partition_id: typing.Optional[builtins.int] = None, - polling_strategy: typing.Optional[PollingStrategy] = None, - batch_length: typing.Optional[builtins.int] = None, - auto_commit: typing.Optional[AutoCommit] = None, - create_consumer_group_if_not_exists: builtins.bool = True, - auto_join_consumer_group: builtins.bool = True, - poll_interval: typing.Optional[datetime.timedelta] = None, - polling_retry_interval: typing.Optional[datetime.timedelta] = None, - init_retries: typing.Optional[builtins.int] = None, - init_retry_interval: typing.Optional[datetime.timedelta] = None, - allow_replay: builtins.bool = False, - ) -> collections.abc.Awaitable[IggyConsumer]: + def consumer_group(self, name:builtins.str, stream:builtins.str, topic:builtins.str, partition_id:typing.Optional[builtins.int]=None, polling_strategy:typing.Optional[PollingStrategy]=None, batch_length:typing.Optional[builtins.int]=None, auto_commit:typing.Optional[AutoCommit]=None, create_consumer_group_if_not_exists:builtins.bool=True, auto_join_consumer_group:builtins.bool=True, poll_interval:typing.Optional[datetime.timedelta]=None, polling_retry_interval:typing.Optional[datetime.timedelta]=None, init_retries:typing.Optional[builtins.int]=None, init_retry_interval:typing.Optional[datetime.timedelta]=None, allow_replay:builtins.bool=False) -> collections.abc.Awaitable[IggyConsumer]: r""" Creates a new consumer group consumer. - + Returns the consumer or a PyRuntimeError on failure. """ @@ -337,15 +265,11 @@ class IggyConsumer: It wraps the RustIggyConsumer and provides asynchronous functionality through the contained runtime. """ - def get_last_consumed_offset( - self, partition_id: builtins.int - ) -> typing.Optional[builtins.int]: + def get_last_consumed_offset(self, partition_id:builtins.int) -> typing.Optional[builtins.int]: r""" Get the last consumed offset or `None` if no offset has been consumed yet. """ - def get_last_stored_offset( - self, partition_id: builtins.int - ) -> typing.Optional[builtins.int]: + def get_last_stored_offset(self, partition_id:builtins.int) -> typing.Optional[builtins.int]: r""" Get the last stored offset or `None` if no offset has been stored yet. """ @@ -365,40 +289,38 @@ class IggyConsumer: r""" Gets the name of the topic this consumer group is configured for. """ - def store_offset( - self, offset: builtins.int, partition_id: typing.Optional[builtins.int] - ) -> collections.abc.Awaitable[None]: + def store_offset(self, offset:builtins.int, partition_id:typing.Optional[builtins.int]) -> collections.abc.Awaitable[None]: r""" Stores the provided offset for the provided partition id or if none is specified uses the current partition id for the consumer group. - + Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` if the operation fails. """ - def delete_offset( - self, partition_id: typing.Optional[builtins.int] - ) -> collections.abc.Awaitable[None]: + def delete_offset(self, partition_id:typing.Optional[builtins.int]) -> collections.abc.Awaitable[None]: r""" Deletes the offset for the provided partition id or if none is specified uses the current partition id for the consumer group. - + Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` if the operation fails. """ - def iter(self) -> collections.abc.AsyncIterator[ReceiveMessage]: + def iter_messages(self) -> collections.abc.AsyncIterator[ReceiveMessage]: r""" Asynchronously iterate over `ReceiveMessage`s. + + Returns an async iterator that raises `StopAsyncIteration` when no more messages are available + or a `PyRuntimeError` on failure. + + Note: This method does not currently support `AutoCommit.After`. + For `AutoCommit.IntervalOrAfter(datetime.timedelta, AutoCommitAfter)`, + only the interval part is applied; the `after` mode is ignored. + Use `consume_messages()` if you need commit-after-processing semantics. """ - def consume_messages( - self, - callback: collections.abc.Callable[ - [ReceiveMessage], collections.abc.Awaitable[None] - ], - shutdown_event: typing.Optional[asyncio.Event], - ) -> collections.abc.Awaitable[None]: + def consume_messages(self, callback:collections.abc.Callable[[ReceiveMessage], collections.abc.Awaitable[None]], shutdown_event:typing.Optional[asyncio.Event]) -> collections.abc.Awaitable[None]: r""" Consumes messages continuously using a callback function and an optional `asyncio.Event` for signaling shutdown. - + Returns an awaitable that completes when shutdown is signaled or a PyRuntimeError on failure. """ @@ -407,68 +329,68 @@ class PollingStrategy: __match_args__ = ("value",) @property def value(self) -> builtins.int: ... - def __new__(cls, value: builtins.int) -> PollingStrategy.Offset: ... - + def __new__(cls, value:builtins.int) -> PollingStrategy.Offset: ... + class Timestamp(PollingStrategy): __match_args__ = ("value",) @property def value(self) -> builtins.int: ... - def __new__(cls, value: builtins.int) -> PollingStrategy.Timestamp: ... - + def __new__(cls, value:builtins.int) -> PollingStrategy.Timestamp: ... + class First(PollingStrategy): __match_args__ = ((),) def __new__(cls) -> PollingStrategy.First: ... - + class Last(PollingStrategy): __match_args__ = ((),) def __new__(cls) -> PollingStrategy.Last: ... - + class Next(PollingStrategy): __match_args__ = ((),) def __new__(cls) -> PollingStrategy.Next: ... - + ... class ReceiveMessage: r""" A Python class representing a received message. - + This class wraps a Rust message, allowing for access to its payload and offset from Python. """ def payload(self) -> bytes: r""" Retrieves the payload of the received message. - + The payload is returned as a Python bytes object. """ def offset(self) -> builtins.int: r""" Retrieves the offset of the received message. - + The offset represents the position of the message within its topic. """ def timestamp(self) -> builtins.int: r""" Retrieves the timestamp of the received message. - + The timestamp represents the time of the message within its topic. """ def id(self) -> builtins.int: r""" Retrieves the id of the received message. - + The id represents unique identifier of the message within its topic. """ def checksum(self) -> builtins.int: r""" Retrieves the checksum of the received message. - + The checksum represents the integrity of the message within its topic. """ def length(self) -> builtins.int: r""" Retrieves the length of the received message. - + The length represents the length of the payload. """ def partition_id(self) -> builtins.int: @@ -479,14 +401,14 @@ class ReceiveMessage: class SendMessage: r""" A Python class representing a message to be sent. - + This class wraps a Rust message meant for sending, facilitating the creation of such messages from Python and their subsequent use in Rust. """ - def __new__(cls, data: builtins.str | bytes) -> SendMessage: + def __new__(cls, data:builtins.str | bytes) -> SendMessage: r""" Constructs a new `SendMessage` instance from a string or bytes. - + This method allows for the creation of a `SendMessage` instance directly from Python using the provided string or bytes data. """ @@ -510,3 +432,4 @@ class TopicDetails: def messages_count(self) -> builtins.int: ... @property def partitions_count(self) -> builtins.int: ... + diff --git a/foreign/python/pyproject.toml b/foreign/python/pyproject.toml index 9bb86e3ad4..3f3dbdc758 100644 --- a/foreign/python/pyproject.toml +++ b/foreign/python/pyproject.toml @@ -50,6 +50,9 @@ classifiers = [ "Topic :: System :: Networking", "Topic :: System :: Distributed Computing", ] +dependencies = [ + "maturin[patchelf]>=1.10.2", +] [project.urls] "Homepage" = "https://iggy.apache.org" diff --git a/foreign/python/src/consumer.rs b/foreign/python/src/consumer.rs index 795b41bd59..a5d4d7dc8d 100644 --- a/foreign/python/src/consumer.rs +++ b/foreign/python/src/consumer.rs @@ -131,8 +131,16 @@ impl IggyConsumer { } /// Asynchronously iterate over `ReceiveMessage`s. + /// + /// Returns an async iterator that raises `StopAsyncIteration` when no more messages are available + /// or a `PyRuntimeError` on failure. + /// + /// Note: This method does not currently support `AutoCommit.After`. + /// For `AutoCommit.IntervalOrAfter(datetime.timedelta, AutoCommitAfter)`, + /// only the interval part is applied; the `after` mode is ignored. + /// Use `consume_messages()` if you need commit-after-processing semantics. #[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]", imports=("collections.abc")))] - fn iter<'a>(&self) -> ReceiveMessageIterator { + fn iter_messages<'a>(&self) -> ReceiveMessageIterator { let inner = self.inner.clone(); ReceiveMessageIterator { inner } } From a56ee4575962640956cf2e46f508f464c16449fe Mon Sep 17 00:00:00 2001 From: Rimuksh Kansal Date: Sat, 10 Jan 2026 20:18:53 +0900 Subject: [PATCH 4/7] fmt --- foreign/python/src/consumer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/foreign/python/src/consumer.rs b/foreign/python/src/consumer.rs index a5d4d7dc8d..b2ed74f766 100644 --- a/foreign/python/src/consumer.rs +++ b/foreign/python/src/consumer.rs @@ -131,11 +131,11 @@ impl IggyConsumer { } /// Asynchronously iterate over `ReceiveMessage`s. - /// + /// /// Returns an async iterator that raises `StopAsyncIteration` when no more messages are available /// or a `PyRuntimeError` on failure. - /// - /// Note: This method does not currently support `AutoCommit.After`. + /// + /// Note: This method does not currently support `AutoCommit.After`. /// For `AutoCommit.IntervalOrAfter(datetime.timedelta, AutoCommitAfter)`, /// only the interval part is applied; the `after` mode is ignored. /// Use `consume_messages()` if you need commit-after-processing semantics. From 9df56c8f0e4e8d55b32274485fbbb801e2c0c56f Mon Sep 17 00:00:00 2001 From: Rimuksh Kansal Date: Sat, 10 Jan 2026 20:55:40 +0900 Subject: [PATCH 5/7] fix conflicts --- foreign/python/Cargo.toml | 8 +------- foreign/python/pyproject.toml | 3 --- foreign/python/tests/test_iggy_sdk.py | 2 +- 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml index 5ed988b80e..1b7a5c6dc3 100644 --- a/foreign/python/Cargo.toml +++ b/foreign/python/Cargo.toml @@ -30,16 +30,10 @@ repository = "https://github.com/apache/iggy" [dependencies] bytes = "1.10.1" -<<<<<<< HEAD +futures = "0.3.31" iggy = { path = "../../core/sdk", version = "0.8.1-edge.1" } pyo3 = "0.26.0" pyo3-async-runtimes = { version = "0.26.0", features = [ -======= -futures = "0.3.31" -iggy = { path = "../../core/sdk", version = "0.7.0" } -pyo3 = "0.25.0" -pyo3-async-runtimes = { version = "0.25.0", features = [ ->>>>>>> 2ce2ba0a (feat(python): add AsyncIterator interface to IggyConsumer) "attributes", "tokio-runtime", ] } diff --git a/foreign/python/pyproject.toml b/foreign/python/pyproject.toml index 3f3dbdc758..9bb86e3ad4 100644 --- a/foreign/python/pyproject.toml +++ b/foreign/python/pyproject.toml @@ -50,9 +50,6 @@ classifiers = [ "Topic :: System :: Networking", "Topic :: System :: Distributed Computing", ] -dependencies = [ - "maturin[patchelf]>=1.10.2", -] [project.urls] "Homepage" = "https://iggy.apache.org" diff --git a/foreign/python/tests/test_iggy_sdk.py b/foreign/python/tests/test_iggy_sdk.py index 5ac72fa7ee..abe6bb0ba5 100644 --- a/foreign/python/tests/test_iggy_sdk.py +++ b/foreign/python/tests/test_iggy_sdk.py @@ -545,7 +545,7 @@ async def test_iter_messages(self, iggy_client: IggyClient, consumer_group_setup [Message(m) for m in test_messages], ) - async for message in consumer.iter(): + async for message in consumer.iter_messages(): received_messages.append(message.payload().decode()) if len(received_messages) == 5: break From c88a66efc6405fe6eb5303c291520b014317bc07 Mon Sep 17 00:00:00 2001 From: Rimuksh Kansal Date: Sat, 10 Jan 2026 21:11:07 +0900 Subject: [PATCH 6/7] lint/fmt --- foreign/python/apache_iggy.pyi | 254 ++++++++++++++------- foreign/python/python_examples/consumer.py | 3 +- foreign/python/python_examples/producer.py | 3 +- foreign/python/tests/conftest.py | 1 + foreign/python/tests/test_iggy_sdk.py | 2 +- 5 files changed, 176 insertions(+), 87 deletions(-) diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi index 3d0901acf7..0e319c5ace 100644 --- a/foreign/python/apache_iggy.pyi +++ b/foreign/python/apache_iggy.pyi @@ -32,70 +32,86 @@ class AutoCommit: r""" The auto-commit is disabled and the offset must be stored manually by the consumer. """ + __match_args__ = ((),) def __new__(cls) -> AutoCommit.Disabled: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + class Interval(AutoCommit): r""" The auto-commit is enabled and the offset is stored on the server after a certain interval. """ + __match_args__ = ("_0",) @property def _0(self) -> datetime.timedelta: ... - def __new__(cls, _0:datetime.timedelta) -> AutoCommit.Interval: ... + def __new__(cls, _0: datetime.timedelta) -> AutoCommit.Interval: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + class IntervalOrWhen(AutoCommit): r""" The auto-commit is enabled and the offset is stored on the server after a certain interval or depending on the mode when consuming the messages. """ - __match_args__ = ("_0", "_1",) + + __match_args__ = ( + "_0", + "_1", + ) @property def _0(self) -> datetime.timedelta: ... @property def _1(self) -> AutoCommitWhen: ... - def __new__(cls, _0:datetime.timedelta, _1:AutoCommitWhen) -> AutoCommit.IntervalOrWhen: ... + def __new__( + cls, _0: datetime.timedelta, _1: AutoCommitWhen + ) -> AutoCommit.IntervalOrWhen: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + class IntervalOrAfter(AutoCommit): r""" The auto-commit is enabled and the offset is stored on the server after a certain interval or depending on the mode after consuming the messages. """ - __match_args__ = ("_0", "_1",) + + __match_args__ = ( + "_0", + "_1", + ) @property def _0(self) -> datetime.timedelta: ... @property def _1(self) -> AutoCommitAfter: ... - def __new__(cls, _0:datetime.timedelta, _1:AutoCommitAfter) -> AutoCommit.IntervalOrAfter: ... + def __new__( + cls, _0: datetime.timedelta, _1: AutoCommitAfter + ) -> AutoCommit.IntervalOrAfter: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + class When(AutoCommit): r""" The auto-commit is enabled and the offset is stored on the server depending on the mode when consuming the messages. """ + __match_args__ = ("_0",) @property def _0(self) -> AutoCommitWhen: ... - def __new__(cls, _0:AutoCommitWhen) -> AutoCommit.When: ... + def __new__(cls, _0: AutoCommitWhen) -> AutoCommit.When: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + class After(AutoCommit): r""" The auto-commit is enabled and the offset is stored on the server depending on the mode after consuming the messages. """ + __match_args__ = ("_0",) @property def _0(self) -> AutoCommitAfter: ... - def __new__(cls, _0:AutoCommitAfter) -> AutoCommit.After: ... + def __new__(cls, _0: AutoCommitAfter) -> AutoCommit.After: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + ... class AutoCommitAfter: @@ -106,31 +122,36 @@ class AutoCommitAfter: r""" The offset is stored on the server after all the messages are consumed. """ + __match_args__ = ((),) def __new__(cls) -> AutoCommitAfter.ConsumingAllMessages: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + class ConsumingEachMessage(AutoCommitAfter): r""" The offset is stored on the server after consuming each message. """ + __match_args__ = ((),) def __new__(cls) -> AutoCommitAfter.ConsumingEachMessage: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + class ConsumingEveryNthMessage(AutoCommitAfter): r""" The offset is stored on the server after consuming every Nth message. """ + __match_args__ = ("_0",) @property def _0(self) -> builtins.int: ... - def __new__(cls, _0:builtins.int) -> AutoCommitAfter.ConsumingEveryNthMessage: ... + def __new__( + cls, _0: builtins.int + ) -> AutoCommitAfter.ConsumingEveryNthMessage: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + ... class AutoCommitWhen: @@ -141,40 +162,46 @@ class AutoCommitWhen: r""" The offset is stored on the server when the messages are received. """ + __match_args__ = ((),) def __new__(cls) -> AutoCommitWhen.PollingMessages: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + class ConsumingAllMessages(AutoCommitWhen): r""" The offset is stored on the server when all the messages are consumed. """ + __match_args__ = ((),) def __new__(cls) -> AutoCommitWhen.ConsumingAllMessages: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + class ConsumingEachMessage(AutoCommitWhen): r""" The offset is stored on the server when consuming each message. """ + __match_args__ = ((),) def __new__(cls) -> AutoCommitWhen.ConsumingEachMessage: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + class ConsumingEveryNthMessage(AutoCommitWhen): r""" The offset is stored on the server when consuming every Nth message. """ + __match_args__ = ("_0",) @property def _0(self) -> builtins.int: ... - def __new__(cls, _0:builtins.int) -> AutoCommitWhen.ConsumingEveryNthMessage: ... + def __new__( + cls, _0: builtins.int + ) -> AutoCommitWhen.ConsumingEveryNthMessage: ... def __len__(self) -> builtins.int: ... - def __getitem__(self, key:builtins.int) -> typing.Any: ... - + def __getitem__(self, key: builtins.int) -> typing.Any: ... + ... class IggyClient: @@ -183,37 +210,39 @@ class IggyClient: It wraps the RustIggyClient and provides asynchronous functionality through the contained runtime. """ - def __new__(cls, conn:typing.Optional[builtins.str]=None) -> IggyClient: + def __new__(cls, conn: typing.Optional[builtins.str] = None) -> IggyClient: r""" Constructs a new IggyClient from a TCP server address. - + This initializes a new runtime for asynchronous operations. Future versions might utilize asyncio for more Pythonic async. """ @classmethod - def from_connection_string(cls, connection_string:builtins.str) -> IggyClient: + def from_connection_string(cls, connection_string: builtins.str) -> IggyClient: r""" Constructs a new IggyClient from a connection string. - + Returns an error if the connection string provided is invalid. """ def ping(self) -> collections.abc.Awaitable[None]: r""" Sends a ping request to the server to check connectivity. - + Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` if the connection fails. """ - def login_user(self, username:builtins.str, password:builtins.str) -> collections.abc.Awaitable[None]: + def login_user( + self, username: builtins.str, password: builtins.str + ) -> collections.abc.Awaitable[None]: r""" Logs in the user with the given credentials. - + Returns `Ok(())` on success, or a PyRuntimeError on failure. """ def connect(self) -> collections.abc.Awaitable[None]: r""" Connects the IggyClient to its service. - + Returns Ok(()) on successful connection or a PyRuntimeError on failure. """ def create_stream(self, name: builtins.str) -> collections.abc.Awaitable[None]: @@ -222,40 +251,84 @@ class IggyClient: Returns Ok(()) on successful stream creation or a PyRuntimeError on failure. """ - def get_stream(self, stream_id:builtins.str | builtins.int) -> collections.abc.Awaitable[typing.Optional[StreamDetails]]: + def get_stream( + self, stream_id: builtins.str | builtins.int + ) -> collections.abc.Awaitable[typing.Optional[StreamDetails]]: r""" Gets stream by id. - + Returns Option of stream details or a PyRuntimeError on failure. """ - def create_topic(self, stream:builtins.str | builtins.int, name:builtins.str, partitions_count:builtins.int, compression_algorithm:typing.Optional[builtins.str]=None, topic_id:typing.Optional[builtins.int]=None, replication_factor:typing.Optional[builtins.int]=None) -> collections.abc.Awaitable[None]: + def create_topic( + self, + stream: builtins.str | builtins.int, + name: builtins.str, + partitions_count: builtins.int, + compression_algorithm: typing.Optional[builtins.str] = None, + topic_id: typing.Optional[builtins.int] = None, + replication_factor: typing.Optional[builtins.int] = None, + ) -> collections.abc.Awaitable[None]: r""" Creates a new topic with the given parameters. - + Returns Ok(()) on successful topic creation or a PyRuntimeError on failure. """ - def get_topic(self, stream_id:builtins.str | builtins.int, topic_id:builtins.str | builtins.int) -> collections.abc.Awaitable[typing.Optional[TopicDetails]]: + def get_topic( + self, + stream_id: builtins.str | builtins.int, + topic_id: builtins.str | builtins.int, + ) -> collections.abc.Awaitable[typing.Optional[TopicDetails]]: r""" Gets topic by stream and id. - + Returns Option of topic details or a PyRuntimeError on failure. """ - def send_messages(self, stream:builtins.str | builtins.int, topic:builtins.str | builtins.int, partitioning:builtins.int, messages:list[SendMessage]) -> collections.abc.Awaitable[None]: + def send_messages( + self, + stream: builtins.str | builtins.int, + topic: builtins.str | builtins.int, + partitioning: builtins.int, + messages: list[SendMessage], + ) -> collections.abc.Awaitable[None]: r""" Sends a list of messages to the specified topic. - + Returns Ok(()) on successful sending or a PyRuntimeError on failure. """ - def poll_messages(self, stream:builtins.str | builtins.int, topic:builtins.str | builtins.int, partition_id:builtins.int, polling_strategy:PollingStrategy, count:builtins.int, auto_commit:builtins.bool) -> collections.abc.Awaitable[list[ReceiveMessage]]: + def poll_messages( + self, + stream: builtins.str | builtins.int, + topic: builtins.str | builtins.int, + partition_id: builtins.int, + polling_strategy: PollingStrategy, + count: builtins.int, + auto_commit: builtins.bool, + ) -> collections.abc.Awaitable[list[ReceiveMessage]]: r""" Polls for messages from the specified topic and partition. - + Returns a list of received messages or a PyRuntimeError on failure. """ - def consumer_group(self, name:builtins.str, stream:builtins.str, topic:builtins.str, partition_id:typing.Optional[builtins.int]=None, polling_strategy:typing.Optional[PollingStrategy]=None, batch_length:typing.Optional[builtins.int]=None, auto_commit:typing.Optional[AutoCommit]=None, create_consumer_group_if_not_exists:builtins.bool=True, auto_join_consumer_group:builtins.bool=True, poll_interval:typing.Optional[datetime.timedelta]=None, polling_retry_interval:typing.Optional[datetime.timedelta]=None, init_retries:typing.Optional[builtins.int]=None, init_retry_interval:typing.Optional[datetime.timedelta]=None, allow_replay:builtins.bool=False) -> collections.abc.Awaitable[IggyConsumer]: + def consumer_group( + self, + name: builtins.str, + stream: builtins.str, + topic: builtins.str, + partition_id: typing.Optional[builtins.int] = None, + polling_strategy: typing.Optional[PollingStrategy] = None, + batch_length: typing.Optional[builtins.int] = None, + auto_commit: typing.Optional[AutoCommit] = None, + create_consumer_group_if_not_exists: builtins.bool = True, + auto_join_consumer_group: builtins.bool = True, + poll_interval: typing.Optional[datetime.timedelta] = None, + polling_retry_interval: typing.Optional[datetime.timedelta] = None, + init_retries: typing.Optional[builtins.int] = None, + init_retry_interval: typing.Optional[datetime.timedelta] = None, + allow_replay: builtins.bool = False, + ) -> collections.abc.Awaitable[IggyConsumer]: r""" Creates a new consumer group consumer. - + Returns the consumer or a PyRuntimeError on failure. """ @@ -265,11 +338,15 @@ class IggyConsumer: It wraps the RustIggyConsumer and provides asynchronous functionality through the contained runtime. """ - def get_last_consumed_offset(self, partition_id:builtins.int) -> typing.Optional[builtins.int]: + def get_last_consumed_offset( + self, partition_id: builtins.int + ) -> typing.Optional[builtins.int]: r""" Get the last consumed offset or `None` if no offset has been consumed yet. """ - def get_last_stored_offset(self, partition_id:builtins.int) -> typing.Optional[builtins.int]: + def get_last_stored_offset( + self, partition_id: builtins.int + ) -> typing.Optional[builtins.int]: r""" Get the last stored offset or `None` if no offset has been stored yet. """ @@ -289,38 +366,48 @@ class IggyConsumer: r""" Gets the name of the topic this consumer group is configured for. """ - def store_offset(self, offset:builtins.int, partition_id:typing.Optional[builtins.int]) -> collections.abc.Awaitable[None]: + def store_offset( + self, offset: builtins.int, partition_id: typing.Optional[builtins.int] + ) -> collections.abc.Awaitable[None]: r""" Stores the provided offset for the provided partition id or if none is specified uses the current partition id for the consumer group. - + Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` if the operation fails. """ - def delete_offset(self, partition_id:typing.Optional[builtins.int]) -> collections.abc.Awaitable[None]: + def delete_offset( + self, partition_id: typing.Optional[builtins.int] + ) -> collections.abc.Awaitable[None]: r""" Deletes the offset for the provided partition id or if none is specified uses the current partition id for the consumer group. - + Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` if the operation fails. """ def iter_messages(self) -> collections.abc.AsyncIterator[ReceiveMessage]: r""" Asynchronously iterate over `ReceiveMessage`s. - + Returns an async iterator that raises `StopAsyncIteration` when no more messages are available or a `PyRuntimeError` on failure. - - Note: This method does not currently support `AutoCommit.After`. + + Note: This method does not currently support `AutoCommit.After`. For `AutoCommit.IntervalOrAfter(datetime.timedelta, AutoCommitAfter)`, only the interval part is applied; the `after` mode is ignored. Use `consume_messages()` if you need commit-after-processing semantics. """ - def consume_messages(self, callback:collections.abc.Callable[[ReceiveMessage], collections.abc.Awaitable[None]], shutdown_event:typing.Optional[asyncio.Event]) -> collections.abc.Awaitable[None]: + def consume_messages( + self, + callback: collections.abc.Callable[ + [ReceiveMessage], collections.abc.Awaitable[None] + ], + shutdown_event: typing.Optional[asyncio.Event], + ) -> collections.abc.Awaitable[None]: r""" Consumes messages continuously using a callback function and an optional `asyncio.Event` for signaling shutdown. - + Returns an awaitable that completes when shutdown is signaled or a PyRuntimeError on failure. """ @@ -329,68 +416,68 @@ class PollingStrategy: __match_args__ = ("value",) @property def value(self) -> builtins.int: ... - def __new__(cls, value:builtins.int) -> PollingStrategy.Offset: ... - + def __new__(cls, value: builtins.int) -> PollingStrategy.Offset: ... + class Timestamp(PollingStrategy): __match_args__ = ("value",) @property def value(self) -> builtins.int: ... - def __new__(cls, value:builtins.int) -> PollingStrategy.Timestamp: ... - + def __new__(cls, value: builtins.int) -> PollingStrategy.Timestamp: ... + class First(PollingStrategy): __match_args__ = ((),) def __new__(cls) -> PollingStrategy.First: ... - + class Last(PollingStrategy): __match_args__ = ((),) def __new__(cls) -> PollingStrategy.Last: ... - + class Next(PollingStrategy): __match_args__ = ((),) def __new__(cls) -> PollingStrategy.Next: ... - + ... class ReceiveMessage: r""" A Python class representing a received message. - + This class wraps a Rust message, allowing for access to its payload and offset from Python. """ def payload(self) -> bytes: r""" Retrieves the payload of the received message. - + The payload is returned as a Python bytes object. """ def offset(self) -> builtins.int: r""" Retrieves the offset of the received message. - + The offset represents the position of the message within its topic. """ def timestamp(self) -> builtins.int: r""" Retrieves the timestamp of the received message. - + The timestamp represents the time of the message within its topic. """ def id(self) -> builtins.int: r""" Retrieves the id of the received message. - + The id represents unique identifier of the message within its topic. """ def checksum(self) -> builtins.int: r""" Retrieves the checksum of the received message. - + The checksum represents the integrity of the message within its topic. """ def length(self) -> builtins.int: r""" Retrieves the length of the received message. - + The length represents the length of the payload. """ def partition_id(self) -> builtins.int: @@ -401,14 +488,14 @@ class ReceiveMessage: class SendMessage: r""" A Python class representing a message to be sent. - + This class wraps a Rust message meant for sending, facilitating the creation of such messages from Python and their subsequent use in Rust. """ - def __new__(cls, data:builtins.str | bytes) -> SendMessage: + def __new__(cls, data: builtins.str | bytes) -> SendMessage: r""" Constructs a new `SendMessage` instance from a string or bytes. - + This method allows for the creation of a `SendMessage` instance directly from Python using the provided string or bytes data. """ @@ -432,4 +519,3 @@ class TopicDetails: def messages_count(self) -> builtins.int: ... @property def partitions_count(self) -> builtins.int: ... - diff --git a/foreign/python/python_examples/consumer.py b/foreign/python/python_examples/consumer.py index 1c20280eb0..b656b356cd 100644 --- a/foreign/python/python_examples/consumer.py +++ b/foreign/python/python_examples/consumer.py @@ -17,9 +17,10 @@ import asyncio +from loguru import logger + # Assuming there's a Python module for iggy with similar functionalities. from apache_iggy import IggyClient, PollingStrategy, ReceiveMessage -from loguru import logger STREAM_NAME = "sample-stream" TOPIC_NAME = "sample-topic" diff --git a/foreign/python/python_examples/producer.py b/foreign/python/python_examples/producer.py index acf578ed25..c68238f886 100644 --- a/foreign/python/python_examples/producer.py +++ b/foreign/python/python_examples/producer.py @@ -17,10 +17,11 @@ import asyncio +from loguru import logger + # Assuming we have a Python module for iggy with similar functionality as the Rust one. from apache_iggy import IggyClient, StreamDetails, TopicDetails from apache_iggy import SendMessage as Message -from loguru import logger STREAM_NAME = "sample-stream" TOPIC_NAME = "sample-topic" diff --git a/foreign/python/tests/conftest.py b/foreign/python/tests/conftest.py index 578d0bf292..60d7765cb8 100644 --- a/foreign/python/tests/conftest.py +++ b/foreign/python/tests/conftest.py @@ -26,6 +26,7 @@ import os import pytest + from apache_iggy import IggyClient from .utils import get_server_config, wait_for_ping, wait_for_server diff --git a/foreign/python/tests/test_iggy_sdk.py b/foreign/python/tests/test_iggy_sdk.py index abe6bb0ba5..226e6b0078 100644 --- a/foreign/python/tests/test_iggy_sdk.py +++ b/foreign/python/tests/test_iggy_sdk.py @@ -27,6 +27,7 @@ from datetime import timedelta import pytest + from apache_iggy import AutoCommit, IggyClient, PollingStrategy, ReceiveMessage from apache_iggy import SendMessage as Message @@ -521,7 +522,6 @@ async def test_iter_messages(self, iggy_client: IggyClient, consumer_group_setup # Setup received_messages = [] - shutdown_event = asyncio.Event() await iggy_client.create_stream(stream_name) await iggy_client.create_topic( stream=stream_name, name=topic_name, partitions_count=1 From 26d851f6ccb866ea1ae75b6b37ea6a103bbbb1a3 Mon Sep 17 00:00:00 2001 From: Rimuksh Kansal Date: Sat, 10 Jan 2026 21:54:18 +0900 Subject: [PATCH 7/7] lint/fmt --- foreign/python/tests/conftest.py | 1 - foreign/python/tests/test_iggy_sdk.py | 1 - 2 files changed, 2 deletions(-) diff --git a/foreign/python/tests/conftest.py b/foreign/python/tests/conftest.py index 60d7765cb8..578d0bf292 100644 --- a/foreign/python/tests/conftest.py +++ b/foreign/python/tests/conftest.py @@ -26,7 +26,6 @@ import os import pytest - from apache_iggy import IggyClient from .utils import get_server_config, wait_for_ping, wait_for_server diff --git a/foreign/python/tests/test_iggy_sdk.py b/foreign/python/tests/test_iggy_sdk.py index 226e6b0078..e826205307 100644 --- a/foreign/python/tests/test_iggy_sdk.py +++ b/foreign/python/tests/test_iggy_sdk.py @@ -27,7 +27,6 @@ from datetime import timedelta import pytest - from apache_iggy import AutoCommit, IggyClient, PollingStrategy, ReceiveMessage from apache_iggy import SendMessage as Message