From 923d44b48bd75cc5c43f299b765291dd53eb46db Mon Sep 17 00:00:00 2001 From: JJ Geewax Date: Mon, 11 May 2015 10:38:31 -0400 Subject: [PATCH] Initial commit of Cloud Search client. --- docs/_static/js/main.js | 2 +- docs/index.rst | 2 + docs/search-api.rst | 6 + docs/search-usage.rst | 314 ++++++++++++++++ gcloud/search/__init__.py | 27 ++ gcloud/search/client.py | 225 ++++++++++++ gcloud/search/connection.py | 39 ++ gcloud/search/index.py | 323 +++++++++++++++++ gcloud/search/iterator.py | 156 ++++++++ gcloud/search/message.py | 79 +++++ gcloud/search/subscription.py | 263 ++++++++++++++ gcloud/search/test_client.py | 342 ++++++++++++++++++ gcloud/search/test_connection.py | 46 +++ gcloud/search/test_message.py | 104 ++++++ gcloud/search/test_subscription.py | 520 +++++++++++++++++++++++++++ gcloud/search/test_topic.py | 550 +++++++++++++++++++++++++++++ gcloud/search/topic.py | 255 +++++++++++++ 17 files changed, 3252 insertions(+), 1 deletion(-) create mode 100644 docs/search-api.rst create mode 100644 docs/search-usage.rst create mode 100644 gcloud/search/__init__.py create mode 100644 gcloud/search/client.py create mode 100644 gcloud/search/connection.py create mode 100644 gcloud/search/index.py create mode 100644 gcloud/search/iterator.py create mode 100644 gcloud/search/message.py create mode 100644 gcloud/search/subscription.py create mode 100644 gcloud/search/test_client.py create mode 100644 gcloud/search/test_connection.py create mode 100644 gcloud/search/test_message.py create mode 100644 gcloud/search/test_subscription.py create mode 100644 gcloud/search/test_topic.py create mode 100644 gcloud/search/topic.py diff --git a/docs/_static/js/main.js b/docs/_static/js/main.js index 11b8202481a0..8f95b772f38a 100755 --- a/docs/_static/js/main.js +++ b/docs/_static/js/main.js @@ -16,7 +16,7 @@ $('.headerlink').parent().each(function() { $('.side-nav').children('ul:nth-child(2)').children().each(function() { var itemName = $(this).text(); if (itemName !== 'Datastore' && itemName !== 'Storage' && - itemName !== 'Pub/Sub') { + itemName !== 'Pub/Sub' && itemName !== 'Search') { $(this).css('padding-left','2em'); } }); diff --git a/docs/index.rst b/docs/index.rst index 0480c536f339..87d180d06624 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -10,6 +10,8 @@ datastore-transactions datastore-batches datastore-dataset + search-api + search-usage storage-api storage-blobs storage-buckets diff --git a/docs/search-api.rst b/docs/search-api.rst new file mode 100644 index 000000000000..7459630a4cec --- /dev/null +++ b/docs/search-api.rst @@ -0,0 +1,6 @@ +.. toctree:: + :maxdepth: 1 + :hidden: + +Search +------ diff --git a/docs/search-usage.rst b/docs/search-usage.rst new file mode 100644 index 000000000000..e87025e94636 --- /dev/null +++ b/docs/search-usage.rst @@ -0,0 +1,314 @@ +Using the API +============= + +Connection / Authorization +-------------------------- + +Implicitly use the default client: + +.. doctest:: + + >>> from gcloud import search + >>> # The search module has the same methods as a client, using the default. + >>> search.list_indexes() # API request + [] + +Configure the default client: + +.. doctest:: + + >>> from gcloud import search + >>> search.set_project_id('project-id') + >>> search.set_credentials(credentials) + >>> search.list_indexes() # API request + [] + +Explicitly use the default client: + +.. doctest:: + + >>> from gcloud.search import default_client as client + >>> # The default_client is equivalent to search.Client() + >>> client.list_indexes() # API request + [] + +Explicitly configure a client: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client(project_id='project-id', credentials=credentials) + >>> client.list_indexes() # API request + [] + +Manage indexes for a project +---------------------------- + +Create a new index: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.create_index('index_id') # API request + >>> index.id + 'index_id' + +Create a new index with a name: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.create_index('index_id', name='Name') # API request + >>> index.name + 'Name' + +Get or create an index: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_or_create_index('index_id') # API request + >>> index.id + 'index_id' + +List the indexes: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> [index.id for index in client.list_indexes()] # API request + ['index_id'] + +Retrieve an index: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_index('missing_index_id') # API request + >>> index is None + True + >>> index = client.get_index('index_id') # API request + >>> index.id + 'index_id' + +Get an index without making an API request + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_index('index_id', check=False) + >>> index.id + 'index_id' + +Update an index: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_index('index_id') # API request + >>> index.name = 'Name' + >>> client.update_index(index) + +Delete an index by ID: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> client.delete_index('index_id') # API request + +Delete an index: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_index('index_id') # API request + >>> index.id + 'index_id' + >>> client.delete_index(index) # API request + +Manage documents and fields +--------------------------- + +Create a document + +.. doctest:: + + >>> from gcloud import search + >>> document = search.Document('document_id', rank=0) + >>> document.id + 'document_id' + +Add a field to a document + +.. doctest:: + + >>> from gcloud import search + >>> document = search.Document('document_id') + >>> document.add_field(search.Field('fieldname')) + +Add values to a field + +.. doctest:: + + >>> from datetime import datetime + >>> from gcloud import search + >>> field = search.Field('fieldname') + >>> field.add_value('string') + >>> # Tokenization field ignored for non-string values. + >>> field.add_value('

string

', tokenization='html') + >>> field.add_value('string', tokenization='atom') + >>> field.add_value('string', tokenization='text') + >>> field.add_value(1234) + >>> field.add_value(datetime.now()) + >>> len(field.values) + 9 + +Add values to a field at initialization time + +.. doctest:: + + >>> from gcloud import search + >>> field = search.Field('fieldname', values=[ + 'string', + search.Value('

string2

', tokenization='html') + search.Value('string', tokenization='atom')]) + +Add a single document to an index: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_index('index_id') # API request + >>> document = search.Document('document_id', rank=0) + >>> index.add_document(document) # API request + +Add multiple documents to an index: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_index('index_id') # API request + >>> documents = [search.Document('document_id')] + >>> index.add_documents(documents) # API request + +Get a single document by ID: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_index('index_id') # API request + >>> document = index.get_document('missing_document_id') # API request + >>> document is None + True + >>> document = index.get_document('document_id') # API request + >>> document.fields + [] + +Delete a document by ID: + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_index('index_id') # API request + >>> index.delete_document('document_id') # API request + >>> index.delete_document('missing_document_id') # API request + +Searching +--------- + +Create a query + +.. doctest:: + + >>> from gcloud import search + >>> query = search.Query('query text here') + >>> query.query + 'query text here' + +Specify the fields to return in a query + +.. doctest:: + + >>> from gcloud import search + >>> query = search.Query('query text here', fields=['field1', 'field2']) + >>> query.fields + ['field1', 'field2'] + +Set the sort order of a query + +.. doctest:: + + >>> from gcloud import search + >>> query = search.Query('query text here', order_by='field1') + >>> query.order_by + 'field1' + >>> query2 = search.Query('query text here', order_by=['field2', 'field3']) + >>> query2.order_by + ['field2', 'field3'] + >>> # Order descending by field1 and ascending by field2 + >>> query4 = search.Query('query text here', order_by=['-field1', 'field2']) + >>> query3.order_by + ['-field1', 'field2'] + +Set custom field expressions on a query + +.. doctest:: + + >>> from gcloud import search + >>> query = search.Query('query text here') + >>> query.add_field_expression('total_price', '(price + tax)') + >>> # We don't do any checks on the expression. These are checked at query time. + >>> query.add_field_expression('invalid', 'is_prime(num)') + >>> query.add_field_expression('bad_field', '(missing_field + tax)') + +Set custom field expressions at initialization time + +.. doctest:: + + >>> from gcloud import search + >>> query = search.Query('query text here', field_expressions={ + 'total_price': '(price + tax)'}) + +Search an index + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_index('index_id') # API request + >>> matching = index.search(search.Query('query text here')) # API request + >>> for document in matching: + ... print document.id + +Search an index with a limit on number of results + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_index('index_id') # API request + >>> matching = index.search(search.Query('query text here'), + ... limit=42) # API request + +Search an index with a custom page size (advanced) + +.. doctest:: + + >>> from gcloud import search + >>> client = search.Client() + >>> index = client.get_index('index_id') # API request + >>> matching = index.search(search.Query('query text here'), + ... page_size=20) # API request diff --git a/gcloud/search/__init__.py b/gcloud/search/__init__.py new file mode 100644 index 000000000000..4616e6075b6c --- /dev/null +++ b/gcloud/search/__init__.py @@ -0,0 +1,27 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Search API wrapper. + +The main concepts with this API are: + +- :class:`gcloud.pubsub.topic.Topic` represents an endpoint to which messages + can be published using the Cloud Storage Pubsub API. + +- :class:`gcloud.pubsub.subscription.Subscription` represents a named + subscription (either pull or push) to a topic. +""" + +from gcloud.search.client import Client +from gcloud.search.connection import SCOPE diff --git a/gcloud/search/client.py b/gcloud/search/client.py new file mode 100644 index 000000000000..a42f0cc4d322 --- /dev/null +++ b/gcloud/search/client.py @@ -0,0 +1,225 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""gcloud pubsub client for interacting with API.""" + + +from gcloud._helpers import _get_production_project +from gcloud.credentials import get_credentials +from gcloud.credentials import get_for_service_account_json +from gcloud.credentials import get_for_service_account_p12 +from gcloud.pubsub.connection import Connection +from gcloud.pubsub.subscription import Subscription +from gcloud.pubsub.topic import Topic + + +class Client(object): + """Client to bundle configuration needed for API requests. + + :type project: string + :param project: the project which the client acts on behalf of. Will be + passed when creating a topic. If not passed, + falls back to the default inferred from the environment. + + :type credentials: :class:`oauth2client.client.OAuth2Credentials` or + :class:`NoneType` + :param credentials: The OAuth2 Credentials to use for the connection + owned by this client. If not passed (and if no ``http`` + object is passed), falls back to the default inferred + from the environment. + + :type http: :class:`httplib2.Http` or class that defines ``request()``. + :param http: An optional HTTP object to make requests. If not passed, an + ``http`` object is created that is bound to the + ``credentials`` for the current object. + + :raises: :class:`ValueError` if the project is neither passed in nor + set in the environment. + """ + def __init__(self, project=None, credentials=None, http=None): + self._connection = None + self._credentials = None + self._http = None + self._project = project + + @property + def project(self): + if self._project is None: + self._project = _get_production_project() + if self._project is None: + raise ValueError('Project was not passed and could not be ' + 'determined from the environment.') + return self._project + + @property + def connection(self): + if self._connection is None: + self._connection = self._create_connection( + credentials=self._credentials, http=self._http) + return self._connection + + @staticmethod + def _create_connection(credentials=None, http=None): + return Connection(credentials=credentials or get_credentials(), + http=http) + + @classmethod + def from_service_account_json(cls, json_credentials_path, project=None): + """Factory to retrieve JSON credentials while creating client. + + :type json_credentials_path: string + :param json_credentials_path: The path to a private key file (this file + was given to you when you created the + service account). This file must contain + a JSON object with a private key and + other credentials information (downloaded + from the Google APIs console). + + :type project: string + :param project: the project which the client acts on behalf of. Will be + passed when creating a topic. If not passed, falls + back to the default inferred from the environment. + + :rtype: :class:`gcloud.pubsub.client.Client` + :returns: The client created with the retrieved JSON credentials. + """ + credentials = get_for_service_account_json(json_credentials_path) + return cls(project=project, credentials=credentials) + + @classmethod + def from_service_account_p12(cls, client_email, private_key_path, + project=None): + """Factory to retrieve P12 credentials while creating client. + + .. note:: + Unless you have an explicit reason to use a PKCS12 key for your + service account, we recommend using a JSON key. + + :type client_email: string + :param client_email: The e-mail attached to the service account. + + :type private_key_path: string + :param private_key_path: The path to a private key file (this file was + given to you when you created the service + account). This file must be in P12 format. + + :type project: string + :param project: the project which the client acts on behalf of. Will be + passed when creating a topic. If not passed, falls + back to the default inferred from the environment. + + :rtype: :class:`gcloud.pubsub.client.Client` + :returns: The client created with the retrieved P12 credentials. + """ + credentials = get_for_service_account_p12(client_email, + private_key_path) + return cls(project=project, credentials=credentials) + + def list_indexes(self, page_size=None, page_token=None): + """List topics for the project associated with this client. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list + + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + + :type page_token: string + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of + topics. + + :rtype: tuple, (list, str) + :returns: list of :class:`gcloud.pubsub.topic.Topic`, plus a + "next page token" string: if not None, indicates that + more topics can be retrieved with another call (pass that + value as ``page_token``). + """ + params = {} + + if page_size is not None: + params['pageSize'] = page_size + + if page_token is not None: + params['pageToken'] = page_token + + path = '/projects/%s/topics' % (self.project,) + + return IndexIterator(client=self, extra_params=params, path=path) + + def list_subscriptions(self, page_size=None, page_token=None, + topic_name=None): + """List subscriptions for the project associated with this client. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list + + and (where ``topic_name`` is passed): + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/subscriptions/list + + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + + :type page_token: string + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of + topics. + + :type topic_name: string + :param topic_name: limit results to subscriptions bound to the given + topic. + + :rtype: tuple, (list, str) + :returns: list of :class:`gcloud.pubsub.subscription.Subscription`, + plus a "next page token" string: if not None, indicates that + more topics can be retrieved with another call (pass that + value as ``page_token``). + """ + params = {} + + if page_size is not None: + params['pageSize'] = page_size + + if page_token is not None: + params['pageToken'] = page_token + + if topic_name is None: + path = '/projects/%s/subscriptions' % (self.project,) + else: + path = '/projects/%s/topics/%s/subscriptions' % (self.project, + topic_name) + + resp = self.connection.api_request(method='GET', path=path, + query_params=params) + topics = {} + subscriptions = [Subscription.from_api_repr(resource, self, + topics=topics) + for resource in resp['subscriptions']] + return subscriptions, resp.get('nextPageToken') + + def topic(self, name, timestamp_messages=False): + """Creates a topic bound to the current client. + + :type name: string + :param name: the name of the topic to be constructed. + + :type timestamp_messages: boolean + :param timestamp_messages: To be passed to ``Topic`` constructor. + + :rtype: :class:`gcloud.pubsub.topic.Topic` + :returns: Topic created with the current client. + """ + return Topic(name, client=self, timestamp_messages=timestamp_messages) diff --git a/gcloud/search/connection.py b/gcloud/search/connection.py new file mode 100644 index 000000000000..d73ed9cfeebd --- /dev/null +++ b/gcloud/search/connection.py @@ -0,0 +1,39 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Create / interact with gcloud pubsub connections.""" + +from gcloud import connection as base_connection + + +SCOPE = ('https://www.googleapis.com/auth/pubsub', + 'https://www.googleapis.com/auth/cloud-platform') +"""The scopes required for authenticating as a Cloud Pub/Sub consumer.""" + + +class Connection(base_connection.JSONConnection): + """A connection to Google Cloud Pubsub via the JSON REST API.""" + + API_BASE_URL = 'https://pubsub.googleapis.com' + """The base of the API call URL.""" + + API_VERSION = 'v1beta2' + """The version of the API, used in building the API call's URL.""" + + API_URL_TEMPLATE = '{api_base_url}/{api_version}{path}' + """A template for the URL of a particular API call.""" + + def __init__(self, credentials=None, http=None): + credentials = self._create_scoped_credentials(credentials, SCOPE) + super(Connection, self).__init__(credentials=credentials, http=http) diff --git a/gcloud/search/index.py b/gcloud/search/index.py new file mode 100644 index 000000000000..a05dc1ffa87e --- /dev/null +++ b/gcloud/search/index.py @@ -0,0 +1,323 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Define Projects.""" + +from gcloud.exceptions import MissingClientError +from gcloud.exceptions import NotFound + + +class Project(object): + """Projects are containers for your work on Google Cloud Platform. + + .. note:: + + It's unlikely that you'd need to instantiate this outside the context + of a :class:`.client.Client`, so in general, it's best to get a Project + from a Resource Manager client. + + To create a new project:: + + >>> from gcloud import resource_manager + >>> client = resource_manager.Client() + >>> project = client.project('purple-spaceship-123') + >>> project.name = 'Purple Spaceship Project!' + >>> project.create() + + To get an existing project:: + + >>> from gcloud import resource_manager + >>> client = resource_manager.Client() + >>> project = client.get_project('purple-spaceship-123') + >>> print project.name + Purple Spaceship Project! + + To manage labels:: + + >>> from gcloud import resource_manager + >>> client = resource_manager.Client() + >>> project = client.get_project('purple-spaceship-123') + >>> project.labels = {'color': 'purple'} + >>> project.labels['environment'] = 'production' + >>> project.update() + + See: + https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects + + :type id: string + :param id: the globally unique id of the project + """ + def __init__(self, client, id): + self._client = client + self.id = id + self.name = None + self.number = None + self.labels = {} + self.status = None + + def __repr__(self): + if self.name: + display_name = '%s (%s)' % (self.name, self.id) + else: + display_name = self.id + return '' % display_name + + @classmethod + def from_api_repr(cls, resource, client): + """Factory: construct a project given its API representation. + + :type resource: dict + :param resource: project resource representation returned from the API + + :rtype: :class:`gcloud.resource_manager.project.Project` + """ + project = cls(id=resource['projectId'], client=client) + project._set_properties_from_api_repr(resource) + return project + + def _set_properties_from_api_repr(self, resource): + """Update specific properties from its API representation.""" + self.name = resource.get('name') + self.number = resource['projectNumber'] + self.labels = resource.get('labels', {}) + self.status = resource['lifecycleState'] + + @property + def full_name(self): + """Fully-qualified name (ie, ``'projects/purple-spaceship-123'``).""" + if not self.id: + raise ValueError('Missing project ID!') + return 'projects/%s' % (self.id) + + @property + def path(self): + """URL for the project (ie, ``'/projects/purple-spaceship-123'``).""" + return '/%s' % (self.full_name) + + @property + def client(self): + """The :class:`gcloud.resource_manager.client.Client` object.""" + return self._client + + def _require_client(self, client=None): + """Get either a client or raise an exception. + + We need to use this as the various methods could accept a client as a + parameter, which we need to evaluate. If the client provided is empty + and there is no client set as an instance variable, we'll raise a + :class:`gcloud.exceptions.MissingClientError`. + + :type client: :class:`gcloud.resource_manager.client.Client` + :param client: An optional client to test for existence. + """ + client = client or self.client + if not client: + raise MissingClientError() + return client + + def create(self, client=None): + """API call: create the project via a ``POST`` request. + + Example:: + + >>> from gcloud import resource_manager + >>> client = resource_manager.Client() + >>> project = client.project('new-spaceship-123') + >>> project.name = 'New Spaceship Project!' + >>> project.create() + + See + https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects/create + + :type client: :class:`gcloud.resource_manager.client.Client` or None + :param client: the client to use. If not passed, falls back to + the ``client`` attribute. + """ + client = client or self.client + data = {'projectId': self.id, 'name': self.name, 'labels': self.labels} + resp = client.connection.api_request(method='POST', path='/projects', + data=data) + self._set_properties_from_api_repr(resource=resp) + + + def reload(self, client=None): + """API call: reload the project via a ``GET`` request. + + This method will reload the newest metadata for the project. + + .. warning:: + + This will overwrite any local changes you've made and not saved! + + Example:: + + >>> from gcloud import resource_manager + >>> client = resource_manager.Client() + >>> project = client.get_project('purple-spaceship-123') + >>> project.name = 'Locally changed name' + >>> print project + + >>> project.reload() + >>> print project + + + See + https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects/get + + :type client: :class:`gcloud.resource_manager.client.Client` or None + :param client: the client to use. If not passed, falls back to + the ``client`` attribute. + """ + client = self._require_client(client=client) + + # We assume the project exists. If it doesn't it will raise a NotFound + # exception. + resp = client.connection.api_request(method='GET', path=self.path) + self._set_properties_from_api_repr(resource=resp) + + def update(self, client=None): + """API call: update the project via a ``PUT`` request. + + Example:: + + >>> from gcloud import resource_manager + >>> client = resource_manager.Client() + >>> project = client.get_project('purple-spaceship-123') + >>> project.name = 'New Purple Spaceship' + >>> project.labels['environment'] = 'prod' + >>> project.update() + + See + https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects/update + + :type client: :class:`gcloud.resource_manager.client.Client` or None + :param client: the client to use. If not passed, falls back to + the ``client`` attribute. + """ + client = self._require_client(client=client) + + data = {'name': self.name, 'labels': self.labels} + resp = client.connection.api_request(method='PUT', path=self.path, + data=data) + self._set_properties_from_api_repr(resp) + + def exists(self, client=None): + """API call: test the existence of a project via a ``GET`` request. + + Example:: + + >>> from gcloud import resource_manager + >>> client = resource_manager.Client() + >>> project = client.project('purple-spaceship-456') + >>> project.exists() + False + + You can also use the + :func:`gcloud.resource_manager.client.Client.get_project` + method to check whether a project exists, as it will return ``None`` + if the project doesn't exist:: + + >>> from gcloud import resource_manager + >>> client = resource_manager.Client() + >>> print client.get_project('purple-spaceship-456') + None + + See + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/projects/get + + :type client: :class:`gcloud.resource_manager.client.Client` or None + :param client: the client to use. If not passed, falls back to + the ``client`` attribute. + """ + client = self._require_client(client=client) + + try: + client.connection.api_request(method='GET', path=self.path) + except NotFound: + return False + else: + return True + + def delete(self, client=None, reload=True): + """API call: delete the project via a ``DELETE`` request. + + See: + https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects/delete + + This actually changes the status (``lifecycleState``) from ``ACTIVE`` + to ``DELETE_REQUESTED``. + Later (it's not specified when), the project will move into the + ``DELETE_IN_PROGRESS`` state, which means the deleting has actually + begun. + + Example:: + + >>> from gcloud import resource_manager + >>> client = resource_manager.Client() + >>> project = client.get_project('purple-spaceship-123') + >>> project.delete() + + :type client: :class:`gcloud.resource_manager.client.Client` or None + :param client: the client to use. If not passed, + falls back to the ``client`` attribute. + + :type reload: bool + :param reload: Whether to reload the project with the latest state. + Default: ``True``. + """ + client = self._require_client(client) + client.connection.api_request(method='DELETE', path=self.path) + + # If the reload flag is True, reload the project. + if reload: + self.reload() + + def undelete(self, client=None, reload=True): + """API call: undelete the project via a ``POST`` request. + + See + https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects/undelete + + This actually changes the project status (``lifecycleState``) from + ``DELETE_REQUESTED`` to ``ACTIVE``. + If the project has already reached a status of ``DELETE_IN_PROGRESS`, + this request will fail and the project cannot be restored. + + Example:: + + >>> from gcloud import resource_manager + >>> client = resource_manager.Client() + >>> project = client.get_project('purple-spaceship-123') + >>> project.delete() + >>> print project.status + DELETE_REQUESTED + >>> project.undelete() + >>> print project.status + ACTIVE + + :type client: :class:`gcloud.resource_manager.client.Client` or None + :param client: the client to use. If not passed, + falls back to the ``client`` attribute. + + :type reload: bool + :param reload: Whether to reload the project with the latest state. + Default: ``True``. + """ + client = self._require_client(client) + client.connection.api_request(method='POST', + path=self.path + ':undelete') + + # If the reload flag is True, reload the project. + if reload: + self.reload() diff --git a/gcloud/search/iterator.py b/gcloud/search/iterator.py new file mode 100644 index 000000000000..43cff3b83422 --- /dev/null +++ b/gcloud/search/iterator.py @@ -0,0 +1,156 @@ +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Iterators for paging through API responses. + +These iterators simplify the process of paging through API responses +where the response is a list of results with a ``nextPageToken``. + +To make an iterator work, just override the ``get_items_from_response`` +method so that given a response (containing a page of results) it parses +those results into an iterable of the actual objects you want:: + + class MyIterator(Iterator): + def get_items_from_response(self, response): + items = response.get('items', []) + for item in items: + my_item = MyItemClass(other_arg=True) + my_item._set_properties(item) + yield my_item + +You then can use this to get **all** the results from a resource:: + + >>> iterator = MyIterator(...) + >>> list(iterator) # Convert to a list (consumes all values). + +Or you can walk your way through items and call off the search early if +you find what you're looking for (resulting in possibly fewer +requests):: + + >>> for item in MyIterator(...): + >>> print item.name + >>> if not item.is_valid: + >>> break +""" + +from gcloud.resource_manager.project import Project + + +class Iterator(object): + """A generic class for iterating through Cloud list responses. + + :type client: :class:`gcloud.resource_manager.client.Client` + :param client: The client to use to make requests. + + :type path: string + :param path: The path to query for the list of items. + """ + + PAGE_TOKEN = 'pageToken' + RESERVED_PARAMS = frozenset([PAGE_TOKEN]) + + def __init__(self, client, path, extra_params=None): + self.client = client + self.path = path + self.page_number = 0 + self.next_page_token = None + self.extra_params = extra_params or {} + reserved_in_use = self.RESERVED_PARAMS.intersection( + self.extra_params) + if reserved_in_use: + raise ValueError(('Using a reserved parameter', + reserved_in_use)) + + def __iter__(self): + """Iterate through the list of items.""" + while self.has_next_page(): + response = self.get_next_page_response() + for item in self.get_items_from_response(response): + yield item + + def has_next_page(self): + """Determines whether or not this iterator has more pages. + + :rtype: boolean + :returns: Whether the iterator has more pages or not. + """ + if self.page_number == 0: + return True + + return self.next_page_token is not None + + def get_query_params(self): + """Getter for query parameters for the next request. + + :rtype: dict + :returns: A dictionary of query parameters. + """ + result = ({self.PAGE_TOKEN: self.next_page_token} + if self.next_page_token else {}) + result.update(self.extra_params) + return result + + def get_next_page_response(self): + """Requests the next page from the path provided. + + :rtype: dict + :returns: The parsed JSON response of the next page's contents. + """ + if not self.has_next_page(): + raise RuntimeError('No more pages. Try resetting the iterator.') + + response = self.client.connection.api_request( + method='GET', path=self.path, query_params=self.get_query_params()) + + self.page_number += 1 + self.next_page_token = response.get('nextPageToken') + + return response + + def reset(self): + """Resets the iterator to the beginning.""" + self.page_number = 0 + self.next_page_token = None + + def get_items_from_response(self, response): + """Factory method called while iterating. This should be overriden. + + This method should be overridden by a subclass. It should + accept the API response of a request for the next page of items, + and return a list (or other iterable) of items. + + Typically this method will construct a Bucket or a Blob from the + page of results in the response. + + :type response: dict + :param response: The response of asking for the next page of items. + + :rtype: iterable + :returns: Items that the iterator should yield. + """ + raise NotImplementedError + + +class IndexIterator(Iterator): + """An iterator over a list of Project resources.""" + + def get_items_from_response(self, response): + """Yield :class:`gcloud.search.index.Index` items from response. + + :type response: dict + :param response: The JSON API response for a page of indexes. + """ + for resource in response.get('projects', []): + item = Index.from_api_repr(resource, client=self.client) + yield item diff --git a/gcloud/search/message.py b/gcloud/search/message.py new file mode 100644 index 000000000000..b01c0e9c29b4 --- /dev/null +++ b/gcloud/search/message.py @@ -0,0 +1,79 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Define API Topics.""" + +import base64 +import datetime + +import pytz + +from gcloud._helpers import _RFC3339_MICROS + + +class Message(object): + """Messages can be published to a topic and received by subscribers. + + See: + https://cloud.google.com/pubsub/reference/rest/google/pubsub/v1beta2/PubsubMessage + + :type name: bytes + :param name: the payload of the message + + :type message_id: string + :param message_id: An ID assigned to the message by the API. + + :type attrs: dict or None + :param attrs: Extra metadata associated by the publisher with the message. + """ + def __init__(self, data, message_id, attributes=None): + self.data = data + self.message_id = message_id + self._attributes = attributes + + @property + def attributes(self): + """Lazily-constructed attribute dictionary""" + if self._attributes is None: + self._attributes = {} + return self._attributes + + @property + def timestamp(self): + """Return sortable timestamp from attributes, if passed. + + Allows sorting messages in publication order (assuming consistent + clocks across all publishers). + + :rtype: datetime + :returns: timestamp (in UTC timezone) parsed from RFC 3339 timestamp + :raises: ValueError if timestamp not in ``attributes``, or if it does + not match the RFC 3339 format. + """ + stamp = self.attributes.get('timestamp') + if stamp is None: + raise ValueError('No timestamp') + return datetime.datetime.strptime(stamp, _RFC3339_MICROS).replace( + tzinfo=pytz.UTC) + + @classmethod + def from_api_repr(cls, api_repr): + """Factory: construct message from API representation. + + :type api_repr: dict or None + :param api_repr: The API representation of the message + """ + data = base64.b64decode(api_repr['data']) + return cls(data=data, message_id=api_repr['messageId'], + attributes=api_repr.get('attributes')) diff --git a/gcloud/search/subscription.py b/gcloud/search/subscription.py new file mode 100644 index 000000000000..cb3023a286d7 --- /dev/null +++ b/gcloud/search/subscription.py @@ -0,0 +1,263 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Define API Subscriptions.""" + +from gcloud.exceptions import NotFound +from gcloud.pubsub.message import Message +from gcloud.pubsub.topic import Topic + + +class Subscription(object): + """Subscriptions receive messages published to their topics. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions + + :type name: string + :param name: the name of the subscription + + :type topic: :class:`gcloud.pubsub.topic.Topic` + :param topic: the topic to which the subscription belongs.. + + :type ack_deadline: int + :param ack_deadline: the deadline (in seconds) by which messages pulled + from the back-end must be acknowledged. + + :type push_endpoint: string + :param push_endpoint: URL to which messages will be pushed by the back-end. + If not set, the application must pull messages. + """ + def __init__(self, name, topic, ack_deadline=None, push_endpoint=None): + self.name = name + self.topic = topic + self.ack_deadline = ack_deadline + self.push_endpoint = push_endpoint + + @classmethod + def from_api_repr(cls, resource, client, topics=None): + """Factory: construct a topic given its API representation + + :type resource: dict + :param resource: topic resource representation returned from the API + + :type client: :class:`gcloud.pubsub.client.Client` + :param client: Client which holds credentials and project + configuration for a topic. + + :type topics: dict or None + :param topics: A mapping of topic names -> topics. If not passed, + the subscription will have a newly-created topic. + + :rtype: :class:`gcloud.pubsub.subscription.Subscription` + :returns: Subscription parsed from ``resource``. + """ + if topics is None: + topics = {} + t_name = resource['topic'] + topic = topics.get(t_name) + if topic is None: + topic = topics[t_name] = Topic.from_api_repr({'name': t_name}, + client) + _, _, _, name = resource['name'].split('/') + ack_deadline = resource.get('ackDeadlineSeconds') + push_config = resource.get('pushConfig', {}) + push_endpoint = push_config.get('pushEndpoint') + return cls(name, topic, ack_deadline, push_endpoint) + + @property + def path(self): + """URL path for the subscription's APIs""" + project = self.topic.project + return '/projects/%s/subscriptions/%s' % (project, self.name) + + def _require_client(self, client): + """Check client or verify over-ride. + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the topic of the + current subscription. + + :rtype: :class:`gcloud.pubsub.client.Client` + :returns: The client passed in or the currently bound client. + """ + if client is None: + client = self.topic._client + return client + + def create(self, client=None): + """API call: create the subscription via a PUT request + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/create + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. + """ + data = {'topic': self.topic.full_name} + + if self.ack_deadline is not None: + data['ackDeadline'] = self.ack_deadline + + if self.push_endpoint is not None: + data['pushConfig'] = {'pushEndpoint': self.push_endpoint} + + client = self._require_client(client) + client.connection.api_request(method='PUT', path=self.path, data=data) + + def exists(self, client=None): + """API call: test existence of the subscription via a GET request + + See + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. + """ + client = self._require_client(client) + try: + client.connection.api_request(method='GET', path=self.path) + except NotFound: + return False + else: + return True + + def reload(self, client=None): + """API call: sync local subscription configuration via a GET request + + See + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. + """ + client = self._require_client(client) + data = client.connection.api_request(method='GET', path=self.path) + self.ack_deadline = data.get('ackDeadline') + push_config = data.get('pushConfig', {}) + self.push_endpoint = push_config.get('pushEndpoint') + + def modify_push_configuration(self, push_endpoint, client=None): + """API call: update the push endpoint for the subscription. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/modifyPushConfig + + :type push_endpoint: string + :param push_endpoint: URL to which messages will be pushed by the + back-end. If None, the application must pull + messages. + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. + """ + client = self._require_client(client) + data = {} + config = data['pushConfig'] = {} + if push_endpoint is not None: + config['pushEndpoint'] = push_endpoint + client.connection.api_request( + method='POST', path='%s:modifyPushConfig' % (self.path,), + data=data) + self.push_endpoint = push_endpoint + + def pull(self, return_immediately=False, max_messages=1, client=None): + """API call: retrieve messages for the subscription. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/pull + + :type return_immediately: boolean + :param return_immediately: if True, the back-end returns even if no + messages are available; if False, the API + call blocks until one or more messages are + available. + + :type max_messages: int + :param max_messages: the maximum number of messages to return. + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. + + :rtype: list of (ack_id, message) tuples + :returns: sequence of tuples: ``ack_id`` is the ID to be used in a + subsequent call to :meth:`acknowledge`, and ``message`` + is an instance of :class:`gcloud.pubsub.message.Message`. + """ + client = self._require_client(client) + data = {'returnImmediately': return_immediately, + 'maxMessages': max_messages} + response = client.connection.api_request( + method='POST', path='%s:pull' % (self.path,), data=data) + return [(info['ackId'], Message.from_api_repr(info['message'])) + for info in response.get('receivedMessages', ())] + + def acknowledge(self, ack_ids, client=None): + """API call: acknowledge retrieved messages for the subscription. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge + + :type ack_ids: list of string + :param ack_ids: ack IDs of messages being acknowledged + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. + """ + client = self._require_client(client) + data = {'ackIds': ack_ids} + client.connection.api_request( + method='POST', path='%s:acknowledge' % (self.path,), data=data) + + def modify_ack_deadline(self, ack_id, ack_deadline, client=None): + """API call: update acknowledgement deadline for a retrieved message. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge + + :type ack_id: string + :param ack_id: ack ID of message being updated + + :type ack_deadline: int + :param ack_deadline: new deadline for the message, in seconds + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. + """ + client = self._require_client(client) + data = {'ackId': ack_id, 'ackDeadlineSeconds': ack_deadline} + client.connection.api_request( + method='POST', path='%s:modifyAckDeadline' % (self.path,), + data=data) + + def delete(self, client=None): + """API call: delete the subscription via a DELETE request. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/delete + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. + """ + client = self._require_client(client) + client.connection.api_request(method='DELETE', path=self.path) diff --git a/gcloud/search/test_client.py b/gcloud/search/test_client.py new file mode 100644 index 000000000000..f0f1d30a81be --- /dev/null +++ b/gcloud/search/test_client.py @@ -0,0 +1,342 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestClient(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.client import Client + return Client + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_defaults(self): + from gcloud._testing import _Monkey + from gcloud.pubsub import SCOPE + from gcloud.pubsub import client + from gcloud.pubsub.connection import Connection + + PROJECT = 'PROJECT' + CREDS = _Credentials() + FUNC_CALLS = [] + + def mock_get_proj(): + FUNC_CALLS.append('_get_production_project') + return PROJECT + + def mock_get_credentials(): + FUNC_CALLS.append('get_credentials') + return CREDS + + with _Monkey(client, get_credentials=mock_get_credentials, + _get_production_project=mock_get_proj): + client_obj = self._makeOne() + + self.assertEqual(client_obj.project, PROJECT) + self.assertTrue(isinstance(client_obj.connection, Connection)) + self.assertTrue(client_obj.connection._credentials is CREDS) + self.assertEqual(client_obj.connection._credentials._scopes, SCOPE) + self.assertEqual(FUNC_CALLS, + ['_get_production_project', 'get_credentials']) + + def test_ctor_missing_project(self): + from gcloud._testing import _Monkey + from gcloud.pubsub import client + + FUNC_CALLS = [] + + def mock_get_proj(): + FUNC_CALLS.append('_get_production_project') + return None + + with _Monkey(client, _get_production_project=mock_get_proj): + self.assertRaises(ValueError, self._makeOne) + + self.assertEqual(FUNC_CALLS, ['_get_production_project']) + + def test_ctor_explicit(self): + from gcloud.pubsub import SCOPE + from gcloud.pubsub.connection import Connection + + PROJECT = 'PROJECT' + CREDS = _Credentials() + + client_obj = self._makeOne(project=PROJECT, credentials=CREDS) + + self.assertEqual(client_obj.project, PROJECT) + self.assertTrue(isinstance(client_obj.connection, Connection)) + self.assertTrue(client_obj.connection._credentials is CREDS) + self.assertEqual(CREDS._scopes, SCOPE) + + def test_from_service_account_json(self): + from gcloud._testing import _Monkey + from gcloud.pubsub import client + from gcloud.pubsub.connection import Connection + + PROJECT = 'PROJECT' + KLASS = self._getTargetClass() + CREDS = _Credentials() + _CALLED = [] + + def mock_creds(arg1): + _CALLED.append((arg1,)) + return CREDS + + BOGUS_ARG = object() + with _Monkey(client, get_for_service_account_json=mock_creds): + client_obj = KLASS.from_service_account_json( + BOGUS_ARG, project=PROJECT) + + self.assertEqual(client_obj.project, PROJECT) + self.assertTrue(isinstance(client_obj.connection, Connection)) + self.assertTrue(client_obj.connection._credentials is CREDS) + self.assertEqual(_CALLED, [(BOGUS_ARG,)]) + + def test_from_service_account_p12(self): + from gcloud._testing import _Monkey + from gcloud.pubsub import client + from gcloud.pubsub.connection import Connection + + PROJECT = 'PROJECT' + KLASS = self._getTargetClass() + CREDS = _Credentials() + _CALLED = [] + + def mock_creds(arg1, arg2): + _CALLED.append((arg1, arg2)) + return CREDS + + BOGUS_ARG1 = object() + BOGUS_ARG2 = object() + with _Monkey(client, get_for_service_account_p12=mock_creds): + client_obj = KLASS.from_service_account_p12( + BOGUS_ARG1, BOGUS_ARG2, project=PROJECT) + + self.assertEqual(client_obj.project, PROJECT) + self.assertTrue(isinstance(client_obj.connection, Connection)) + self.assertTrue(client_obj.connection._credentials is CREDS) + self.assertEqual(_CALLED, [(BOGUS_ARG1, BOGUS_ARG2)]) + + def test_list_topics_no_paging(self): + from gcloud.pubsub.topic import Topic + PROJECT = 'PROJECT' + CREDS = _Credentials() + + CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + + RETURNED = {'topics': [{'name': TOPIC_PATH}]} + # Replace the connection on the client with one of our own. + CLIENT_OBJ.connection = _Connection(RETURNED) + + # Execute request. + topics, next_page_token = CLIENT_OBJ.list_topics() + # Test values are correct. + self.assertEqual(len(topics), 1) + self.assertTrue(isinstance(topics[0], Topic)) + self.assertEqual(topics[0].name, TOPIC_NAME) + self.assertEqual(next_page_token, None) + self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) + req = CLIENT_OBJ.connection._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) + self.assertEqual(req['query_params'], {}) + + def test_list_topics_with_paging(self): + from gcloud.pubsub.topic import Topic + PROJECT = 'PROJECT' + CREDS = _Credentials() + + CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + TOKEN1 = 'TOKEN1' + TOKEN2 = 'TOKEN2' + SIZE = 1 + RETURNED = {'topics': [{'name': TOPIC_PATH}], + 'nextPageToken': TOKEN2} + # Replace the connection on the client with one of our own. + CLIENT_OBJ.connection = _Connection(RETURNED) + + # Execute request. + topics, next_page_token = CLIENT_OBJ.list_topics(SIZE, TOKEN1) + # Test values are correct. + self.assertEqual(len(topics), 1) + self.assertTrue(isinstance(topics[0], Topic)) + self.assertEqual(topics[0].name, TOPIC_NAME) + self.assertEqual(next_page_token, TOKEN2) + self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) + req = CLIENT_OBJ.connection._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) + self.assertEqual(req['query_params'], + {'pageSize': SIZE, 'pageToken': TOKEN1}) + + def test_list_subscriptions_no_paging(self): + from gcloud.pubsub.subscription import Subscription + PROJECT = 'PROJECT' + CREDS = _Credentials() + + CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + + SUB_NAME = 'subscription_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_INFO = [{'name': SUB_PATH, 'topic': TOPIC_PATH}] + RETURNED = {'subscriptions': SUB_INFO} + # Replace the connection on the client with one of our own. + CLIENT_OBJ.connection = _Connection(RETURNED) + + # Execute request. + subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions() + # Test values are correct. + self.assertEqual(len(subscriptions), 1) + self.assertTrue(isinstance(subscriptions[0], Subscription)) + self.assertEqual(subscriptions[0].name, SUB_NAME) + self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME) + self.assertEqual(next_page_token, None) + self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) + req = CLIENT_OBJ.connection._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT) + self.assertEqual(req['query_params'], {}) + + def test_list_subscriptions_with_paging(self): + from gcloud.pubsub.subscription import Subscription + PROJECT = 'PROJECT' + CREDS = _Credentials() + + CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + + SUB_NAME = 'subscription_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + ACK_DEADLINE = 42 + PUSH_ENDPOINT = 'https://push.example.com/endpoint' + TOKEN1 = 'TOKEN1' + TOKEN2 = 'TOKEN2' + SIZE = 1 + SUB_INFO = [{'name': SUB_PATH, + 'topic': TOPIC_PATH, + 'ackDeadlineSeconds': ACK_DEADLINE, + 'pushConfig': {'pushEndpoint': PUSH_ENDPOINT}}] + RETURNED = {'subscriptions': SUB_INFO, 'nextPageToken': TOKEN2} + # Replace the connection on the client with one of our own. + CLIENT_OBJ.connection = _Connection(RETURNED) + + # Execute request. + subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions( + SIZE, TOKEN1) + # Test values are correct. + self.assertEqual(len(subscriptions), 1) + self.assertTrue(isinstance(subscriptions[0], Subscription)) + self.assertEqual(subscriptions[0].name, SUB_NAME) + self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME) + self.assertEqual(subscriptions[0].ack_deadline, ACK_DEADLINE) + self.assertEqual(subscriptions[0].push_endpoint, PUSH_ENDPOINT) + self.assertEqual(next_page_token, TOKEN2) + self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) + req = CLIENT_OBJ.connection._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT) + self.assertEqual(req['query_params'], + {'pageSize': SIZE, 'pageToken': TOKEN1}) + + def test_list_subscriptions_with_topic_name(self): + from gcloud.pubsub.subscription import Subscription + PROJECT = 'PROJECT' + CREDS = _Credentials() + + CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + + SUB_NAME_1 = 'subscription_1' + SUB_PATH_1 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_1) + SUB_NAME_2 = 'subscription_2' + SUB_PATH_2 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_2) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_INFO = [{'name': SUB_PATH_1, 'topic': TOPIC_PATH}, + {'name': SUB_PATH_2, 'topic': TOPIC_PATH}] + TOKEN = 'TOKEN' + RETURNED = {'subscriptions': SUB_INFO, 'nextPageToken': TOKEN} + # Replace the connection on the client with one of our own. + CLIENT_OBJ.connection = _Connection(RETURNED) + + # Execute request. + subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions( + topic_name=TOPIC_NAME) + # Test values are correct. + self.assertEqual(len(subscriptions), 2) + self.assertTrue(isinstance(subscriptions[0], Subscription)) + self.assertEqual(subscriptions[0].name, SUB_NAME_1) + self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME) + self.assertTrue(isinstance(subscriptions[1], Subscription)) + self.assertEqual(subscriptions[1].name, SUB_NAME_2) + self.assertEqual(subscriptions[1].topic.name, TOPIC_NAME) + self.assertTrue(subscriptions[1].topic is subscriptions[0].topic) + self.assertEqual(next_page_token, TOKEN) + self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) + req = CLIENT_OBJ.connection._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], + '/projects/%s/topics/%s/subscriptions' + % (PROJECT, TOPIC_NAME)) + self.assertEqual(req['query_params'], {}) + + def test_topic(self): + PROJECT = 'PROJECT' + TOPIC_NAME = 'TOPIC_NAME' + CREDS = _Credentials() + + client_obj = self._makeOne(project=PROJECT, credentials=CREDS) + new_topic = client_obj.topic(TOPIC_NAME) + self.assertEqual(new_topic.name, TOPIC_NAME) + self.assertTrue(new_topic._client is client_obj) + self.assertEqual(new_topic.project, PROJECT) + self.assertEqual(new_topic.full_name, + 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) + self.assertFalse(new_topic.timestamp_messages) + + +class _Credentials(object): + + _scopes = None + + @staticmethod + def create_scoped_required(): + return True + + def create_scoped(self, scope): + self._scopes = scope + return self + + +class _Connection(object): + + def __init__(self, *responses): + self._responses = responses + self._requested = [] + + def api_request(self, **kw): + self._requested.append(kw) + response, self._responses = self._responses[0], self._responses[1:] + return response diff --git a/gcloud/search/test_connection.py b/gcloud/search/test_connection.py new file mode 100644 index 000000000000..4a8618388e4e --- /dev/null +++ b/gcloud/search/test_connection.py @@ -0,0 +1,46 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestConnection(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.connection import Connection + return Connection + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_build_api_url_no_extra_query_params(self): + conn = self._makeOne() + URI = '/'.join([ + conn.API_BASE_URL, + conn.API_VERSION, + 'foo', + ]) + self.assertEqual(conn.build_api_url('/foo'), URI) + + def test_build_api_url_w_extra_query_params(self): + from six.moves.urllib.parse import parse_qsl + from six.moves.urllib.parse import urlsplit + conn = self._makeOne() + uri = conn.build_api_url('/foo', {'bar': 'baz'}) + scheme, netloc, path, qs, _ = urlsplit(uri) + self.assertEqual('%s://%s' % (scheme, netloc), conn.API_BASE_URL) + self.assertEqual(path, + '/'.join(['', conn.API_VERSION, 'foo'])) + parms = dict(parse_qsl(qs)) + self.assertEqual(parms['bar'], 'baz') diff --git a/gcloud/search/test_message.py b/gcloud/search/test_message.py new file mode 100644 index 000000000000..38ad240e6199 --- /dev/null +++ b/gcloud/search/test_message.py @@ -0,0 +1,104 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestMessage(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.message import Message + return Message + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_no_attributes(self): + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + message = self._makeOne(data=DATA, message_id=MESSAGE_ID) + self.assertEqual(message.data, DATA) + self.assertEqual(message.message_id, MESSAGE_ID) + self.assertEqual(message.attributes, {}) + + def test_ctor_w_attributes(self): + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + ATTRS = {'a': 'b'} + message = self._makeOne(data=DATA, message_id=MESSAGE_ID, + attributes=ATTRS) + self.assertEqual(message.data, DATA) + self.assertEqual(message.message_id, MESSAGE_ID) + self.assertEqual(message.attributes, ATTRS) + + def test_from_api_repr_no_attributes(self): + from base64 import b64encode as b64 + DATA = b'DEADBEEF' + B64_DATA = b64(DATA) + MESSAGE_ID = '12345' + api_repr = {'data': B64_DATA, 'messageId': MESSAGE_ID} + message = self._getTargetClass().from_api_repr(api_repr) + self.assertEqual(message.data, DATA) + self.assertEqual(message.message_id, MESSAGE_ID) + self.assertEqual(message.attributes, {}) + + def test_from_api_repr_w_attributes(self): + from base64 import b64encode as b64 + DATA = b'DEADBEEF' + B64_DATA = b64(DATA) + MESSAGE_ID = '12345' + ATTRS = {'a': 'b'} + api_repr = {'data': B64_DATA, + 'messageId': MESSAGE_ID, + 'attributes': ATTRS} + message = self._getTargetClass().from_api_repr(api_repr) + self.assertEqual(message.data, DATA) + self.assertEqual(message.message_id, MESSAGE_ID) + self.assertEqual(message.attributes, ATTRS) + + def test_timestamp_no_attributes(self): + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + message = self._makeOne(data=DATA, message_id=MESSAGE_ID) + + def _to_fail(): + return message.timestamp + + self.assertRaises(ValueError, _to_fail) + + def test_timestamp_wo_timestamp_in_attributes(self): + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + ATTRS = {'a': 'b'} + message = self._makeOne(data=DATA, message_id=MESSAGE_ID, + attributes=ATTRS) + + def _to_fail(): + return message.timestamp + + self.assertRaises(ValueError, _to_fail) + + def test_timestamp_w_timestamp_in_attributes(self): + from datetime import datetime + from pytz import utc + from gcloud._helpers import _RFC3339_MICROS + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + TIMESTAMP = '2015-04-10T18:42:27.131956Z' + naive = datetime.strptime(TIMESTAMP, _RFC3339_MICROS) + timestamp = naive.replace(tzinfo=utc) + ATTRS = {'timestamp': TIMESTAMP} + message = self._makeOne(data=DATA, message_id=MESSAGE_ID, + attributes=ATTRS) + self.assertEqual(message.timestamp, timestamp) diff --git a/gcloud/search/test_subscription.py b/gcloud/search/test_subscription.py new file mode 100644 index 000000000000..db966ac3c12a --- /dev/null +++ b/gcloud/search/test_subscription.py @@ -0,0 +1,520 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestSubscription(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.subscription import Subscription + return Subscription + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_defaults(self): + SUB_NAME = 'sub_name' + topic = object() + subscription = self._makeOne(SUB_NAME, topic) + self.assertEqual(subscription.name, SUB_NAME) + self.assertTrue(subscription.topic is topic) + self.assertEqual(subscription.ack_deadline, None) + self.assertEqual(subscription.push_endpoint, None) + + def test_ctor_explicit(self): + SUB_NAME = 'sub_name' + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + topic = object() + subscription = self._makeOne(SUB_NAME, topic, DEADLINE, ENDPOINT) + self.assertEqual(subscription.name, SUB_NAME) + self.assertTrue(subscription.topic is topic) + self.assertEqual(subscription.ack_deadline, DEADLINE) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + + def test_from_api_repr_no_topics(self): + from gcloud.pubsub.topic import Topic + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + resource = {'topic': TOPIC_PATH, + 'name': SUB_PATH, + 'ackDeadlineSeconds': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}} + klass = self._getTargetClass() + client = _Client(project=PROJECT) + subscription = klass.from_api_repr(resource, client) + self.assertEqual(subscription.name, SUB_NAME) + topic = subscription.topic + self.assertTrue(isinstance(topic, Topic)) + self.assertEqual(topic.name, TOPIC_NAME) + self.assertEqual(topic.project, PROJECT) + self.assertEqual(subscription.ack_deadline, DEADLINE) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + + def test_from_api_repr_w_topics_no_topic_match(self): + from gcloud.pubsub.topic import Topic + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + resource = {'topic': TOPIC_PATH, + 'name': SUB_PATH, + 'ackDeadlineSeconds': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}} + topics = {} + klass = self._getTargetClass() + client = _Client(project=PROJECT) + subscription = klass.from_api_repr(resource, client, topics=topics) + self.assertEqual(subscription.name, SUB_NAME) + topic = subscription.topic + self.assertTrue(isinstance(topic, Topic)) + self.assertTrue(topic is topics[TOPIC_PATH]) + self.assertEqual(topic.name, TOPIC_NAME) + self.assertEqual(topic.project, PROJECT) + self.assertEqual(subscription.ack_deadline, DEADLINE) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + + def test_from_api_repr_w_topics_w_topic_match(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + resource = {'topic': TOPIC_PATH, + 'name': SUB_PATH, + 'ackDeadlineSeconds': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}} + topic = object() + topics = {TOPIC_PATH: topic} + klass = self._getTargetClass() + client = _Client(project=PROJECT) + subscription = klass.from_api_repr(resource, client, topics=topics) + self.assertEqual(subscription.name, SUB_NAME) + self.assertTrue(subscription.topic is topic) + self.assertEqual(subscription.ack_deadline, DEADLINE) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + + def test_create_pull_wo_ack_deadline_w_bound_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + BODY = {'topic': TOPIC_PATH} + conn = _Connection({'name': SUB_PATH}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) + subscription = self._makeOne(SUB_NAME, topic) + subscription.create() + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'PUT') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + self.assertEqual(req['data'], BODY) + + def test_create_push_w_ack_deadline_w_alternate_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + BODY = {'topic': TOPIC_PATH, + 'ackDeadline': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}} + conn1 = _Connection({'name': SUB_PATH}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'name': SUB_PATH}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) + subscription = self._makeOne(SUB_NAME, topic, DEADLINE, ENDPOINT) + subscription.create(client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'PUT') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + self.assertEqual(req['data'], BODY) + + def test_exists_miss_w_bound_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + conn = _Connection() + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) + subscription = self._makeOne(SUB_NAME, topic) + self.assertFalse(subscription.exists()) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + self.assertEqual(req.get('query_params'), None) + + def test_exists_hit_w_alternate_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn1 = _Connection({'name': SUB_PATH, 'topic': TOPIC_PATH}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'name': SUB_PATH, 'topic': TOPIC_PATH}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) + subscription = self._makeOne(SUB_NAME, topic) + self.assertTrue(subscription.exists(client=CLIENT2)) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + self.assertEqual(req.get('query_params'), None) + + def test_reload_w_bound_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + conn = _Connection({'name': SUB_PATH, + 'topic': TOPIC_PATH, + 'ackDeadline': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) + subscription = self._makeOne(SUB_NAME, topic) + subscription.reload() + self.assertEqual(subscription.ack_deadline, DEADLINE) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + + def test_reload_w_alternate_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + conn1 = _Connection() + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'name': SUB_PATH, + 'topic': TOPIC_PATH, + 'ackDeadline': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) + subscription = self._makeOne(SUB_NAME, topic) + subscription.reload(client=CLIENT2) + self.assertEqual(subscription.ack_deadline, DEADLINE) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + + def test_modify_push_config_w_endpoint_w_bound_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ENDPOINT = 'https://api.example.com/push' + conn = _Connection({}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) + subscription = self._makeOne(SUB_NAME, topic) + subscription.modify_push_configuration(push_endpoint=ENDPOINT) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:modifyPushConfig' % SUB_PATH) + self.assertEqual(req['data'], + {'pushConfig': {'pushEndpoint': ENDPOINT}}) + + def test_modify_push_config_wo_endpoint_w_alternate_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ENDPOINT = 'https://api.example.com/push' + conn1 = _Connection({}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) + subscription = self._makeOne(SUB_NAME, topic, push_endpoint=ENDPOINT) + subscription.modify_push_configuration(push_endpoint=None, + client=CLIENT2) + self.assertEqual(subscription.push_endpoint, None) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:modifyPushConfig' % SUB_PATH) + self.assertEqual(req['data'], {'pushConfig': {}}) + + def test_pull_wo_return_immediately_max_messages_w_bound_client(self): + import base64 + from gcloud.pubsub.message import Message + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ACK_ID = 'DEADBEEF' + MSG_ID = 'BEADCAFE' + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD) + MESSAGE = {'messageId': MSG_ID, 'data': B64} + REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} + conn = _Connection({'receivedMessages': [REC_MESSAGE]}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) + subscription = self._makeOne(SUB_NAME, topic) + pulled = subscription.pull() + self.assertEqual(len(pulled), 1) + ack_id, message = pulled[0] + self.assertEqual(ack_id, ACK_ID) + self.assertTrue(isinstance(message, Message)) + self.assertEqual(message.data, PAYLOAD) + self.assertEqual(message.message_id, MSG_ID) + self.assertEqual(message.attributes, {}) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:pull' % SUB_PATH) + self.assertEqual(req['data'], + {'returnImmediately': False, 'maxMessages': 1}) + + def test_pull_w_return_immediately_w_max_messages_w_alt_client(self): + import base64 + from gcloud.pubsub.message import Message + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ACK_ID = 'DEADBEEF' + MSG_ID = 'BEADCAFE' + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD) + MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}} + REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} + conn1 = _Connection() + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'receivedMessages': [REC_MESSAGE]}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) + subscription = self._makeOne(SUB_NAME, topic) + pulled = subscription.pull(return_immediately=True, max_messages=3, + client=CLIENT2) + self.assertEqual(len(pulled), 1) + ack_id, message = pulled[0] + self.assertEqual(ack_id, ACK_ID) + self.assertTrue(isinstance(message, Message)) + self.assertEqual(message.data, PAYLOAD) + self.assertEqual(message.message_id, MSG_ID) + self.assertEqual(message.attributes, {'a': 'b'}) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:pull' % SUB_PATH) + self.assertEqual(req['data'], + {'returnImmediately': True, 'maxMessages': 3}) + + def test_pull_wo_receivedMessages(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + conn = _Connection({}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) + subscription = self._makeOne(SUB_NAME, topic) + pulled = subscription.pull(return_immediately=False) + self.assertEqual(len(pulled), 0) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:pull' % SUB_PATH) + self.assertEqual(req['data'], + {'returnImmediately': False, 'maxMessages': 1}) + + def test_acknowledge_w_bound_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + conn = _Connection({}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) + subscription = self._makeOne(SUB_NAME, topic) + subscription.acknowledge([ACK_ID1, ACK_ID2]) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:acknowledge' % SUB_PATH) + self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]}) + + def test_acknowledge_w_alternate_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + conn1 = _Connection({}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) + subscription = self._makeOne(SUB_NAME, topic) + subscription.acknowledge([ACK_ID1, ACK_ID2], client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:acknowledge' % SUB_PATH) + self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]}) + + def test_modify_ack_deadline_w_bound_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ACK_ID = 'DEADBEEF' + DEADLINE = 42 + conn = _Connection({}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) + subscription = self._makeOne(SUB_NAME, topic) + subscription.modify_ack_deadline(ACK_ID, DEADLINE) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:modifyAckDeadline' % SUB_PATH) + self.assertEqual(req['data'], + {'ackId': ACK_ID, 'ackDeadlineSeconds': DEADLINE}) + + def test_modify_ack_deadline_w_alternate_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + ACK_ID = 'DEADBEEF' + DEADLINE = 42 + conn1 = _Connection({}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) + subscription = self._makeOne(SUB_NAME, topic) + subscription.modify_ack_deadline(ACK_ID, DEADLINE, client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:modifyAckDeadline' % SUB_PATH) + self.assertEqual(req['data'], + {'ackId': ACK_ID, 'ackDeadlineSeconds': DEADLINE}) + + def test_delete_w_bound_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + conn = _Connection({}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) + subscription = self._makeOne(SUB_NAME, topic) + subscription.delete() + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'DELETE') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + + def test_delete_w_alternate_client(self): + PROJECT = 'PROJECT' + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + conn1 = _Connection({}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) + subscription = self._makeOne(SUB_NAME, topic) + subscription.delete(client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'DELETE') + self.assertEqual(req['path'], '/%s' % SUB_PATH) + + +class _Connection(object): + + def __init__(self, *responses): + self._responses = responses + self._requested = [] + + def api_request(self, **kw): + from gcloud.exceptions import NotFound + self._requested.append(kw) + + try: + response, self._responses = self._responses[0], self._responses[1:] + except: + raise NotFound('miss') + else: + return response + + +class _Topic(object): + + def __init__(self, name, client): + self.name = name + self._client = client + self.project = client.project + self.full_name = 'projects/%s/topics/%s' % (client.project, name) + self.path = '/projects/%s/topics/%s' % (client.project, name) + + +class _Client(object): + + def __init__(self, project, connection=None): + self.project = project + self.connection = connection diff --git a/gcloud/search/test_topic.py b/gcloud/search/test_topic.py new file mode 100644 index 000000000000..4d4942db6a23 --- /dev/null +++ b/gcloud/search/test_topic.py @@ -0,0 +1,550 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestTopic(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.topic import Topic + return Topic + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_w_explicit_timestamp(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + CLIENT = _Client(project=PROJECT) + topic = self._makeOne(TOPIC_NAME, + client=CLIENT, + timestamp_messages=True) + self.assertEqual(topic.name, TOPIC_NAME) + self.assertEqual(topic.project, PROJECT) + self.assertEqual(topic.full_name, + 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) + self.assertTrue(topic.timestamp_messages) + + def test_from_api_repr(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + CLIENT = _Client(project=PROJECT) + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + resource = {'name': PATH} + klass = self._getTargetClass() + topic = klass.from_api_repr(resource, client=CLIENT) + self.assertEqual(topic.name, TOPIC_NAME) + self.assertTrue(topic._client is CLIENT) + self.assertEqual(topic.project, PROJECT) + self.assertEqual(topic.full_name, PATH) + + def test_from_api_repr_with_bad_client(self): + TOPIC_NAME = 'topic_name' + PROJECT1 = 'PROJECT1' + PROJECT2 = 'PROJECT2' + CLIENT = _Client(project=PROJECT1) + PATH = 'projects/%s/topics/%s' % (PROJECT2, TOPIC_NAME) + resource = {'name': PATH} + klass = self._getTargetClass() + self.assertRaises(ValueError, klass.from_api_repr, + resource, client=CLIENT) + + def test_create_w_bound_client(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'name': PATH}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + topic.create() + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'PUT') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_create_w_alternate_client(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn1 = _Connection({'name': PATH}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'name': PATH}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = self._makeOne(TOPIC_NAME, client=CLIENT1) + topic.create(client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'PUT') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_exists_miss_w_bound_client(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection() + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + self.assertFalse(topic.exists()) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_exists_hit_w_alternate_client(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn1 = _Connection({'name': PATH}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'name': PATH}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = self._makeOne(TOPIC_NAME, client=CLIENT1) + self.assertTrue(topic.exists(client=CLIENT2)) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_publish_single_bytes_wo_attrs_w_bound_client(self): + import base64 + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, + 'attributes': {}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'messageIds': [MSGID]}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + msgid = topic.publish(PAYLOAD) + self.assertEqual(msgid, MSGID) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE]}) + + def test_publish_single_bytes_wo_attrs_w_add_timestamp_alt_client(self): + import base64 + import datetime + from gcloud.pubsub import topic as MUT + from gcloud._helpers import _RFC3339_MICROS + from gcloud._testing import _Monkey + NOW = datetime.datetime.utcnow() + + def _utcnow(): + return NOW + + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, + 'attributes': {'timestamp': NOW.strftime(_RFC3339_MICROS)}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn1 = _Connection({'messageIds': [MSGID]}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'messageIds': [MSGID]}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + + topic = self._makeOne(TOPIC_NAME, client=CLIENT1, + timestamp_messages=True) + with _Monkey(MUT, _NOW=_utcnow): + msgid = topic.publish(PAYLOAD, client=CLIENT2) + + self.assertEqual(msgid, MSGID) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE]}) + + def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self): + import base64 + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + OVERRIDE = '2015-04-10T16:46:22.868399Z' + MESSAGE = {'data': B64, + 'attributes': {'timestamp': OVERRIDE}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'messageIds': [MSGID]}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT, + timestamp_messages=True) + msgid = topic.publish(PAYLOAD, timestamp=OVERRIDE) + self.assertEqual(msgid, MSGID) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE]}) + + def test_publish_single_w_attrs(self): + import base64 + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'messageIds': [MSGID]}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + msgid = topic.publish(PAYLOAD, attr1='value1', attr2='value2') + self.assertEqual(msgid, MSGID) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE]}) + + def test_publish_multiple_w_bound_client(self): + import base64 + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1.decode('ascii'), + 'attributes': {}} + MESSAGE2 = {'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + with topic.batch() as batch: + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + self.assertEqual(list(batch), [MSGID1, MSGID2]) + self.assertEqual(list(batch.messages), []) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + + def test_publish_multiple_w_alternate_client(self): + import base64 + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1.decode('ascii'), + 'attributes': {}} + MESSAGE2 = {'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn1 = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = self._makeOne(TOPIC_NAME, client=CLIENT1) + with topic.batch(client=CLIENT2) as batch: + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + self.assertEqual(list(batch), [MSGID1, MSGID2]) + self.assertEqual(list(batch.messages), []) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + + def test_publish_multiple_error(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT = _Client(project=PROJECT) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + try: + with topic.batch() as batch: + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + raise _Bugout() + except _Bugout: + pass + self.assertEqual(list(batch), []) + self.assertEqual(len(conn._requested), 0) + + def test_delete_w_bound_client(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({}) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + topic.delete() + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'DELETE') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_delete_w_alternate_client(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn1 = _Connection({}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = self._makeOne(TOPIC_NAME, client=CLIENT1) + topic.delete(client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'DELETE') + self.assertEqual(req['path'], '/%s' % PATH) + + +class TestBatch(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.topic import Batch + return Batch + + def _makeOne(self, *args, **kwargs): + return self._getTargetClass()(*args, **kwargs) + + def test_ctor_defaults(self): + topic = _Topic() + CLIENT = _Client(project='PROJECT') + batch = self._makeOne(topic, CLIENT) + self.assertTrue(batch.topic is topic) + self.assertTrue(batch.client is CLIENT) + self.assertEqual(len(batch.messages), 0) + self.assertEqual(len(batch.message_ids), 0) + + def test___iter___empty(self): + topic = _Topic() + client = object() + batch = self._makeOne(topic, client) + self.assertEqual(list(batch), []) + + def test___iter___non_empty(self): + topic = _Topic() + client = object() + batch = self._makeOne(topic, client) + batch.message_ids[:] = ['ONE', 'TWO', 'THREE'] + self.assertEqual(list(batch), ['ONE', 'TWO', 'THREE']) + + def test_publish_bytes_wo_attrs(self): + import base64 + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MESSAGE = {'data': B64, + 'attributes': {}} + connection = _Connection() + CLIENT = _Client(project='PROJECT', connection=connection) + topic = _Topic() + batch = self._makeOne(topic, client=CLIENT) + batch.publish(PAYLOAD) + self.assertEqual(len(connection._requested), 0) + self.assertEqual(batch.messages, [MESSAGE]) + + def test_publish_bytes_w_add_timestamp(self): + import base64 + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MESSAGE = {'data': B64, + 'attributes': {'timestamp': 'TIMESTAMP'}} + connection = _Connection() + CLIENT = _Client(project='PROJECT', connection=connection) + topic = _Topic(timestamp_messages=True) + batch = self._makeOne(topic, client=CLIENT) + batch.publish(PAYLOAD) + self.assertEqual(len(connection._requested), 0) + self.assertEqual(batch.messages, [MESSAGE]) + + def test_commit_w_bound_client(self): + import base64 + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1.decode('ascii'), + 'attributes': {}} + MESSAGE2 = {'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT = _Client(project='PROJECT', connection=conn) + topic = _Topic() + batch = self._makeOne(topic, client=CLIENT) + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + batch.commit() + self.assertEqual(list(batch), [MSGID1, MSGID2]) + self.assertEqual(list(batch.messages), []) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '%s:publish' % topic.path) + self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + + def test_commit_w_alternate_client(self): + import base64 + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1.decode('ascii'), + 'attributes': {}} + MESSAGE2 = {'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + conn1 = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT1 = _Client(project='PROJECT', connection=conn1) + conn2 = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT2 = _Client(project='PROJECT', connection=conn2) + topic = _Topic() + batch = self._makeOne(topic, client=CLIENT1) + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + batch.commit(client=CLIENT2) + self.assertEqual(list(batch), [MSGID1, MSGID2]) + self.assertEqual(list(batch.messages), []) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '%s:publish' % topic.path) + self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + + def test_context_mgr_success(self): + import base64 + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1.decode('ascii'), + 'attributes': {}} + MESSAGE2 = {'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT = _Client(project='PROJECT', connection=conn) + topic = _Topic() + batch = self._makeOne(topic, client=CLIENT) + + with batch as other: + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + + self.assertTrue(other is batch) + self.assertEqual(list(batch), [MSGID1, MSGID2]) + self.assertEqual(list(batch.messages), []) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '%s:publish' % topic.path) + self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) + + def test_context_mgr_failure(self): + import base64 + PAYLOAD1 = b'This is the first message text' + PAYLOAD2 = b'This is the second message text' + B64_1 = base64.b64encode(PAYLOAD1) + B64_2 = base64.b64encode(PAYLOAD2) + MSGID1 = 'DEADBEEF' + MSGID2 = 'BEADCAFE' + MESSAGE1 = {'data': B64_1.decode('ascii'), + 'attributes': {}} + MESSAGE2 = {'data': B64_2.decode('ascii'), + 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} + conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT = _Client(project='PROJECT', connection=conn) + topic = _Topic() + batch = self._makeOne(topic, client=CLIENT) + + try: + with batch as other: + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + raise _Bugout() + except _Bugout: + pass + + self.assertTrue(other is batch) + self.assertEqual(list(batch), []) + self.assertEqual(list(batch.messages), [MESSAGE1, MESSAGE2]) + self.assertEqual(len(conn._requested), 0) + + +class _Connection(object): + + def __init__(self, *responses): + self._responses = responses + self._requested = [] + + def api_request(self, **kw): + from gcloud.exceptions import NotFound + self._requested.append(kw) + + try: + response, self._responses = self._responses[0], self._responses[1:] + except: + raise NotFound('miss') + else: + return response + + +class _Topic(object): + + def __init__(self, name="NAME", project="PROJECT", + timestamp_messages=False): + self.path = '/projects/%s/topics/%s' % (project, name) + self.timestamp_messages = timestamp_messages + + def _timestamp_message(self, attrs): + if self.timestamp_messages: + attrs['timestamp'] = 'TIMESTAMP' + + +class _Client(object): + + def __init__(self, project, connection=None): + self.project = project + self.connection = connection + + +class _Bugout(Exception): + pass diff --git a/gcloud/search/topic.py b/gcloud/search/topic.py new file mode 100644 index 000000000000..9a0af105747d --- /dev/null +++ b/gcloud/search/topic.py @@ -0,0 +1,255 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Define API Topics.""" + +import base64 +import datetime + +from gcloud._helpers import _RFC3339_MICROS +from gcloud.exceptions import NotFound + +_NOW = datetime.datetime.utcnow + + +class Topic(object): + """Topics are targets to which messages can be published. + + Subscribers then receive those messages. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics + + :type name: string + :param name: the name of the topic + + :type client: :class:`gcloud.pubsub.client.Client` + :param client: A client which holds credentials and project configuration + for the topic (which requires a project). + + :type timestamp_messages: boolean + :param timestamp_messages: If true, the topic will add a ``timestamp`` key + to the attributes of each published message: + the value will be an RFC 3339 timestamp. + """ + def __init__(self, name, client, timestamp_messages=False): + self.name = name + self._client = client + self.timestamp_messages = timestamp_messages + + @classmethod + def from_api_repr(cls, resource, client): + """Factory: construct a topic given its API representation + + :type resource: dict + :param resource: topic resource representation returned from the API + + :type client: :class:`gcloud.pubsub.client.Client` + :param client: Client which holds credentials and project + configuration for the topic. + + :rtype: :class:`gcloud.pubsub.topic.Topic` + :returns: Topic parsed from ``resource``. + :raises: :class:`ValueError` if ``client`` is not ``None`` and the + project from the resource does not agree with the project + from the client. + """ + _, project, _, name = resource['name'].split('/') + if client.project != project: + raise ValueError('Project from clientshould agree with ' + 'project from resource.') + return cls(name, client=client) + + @property + def project(self): + """Project bound to the topic.""" + return self._client.project + + @property + def full_name(self): + """Fully-qualified name used in topic / subscription APIs""" + return 'projects/%s/topics/%s' % (self.project, self.name) + + @property + def path(self): + """URL path for the topic's APIs""" + return '/%s' % (self.full_name) + + def _require_client(self, client): + """Check client or verify over-ride. + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. + + :rtype: :class:`gcloud.pubsub.client.Client` + :returns: The client passed in or the currently bound client. + """ + if client is None: + client = self._client + return client + + def create(self, client=None): + """API call: create the topic via a PUT request + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/create + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. + """ + client = self._require_client(client) + client.connection.api_request(method='PUT', path=self.path) + + def exists(self, client=None): + """API call: test for the existence of the topic via a GET request + + See + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/get + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. + """ + client = self._require_client(client) + + try: + client.connection.api_request(method='GET', path=self.path) + except NotFound: + return False + else: + return True + + def _timestamp_message(self, attrs): + """Add a timestamp to ``attrs``, if the topic is so configured. + + If ``attrs`` already has the key, do nothing. + + Helper method for ``publish``/``Batch.publish``. + """ + if self.timestamp_messages and 'timestamp' not in attrs: + attrs['timestamp'] = _NOW().strftime(_RFC3339_MICROS) + + def publish(self, message, client=None, **attrs): + """API call: publish a message to a topic via a POST request + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/publish + + :type message: bytes + :param message: the message payload + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. + + :type attrs: dict (string -> string) + :message attrs: key-value pairs to send as message attributes + + :rtype: str + :returns: message ID assigned by the server to the published message + """ + client = self._require_client(client) + + self._timestamp_message(attrs) + message_b = base64.b64encode(message).decode('ascii') + message_data = {'data': message_b, 'attributes': attrs} + data = {'messages': [message_data]} + response = client.connection.api_request( + method='POST', path='%s:publish' % (self.path,), data=data) + return response['messageIds'][0] + + def batch(self, client=None): + """Return a batch to use as a context manager. + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. + + :rtype: :class:`Batch` + :returns: A batch to use as a context manager. + """ + client = self._require_client(client) + return Batch(self, client) + + def delete(self, client=None): + """API call: delete the topic via a DELETE request + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/delete + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. + """ + client = self._require_client(client) + client.connection.api_request(method='DELETE', path=self.path) + + +class Batch(object): + """Context manager: collect messages to publish via a single API call. + + Helper returned by :meth:Topic.batch + + :type topic: :class:`gcloud.pubsub.topic.Topic` + :param topic: the topic being published + + :type client: :class:`gcloud.pubsub.client.Client` + :param client: The client to use. + """ + def __init__(self, topic, client): + self.topic = topic + self.messages = [] + self.message_ids = [] + self.client = client + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is None: + self.commit() + + def __iter__(self): + return iter(self.message_ids) + + def publish(self, message, **attrs): + """Emulate publishing a message, but save it. + + :type message: bytes + :param message: the message payload + + :type attrs: dict (string -> string) + :message attrs: key-value pairs to send as message attributes + """ + self.topic._timestamp_message(attrs) + self.messages.append( + {'data': base64.b64encode(message).decode('ascii'), + 'attributes': attrs}) + + def commit(self, client=None): + """Send saved messages as a single API call. + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current batch. + """ + if client is None: + client = self.client + response = client.connection.api_request( + method='POST', path='%s:publish' % self.topic.path, + data={'messages': self.messages[:]}) + self.message_ids.extend(response['messageIds']) + del self.messages[:]