Open
Description
Environment details
OS type and version
: MacOS 15.1.1Python version
: 3.11.6pip version
: pip 24.0google-cloud-pubsub
: 2.27.1
Description
I'm encountering an issue where my FastAPI application does not terminate gracefully when shutting down the Pub/Sub subscriber. This results in lingering background threads and the following error:
ValueError('Cannot invoke RPC: Channel closed!')
This behavior aligns with the GitHub issue #747 related to incomplete shutdown handling in the google-cloud-pubsub
library.
Steps to Reproduce
-
Set Up a FastAPI Application with Pub/Sub Subscriber:
- Initialize a FastAPI application.
- Integrate a Pub/Sub subscriber using the
google-cloud-pubsub
library. - Implement startup and shutdown logic using an async context manager to manage the subscriber lifecycle.
-
Run the Application:
- Start the FastAPI server.
- Ensure the Pub/Sub subscriber is actively listening to messages.
-
Initiate Shutdown:
- Terminate the application using
Ctrl+C
or another shutdown signal. - Observe the shutdown process and logs.
- Terminate the application using
-
Verify Shutdown Behavior:
- Notice that the application does not terminate cleanly.
- Check for lingering background threads and error logs indicating issues with shutting down the Pub/Sub subscriber.
Code Example
Below is a minimalistic example that reproduces the issue. This setup initializes a FastAPI application with a Pub/Sub subscriber and attempts to shut it down gracefully.
# main.py
from fastapi import FastAPI
import uvicorn
from contextlib import asynccontextmanager
import asyncio
from google.cloud import pubsub_v1
import concurrent.futures
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PubSubSubscriber:
def __init__(self, project_id: str, subscription_name: str):
self.project_id = project_id
self.subscription_name = subscription_name
self.subscriber = pubsub_v1.SubscriberClient()
self.streaming_pull_future = None
def _get_subscription_path(self) -> str:
return self.subscriber.subscription_path(self.project_id, self.subscription_name)
async def _callback(self, message):
"""Asynchronous callback for processing messages."""
try:
data = message.data.decode('utf-8')
logger.info(f"Received message: {data}")
message.ack()
except Exception as e:
logger.error(f"Error processing message: {e}")
message.nack()
async def start(self):
"""Starts the Pub/Sub subscriber."""
subscription_path = self._get_subscription_path()
self.streaming_pull_future = self.subscriber.subscribe(
subscription_path, callback=self._callback_wrapper
)
logger.info("Pub/Sub subscriber started.")
await self._monitor_streaming_future()
async def _callback_wrapper(self, message):
"""Wrapper to ensure the callback is asynchronous."""
await self._callback(message)
async def _monitor_streaming_future(self):
"""Monitors the streaming pull for errors."""
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(None, self.streaming_pull_future.result)
except Exception as e:
logger.error(f"Streaming pull future threw an exception: {e}")
self.stop()
def stop(self):
"""Stops the Pub/Sub subscriber."""
logger.info("Stopping Pub/Sub subscriber...")
if self.streaming_pull_future:
self.streaming_pull_future.cancel()
try:
self.streaming_pull_future.result(timeout=5.0)
except concurrent.futures.TimeoutError:
logger.warning("Timed out waiting for streaming pull future to terminate.")
except Exception as e:
logger.debug(f"Ignoring exception after cancel: {e}")
self.subscriber.close()
logger.info("Pub/Sub subscriber stopped.")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Defines startup and shutdown events for the FastAPI application."""
project_id = "your-google-project-id"
subscription_name = "your-subscription-name"
# Startup logic
subscriber = PubSubSubscriber(project_id, subscription_name)
app.state.pubsub_task = asyncio.create_task(subscriber.start())
app.state.pubsub_instance = subscriber
logger.info("Application startup complete.")
yield # Application is running
# Shutdown logic
logger.info("Shutting down Pub/Sub subscriber.")
pubsub_instance = getattr(app.state, "pubsub_instance", None)
pubsub_task = getattr(app.state, "pubsub_task", None)
if pubsub_instance:
try:
pubsub_instance.stop()
except Exception as e:
logger.error(f"Error while stopping Pub/Sub subscriber: {e}")
if pubsub_task:
pubsub_task.cancel()
try:
await pubsub_task
except asyncio.CancelledError:
pass
logger.info("Pub/Sub subscriber shutdown complete.")
app = FastAPI(title="MailFlowAI", lifespan=lifespan)
@app.get("/")
async def read_root():
return {"Hello": "World"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000)
Stack Trace
Upon initiating shutdown (e.g., pressing Ctrl+C
), the following logs and errors are produced:
^CINFO: Shutting down
INFO: Waiting for application shutdown.
INFO:main:Shutting down Pub/Sub subscriber.
INFO:main:Stopping Pub/Sub subscriber...
WARNING:google.api_core.bidi:Background thread did not exit.
ERROR:root:Exception in callback <bound method ResumableBidiRpc._on_call_done of <google.api_core.bidi.ResumableBidiRpc object at 0x105a5af50>>: ValueError('Cannot invoke RPC: Channel closed!')
INFO:main:Pub/Sub subscriber stopped.
INFO:main:Pub/Sub subscriber shutdown complete.
INFO: Application shutdown complete.
INFO: Finished server process [7040]
- Additional Context:
- The issue appears to be linked to the
google-cloud-pubsub
library's handling of shutdown sequences, where the background gRPC threads do not terminate as expected, leading toValueError
exceptions and lingering threads that prevent the application from exiting gracefully.
- The issue appears to be linked to the
Thank you for your time and assistance!
Notes for Maintainer
- Replace Placeholder Values:
- E.g. "your-google-project-id"
and
"your-subscription-name"` with your actual Google Cloud project ID and Pub/Sub subscription name in the code example.
- E.g. "your-google-project-id"
Thanks!