diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml index 5d1b1b4d77..1b7a5c6dc3 100644 --- a/foreign/python/Cargo.toml +++ b/foreign/python/Cargo.toml @@ -30,6 +30,7 @@ repository = "https://github.com/apache/iggy" [dependencies] bytes = "1.10.1" +futures = "0.3.31" iggy = { path = "../../core/sdk", version = "0.8.1-edge.1" } pyo3 = "0.26.0" pyo3-async-runtimes = { version = "0.26.0", features = [ diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi index a5141681bf..0e319c5ace 100644 --- a/foreign/python/apache_iggy.pyi +++ b/foreign/python/apache_iggy.pyi @@ -265,6 +265,7 @@ class IggyClient: name: builtins.str, partitions_count: builtins.int, compression_algorithm: typing.Optional[builtins.str] = None, + topic_id: typing.Optional[builtins.int] = None, replication_factor: typing.Optional[builtins.int] = None, ) -> collections.abc.Awaitable[None]: r""" @@ -324,7 +325,7 @@ class IggyClient: init_retries: typing.Optional[builtins.int] = None, init_retry_interval: typing.Optional[datetime.timedelta] = None, allow_replay: builtins.bool = False, - ) -> IggyConsumer: + ) -> collections.abc.Awaitable[IggyConsumer]: r""" Creates a new consumer group consumer. @@ -385,6 +386,18 @@ class IggyConsumer: Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` if the operation fails. """ + def iter_messages(self) -> collections.abc.AsyncIterator[ReceiveMessage]: + r""" + Asynchronously iterate over `ReceiveMessage`s. + + Returns an async iterator that raises `StopAsyncIteration` when no more messages are available + or a `PyRuntimeError` on failure. + + Note: This method does not currently support `AutoCommit.After`. + For `AutoCommit.IntervalOrAfter(datetime.timedelta, AutoCommitAfter)`, + only the interval part is applied; the `after` mode is ignored. + Use `consume_messages()` if you need commit-after-processing semantics. + """ def consume_messages( self, callback: collections.abc.Callable[ @@ -467,6 +480,10 @@ class ReceiveMessage: The length represents the length of the payload. """ + def partition_id(self) -> builtins.int: + r""" + Retrieves the partition this message belongs to. + """ class SendMessage: r""" @@ -477,10 +494,10 @@ class SendMessage: """ def __new__(cls, data: builtins.str | bytes) -> SendMessage: r""" - Constructs a new `SendMessage` instance from a string. + Constructs a new `SendMessage` instance from a string or bytes. This method allows for the creation of a `SendMessage` instance - directly from Python using the provided string data. + directly from Python using the provided string or bytes data. """ class StreamDetails: diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs index f30000d9fe..b0db32fc3f 100644 --- a/foreign/python/src/client.rs +++ b/foreign/python/src/client.rs @@ -16,9 +16,6 @@ * under the License. */ -use std::str::FromStr; -use std::sync::Arc; - use iggy::prelude::{ Consumer as RustConsumer, IggyClient as RustIggyClient, IggyMessage as RustMessage, PollingStrategy as RustPollingStrategy, *, @@ -28,6 +25,8 @@ use pyo3::types::{PyDelta, PyList, PyType}; use pyo3_async_runtimes::tokio::future_into_py; use pyo3_stub_gen::define_stub_info_gatherer; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; +use std::str::FromStr; +use std::sync::Arc; use crate::consumer::{py_delta_to_iggy_duration, AutoCommit, IggyConsumer}; use crate::identifier::PyIdentifier; @@ -314,7 +313,10 @@ impl IggyClient { let messages = polled_messages .messages .into_iter() - .map(ReceiveMessage::from_rust_message) + .map(|m| ReceiveMessage { + inner: m, + partition_id, + }) .collect::>(); Ok(messages) }) @@ -340,8 +342,10 @@ impl IggyClient { init_retry_interval=None, allow_replay=false, ))] - fn consumer_group( + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[IggyConsumer]", imports=("collections.abc")))] + fn consumer_group<'a>( &self, + py: Python<'a>, name: &str, stream: &str, topic: &str, @@ -356,7 +360,7 @@ impl IggyClient { init_retries: Option, init_retry_interval: Option>, allow_replay: bool, - ) -> PyResult { + ) -> PyResult> { let mut builder = self .inner .consumer_group(name, stream, topic) @@ -412,10 +416,16 @@ impl IggyClient { if allow_replay { builder = builder.allow_replay() } - let consumer = builder.build(); + let mut consumer = builder.build(); - Ok(IggyConsumer { - inner: Arc::new(Mutex::new(consumer)), + future_into_py(py, async move { + consumer + .init() + .await + .map_err(|e| PyErr::new::(format!("{e:?}")))?; + Ok(IggyConsumer { + inner: Arc::new(Mutex::new(consumer)), + }) }) } } diff --git a/foreign/python/src/consumer.rs b/foreign/python/src/consumer.rs index 8a830aeb21..b2ed74f766 100644 --- a/foreign/python/src/consumer.rs +++ b/foreign/python/src/consumer.rs @@ -36,6 +36,7 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use crate::identifier::PyIdentifier; +use crate::iterator::ReceiveMessageIterator; use crate::receive_message::ReceiveMessage; /// A Python class representing the Iggy consumer. @@ -129,6 +130,21 @@ impl IggyConsumer { }) } + /// Asynchronously iterate over `ReceiveMessage`s. + /// + /// Returns an async iterator that raises `StopAsyncIteration` when no more messages are available + /// or a `PyRuntimeError` on failure. + /// + /// Note: This method does not currently support `AutoCommit.After`. + /// For `AutoCommit.IntervalOrAfter(datetime.timedelta, AutoCommitAfter)`, + /// only the interval part is applied; the `after` mode is ignored. + /// Use `consume_messages()` if you need commit-after-processing semantics. + #[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]", imports=("collections.abc")))] + fn iter_messages<'a>(&self) -> ReceiveMessageIterator { + let inner = self.inner.clone(); + ReceiveMessageIterator { inner } + } + /// Consumes messages continuously using a callback function and an optional `asyncio.Event` for signaling shutdown. /// /// Returns an awaitable that completes when shutdown is signaled or a PyRuntimeError on failure. @@ -148,14 +164,6 @@ impl IggyConsumer { future_into_py(py, async { let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); - let inner_init = inner.clone(); - let mut inner_init = inner_init.lock().await; - inner_init - .init() - .await - .map_err(|e| PyErr::new::(format!("{e:?}")))?; - drop(inner_init); - let task_locals = Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?; let handle_consume = get_runtime().spawn(scope(task_locals, async move { let task_locals = @@ -219,7 +227,10 @@ impl MessageConsumer for PyCallbackConsumer { let callback = self.callback.clone(); let task_locals = self.task_locals.clone().lock_owned().await; let task_locals = Python::with_gil(|py| task_locals.clone_ref(py)); - let message = ReceiveMessage::from_rust_message(received.message); + let message = ReceiveMessage { + inner: received.message, + partition_id: received.partition_id, + }; get_runtime() .spawn(scope(task_locals, async move { Python::with_gil(|py| { diff --git a/foreign/python/src/iterator.rs b/foreign/python/src/iterator.rs new file mode 100644 index 0000000000..3131d28c41 --- /dev/null +++ b/foreign/python/src/iterator.rs @@ -0,0 +1,59 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::sync::Arc; + +use futures::StreamExt; +use iggy::prelude::IggyConsumer as RustIggyConsumer; +use pyo3::exceptions::PyStopIteration; + +use crate::receive_message::ReceiveMessage; +use pyo3::prelude::*; +use pyo3_async_runtimes::tokio::future_into_py; +use tokio::sync::Mutex; + +#[pyclass] +pub struct ReceiveMessageIterator { + pub(crate) inner: Arc>, +} + +#[pymethods] +impl ReceiveMessageIterator { + pub fn __anext__<'a>(&self, py: Python<'a>) -> PyResult> { + let inner = self.inner.clone(); + future_into_py(py, async move { + let mut inner = inner.lock().await; + if let Some(message) = inner.next().await { + Ok(message + .map(|m| ReceiveMessage { + inner: m.message, + partition_id: m.partition_id, + }) + .map_err(|e| { + PyErr::new::(format!("{e:?}")) + })?) + } else { + Err(PyStopIteration::new_err("No more messages")) + } + }) + } + + pub fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } +} diff --git a/foreign/python/src/lib.rs b/foreign/python/src/lib.rs index efc12cab54..63afe785a0 100644 --- a/foreign/python/src/lib.rs +++ b/foreign/python/src/lib.rs @@ -19,6 +19,7 @@ pub mod client; mod consumer; mod identifier; +mod iterator; mod receive_message; mod send_message; mod stream; @@ -26,6 +27,7 @@ mod topic; use client::IggyClient; use consumer::{AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer}; +use iterator::ReceiveMessageIterator; use pyo3::prelude::*; use receive_message::{PollingStrategy, ReceiveMessage}; use send_message::SendMessage; @@ -45,5 +47,6 @@ fn apache_iggy(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/foreign/python/src/receive_message.rs b/foreign/python/src/receive_message.rs index cd3be2978d..078e590451 100644 --- a/foreign/python/src/receive_message.rs +++ b/foreign/python/src/receive_message.rs @@ -28,15 +28,7 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyclass_complex_enum, gen #[gen_stub_pyclass] pub struct ReceiveMessage { pub(crate) inner: RustReceiveMessage, -} - -impl ReceiveMessage { - /// Converts a Rust message into its corresponding Python representation. - /// - /// This is an internal utility function, not exposed to Python. - pub(crate) fn from_rust_message(message: RustReceiveMessage) -> Self { - Self { inner: message } - } + pub(crate) partition_id: u32, } #[gen_stub_pymethods] @@ -83,6 +75,11 @@ impl ReceiveMessage { pub fn length(&self) -> u32 { self.inner.header.payload_length } + + /// Retrieves the partition this message belongs to. + pub fn partition_id(&self) -> u32 { + self.partition_id + } } #[derive(Clone, Copy)] diff --git a/foreign/python/tests/test_iggy_sdk.py b/foreign/python/tests/test_iggy_sdk.py index 142e9ce620..e826205307 100644 --- a/foreign/python/tests/test_iggy_sdk.py +++ b/foreign/python/tests/test_iggy_sdk.py @@ -445,7 +445,7 @@ async def test_meta(self, iggy_client: IggyClient, consumer_group_setup): await iggy_client.create_topic( stream=stream_name, name=topic_name, partitions_count=1 ) - consumer = iggy_client.consumer_group( + consumer = await iggy_client.consumer_group( consumer_name, stream_name, topic_name, @@ -482,7 +482,7 @@ async def test_consume_messages( stream=stream_name, name=topic_name, partitions_count=1 ) - consumer = iggy_client.consumer_group( + consumer = await iggy_client.consumer_group( consumer_name, stream_name, topic_name, @@ -510,6 +510,47 @@ async def send() -> None: assert received_messages == test_messages + @pytest.mark.asyncio + async def test_iter_messages(self, iggy_client: IggyClient, consumer_group_setup): + """Test that the consumer group can consume messages.""" + consumer_name = consumer_group_setup["consumer"] + stream_name = consumer_group_setup["stream"] + topic_name = consumer_group_setup["topic"] + partition_id = consumer_group_setup["partition_id"] + test_messages = consumer_group_setup["messages"] + + # Setup + received_messages = [] + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + consumer = await iggy_client.consumer_group( + consumer_name, + stream_name, + topic_name, + partition_id, + PollingStrategy.Next(), + 10, + auto_commit=AutoCommit.Interval(timedelta(seconds=5)), + poll_interval=timedelta(seconds=1), + ) + + await iggy_client.send_messages( + stream_name, + topic_name, + partition_id, + [Message(m) for m in test_messages], + ) + + async for message in consumer.iter_messages(): + received_messages.append(message.payload().decode()) + if len(received_messages) == 5: + break + + assert received_messages == test_messages + @pytest.mark.asyncio async def test_shutdown(self, iggy_client: IggyClient, consumer_group_setup): """Test that the consumer group can be signaled to shutdown.""" @@ -525,7 +566,7 @@ async def test_shutdown(self, iggy_client: IggyClient, consumer_group_setup): stream=stream_name, name=topic_name, partitions_count=1 ) - consumer = iggy_client.consumer_group( + consumer = await iggy_client.consumer_group( consumer_name, stream_name, topic_name,