Skip to content

chore: update tests and README to use lambda over getconn func #430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 44 additions & 82 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,10 @@ import sqlalchemy
# initialize Connector object
connector = Connector()

# function to return the database connection
def getconn():
conn = connector.connect(
# initialize SQLAlchemy connection pool with Connector
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=lambda: connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"pg8000",
user="my-user",
Expand All @@ -145,13 +146,7 @@ def getconn():
# NOTE: this assumes private IP by default.
# Add the following keyword arg to use public IP:
# ip_type="PUBLIC"
)
return conn

# create connection pool
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
),
)
```

Expand Down Expand Up @@ -196,30 +191,19 @@ Connector as a context manager:
from google.cloud.alloydb.connector import Connector
import sqlalchemy

# helper function to return SQLAlchemy connection pool
def init_connection_pool(connector: Connector) -> sqlalchemy.engine.Engine:
# function used to generate database connection
def getconn():
conn = connector.connect(
# initialize Connector as context manager
with Connector() as connector:
# initialize SQLAlchemy connection pool with Connector
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=lambda: connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"pg8000",
user="my-user",
password="my-password",
db="my-db-name"
)
return conn

# create connection pool
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
),
)
return pool

# initialize Connector as context manager
with Connector() as connector:
# initialize connection pool
pool = init_connection_pool(connector)
# insert statement
insert_stmt = sqlalchemy.text(
"INSERT INTO my_table (id, title) VALUES (:id, :title)",
Expand Down Expand Up @@ -258,30 +242,26 @@ import asyncpg
from google.cloud.alloydb.connector import AsyncConnector

async def main():
# initialize Connector object for connections to AlloyDB
# initialize AsyncConnector object for connections to AlloyDB
connector = AsyncConnector()

# creation function to generate asyncpg connections as the 'connect' arg
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect(
# initialize asyncpg connection pool with AsyncConnector
pool = await asyncpg.create_pool(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
connect=lambda instance_connection_name, **kwargs: connector.connect(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
)

# initialize connection pool
pool = await asyncpg.create_pool(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
connect=getconn,
),
)

# acquire connection and query AlloyDB database
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")

# close Connector
# close AsyncConnector
await connector.close()
```

Expand All @@ -296,9 +276,11 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.alloydb.connector import AsyncConnector

async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
# creation function to generate asyncpg connections as 'async_creator' arg
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
# The AlloyDB Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=lambda: connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"asyncpg",
user="my-user",
Expand All @@ -308,14 +290,7 @@ async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
# Add the following keyword arg to use public IP:
# ip_type="PUBLIC"
# ... additional database driver args
)
return conn

# The AlloyDB Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
),
)
return pool

Expand Down Expand Up @@ -357,20 +332,16 @@ async def main():
# initialize AsyncConnector object for connections to AlloyDB
async with AsyncConnector() as connector:

# creation function to generate asyncpg connections as the 'connect' arg
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect(
# create connection pool
pool = await asyncpg.create_pool(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
connect=lambda instance_connection_name, **kwargs: connector.connect(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
)

# create connection pool
pool = await asyncpg.create_pool(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
connect=getconn,
),
)

# acquire connection and query AlloyDB database
Expand All @@ -390,23 +361,18 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.alloydb.connector import AsyncConnector

async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
# creation function to generate asyncpg connections as 'async_creator' arg
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
# The AlloyDB Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=lambda: connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn

# The AlloyDB Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
),
)
return pool

Expand Down Expand Up @@ -494,21 +460,17 @@ import sqlalchemy
# initialize Connector object
connector = Connector()

# function to return the database connection
def getconn():
return connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"pg8000",
user="my-user",
password="my-password",
db="my-db-name",
ip_type="PUBLIC", # use public IP
)

# create connection pool
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
creator=lambda: connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"pg8000",
user="my-user",
password="my-password",
db="my-db-name",
ip_type="PUBLIC", # use public IP
),
)

# use connection pool...
Expand Down
30 changes: 11 additions & 19 deletions tests/system/test_asyncpg_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import os
from typing import Any

# [START alloydb_sqlalchemy_connect_async_connector]
import asyncpg
Expand Down Expand Up @@ -66,20 +65,16 @@ async def create_sqlalchemy_engine(
"""
connector = AsyncConnector(refresh_strategy=refresh_strategy)

async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
# create SQLAlchemy connection pool
engine = sqlalchemy.ext.asyncio.create_async_engine(
"postgresql+asyncpg://",
async_creator=lambda: connector.connect(
inst_uri,
"asyncpg",
user=user,
password=password,
db=db,
)
return conn

# create SQLAlchemy connection pool
engine = sqlalchemy.ext.asyncio.create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
),
execution_options={"isolation_level": "AUTOCOMMIT"},
)
return engine, connector
Expand Down Expand Up @@ -129,20 +124,17 @@ async def create_asyncpg_pool(
"""
connector = AsyncConnector(refresh_strategy=refresh_strategy)

async def getconn(
instance_connection_name: str, **kwargs: Any
) -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
# create native asyncpg pool (requires asyncpg version >=0.30.0)
pool = await asyncpg.create_pool(
instance_connection_name,
connect=lambda instance_connection_name, **kwargs: connector.connect(
instance_connection_name,
"asyncpg",
user=user,
password=password,
db=db,
)
return conn

# create native asyncpg pool (requires asyncpg version >=0.30.0)
pool = await asyncpg.create_pool(instance_connection_name, connect=getconn)
),
)
return pool, connector


Expand Down
15 changes: 5 additions & 10 deletions tests/system/test_asyncpg_iam_authn.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import os

# [START alloydb_sqlalchemy_connect_async_connector_iam_authn]
import asyncpg
import sqlalchemy
import sqlalchemy.ext.asyncio

Expand Down Expand Up @@ -61,20 +60,16 @@ async def create_sqlalchemy_engine(
"""
connector = AsyncConnector(refresh_strategy=refresh_strategy)

async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
# create async SQLAlchemy connection pool
engine = sqlalchemy.ext.asyncio.create_async_engine(
"postgresql+asyncpg://",
async_creator=lambda: connector.connect(
inst_uri,
"asyncpg",
user=user,
db=db,
enable_iam_auth=True,
)
return conn

# create async SQLAlchemy connection pool
engine = sqlalchemy.ext.asyncio.create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
),
execution_options={"isolation_level": "AUTOCOMMIT"},
)
return engine, connector
Expand Down
15 changes: 5 additions & 10 deletions tests/system/test_asyncpg_psc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import os

import asyncpg
import pytest
import sqlalchemy
import sqlalchemy.ext.asyncio
Expand Down Expand Up @@ -60,21 +59,17 @@ async def create_sqlalchemy_engine(
"""
connector = AsyncConnector()

async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
# create SQLAlchemy connection pool
engine = sqlalchemy.ext.asyncio.create_async_engine(
"postgresql+asyncpg://",
async_creator=lambda: connector.connect(
inst_uri,
"asyncpg",
user=user,
password=password,
db=db,
ip_type="PSC",
)
return conn

# create SQLAlchemy connection pool
engine = sqlalchemy.ext.asyncio.create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
),
execution_options={"isolation_level": "AUTOCOMMIT"},
)
return engine, connector
Expand Down
15 changes: 5 additions & 10 deletions tests/system/test_asyncpg_public_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import os

# [START alloydb_sqlalchemy_connect_async_connector_public_ip]
import asyncpg
import pytest
import sqlalchemy
import sqlalchemy.ext.asyncio
Expand Down Expand Up @@ -61,21 +60,17 @@ async def create_sqlalchemy_engine(
"""
connector = AsyncConnector()

async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
# create SQLAlchemy connection pool
engine = sqlalchemy.ext.asyncio.create_async_engine(
"postgresql+asyncpg://",
async_creator=lambda: connector.connect(
inst_uri,
"asyncpg",
user=user,
password=password,
db=db,
ip_type="PUBLIC",
)
return conn

# create SQLAlchemy connection pool
engine = sqlalchemy.ext.asyncio.create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
),
execution_options={"isolation_level": "AUTOCOMMIT"},
)
return engine, connector
Expand Down
Loading
Loading