Skip to content

chore(cloudrun): refactor to sample 'cloudrun_service_to_service_receive' and its test #13292

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions run/service-auth/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from http import HTTPStatus
import os

from flask import Flask, request
Expand All @@ -25,9 +26,15 @@
def main() -> str:
"""Example route for receiving authorized requests."""
try:
return receive_request_and_parse_auth_header(request)
response = receive_request_and_parse_auth_header(request)

status = HTTPStatus.UNAUTHORIZED
if "Hello" in response:
status = HTTPStatus.OK

return response, status
except Exception as e:
return f"Error verifying ID token: {e}"
return f"Error verifying ID token: {e}", HTTPStatus.UNAUTHORIZED


if __name__ == "__main__":
Expand Down
27 changes: 27 additions & 0 deletions run/service-auth/data/privatekey.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj
7wZgkdmM7oVK2OfgrSj/FCTkInKPqaCR0gD7K80q+mLBrN3PUkDrJQZpvRZIff3/
xmVU1WeruQLFJjnFb2dqu0s/FY/2kWiJtBCakXvXEOb7zfbINuayL+MSsCGSdVYs
SliS5qQpgyDap+8b5fpXZVJkq92hrcNtbkg7hCYUJczt8n9hcCTJCfUpApvaFQ18
pe+zpyl4+WzkP66I28hniMQyUlA1hBiskT7qiouq0m8IOodhv2fagSZKjOTTU2xk
SBc//fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQABAoIBAQDGGHzQxGKX+ANk
nQi53v/c6632dJKYXVJC+PDAz4+bzU800Y+n/bOYsWf/kCp94XcG4Lgsdd0Gx+Zq
HD9CI1IcqqBRR2AFscsmmX6YzPLTuEKBGMW8twaYy3utlFxElMwoUEsrSWRcCA1y
nHSDzTt871c7nxCXHxuZ6Nm/XCL7Bg8uidRTSC1sQrQyKgTPhtQdYrPQ4WZ1A4J9
IisyDYmZodSNZe5P+LTJ6M1SCgH8KH9ZGIxv3diMwzNNpk3kxJc9yCnja4mjiGE2
YCNusSycU5IhZwVeCTlhQGcNeV/skfg64xkiJE34c2y2ttFbdwBTPixStGaF09nU
Z422D40BAoGBAPvVyRRsC3BF+qZdaSMFwI1yiXY7vQw5+JZh01tD28NuYdRFzjcJ
vzT2n8LFpj5ZfZFvSMLMVEFVMgQvWnN0O6xdXvGov6qlRUSGaH9u+TCPNnIldjMP
B8+xTwFMqI7uQr54wBB+Poq7dVRP+0oHb0NYAwUBXoEuvYo3c/nDoRcZAoGBAOWl
aLHjMv4CJbArzT8sPfic/8waSiLV9Ixs3Re5YREUTtnLq7LoymqB57UXJB3BNz/2
eCueuW71avlWlRtE/wXASj5jx6y5mIrlV4nZbVuyYff0QlcG+fgb6pcJQuO9DxMI
aqFGrWP3zye+LK87a6iR76dS9vRU+bHZpSVvGMKJAoGAFGt3TIKeQtJJyqeUWNSk
klORNdcOMymYMIlqG+JatXQD1rR6ThgqOt8sgRyJqFCVT++YFMOAqXOBBLnaObZZ
CFbh1fJ66BlSjoXff0W+SuOx5HuJJAa5+WtFHrPajwxeuRcNa8jwxUsB7n41wADu
UqWWSRedVBg4Ijbw3nWwYDECgYB0pLew4z4bVuvdt+HgnJA9n0EuYowVdadpTEJg
soBjNHV4msLzdNqbjrAqgz6M/n8Ztg8D2PNHMNDNJPVHjJwcR7duSTA6w2p/4k28
bvvk/45Ta3XmzlxZcZSOct3O31Cw0i2XDVc018IY5be8qendDYM08icNo7vQYkRH
504kQQKBgQDjx60zpz8ozvm1XAj0wVhi7GwXe+5lTxiLi9Fxq721WDxPMiHDW2XL
YXfFVy/9/GIMvEiGYdmarK1NW+VhWl1DC5xhDg0kvMfxplt4tynoq1uTsQTY31Mx
BeF5CT/JuNYk3bEBF0H/Q3VGO1/ggVS+YezdFbLWIRoMnLj6XCFEGg==
-----END RSA PRIVATE KEY-----
195 changes: 100 additions & 95 deletions run/service-auth/receive_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,79 @@
# This test deploys a secure application running on Cloud Run
# to test that the authentication sample works properly.

from http import HTTPStatus
import os
import subprocess
import time
from urllib import error, request
import uuid

from google.auth import crypt
from google.auth import jwt

import pytest

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from requests.sessions import Session

PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]

HTTP_STATUS_OK = 200
HTTP_STATUS_BAD_REQUEST = 400
HTTP_STATUS_UNAUTHORIZED = 401
HTTP_STATUS_FORBIDDEN = 403
HTTP_STATUS_NOT_FOUND = 404
HTTP_STATUS_INTERNAL_SERVER_ERROR = 500
HTTP_STATUS_BAD_GATEWAY = 502
HTTP_STATUS_SERVICE_UNAVAILABLE = 503
HTTP_STATUS_GATEWAY_TIMEOUT = 504

STATUS_FORCELIST = [
HTTP_STATUS_BAD_REQUEST,
HTTP_STATUS_UNAUTHORIZED,
HTTP_STATUS_FORBIDDEN,
HTTP_STATUS_NOT_FOUND,
HTTP_STATUS_INTERNAL_SERVER_ERROR,
HTTP_STATUS_BAD_GATEWAY,
HTTP_STATUS_SERVICE_UNAVAILABLE,
HTTP_STATUS_GATEWAY_TIMEOUT,
HTTPStatus.BAD_REQUEST,
HTTPStatus.UNAUTHORIZED,
HTTPStatus.FORBIDDEN,
HTTPStatus.NOT_FOUND,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.SERVICE_UNAVAILABLE,
HTTPStatus.GATEWAY_TIMEOUT,
],


DATA_DIR = os.path.join(os.path.dirname(__file__), "data")

with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()


@pytest.fixture(scope="module")
def signer() -> crypt.RSASigner:
return crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")


@pytest.fixture
def fake_token(signer: crypt.RSASigner) -> str:
now = int(time.time())
payload = {
"aud": "example.com",
"azp": "1234567890",
"email": "[email protected]",
"email_verified": True,
"iat": now,
"exp": now + 3600,
"iss": "https://accounts.google.com",
"sub": "1234567890",
}
header = {"alg": "RS256", "kid": signer.key_id, "typ": "JWT"}
token_str = jwt.encode(signer, payload, header=header).decode("utf-8")

return token_str


@pytest.fixture(scope="module")
def service() -> tuple[str, str]:
"""Deploys a Cloud Run service and returns its URL and a valid token."""
def service_name() -> str:
# Add a unique suffix to create distinct service names.
service_name = f"receive-{uuid.uuid4()}"
service_name_str = f"receive-{uuid.uuid4().hex}"

# Deploy the Cloud Run Service.
subprocess.run(
[
"gcloud",
"run",
"deploy",
service_name,
service_name_str,
"--project",
PROJECT_ID,
"--source",
Expand All @@ -74,7 +100,29 @@ def service() -> tuple[str, str]:
check=True,
)

endpoint_url = (
yield service_name_str

# Clean-up after running the test.
subprocess.run(
[
"gcloud",
"run",
"services",
"delete",
service_name_str,
"--project",
PROJECT_ID,
"--async",
"--region=us-central1",
"--quiet",
],
check=True,
)


@pytest.fixture(scope="module")
def endpoint_url(service_name: str) -> str:
endpoint_url_str = (
subprocess.run(
[
"gcloud",
Expand All @@ -94,7 +142,12 @@ def service() -> tuple[str, str]:
.decode()
)

token = (
return endpoint_url_str


@pytest.fixture(scope="module")
def token() -> str:
token_str = (
subprocess.run(
["gcloud", "auth", "print-identity-token"],
stdout=subprocess.PIPE,
Expand All @@ -104,35 +157,16 @@ def service() -> tuple[str, str]:
.decode()
)

yield endpoint_url, token

# Clean-up after running the test.
subprocess.run(
[
"gcloud",
"run",
"services",
"delete",
service_name,
"--project",
PROJECT_ID,
"--async",
"--region=us-central1",
"--quiet",
],
check=True,
)

return token_str

def test_authentication_on_cloud_run(service: tuple[str, str]) -> None:
endpoint_url = service[0]
token = service[1]

@pytest.fixture(scope="module")
def client(endpoint_url: str) -> Session:
req = request.Request(endpoint_url)
try:
_ = request.urlopen(req)
except error.HTTPError as e:
assert e.code == HTTP_STATUS_FORBIDDEN
assert e.code == HTTPStatus.FORBIDDEN

retry_strategy = Retry(
total=3,
Expand All @@ -145,65 +179,36 @@ def test_authentication_on_cloud_run(service: tuple[str, str]) -> None:
client = requests.session()
client.mount("https://", adapter)

response = client.get(endpoint_url, headers={"Authorization": f"Bearer {token}"})
response_content = response.content.decode("UTF-8")

assert response.status_code == HTTP_STATUS_OK
assert "Hello" in response_content
assert "anonymous" not in response_content


def test_anonymous_request_on_cloud_run(service: tuple[str, str]) -> None:
endpoint_url = service[0]
return client

req = request.Request(endpoint_url)
try:
_ = request.urlopen(req)
except error.HTTPError as e:
assert e.code == HTTP_STATUS_FORBIDDEN

retry_strategy = Retry(
total=3,
status_forcelist=STATUS_FORCELIST,
allowed_methods=["GET", "POST"],
backoff_factor=3,
def test_authentication_on_cloud_run(
client: Session, endpoint_url: str, token: str
) -> None:
response = client.get(
endpoint_url, headers={"Authorization": f"Bearer {token}"}
)
adapter = HTTPAdapter(max_retries=retry_strategy)
response_content = response.content.decode("utf-8")

assert response.status_code == HTTPStatus.OK
assert "Hello" in response_content
assert "anonymous" not in response_content

client = requests.session()
client.mount("https://", adapter)

def test_anonymous_request_on_cloud_run(client: Session, endpoint_url: str) -> None:
response = client.get(endpoint_url)
response_content = response.content.decode("UTF-8")
response_content = response.content.decode("utf-8")

assert response.status_code == HTTP_STATUS_OK
assert response.status_code == HTTPStatus.OK
assert "Hello" in response_content
assert "anonymous" in response_content


def test_invalid_token(service: tuple[str, str]) -> None:
endpoint_url = service[0]

req = request.Request(endpoint_url)
try:
_ = request.urlopen(req)
except error.HTTPError as e:
assert e.code == HTTP_STATUS_FORBIDDEN

retry_strategy = Retry(
total=3,
status_forcelist=STATUS_FORCELIST,
allowed_methods=["GET", "POST"],
backoff_factor=3,
def test_invalid_token(client: Session, endpoint_url: str, fake_token: str) -> None:
response = client.get(
endpoint_url, headers={"Authorization": f"Bearer {fake_token}"}
)
adapter = HTTPAdapter(max_retries=retry_strategy)

client = requests.session()
client.mount("https://", adapter)

# Sample token from https://jwt.io for John Doe.
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
response = client.get(endpoint_url, headers={"Authorization": f"Bearer {token}"})
response_content = response.content.decode("UTF-8")
response_content = response.content.decode("utf-8")

assert response.status_code == HTTPStatus.UNAUTHORIZED
assert "Invalid token" in response_content