From 88f0dd7c8f181f2e1c0a38cf5a2933c1f5286fd3 Mon Sep 17 00:00:00 2001
From: wanghao <sxmatch1986@gmail.com>
Date: Tue, 15 Jan 2019 16:57:49 +0800
Subject: [PATCH] Introduce the Topic resource into Zaqar-1

This patch will Introduce the Topic resource into Zaqar
and implement the storage in mongodb.

Change-Id: I6d37c93aa75d7df78d3939044b0e8fefa5d9a5f5
Implements: bp introduce-topic-resource-for-notification
---
 ...oduce-topic-resource-9b40674cac06bdc2.yaml |   8 +
 setup.cfg                                     |   9 +
 zaqar/common/policies/__init__.py             |   4 +-
 zaqar/common/policies/topics.py               | 101 ++
 zaqar/common/transport/wsgi/helpers.py        |  41 +
 zaqar/conf/storage.py                         |  10 +-
 zaqar/storage/__init__.py                     |   2 +
 zaqar/storage/base.py                         | 121 +++
 zaqar/storage/errors.py                       |  16 +
 zaqar/storage/mongodb/controllers.py          |   2 +
 zaqar/storage/mongodb/driver.py               |  21 +
 zaqar/storage/mongodb/topic_messages.py       | 976 ++++++++++++++++++
 zaqar/storage/mongodb/topics.py               | 279 +++++
 zaqar/storage/mongodb/utils.py                |   7 +-
 zaqar/storage/pipeline.py                     |   8 +
 zaqar/storage/pooling.py                      | 131 +++
 zaqar/storage/redis/driver.py                 |   4 +
 zaqar/storage/redis/messages.py               |  50 +
 zaqar/storage/sqlalchemy/driver.py            |   4 +
 zaqar/storage/swift/messages.py               |  86 ++
 zaqar/tests/faulty_storage.py                 |  37 +
 .../wsgi/v2_0/test_topic_lifecycle.py         | 608 +++++++++++
 zaqar/transport/validation.py                 |  24 +
 zaqar/transport/wsgi/driver.py                |   9 +-
 zaqar/transport/wsgi/v2_0/__init__.py         |  40 +
 zaqar/transport/wsgi/v2_0/topic.py            | 333 ++++++
 zaqar/transport/wsgi/v2_0/topic_purge.py      |  82 ++
 zaqar/transport/wsgi/v2_0/topic_stats.py      |  78 ++
 28 files changed, 3086 insertions(+), 5 deletions(-)
 create mode 100644 releasenotes/notes/introduce-topic-resource-9b40674cac06bdc2.yaml
 create mode 100644 zaqar/common/policies/topics.py
 create mode 100644 zaqar/storage/mongodb/topic_messages.py
 create mode 100644 zaqar/storage/mongodb/topics.py
 create mode 100644 zaqar/tests/unit/transport/wsgi/v2_0/test_topic_lifecycle.py
 create mode 100644 zaqar/transport/wsgi/v2_0/topic.py
 create mode 100644 zaqar/transport/wsgi/v2_0/topic_purge.py
 create mode 100644 zaqar/transport/wsgi/v2_0/topic_stats.py

diff --git a/releasenotes/notes/introduce-topic-resource-9b40674cac06bdc2.yaml b/releasenotes/notes/introduce-topic-resource-9b40674cac06bdc2.yaml
new file mode 100644
index 000000000..c10c4ccde
--- /dev/null
+++ b/releasenotes/notes/introduce-topic-resource-9b40674cac06bdc2.yaml
@@ -0,0 +1,8 @@
+---
+features:
+  - Introduce a new resource called Topic into Zaqar.
+    Topic is a concept from AWS Simple Notification Service (SNS), it will has
+    relevance with subscriptions. User can send message to a topic, and then
+    the subscribers will get the message according to different protocols,
+    like http, email, sms, etc. This feature will help Zaqar to split
+    Messaging Queue Service and Notification Service clearly.
diff --git a/setup.cfg b/setup.cfg
index a0681b267..98a79eb15 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -61,6 +61,15 @@ zaqar.storage.redis.driver.queue.stages =
 zaqar.storage.swift.driver.queue.stages =
     message_queue_handler = zaqar.storage.swift.messages:MessageQueueHandler
 
+zaqar.storage.mongodb.driver.topic.stages =
+    message_queue_handler = zaqar.storage.mongodb.topic_messages:MessageTopicHandler
+
+zaqar.storage.redis.driver.topic.stages =
+    message_queue_handler = zaqar.storage.redis.messages:MessageTopicHandler
+
+zaqar.storage.swift.driver.topic.stages =
+    message_queue_handler = zaqar.storage.swift.messages:MessageTopicHandler
+
 zaqar.notification.tasks =
     http = zaqar.notification.tasks.webhook:WebhookTask
     https = zaqar.notification.tasks.webhook:WebhookTask
diff --git a/zaqar/common/policies/__init__.py b/zaqar/common/policies/__init__.py
index b13e8829f..58c9e1fa3 100644
--- a/zaqar/common/policies/__init__.py
+++ b/zaqar/common/policies/__init__.py
@@ -20,6 +20,7 @@ from zaqar.common.policies import messages
 from zaqar.common.policies import pools
 from zaqar.common.policies import queues
 from zaqar.common.policies import subscription
+from zaqar.common.policies import topics
 
 
 def list_rules():
@@ -31,5 +32,6 @@ def list_rules():
         messages.list_rules(),
         pools.list_rules(),
         queues.list_rules(),
-        subscription.list_rules()
+        subscription.list_rules(),
+        topics.list_rules(),
     )
diff --git a/zaqar/common/policies/topics.py b/zaqar/common/policies/topics.py
new file mode 100644
index 000000000..171bbf729
--- /dev/null
+++ b/zaqar/common/policies/topics.py
@@ -0,0 +1,101 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_policy import policy
+
+from zaqar.common.policies import base
+
+TOPICS = 'topics:%s'
+
+rules = [
+    policy.DocumentedRuleDefault(
+        name=TOPICS % 'get_all',
+        check_str=base.UNPROTECTED,
+        description='List all topics.',
+        operations=[
+            {
+                'path': '/v2/topics',
+                'method': 'GET'
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        name=TOPICS % 'create',
+        check_str=base.UNPROTECTED,
+        description='Create a topic.',
+        operations=[
+            {
+                'path': '/v2/topics/{topic_name}',
+                'method': 'PUT'
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        name=TOPICS % 'get',
+        check_str=base.UNPROTECTED,
+        description='Get details about a specific topic.',
+        operations=[
+            {
+                'path': '/v2/topics/{topic_name}',
+                'method': 'GET'
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        name=TOPICS % 'delete',
+        check_str=base.UNPROTECTED,
+        description='Delete a topic.',
+        operations=[
+            {
+                'path': '/v2/topics/{topic_name}',
+                'method': 'DELETE'
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        name=TOPICS % 'update',
+        check_str=base.UNPROTECTED,
+        description='Update a topic.',
+        operations=[
+            {
+                'path': '/v2/topics/{topic_name}',
+                'method': 'PATCH'
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        name=TOPICS % 'stats',
+        check_str=base.UNPROTECTED,
+        description='Get statistics about a specific topic.',
+        operations=[
+            {
+                'path': '/v2/topics/{topic_name}/stats',
+                'method': 'GET'
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        name=TOPICS % 'purge',
+        check_str=base.UNPROTECTED,
+        description='Purge resources from a particular topic.',
+        operations=[
+            {
+                'path': '/v2/topic/{topic_name}/purge',
+                'method': 'POST'
+            }
+        ]
+    )
+]
+
+
+def list_rules():
+    return rules
diff --git a/zaqar/common/transport/wsgi/helpers.py b/zaqar/common/transport/wsgi/helpers.py
index 9e9a2ecce..5d4448d42 100644
--- a/zaqar/common/transport/wsgi/helpers.py
+++ b/zaqar/common/transport/wsgi/helpers.py
@@ -277,3 +277,44 @@ def inject_context(req, resp, params):
                                   project_domain_id=project_domain_id,
                                   user_domain_id=user_domain_id)
     req.env['zaqar.context'] = ctxt
+
+
+def validate_topic_identification(validate, req, resp, params):
+    """Hook for validating the topic name and project id in requests.
+
+    The queue name validation is short-circuited if 'topic_name' does
+    not exist in `params`.
+
+    This hook depends on the `get_project` hook, which must be
+    installed upstream.
+
+
+    :param validate: A validator function that will
+        be used to check the topic name against configured
+        limits. functools.partial or a closure must be used to
+        set this first arg, and expose the remaining ones as
+        a Falcon hook interface.
+    :param req: Falcon request object
+    :param resp: Falcon response object
+    :param params: Responder params dict
+    """
+
+    try:
+        validate(params['topic_name'],
+                 params['project_id'])
+    except KeyError:
+        # NOTE(kgriffs): topic not in params, so nothing to do
+        pass
+    except validation.ValidationFailed:
+        project = params['project_id']
+        queue = params['topic_name']
+        if six.PY2:
+            queue = queue.decode('utf-8', 'replace')
+
+        LOG.debug(u'Invalid topic name "%(topic)s" submitted for '
+                  u'project: %(project)s',
+                  {'topic': queue, 'project': project})
+
+        raise falcon.HTTPBadRequest(_(u'Invalid topic identification'),
+                                    _(u'The format of the submitted topic '
+                                      u'name or project id is not valid.'))
diff --git a/zaqar/conf/storage.py b/zaqar/conf/storage.py
index 2779e3c83..2db74a2c0 100644
--- a/zaqar/conf/storage.py
+++ b/zaqar/conf/storage.py
@@ -44,12 +44,20 @@ subscription_pipeline = cfg.ListOpt(
            'controller methods.'))
 
 
+topic_pipeline = cfg.ListOpt(
+    'topic_pipeline', default=[],
+    help=_('Pipeline to use for processing topic operations. This '
+           'pipeline will be consumed before calling the storage driver\'s '
+           'controller methods.'))
+
+
 GROUP_NAME = 'storage'
 ALL_OPTS = [
     queue_pipeline,
     message_pipeline,
     claim_pipeline,
-    subscription_pipeline
+    subscription_pipeline,
+    topic_pipeline
 ]
 
 
diff --git a/zaqar/storage/__init__.py b/zaqar/storage/__init__.py
index 795647b1d..e663df51d 100644
--- a/zaqar/storage/__init__.py
+++ b/zaqar/storage/__init__.py
@@ -27,10 +27,12 @@ Queue = base.Queue
 Subscription = base.Subscription
 PoolsBase = base.PoolsBase
 FlavorsBase = base.FlavorsBase
+Topic = base.Topic
 
 DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE
 DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE
 DEFAULT_POOLS_PER_PAGE = base.DEFAULT_POOLS_PER_PAGE
 DEFAULT_SUBSCRIPTIONS_PER_PAGE = base.DEFAULT_SUBSCRIPTIONS_PER_PAGE
+DEFAULT_TOPICS_PER_PAGE = base.DEFAULT_TOPICS_PER_PAGE
 
 DEFAULT_MESSAGES_PER_CLAIM = base.DEFAULT_MESSAGES_PER_CLAIM
diff --git a/zaqar/storage/base.py b/zaqar/storage/base.py
index 82cceffc9..be025262a 100644
--- a/zaqar/storage/base.py
+++ b/zaqar/storage/base.py
@@ -35,6 +35,7 @@ DEFAULT_QUEUES_PER_PAGE = 10
 DEFAULT_MESSAGES_PER_PAGE = 10
 DEFAULT_POOLS_PER_PAGE = 10
 DEFAULT_SUBSCRIPTIONS_PER_PAGE = 10
+DEFAULT_TOPICS_PER_PAGE = 10
 
 DEFAULT_MESSAGES_PER_CLAIM = 10
 
@@ -242,6 +243,11 @@ class DataDriverBase(DriverBase):
         """Returns the driver's subscription controller."""
         raise NotImplementedError
 
+    @decorators.lazy_property(write=False)
+    def topic_controller(self):
+        """Returns the driver's topic controller."""
+        return self.control_driver.topic_controller
+
 
 @six.add_metaclass(abc.ABCMeta)
 class ControlDriverBase(DriverBase):
@@ -281,6 +287,11 @@ class ControlDriverBase(DriverBase):
         """Returns the driver's queue controller."""
         raise NotImplementedError
 
+    @abc.abstractproperty
+    def topic_controller(self):
+        """Returns the driver's topic controller."""
+        raise NotImplementedError
+
     @abc.abstractmethod
     def close(self):
         """Close connections to the backend."""
@@ -1096,3 +1107,113 @@ class FlavorsBase(ControllerBase):
         """Deletes all flavors from storage."""
 
         raise NotImplementedError
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Topic(ControllerBase):
+    """This class is responsible for managing topics.
+
+    Topic operations include CRUD, etc.
+
+    Storage driver implementations of this class should
+    be capable of handling high workloads and huge
+    numbers of topics.
+    """
+
+    def list(self, project=None, kfilter={}, marker=None,
+             limit=DEFAULT_TOPICS_PER_PAGE, detailed=False, name=None):
+        """Base method for listing topics.
+
+        :param project: Project id
+        :param kfilter: The key-value of metadata which user want to filter
+        :param marker: The last topic name
+        :param limit: (Default 10) Max number of topics to return
+        :param detailed: Whether metadata is included
+        :param name: The topic name which user want to filter
+
+        :returns: An iterator giving a sequence of topics
+            and the marker of the next page.
+        """
+        return self._list(project, kfilter, marker, limit, detailed, name)
+
+    _list = abc.abstractmethod(lambda x: None)
+
+    def get(self, name, project=None):
+        """Base method for topic metadata retrieval.
+
+        :param name: The topic name
+        :param project: Project id
+
+        :returns: Dictionary containing topic metadata
+        :raises DoesNotExist: if topic metadata does not exist
+        """
+        return self._get(name, project)
+
+    _get = abc.abstractmethod(lambda x: None)
+
+    def get_metadata(self, name, project=None):
+        """Base method for topic metadata retrieval.
+
+        :param name: The topic name
+        :param project: Project id
+
+        :returns: Dictionary containing topic metadata
+        :raises DoesNotExist: if topic metadata does not exist
+        """
+        raise NotImplementedError
+
+    def set_metadata(self, name, metadata, project=None):
+        """Base method for updating a topic metadata.
+
+        :param name: The topic name
+        :param metadata: Topic metadata as a dict
+        :param project: Project id
+        :raises DoesNotExist: if topic metadata can not be updated
+        """
+        raise NotImplementedError
+
+    def create(self, name, metadata=None, project=None):
+        """Base method for topic creation.
+
+        :param name: The topic name
+        :param project: Project id
+        :returns: True if a topic was created and False
+            if it was updated.
+        """
+        return self._create(name, metadata, project)
+
+    _create = abc.abstractmethod(lambda x: None)
+
+    def exists(self, name, project=None):
+        """Base method for testing topic existence.
+
+        :param name: The topic name
+        :param project: Project id
+        :returns: True if a topic exists and False
+            if it does not.
+        """
+        return self._exists(name, project)
+
+    _exists = abc.abstractmethod(lambda x: None)
+
+    def delete(self, name, project=None):
+        """Base method for deleting a topic.
+
+        :param name: The topic name
+        :param project: Project id
+        """
+        return self._delete(name, project)
+
+    _delete = abc.abstractmethod(lambda x: None)
+
+    def stats(self, name, project=None):
+        """Base method for topic stats.
+
+        :param name: The topic name
+        :param project: Project id
+        :returns: Dictionary with the
+            queue stats
+        """
+        return self._stats(name, project)
+
+    _stats = abc.abstractmethod(lambda x: None)
diff --git a/zaqar/storage/errors.py b/zaqar/storage/errors.py
index 9ac908224..d35c8a282 100644
--- a/zaqar/storage/errors.py
+++ b/zaqar/storage/errors.py
@@ -225,3 +225,19 @@ class SubscriptionAlreadyExists(Conflict):
 
     msg_format = (u'Such subscription already exists. Subscriptions '
                   u'are unique by project + queue + subscriber URI.')
+
+
+class TopicDoesNotExist(DoesNotExist):
+
+    msg_format = u'Topic {name} does not exist for project {project}'
+
+    def __init__(self, name, project):
+        super(TopicDoesNotExist, self).__init__(name=name, project=project)
+
+
+class TopicIsEmpty(ExceptionBase):
+
+    msg_format = u'Topic {name} in project {project} is empty'
+
+    def __init__(self, name, project):
+        super(TopicIsEmpty, self).__init__(name=name, project=project)
diff --git a/zaqar/storage/mongodb/controllers.py b/zaqar/storage/mongodb/controllers.py
index 7706f1538..90021a585 100644
--- a/zaqar/storage/mongodb/controllers.py
+++ b/zaqar/storage/mongodb/controllers.py
@@ -29,6 +29,7 @@ from zaqar.storage.mongodb import messages
 from zaqar.storage.mongodb import pools
 from zaqar.storage.mongodb import queues
 from zaqar.storage.mongodb import subscriptions
+from zaqar.storage.mongodb import topics
 
 
 CatalogueController = catalogue.CatalogueController
@@ -39,3 +40,4 @@ FIFOMessageController = messages.FIFOMessageController
 QueueController = queues.QueueController
 PoolsController = pools.PoolsController
 SubscriptionController = subscriptions.SubscriptionController
+TopicController = topics.TopicController
diff --git a/zaqar/storage/mongodb/driver.py b/zaqar/storage/mongodb/driver.py
index 2e84c4afa..f1a677734 100644
--- a/zaqar/storage/mongodb/driver.py
+++ b/zaqar/storage/mongodb/driver.py
@@ -271,6 +271,17 @@ class ControlDriver(storage.ControlDriverBase):
         name = self.mongodb_conf.database + '_queues'
         return self.connection[name]
 
+    @decorators.lazy_property(write=False)
+    def topics_database(self):
+        """Database dedicated to the "topics" collection.
+
+        The topics collection is separated out into its own database
+        to avoid writer lock contention with the messages collections.
+        """
+
+        name = self.mongodb_conf.database + '_topics'
+        return self.connection[name]
+
     @decorators.lazy_property(write=False)
     def queue_controller(self):
         controller = controllers.QueueController(self)
@@ -308,3 +319,13 @@ class ControlDriver(storage.ControlDriverBase):
             return profiler.trace_cls("mongodb_flavors_controller")(controller)
         else:
             return controller
+
+    @decorators.lazy_property(write=False)
+    def topic_controller(self):
+        controller = controllers.TopicController(self)
+        if (self.conf.profiler.enabled and
+                (self.conf.profiler.trace_message_store or
+                    self.conf.profiler.trace_management_store)):
+            return profiler.trace_cls("mongodb_topics_controller")(controller)
+        else:
+            return controller
diff --git a/zaqar/storage/mongodb/topic_messages.py b/zaqar/storage/mongodb/topic_messages.py
new file mode 100644
index 000000000..55bd2cc23
--- /dev/null
+++ b/zaqar/storage/mongodb/topic_messages.py
@@ -0,0 +1,976 @@
+# Copyright (c) 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Implements MongoDB the storage controller for messages.
+
+Field Mappings:
+    In order to reduce the disk / memory space used,
+    field names will be, most of the time, the first
+    letter of their long name.
+"""
+
+import datetime
+import time
+
+from bson import errors as bsonerror
+from bson import objectid
+from oslo_log import log as logging
+from oslo_utils import timeutils
+import pymongo.errors
+import pymongo.read_preferences
+
+from zaqar.i18n import _
+from zaqar import storage
+from zaqar.storage import errors
+from zaqar.storage.mongodb import utils
+from zaqar.storage import utils as s_utils
+
+
+LOG = logging.getLogger(__name__)
+
+# NOTE(kgriffs): This value, in seconds, should be at least less than the
+# minimum allowed TTL for messages (60 seconds). Make it 45 to allow for
+# some fudge room.
+MAX_RETRY_POST_DURATION = 45
+
+# NOTE(kgriffs): It is extremely unlikely that all workers would somehow hang
+# for more than 5 seconds, without a single one being able to succeed in
+# posting some messages and incrementing the counter, thus allowing the other
+# producers to succeed in turn.
+COUNTER_STALL_WINDOW = 5
+
+# For hinting
+ID_INDEX_FIELDS = [('_id', 1)]
+
+# For removing expired messages
+TTL_INDEX_FIELDS = [
+    ('e', 1),
+]
+
+# to unify use of project/topic across mongodb
+# storage impls.
+PROJ_TOPIC = utils.PROJ_TOPIC_KEY
+
+# NOTE(kgriffs): This index is for listing messages, usually
+# filtering out claimed ones.
+ACTIVE_INDEX_FIELDS = [
+    (PROJ_TOPIC, 1),  # Project will be unique, so put first
+    ('k', 1),  # Used for sorting and paging, must come before range queries
+]
+
+# For counting
+COUNTING_INDEX_FIELDS = [
+    (PROJ_TOPIC, 1),  # Project will be unique, so put first
+]
+
+# This index is meant to be used as a shard-key and to ensure
+# uniqueness for markers.
+#
+# As for other compound indexes, order matters. The marker `k`
+# gives enough cardinality to ensure chunks are evenly distributed,
+# whereas the `p_q` field helps keeping chunks from the same project
+# and queue together.
+#
+# In a sharded environment, uniqueness of this index is still guaranteed
+# because it's used as a shard key.
+MARKER_INDEX_FIELDS = [
+    ('k', 1),
+    (PROJ_TOPIC, 1),
+]
+
+TRANSACTION_INDEX_FIELDS = [
+    ('tx', 1),
+]
+
+
+class MessageController(storage.Message):
+    """Implements message resource operations using MongoDB.
+
+    Messages are scoped by project + topic.
+
+    ::
+
+        Messages:
+            Name                Field
+            -------------------------
+            scope            ->   p_t
+            ttl              ->     t
+            expires          ->     e
+            marker           ->     k
+            body             ->     b
+            client uuid      ->     u
+            transaction      ->    tx
+            delay            ->     d
+            checksum         ->    cs
+    """
+
+    def __init__(self, *args, **kwargs):
+        super(MessageController, self).__init__(*args, **kwargs)
+
+        # Cache for convenience and performance
+        self._num_partitions = self.driver.mongodb_conf.partitions
+        self._topic_ctrl = self.driver.topic_controller
+        self._retry_range = range(self.driver.mongodb_conf.max_attempts)
+
+        # Create a list of 'messages' collections, one for each database
+        # partition, ordered by partition number.
+        #
+        # NOTE(kgriffs): Order matters, since it is used to lookup the
+        # collection by partition number. For example, self._collections[2]
+        # would provide access to zaqar_p2.messages (partition numbers are
+        # zero-based).
+        self._collections = [db.messages
+                             for db in self.driver.message_databases]
+
+        # Ensure indexes are initialized before any queries are performed
+        for collection in self._collections:
+            self._ensure_indexes(collection)
+
+    # ----------------------------------------------------------------------
+    # Helpers
+    # ----------------------------------------------------------------------
+
+    def _ensure_indexes(self, collection):
+        """Ensures that all indexes are created."""
+
+        collection.ensure_index(TTL_INDEX_FIELDS,
+                                name='ttl',
+                                expireAfterSeconds=0,
+                                background=True)
+
+        collection.ensure_index(ACTIVE_INDEX_FIELDS,
+                                name='active',
+                                background=True)
+
+        collection.ensure_index(COUNTING_INDEX_FIELDS,
+                                name='counting',
+                                background=True)
+
+        collection.ensure_index(MARKER_INDEX_FIELDS,
+                                name='queue_marker',
+                                background=True)
+
+        collection.ensure_index(TRANSACTION_INDEX_FIELDS,
+                                name='transaction',
+                                background=True)
+
+    def _collection(self, topic_name, project=None):
+        """Get a partitioned collection instance."""
+        return self._collections[utils.get_partition(self._num_partitions,
+                                                     topic_name, project)]
+
+    def _backoff_sleep(self, attempt):
+        """Sleep between retries using a jitter algorithm.
+
+        Mitigates thrashing between multiple parallel requests, and
+        creates backpressure on clients to slow down the rate
+        at which they submit requests.
+
+        :param attempt: current attempt number, zero-based
+        """
+        conf = self.driver.mongodb_conf
+        seconds = utils.calculate_backoff(attempt, conf.max_attempts,
+                                          conf.max_retry_sleep,
+                                          conf.max_retry_jitter)
+
+        time.sleep(seconds)
+
+    def _purge_topic(self, topic_name, project=None):
+        """Removes all messages from the queue.
+
+        Warning: Only use this when deleting the queue; otherwise
+        you can cause a side-effect of reseting the marker counter
+        which can cause clients to miss tons of messages.
+
+        If the queue does not exist, this method fails silently.
+
+        :param topic_name: name of the queue to purge
+        :param project: ID of the project to which the queue belongs
+        """
+        scope = utils.scope_queue_name(topic_name, project)
+        collection = self._collection(topic_name, project)
+        collection.delete_many({PROJ_TOPIC: scope})
+
+    def _list(self, topic_name, project=None, marker=None,
+              echo=False, client_uuid=None, projection=None,
+              include_claimed=False, include_delayed=False,
+              sort=1, limit=None):
+        """Message document listing helper.
+
+        :param topic_name: Name of the topic to list
+        :param project: (Default None) Project `topic_name` belongs to. If
+            not specified, queries the "global" namespace/project.
+        :param marker: (Default None) Message marker from which to start
+            iterating. If not specified, starts with the first message
+            available in the topic.
+        :param echo: (Default False) Whether to return messages that match
+            client_uuid
+        :param client_uuid: (Default None) UUID for the client that
+            originated this request
+        :param projection: (Default None) a list of field names that should be
+            returned in the result set or a dict specifying the fields to
+            include or exclude
+        :param include_claimed: (Default False) Whether to include
+            claimed messages, not just active ones
+        :param include_delayed: (Default False) Whether to include
+            delayed messages, not just active ones
+        :param sort: (Default 1) Sort order for the listing. Pass 1 for
+            ascending (oldest message first), or -1 for descending (newest
+            message first).
+        :param limit: (Default None) The maximum number of messages
+            to list. The results may include fewer messages than the
+            requested `limit` if not enough are available. If limit is
+            not specified
+
+        :returns: Generator yielding up to `limit` messages.
+        """
+
+        if sort not in (1, -1):
+            raise ValueError(u'sort must be either 1 (ascending) '
+                             u'or -1 (descending)')
+
+        now = timeutils.utcnow_ts()
+
+        query = {
+            # Messages must belong to this topic and project.
+            PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
+
+            # NOTE(kgriffs): Messages must be finalized (i.e., must not
+            # be part of an unfinalized transaction).
+            #
+            # See also the note wrt 'tx' within the definition
+            # of ACTIVE_INDEX_FIELDS.
+            'tx': None,
+        }
+
+        if not echo:
+            query['u'] = {'$ne': client_uuid}
+
+        if marker is not None:
+            query['k'] = {'$gt': marker}
+
+        collection = self._collection(topic_name, project)
+
+        if not include_delayed:
+            # NOTE(cdyangzhenyu): Only include messages that are not
+            # part of any delay, or are part of an expired delay. if
+            # the message has no attribute 'd', it will also be obtained.
+            # This is for compatibility with old data.
+            query['$or'] = [{'d': {'$lte': now}},
+                            {'d': {'$exists': False}}]
+
+        # Construct the request
+        cursor = collection.find(query,
+                                 projection=projection,
+                                 sort=[('k', sort)])
+
+        if limit is not None:
+            cursor.limit(limit)
+
+        # NOTE(flaper87): Suggest the index to use for this query to
+        # ensure the most performant one is chosen.
+        return cursor.hint(ACTIVE_INDEX_FIELDS)
+
+    # ----------------------------------------------------------------------
+    # "Friends" interface
+    # ----------------------------------------------------------------------
+
+    def _count(self, topic_name, project=None, include_claimed=False):
+        """Return total number of messages in a topic.
+
+        This method is designed to very quickly count the number
+        of messages in a given topic. Expired messages are not
+        counted, of course. If the queue does not exist, the
+        count will always be 0.
+
+        Note: Some expired messages may be included in the count if
+            they haven't been GC'd yet. This is done for performance.
+        """
+        query = {
+            # Messages must belong to this queue and project.
+            PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
+
+            # NOTE(kgriffs): Messages must be finalized (i.e., must not
+            # be part of an unfinalized transaction).
+            #
+            # See also the note wrt 'tx' within the definition
+            # of ACTIVE_INDEX_FIELDS.
+            'tx': None,
+        }
+
+        collection = self._collection(topic_name, project)
+        return collection.count(filter=query, hint=COUNTING_INDEX_FIELDS)
+
+    def _active(self, topic_name, marker=None, echo=False,
+                client_uuid=None, projection=None, project=None,
+                limit=None, include_delayed=False):
+
+        return self._list(topic_name, project=project, marker=marker,
+                          echo=echo, client_uuid=client_uuid,
+                          projection=projection, include_claimed=False,
+                          include_delayed=include_delayed, limit=limit)
+
+    def _inc_counter(self, topic_name, project=None, amount=1, window=None):
+        """Increments the message counter and returns the new value.
+
+        :param topic_name: Name of the topic to which the counter is scoped
+        :param project: Queue's project name
+        :param amount: (Default 1) Amount by which to increment the counter
+        :param window: (Default None) A time window, in seconds, that
+            must have elapsed since the counter was last updated, in
+            order to increment the counter.
+
+        :returns: Updated message counter value, or None if window
+            was specified, and the counter has already been updated
+            within the specified time period.
+
+        :raises QueueDoesNotExist: if not found
+        """
+
+        # NOTE(flaper87): If this `if` is True, it means we're
+        # using a mongodb in the control plane. To avoid breaking
+        # environments doing so already, we'll keep using the counter
+        # in the mongodb topic_controller rather than the one in the
+        # message_controller. This should go away, eventually
+        if hasattr(self._topic_ctrl, '_inc_counter'):
+            return self._topic_ctrl._inc_counter(topic_name, project,
+                                                 amount, window)
+
+        now = timeutils.utcnow_ts()
+
+        update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
+        query = _get_scoped_query(topic_name, project)
+        if window is not None:
+            threshold = now - window
+            query['c.t'] = {'$lt': threshold}
+
+        while True:
+            try:
+                collection = self._collection(topic_name, project).stats
+                doc = collection.find_one_and_update(
+                    query, update,
+                    return_document=pymongo.ReturnDocument.AFTER,
+                    projection={'c.v': 1, '_id': 0})
+
+                break
+            except pymongo.errors.AutoReconnect as ex:
+                LOG.exception(ex)
+
+        if doc is None:
+            if window is None:
+                # NOTE(kgriffs): Since we did not filter by a time window,
+                # the topic should have been found and updated. Perhaps
+                # the topic has been deleted?
+                message = (u'Failed to increment the message '
+                           u'counter for topic %(name)s and '
+                           u'project %(project)s')
+                message %= dict(name=topic_name, project=project)
+
+                LOG.warning(message)
+
+                raise errors.TopicDoesNotExist(topic_name, project)
+
+            # NOTE(kgriffs): Assume the queue existed, but the counter
+            # was recently updated, causing the range query on 'c.t' to
+            # exclude the record.
+            return None
+
+        return doc['c']['v']
+
+    def _get_counter(self, topic_name, project=None):
+        """Retrieves the current message counter value for a given topic.
+
+        This helper is used to generate monotonic pagination
+        markers that are saved as part of the message
+        document.
+
+        Note 1: Markers are scoped per-queue and so are *not*
+            globally unique or globally ordered.
+
+        Note 2: If two or more requests to this method are made
+            in parallel, this method will return the same counter
+            value. This is done intentionally so that the caller
+            can detect a parallel message post, allowing it to
+            mitigate race conditions between producer and
+            observer clients.
+
+        :param topic_name: Name of the topic to which the counter is scoped
+        :param project: Topic's project
+        :returns: current message counter as an integer
+        """
+
+        # NOTE(flaper87): If this `if` is True, it means we're
+        # using a mongodb in the control plane. To avoid breaking
+        # environments doing so already, we'll keep using the counter
+        # in the mongodb queue_controller rather than the one in the
+        # message_controller. This should go away, eventually
+        if hasattr(self._topic_ctrl, '_get_counter'):
+            return self._topic_ctrl._get_counter(topic_name, project)
+
+        update = {'$inc': {'c.v': 0, 'c.t': 0}}
+        query = _get_scoped_query(topic_name, project)
+
+        try:
+            collection = self._collection(topic_name, project).stats
+            doc = collection.find_one_and_update(
+                query, update, upsert=True,
+                return_document=pymongo.ReturnDocument.AFTER,
+                projection={'c.v': 1, '_id': 0})
+
+            return doc['c']['v']
+        except pymongo.errors.AutoReconnect as ex:
+            LOG.exception(ex)
+
+    # ----------------------------------------------------------------------
+    # Public interface
+    # ----------------------------------------------------------------------
+
+    def list(self, topic_name, project=None, marker=None,
+             limit=storage.DEFAULT_MESSAGES_PER_PAGE,
+             echo=False, client_uuid=None, include_claimed=False,
+             include_delayed=False):
+
+        if marker is not None:
+            try:
+                marker = int(marker)
+            except ValueError:
+                yield iter([])
+
+        messages = self._list(topic_name, project=project, marker=marker,
+                              client_uuid=client_uuid, echo=echo,
+                              include_claimed=include_claimed,
+                              include_delayed=include_delayed, limit=limit)
+
+        marker_id = {}
+
+        now = timeutils.utcnow_ts()
+
+        # NOTE (kgriffs) @utils.raises_conn_error not needed on this
+        # function, since utils.HookedCursor already has it.
+        def denormalizer(msg):
+            marker_id['next'] = msg['k']
+
+            return _basic_message(msg, now)
+
+        yield utils.HookedCursor(messages, denormalizer)
+        yield str(marker_id['next'])
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def first(self, topic_name, project=None, sort=1):
+        cursor = self._list(topic_name, project=project,
+                            include_claimed=True, sort=sort,
+                            limit=1)
+        try:
+            message = next(cursor)
+        except StopIteration:
+            raise errors.TopicIsEmpty(topic_name, project)
+
+        now = timeutils.utcnow_ts()
+        return _basic_message(message, now)
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def get(self, topic_name, message_id, project=None):
+        mid = utils.to_oid(message_id)
+        if mid is None:
+            raise errors.MessageDoesNotExist(message_id, topic_name,
+                                             project)
+
+        now = timeutils.utcnow_ts()
+
+        query = {
+            '_id': mid,
+            PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
+        }
+
+        collection = self._collection(topic_name, project)
+        message = list(collection.find(query).limit(1).hint(ID_INDEX_FIELDS))
+
+        if not message:
+            raise errors.MessageDoesNotExist(message_id, topic_name,
+                                             project)
+
+        return _basic_message(message[0], now)
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def bulk_get(self, topic_name, message_ids, project=None):
+        message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
+        if not message_ids:
+            return iter([])
+
+        now = timeutils.utcnow_ts()
+
+        # Base query, always check expire time
+        query = {
+            '_id': {'$in': message_ids},
+            PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
+        }
+
+        collection = self._collection(topic_name, project)
+
+        # NOTE(flaper87): Should this query
+        # be sorted?
+        messages = collection.find(query).hint(ID_INDEX_FIELDS)
+
+        def denormalizer(msg):
+            return _basic_message(msg, now)
+
+        return utils.HookedCursor(messages, denormalizer)
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def post(self, topic_name, messages, client_uuid, project=None):
+        # NOTE(flaper87): This method should be safe to retry on
+        # autoreconnect, since we've a 2-step insert for messages.
+        # The worst-case scenario is that we'll increase the counter
+        # several times and we'd end up with some non-active messages.
+
+        if not self._topic_ctrl.exists(topic_name, project):
+            raise errors.TopicDoesNotExist(topic_name, project)
+
+        # NOTE(flaper87): Make sure the counter exists. This method
+        # is an upsert.
+        self._get_counter(topic_name, project)
+        now = timeutils.utcnow_ts()
+        now_dt = datetime.datetime.utcfromtimestamp(now)
+        collection = self._collection(topic_name, project)
+
+        messages = list(messages)
+        msgs_n = len(messages)
+        next_marker = self._inc_counter(topic_name,
+                                        project,
+                                        amount=msgs_n) - msgs_n
+
+        prepared_messages = []
+        for index, message in enumerate(messages):
+            msg = {
+                PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
+                't': message['ttl'],
+                'e': now_dt + datetime.timedelta(seconds=message['ttl']),
+                'u': client_uuid,
+                'd': now + message.get('delay', 0),
+                'b': message['body'] if 'body' in message else {},
+                'k': next_marker + index,
+                'tx': None
+                }
+            if self.driver.conf.enable_checksum:
+                msg['cs'] = s_utils.get_checksum(message.get('body', None))
+
+            prepared_messages.append(msg)
+
+        res = collection.insert_many(prepared_messages,
+                                     bypass_document_validation=True)
+
+        return [str(id_) for id_ in res.inserted_ids]
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def delete(self, topic_name, message_id, project=None, claim=None):
+        # NOTE(cpp-cabrera): return early - this is an invalid message
+        # id so we won't be able to find it any way
+        mid = utils.to_oid(message_id)
+        if mid is None:
+            return
+
+        collection = self._collection(topic_name, project)
+
+        query = {
+            '_id': mid,
+            PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
+        }
+
+        cid = utils.to_oid(claim)
+        if cid is None:
+            raise errors.ClaimDoesNotExist(claim, topic_name, project)
+
+        now = timeutils.utcnow_ts()
+        cursor = collection.find(query).hint(ID_INDEX_FIELDS)
+
+        try:
+            message = next(cursor)
+        except StopIteration:
+            return
+
+        if claim is None:
+            if _is_claimed(message, now):
+                raise errors.MessageIsClaimed(message_id)
+
+        else:
+            if message['c']['id'] != cid:
+                kwargs = {}
+                # NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and
+                # `read_preference` is read only. We'd need to set it when the
+                # client is created.
+                # NOTE(kgriffs): Read from primary in case the message
+                # was just barely claimed, and claim hasn't made it to
+                # the secondary.
+                message = collection.find_one(query, **kwargs)
+
+                if message['c']['id'] != cid:
+                    if _is_claimed(message, now):
+                        raise errors.MessageNotClaimedBy(message_id, claim)
+
+                    raise errors.MessageNotClaimed(message_id)
+
+        collection.delete_one(query)
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def bulk_delete(self, topic_name, message_ids, project=None,
+                    claim_ids=None):
+        message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
+        if claim_ids:
+            claim_ids = [cid for cid in map(utils.to_oid, claim_ids) if cid]
+        query = {
+            '_id': {'$in': message_ids},
+            PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
+        }
+
+        collection = self._collection(topic_name, project)
+        if claim_ids:
+            message_claim_ids = []
+            messages = collection.find(query).hint(ID_INDEX_FIELDS)
+            for message in messages:
+                message_claim_ids.append(message['c']['id'])
+            for cid in claim_ids:
+                if cid not in message_claim_ids:
+                    raise errors.ClaimDoesNotExist(cid, topic_name, project)
+
+        collection.delete_many(query)
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def pop(self, topic_name, limit, project=None):
+        query = {
+            PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
+        }
+
+        # Only include messages that are not part of
+        # any claim, or are part of an expired claim.
+        now = timeutils.utcnow_ts()
+        query['c.e'] = {'$lte': now}
+
+        collection = self._collection(topic_name, project)
+        projection = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1}
+
+        messages = (collection.find_one_and_delete(query,
+                                                   projection=projection)
+                    for _ in range(limit))
+
+        final_messages = [_basic_message(message, now)
+                          for message in messages
+                          if message]
+
+        return final_messages
+
+
+class FIFOMessageController(MessageController):
+
+    def _ensure_indexes(self, collection):
+        """Ensures that all indexes are created."""
+
+        collection.ensure_index(TTL_INDEX_FIELDS,
+                                name='ttl',
+                                expireAfterSeconds=0,
+                                background=True)
+
+        collection.ensure_index(ACTIVE_INDEX_FIELDS,
+                                name='active',
+                                background=True)
+
+        collection.ensure_index(COUNTING_INDEX_FIELDS,
+                                name='counting',
+                                background=True)
+
+        # NOTE(kgriffs): This index must be unique so that
+        # inserting a message with the same marker to the
+        # same queue will fail; this is used to detect a
+        # race condition which can cause an observer client
+        # to miss a message when there is more than one
+        # producer posting messages to the same queue, in
+        # parallel.
+        collection.ensure_index(MARKER_INDEX_FIELDS,
+                                name='queue_marker',
+                                unique=True,
+                                background=True)
+
+        collection.ensure_index(TRANSACTION_INDEX_FIELDS,
+                                name='transaction',
+                                background=True)
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def post(self, topic_name, messages, client_uuid, project=None):
+        # NOTE(flaper87): This method should be safe to retry on
+        # autoreconnect, since we've a 2-step insert for messages.
+        # The worst-case scenario is that we'll increase the counter
+        # several times and we'd end up with some non-active messages.
+
+        if not self._topic_ctrl.exists(topic_name, project):
+            raise errors.TopicDoesNotExist(topic_name, project)
+
+        # NOTE(flaper87): Make sure the counter exists. This method
+        # is an upsert.
+        self._get_counter(topic_name, project)
+        now = timeutils.utcnow_ts()
+        now_dt = datetime.datetime.utcfromtimestamp(now)
+        collection = self._collection(topic_name, project)
+
+        # Set the next basis marker for the first attempt.
+        #
+        # Note that we don't increment the counter right away because
+        # if 2 concurrent posts happen and the one with the higher counter
+        # ends before the one with the lower counter, there's a window
+        # where a client paging through the queue may get the messages
+        # with the higher counter and skip the previous ones. This would
+        # make our FIFO guarantee unsound.
+        next_marker = self._get_counter(topic_name, project)
+
+        # Unique transaction ID to facilitate atomic batch inserts
+        transaction = objectid.ObjectId()
+
+        prepared_messages = []
+        for index, message in enumerate(messages):
+            msg = {
+                PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
+                't': message['ttl'],
+                'e': now_dt + datetime.timedelta(seconds=message['ttl']),
+                'u': client_uuid,
+                'd': now + message.get('delay', 0),
+                'b': message['body'] if 'body' in message else {},
+                'k': next_marker + index,
+                'tx': None
+                }
+            if self.driver.conf.enable_checksum:
+                msg['cs'] = s_utils.get_checksum(message.get('body', None))
+
+            prepared_messages.append(msg)
+
+        # NOTE(kgriffs): Don't take the time to do a 2-phase insert
+        # if there is no way for it to partially succeed.
+        if len(prepared_messages) == 1:
+            transaction = None
+            prepared_messages[0]['tx'] = None
+
+        # Use a retry range for sanity, although we expect
+        # to rarely, if ever, reach the maximum number of
+        # retries.
+        #
+        # NOTE(kgriffs): With the default configuration (100 ms
+        # max sleep, 1000 max attempts), the max stall time
+        # before the operation is abandoned is 49.95 seconds.
+        for attempt in self._retry_range:
+            try:
+                res = collection.insert_many(prepared_messages,
+                                             bypass_document_validation=True)
+
+                # Log a message if we retried, for debugging perf issues
+                if attempt != 0:
+                    msgtmpl = _(u'%(attempts)d attempt(s) required to post '
+                                u'%(num_messages)d messages to queue '
+                                u'"%(topic)s" under project %(project)s')
+
+                    LOG.debug(msgtmpl,
+                              dict(topic=topic_name,
+                                   attempts=attempt + 1,
+                                   num_messages=len(res.inserted_ids),
+                                   project=project))
+
+                # Update the counter in preparation for the next batch
+                #
+                # NOTE(kgriffs): Due to the unique index on the messages
+                # collection, competing inserts will fail as a whole,
+                # and keep retrying until the counter is incremented
+                # such that the competing marker's will start at a
+                # unique number, 1 past the max of the messages just
+                # inserted above.
+                self._inc_counter(topic_name, project,
+                                  amount=len(res.inserted_ids))
+
+                # NOTE(kgriffs): Finalize the insert once we can say that
+                # all the messages made it. This makes bulk inserts
+                # atomic, assuming queries filter out any non-finalized
+                # messages.
+                if transaction is not None:
+                    collection.update_many({'tx': transaction},
+                                           {'$set': {'tx': None}},
+                                           upsert=False)
+
+                return [str(id_) for id_ in res.inserted_ids]
+
+            except (pymongo.errors.DuplicateKeyError,
+                    pymongo.errors.BulkWriteError) as ex:
+                # TODO(kgriffs): Record stats of how often retries happen,
+                # and how many attempts, on average, are required to insert
+                # messages.
+
+                # NOTE(kgriffs): This can be used in conjunction with the
+                # log line, above, that is emitted after all messages have
+                # been posted, to gauge how long it is taking for messages
+                # to be posted to a given topic, or overall.
+                #
+                # TODO(kgriffs): Add transaction ID to help match up loglines
+                if attempt == 0:
+                    msgtmpl = _(u'First attempt failed while '
+                                u'adding messages to topic '
+                                u'"%(topic)s" under project %(project)s')
+
+                    LOG.debug(msgtmpl, dict(topic=topic_name, project=project))
+
+                # NOTE(kgriffs): Never retry past the point that competing
+                # messages expire and are GC'd, since once they are gone,
+                # the unique index no longer protects us from getting out
+                # of order, which could cause an observer to miss this
+                # message. The code below provides a sanity-check to ensure
+                # this situation can not happen.
+                elapsed = timeutils.utcnow_ts() - now
+                if elapsed > MAX_RETRY_POST_DURATION:
+                    msgtmpl = (u'Exceeded maximum retry duration for topic '
+                               u'"%(topic)s" under project %(project)s')
+
+                    LOG.warning(msgtmpl,
+                                dict(topic=topic_name, project=project))
+                    break
+
+                # Chill out for a moment to mitigate thrashing/thundering
+                self._backoff_sleep(attempt)
+
+                # NOTE(kgriffs): Perhaps we failed because a worker crashed
+                # after inserting messages, but before incrementing the
+                # counter; that would cause all future requests to stall,
+                # since they would keep getting the same base marker that is
+                # conflicting with existing messages, until the messages that
+                # "won" expire, at which time we would end up reusing markers,
+                # and that could make some messages invisible to an observer
+                # that is querying with a marker that is large than the ones
+                # being reused.
+                #
+                # To mitigate this, we apply a heuristic to determine whether
+                # a counter has stalled. We attempt to increment the counter,
+                # but only if it hasn't been updated for a few seconds, which
+                # should mean that nobody is left to update it!
+                #
+                # Note that we increment one at a time until the logjam is
+                # broken, since we don't know how many messages were posted
+                # by the worker before it crashed.
+                next_marker = self._inc_counter(
+                    topic_name, project, window=COUNTER_STALL_WINDOW)
+
+                # Retry the entire batch with a new sequence of markers.
+                #
+                # NOTE(kgriffs): Due to the unique index, and how
+                # MongoDB works with batch requests, we will never
+                # end up with a partially-successful update. The first
+                # document in the batch will fail to insert, and the
+                # remainder of the documents will not be attempted.
+                if next_marker is None:
+                    # NOTE(kgriffs): Usually we will end up here, since
+                    # it should be rare that a counter becomes stalled.
+                    next_marker = self._get_counter(
+                        topic_name, project)
+                else:
+                    msgtmpl = (u'Detected a stalled message counter '
+                               u'for topic "%(topic)s" under '
+                               u'project %(project)s.'
+                               u'The counter was incremented to %(value)d.')
+
+                    LOG.warning(msgtmpl,
+                                dict(topic=topic_name,
+                                     project=project,
+                                     value=next_marker))
+
+                for index, message in enumerate(prepared_messages):
+                    message['k'] = next_marker + index
+            except bsonerror.InvalidDocument as ex:
+                LOG.exception(ex)
+                raise
+            except Exception as ex:
+                LOG.exception(ex)
+                raise
+
+        msgtmpl = (u'Hit maximum number of attempts (%(max)s) for topic '
+                   u'"%(topic)s" under project %(project)s')
+
+        LOG.warning(msgtmpl,
+                    dict(max=self.driver.mongodb_conf.max_attempts,
+                         topic=topic_name,
+                         project=project))
+
+        raise errors.MessageConflict(topic_name, project)
+
+
+def _is_claimed(msg, now):
+    return (msg['c']['id'] is not None and
+            msg['c']['e'] > now)
+
+
+def _basic_message(msg, now):
+    oid = msg['_id']
+    age = now - utils.oid_ts(oid)
+    res = {
+        'id': str(oid),
+        'age': int(age),
+        'ttl': msg['t'],
+        'body': msg['b']
+        }
+    if msg.get('cs'):
+        res['checksum'] = msg.get('cs')
+
+    return res
+
+
+class MessageTopicHandler(object):
+
+    def __init__(self, driver, control_driver):
+        self.driver = driver
+        self._cache = self.driver.cache
+        self.topic_controller = self.driver.topic_controller
+        self.message_controller = self.driver.message_controller
+
+    def delete(self, topic_name, project=None):
+        self.message_controller._purge_queue(topic_name, project)
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def stats(self, name, project=None):
+        if not self.topic_controller.exists(name, project=project):
+            raise errors.TopicDoesNotExist(name, project)
+
+        controller = self.message_controller
+
+        total = controller._count(name, project=project,
+                                  include_claimed=True)
+
+        message_stats = {
+            'total': total,
+        }
+
+        try:
+            oldest = controller.first(name, project=project, sort=1)
+            newest = controller.first(name, project=project, sort=-1)
+        except errors.QueueIsEmpty:
+            pass
+        else:
+            now = timeutils.utcnow_ts()
+            message_stats['oldest'] = utils.stat_message(oldest, now)
+            message_stats['newest'] = utils.stat_message(newest, now)
+
+        return {'messages': message_stats}
+
+
+def _get_scoped_query(name, project):
+    return {'p_t': utils.scope_queue_name(name, project)}
diff --git a/zaqar/storage/mongodb/topics.py b/zaqar/storage/mongodb/topics.py
new file mode 100644
index 000000000..1dca69052
--- /dev/null
+++ b/zaqar/storage/mongodb/topics.py
@@ -0,0 +1,279 @@
+# Copyright (c) 2019 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Implements the MongoDB storage controller for topics.
+
+Field Mappings:
+    In order to reduce the disk / memory space used,
+    field names will be, most of the time, the first
+    letter of their long name.
+"""
+
+from oslo_log import log as logging
+from oslo_utils import timeutils
+from pymongo.collection import ReturnDocument
+import pymongo.errors
+
+from zaqar.common import decorators
+from zaqar.i18n import _
+from zaqar import storage
+from zaqar.storage import errors
+from zaqar.storage.mongodb import utils
+
+LOG = logging.getLogger(__name__)
+
+# NOTE(wanghao): Keep this as same as queues'
+_TOPIC_CACHE_PREFIX = 'topiccontroller:'
+_TOPIC_CACHE_TTL = 5
+
+
+def _topic_exists_key(topic, project=None):
+    # NOTE(kgriffs): Use string concatenation for performance,
+    # also put project first since it is guaranteed to be
+    # unique, which should reduce lookup time.
+    return _TOPIC_CACHE_PREFIX + 'exists:' + str(project) + '/' + topic
+
+
+class TopicController(storage.Topic):
+    """Implements Topic resource operations using MongoDB.
+
+    Topics are scoped by project, which is prefixed to the
+    topic name.
+
+    ::
+
+        Topic:
+
+            Name            Field
+            ---------------------
+            name         ->   p_t
+            msg counter  ->     c
+            metadata     ->     m
+
+        Message Counter:
+
+            Name          Field
+            -------------------
+            value        ->   v
+            modified ts  ->   t
+    """
+
+    def __init__(self, *args, **kwargs):
+        super(TopicController, self).__init__(*args, **kwargs)
+
+        self._cache = self.driver.cache
+        self._collection = self.driver.topics_database.topics
+
+        # NOTE(flaper87): This creates a unique index for
+        # project and name. Using project as the prefix
+        # allows for querying by project and project+name.
+        # This is also useful for retrieving the queues list for
+        # a specific project, for example. Order matters!
+        self._collection.ensure_index([('p_t', 1)], unique=True)
+
+    # ----------------------------------------------------------------------
+    # Helpers
+    # ----------------------------------------------------------------------
+
+    def _get_counter(self, name, project=None):
+        """Retrieves the current message counter value for a given topic.
+
+        This helper is used to generate monotonic pagination
+        markers that are saved as part of the message
+        document.
+
+        Note 1: Markers are scoped per-topic and so are *not*
+            globally unique or globally ordered.
+
+        Note 2: If two or more requests to this method are made
+            in parallel, this method will return the same counter
+            value. This is done intentionally so that the caller
+            can detect a parallel message post, allowing it to
+            mitigate race conditions between producer and
+            observer clients.
+
+        :param name: Name of the queue to which the counter is scoped
+        :param project: Topic's project
+        :returns: current message counter as an integer
+        """
+
+        doc = self._collection.find_one(_get_scoped_query(name, project),
+                                        projection={'c.v': 1, '_id': 0})
+
+        if doc is None:
+            raise errors.TopicDoesNotExist(name, project)
+
+        return doc['c']['v']
+
+    def _inc_counter(self, name, project=None, amount=1, window=None):
+        """Increments the message counter and returns the new value.
+
+        :param name: Name of the topic to which the counter is scoped
+        :param project: Topic's project name
+        :param amount: (Default 1) Amount by which to increment the counter
+        :param window: (Default None) A time window, in seconds, that
+            must have elapsed since the counter was last updated, in
+            order to increment the counter.
+
+        :returns: Updated message counter value, or None if window
+            was specified, and the counter has already been updated
+            within the specified time period.
+
+        :raises TopicDoesNotExist: if not found
+        """
+        now = timeutils.utcnow_ts()
+
+        update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
+        query = _get_scoped_query(name, project)
+        if window is not None:
+            threshold = now - window
+            query['c.t'] = {'$lt': threshold}
+
+        while True:
+            try:
+                doc = self._collection.find_one_and_update(
+                    query, update, return_document=ReturnDocument.AFTER,
+                    projection={'c.v': 1, '_id': 0})
+
+                break
+            except pymongo.errors.AutoReconnect as ex:
+                LOG.exception(ex)
+
+        if doc is None:
+            if window is None:
+                # NOTE(kgriffs): Since we did not filter by a time window,
+                # the topic should have been found and updated. Perhaps
+                # the topic has been deleted?
+                message = _(u'Failed to increment the message '
+                            u'counter for topic %(name)s and '
+                            u'project %(project)s')
+                message %= dict(name=name, project=project)
+
+                LOG.warning(message)
+
+                raise errors.TopicDoesNotExist(name, project)
+
+            # NOTE(kgriffs): Assume the topic existed, but the counter
+            # was recently updated, causing the range topic on 'c.t' to
+            # exclude the record.
+            return None
+
+        return doc['c']['v']
+
+    # ----------------------------------------------------------------------
+    # Interface
+    # ----------------------------------------------------------------------
+
+    def _get(self, name, project=None):
+        try:
+            return self.get_metadata(name, project)
+        except errors.TopicDoesNotExist:
+            return {}
+
+    def _list(self, project=None, kfilter={}, marker=None,
+              limit=storage.DEFAULT_TOPICS_PER_PAGE, detailed=False,
+              name=None):
+
+        query = utils.scoped_query(marker, project, name, kfilter,
+                                   key_value='p_t')
+
+        projection = {'p_t': 1, '_id': 0}
+        if detailed:
+            projection['m'] = 1
+
+        cursor = self._collection.find(query, projection=projection)
+        cursor = cursor.limit(limit).sort('p_t')
+        marker_name = {}
+
+        def normalizer(record):
+            topic = {'name': utils.descope_queue_name(record['p_t'])}
+            marker_name['next'] = topic['name']
+            if detailed:
+                topic['metadata'] = record['m']
+            return topic
+
+        yield utils.HookedCursor(cursor, normalizer)
+        yield marker_name and marker_name['next']
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def get_metadata(self, name, project=None):
+        queue = self._collection.find_one(_get_scoped_query(name, project),
+                                          projection={'m': 1, '_id': 0})
+        if queue is None:
+            raise errors.TopicDoesNotExist(name, project)
+
+        return queue.get('m', {})
+
+    @utils.raises_conn_error
+    # @utils.retries_on_autoreconnect
+    def _create(self, name, metadata=None, project=None):
+        # NOTE(flaper87): If the connection fails after it was called
+        # and we retry to insert the topic, we could end up returning
+        # `False` because of the `DuplicatedKeyError` although the
+        # topic was indeed created by this API call.
+        #
+        # TODO(kgriffs): Commented out `retries_on_autoreconnect` for
+        # now due to the above issue, since creating a topic is less
+        # important to make super HA.
+
+        try:
+            # NOTE(kgriffs): Start counting at 1, and assume the first
+            # message ever posted will succeed and set t to a UNIX
+            # "modified at" timestamp.
+            counter = {'v': 1, 't': 0}
+
+            scoped_name = utils.scope_queue_name(name, project)
+            self._collection.insert_one(
+                {'p_t': scoped_name, 'm': metadata or {},
+                 'c': counter})
+
+        except pymongo.errors.DuplicateKeyError:
+            return False
+        else:
+            return True
+
+    # NOTE(kgriffs): Only cache when it exists; if it doesn't exist, and
+    # someone creates it, we want it to be immediately visible.
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    @decorators.caches(_topic_exists_key, _TOPIC_CACHE_TTL, lambda v: v)
+    def _exists(self, name, project=None):
+        query = _get_scoped_query(name, project)
+        return self._collection.find_one(query) is not None
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def set_metadata(self, name, metadata, project=None):
+        rst = self._collection.update_one(_get_scoped_query(name, project),
+                                          {'$set': {'m': metadata}})
+
+        if rst.matched_count == 0:
+            raise errors.TopicDoesNotExist(name, project)
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    @_exists.purges
+    def _delete(self, name, project=None):
+        self._collection.delete_one(_get_scoped_query(name, project))
+
+    @utils.raises_conn_error
+    @utils.retries_on_autoreconnect
+    def _stats(self, name, project=None):
+        pass
+
+
+def _get_scoped_query(name, project):
+    return {'p_t': utils.scope_queue_name(name, project)}
diff --git a/zaqar/storage/mongodb/utils.py b/zaqar/storage/mongodb/utils.py
index 58c9682e1..2227b3385 100644
--- a/zaqar/storage/mongodb/utils.py
+++ b/zaqar/storage/mongodb/utils.py
@@ -38,6 +38,8 @@ EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc)
 # NOTE(cpp-cabrera): the authoritative form of project/queue keys.
 PROJ_QUEUE_KEY = 'p_q'
 
+PROJ_TOPIC_KEY = 'p_t'
+
 LOG = logging.getLogger(__name__)
 
 
@@ -191,7 +193,8 @@ def parse_scoped_project_queue(scoped_name):
     return scoped_name.split('/')
 
 
-def scoped_query(queue, project, name=None, kfilter={}):
+def scoped_query(queue, project, name=None, kfilter={},
+                 key_value=PROJ_QUEUE_KEY):
     """Returns a dict usable for querying for scoped project/queues.
 
     :param queue: name of queue to seek
@@ -201,7 +204,7 @@ def scoped_query(queue, project, name=None, kfilter={}):
     :returns: query to issue
     :rtype: dict
     """
-    key = PROJ_QUEUE_KEY
+    key = key_value
     query = {}
     scoped_name = scope_queue_name(queue, project)
 
diff --git a/zaqar/storage/pipeline.py b/zaqar/storage/pipeline.py
index f49439608..3309f72f6 100644
--- a/zaqar/storage/pipeline.py
+++ b/zaqar/storage/pipeline.py
@@ -159,3 +159,11 @@ class DataDriver(base.DataDriverBase):
         stages.extend(_get_storage_pipeline('subscription', self.conf))
         stages.append(self._storage.subscription_controller)
         return common.Pipeline(stages)
+
+    @decorators.lazy_property(write=False)
+    def topic_controller(self):
+        stages = _get_builtin_entry_points('topic', self._storage,
+                                           self.control_driver, self.conf)
+        stages.extend(_get_storage_pipeline('topic', self.conf))
+        stages.append(self._storage.topic_controller)
+        return common.Pipeline(stages)
diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py
index 6992f7e95..ec45f31a6 100644
--- a/zaqar/storage/pooling.py
+++ b/zaqar/storage/pooling.py
@@ -147,6 +147,14 @@ class DataDriver(storage.DataDriverBase):
         else:
             return controller
 
+    @decorators.lazy_property(write=False)
+    def topic_controller(self):
+        controller = TopicController(self._pool_catalog)
+        if self.conf.profiler.enabled:
+            return profiler.trace_cls("pooling_topic_controller")(controller)
+        else:
+            return controller
+
 
 class QueueController(storage.Queue):
     """Routes operations to get the appropriate queue controller.
@@ -637,6 +645,20 @@ class Catalog(object):
         target = self.lookup(queue, project)
         return target and target.subscription_controller
 
+    def get_topic_controller(self, topic, project=None):
+        """Lookup the topic controller for the given queue and project.
+
+        :param topic: Name of the topic for which to find a pool
+        :param project: Project to which the topic belongs, or
+            None to specify the "global" or "generic" project.
+
+        :returns: The topic controller associated with the data driver for
+            the pool containing (queue, project) or None if this doesn't exist.
+        :rtype: Maybe TopicController
+        """
+        target = self.lookup(topic, project)
+        return target and target.topic_controller
+
     def get_default_pool(self, use_listing=True):
         if use_listing:
             cursor = self._pools_ctrl.list(limit=0)
@@ -716,3 +738,112 @@ class Catalog(object):
             self._drivers[pool_id] = self._init_driver(pool_id, pool_conf)
 
             return self._drivers[pool_id]
+
+
+class TopicController(storage.Topic):
+    """Routes operations to get the appropriate topic controller.
+
+    :param pool_catalog: a catalog of available pools
+    :type pool_catalog: queues.pooling.base.Catalog
+    """
+
+    def __init__(self, pool_catalog):
+        super(TopicController, self).__init__(None)
+        self._pool_catalog = pool_catalog
+        self._mgt_topic_ctrl = self._pool_catalog.control.topic_controller
+        self._get_controller = self._pool_catalog.get_topic_controller
+
+    def _list(self, project=None, kfilter={}, marker=None,
+              limit=storage.DEFAULT_TOPICS_PER_PAGE, detailed=False,
+              name=None):
+
+        def all_pages():
+            yield next(self._mgt_topic_ctrl.list(
+                project=project,
+                kfilter=kfilter,
+                marker=marker,
+                limit=limit,
+                detailed=detailed,
+                name=name))
+
+        # make a heap compared with 'name'
+        ls = heapq.merge(*[
+            utils.keyify('name', page)
+            for page in all_pages()
+        ])
+
+        marker_name = {}
+
+        # limit the iterator and strip out the comparison wrapper
+        def it():
+            for topic_cmp in itertools.islice(ls, limit):
+                marker_name['next'] = topic_cmp.obj['name']
+                yield topic_cmp.obj
+
+        yield it()
+        yield marker_name and marker_name['next']
+
+    def _get(self, name, project=None):
+        try:
+            return self.get_metadata(name, project)
+        except errors.TopicDoesNotExist:
+            return {}
+
+    def _create(self, name, metadata=None, project=None):
+        flavor = None
+        if isinstance(metadata, dict):
+            flavor = metadata.get('_flavor')
+
+        self._pool_catalog.register(name, project=project, flavor=flavor)
+
+        # NOTE(cpp-cabrera): This should always succeed since we just
+        # registered the project/topic. There is a race condition,
+        # however. If between the time we register a topic and go to
+        # look it up, the topic is deleted, then this assertion will
+        # fail.
+        pool = self._pool_catalog.lookup(name, project)
+        if not pool:
+            raise RuntimeError('Failed to register topic')
+        return self._mgt_topic_ctrl.create(name, metadata=metadata,
+                                           project=project)
+
+    def _delete(self, name, project=None):
+        mtHandler = self._get_controller(name, project)
+        if mtHandler:
+            # NOTE(cpp-cabrera): delete from the catalogue first. If
+            # zaqar crashes in the middle of these two operations,
+            # it is desirable that the entry be missing from the
+            # catalogue and present in storage, rather than the
+            # reverse. The former case leads to all operations
+            # behaving as expected: 404s across the board, and a
+            # functionally equivalent 204 on a create queue. The
+            # latter case is more difficult to reason about, and may
+            # yield 500s in some operations.
+            self._pool_catalog.deregister(name, project)
+            mtHandler.delete(name, project)
+
+        return self._mgt_topic_ctrl.delete(name, project)
+
+    def _exists(self, name, project=None):
+        return self._mgt_topic_ctrl.exists(name, project=project)
+
+    def get_metadata(self, name, project=None):
+        return self._mgt_topic_ctrl.get_metadata(name, project=project)
+
+    def set_metadata(self, name, metadata, project=None):
+        # NOTE(gengchc2): If flavor metadata is modified in topic,
+        # The topic needs to be re-registered to pools, otherwise
+        # the topic flavor parameter is not consistent with the pool.
+        flavor = None
+        if isinstance(metadata, dict):
+            flavor = metadata.get('_flavor')
+        self._pool_catalog.register(name, project=project, flavor=flavor)
+
+        return self._mgt_topic_ctrl.set_metadata(name, metadata=metadata,
+                                                 project=project)
+
+    def _stats(self, name, project=None):
+        mtHandler = self._get_controller(name, project)
+        if mtHandler:
+            return mtHandler.stats(name, project=project)
+        raise errors.TopicDoesNotExist(name, project)
diff --git a/zaqar/storage/redis/driver.py b/zaqar/storage/redis/driver.py
index ef891ac51..e8eba6e49 100644
--- a/zaqar/storage/redis/driver.py
+++ b/zaqar/storage/redis/driver.py
@@ -296,6 +296,10 @@ class ControlDriver(storage.ControlDriverBase):
         else:
             return controller
 
+    @decorators.lazy_property(write=False)
+    def topic_controller(self):
+        pass
+
 
 def _get_redis_client(driver):
     conf = driver.redis_conf
diff --git a/zaqar/storage/redis/messages.py b/zaqar/storage/redis/messages.py
index 9f35ad080..183f0fb51 100644
--- a/zaqar/storage/redis/messages.py
+++ b/zaqar/storage/redis/messages.py
@@ -637,3 +637,53 @@ class MessageQueueHandler(object):
                 message_stats['oldest'] = oldest
 
         return {'messages': message_stats}
+
+
+class MessageTopicHandler(object):
+    def __init__(self, driver, control_driver):
+        self.driver = driver
+        self._client = self.driver.connection
+        self._topic_ctrl = self.driver.topic_controller
+        self._message_ctrl = self.driver.message_controller
+
+    @utils.raises_conn_error
+    def create(self, name, metadata=None, project=None):
+        with self._client.pipeline() as pipe:
+            self._message_ctrl._create_msgset(name, project, pipe)
+
+            try:
+                pipe.execute()
+            except redis.exceptions.ResponseError:
+                return False
+
+    @utils.raises_conn_error
+    @utils.retries_on_connection_error
+    def delete(self, name, project=None):
+        with self._client.pipeline() as pipe:
+            self._message_ctrl._delete_msgset(name, project, pipe)
+            self._message_ctrl._delete_queue_messages(name, project, pipe)
+            pipe.execute()
+
+    @utils.raises_conn_error
+    @utils.retries_on_connection_error
+    def stats(self, name, project=None):
+        if not self._topic_ctrl.exists(name, project=project):
+            raise errors.TopicDoesNotExist(name, project)
+
+        total = self._message_ctrl._count(name, project)
+
+        message_stats = {
+            'total': total
+        }
+
+        if total:
+            try:
+                newest = self._message_ctrl.first(name, project, -1)
+                oldest = self._message_ctrl.first(name, project, 1)
+            except errors.QueueIsEmpty:
+                pass
+            else:
+                message_stats['newest'] = newest
+                message_stats['oldest'] = oldest
+
+        return {'messages': message_stats}
diff --git a/zaqar/storage/sqlalchemy/driver.py b/zaqar/storage/sqlalchemy/driver.py
index 966d63540..b74acbae8 100644
--- a/zaqar/storage/sqlalchemy/driver.py
+++ b/zaqar/storage/sqlalchemy/driver.py
@@ -110,3 +110,7 @@ class ControlDriver(storage.ControlDriverBase):
                                       "controller")(controller)
         else:
             return controller
+
+    @property
+    def topic_controller(self):
+        pass
diff --git a/zaqar/storage/swift/messages.py b/zaqar/storage/swift/messages.py
index c8ddae74e..833b85e07 100644
--- a/zaqar/storage/swift/messages.py
+++ b/zaqar/storage/swift/messages.py
@@ -386,3 +386,89 @@ class MessageQueueHandler(object):
             raise
         else:
             return True
+
+
+class MessageTopicHandler(object):
+    def __init__(self, driver, control_driver):
+        self.driver = driver
+        self._client = self.driver.connection
+        self._topic_ctrl = self.driver.topic_controller
+        self._message_ctrl = self.driver.message_controller
+
+    def create(self, name, metadata=None, project=None):
+        self._client.put_container(utils._message_container(name, project))
+
+    def delete(self, name, project=None):
+        for container in [utils._message_container(name, project)]:
+            try:
+                headers, objects = self._client.get_container(container)
+            except swiftclient.ClientException as exc:
+                if exc.http_status != 404:
+                    raise
+            else:
+                for obj in objects:
+                    try:
+                        self._client.delete_object(container, obj['name'])
+                    except swiftclient.ClientException as exc:
+                        if exc.http_status != 404:
+                            raise
+                try:
+                    self._client.delete_container(container)
+                except swiftclient.ClientException as exc:
+                    if exc.http_status not in (404, 409):
+                        raise
+
+    def stats(self, name, project=None):
+        if not self._topic_ctrl.exists(name, project=project):
+            raise errors.TopicDoesNotExist(name, project)
+
+        total = 0
+        container = utils._message_container(name, project)
+
+        try:
+            _, objects = self._client.get_container(container)
+        except swiftclient.ClientException as exc:
+            if exc.http_status == 404:
+                raise errors.QueueIsEmpty(name, project)
+
+        newest = None
+        oldest = None
+        now = timeutils.utcnow_ts(True)
+        for obj in objects:
+            try:
+                headers = self._client.head_object(container, obj['name'])
+            except swiftclient.ClientException as exc:
+                if exc.http_status != 404:
+                    raise
+            else:
+                created = float(headers['x-timestamp'])
+                created_iso = datetime.datetime.utcfromtimestamp(
+                    created).strftime('%Y-%m-%dT%H:%M:%SZ')
+                newest = {
+                    'id': obj['name'],
+                    'age': now - created,
+                    'created': created_iso}
+                if oldest is None:
+                    oldest = copy.deepcopy(newest)
+                total += 1
+
+        msg_stats = {
+            'total': total,
+        }
+        if newest is not None:
+            msg_stats['newest'] = newest
+            msg_stats['oldest'] = oldest
+
+        return {'messages': msg_stats}
+
+    def exists(self, topic, project=None):
+        try:
+            self._client.head_container(utils._message_container(topic,
+                                                                 project))
+
+        except swiftclient.ClientException as exc:
+            if exc.http_status == 404:
+                return False
+            raise
+        else:
+            return True
diff --git a/zaqar/tests/faulty_storage.py b/zaqar/tests/faulty_storage.py
index b9bbea7a5..2be44faf4 100644
--- a/zaqar/tests/faulty_storage.py
+++ b/zaqar/tests/faulty_storage.py
@@ -61,6 +61,10 @@ class DataDriver(storage.DataDriverBase):
     def subscription_controller(self):
         return None
 
+    @property
+    def topic_controller(self):
+        return self.control_driver.topic_controller
+
 
 class ControlDriver(storage.ControlDriverBase):
 
@@ -86,6 +90,10 @@ class ControlDriver(storage.ControlDriverBase):
     def flavors_controller(self):
         return None
 
+    @property
+    def topic_controller(self):
+        return TopicController(self)
+
 
 class QueueController(storage.Queue):
     def __init__(self, driver):
@@ -144,3 +152,32 @@ class MessageController(storage.Message):
 
     def bulk_delete(self, queue, message_ids, project=None, claim_ids=None):
         raise NotImplementedError()
+
+
+class TopicController(storage.Topic):
+    def __init__(self, driver):
+        pass
+
+    def _list(self, project=None):
+        raise NotImplementedError()
+
+    def _get(self, name, project=None):
+        raise NotImplementedError()
+
+    def get_metadata(self, name, project=None):
+        raise NotImplementedError()
+
+    def _create(self, name, metadata=None, project=None):
+        raise NotImplementedError()
+
+    def _exists(self, name, project=None):
+        raise NotImplementedError()
+
+    def set_metadata(self, name, metadata, project=None):
+        raise NotImplementedError()
+
+    def _delete(self, name, project=None):
+        raise NotImplementedError()
+
+    def _stats(self, name, project=None):
+        raise NotImplementedError()
diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_topic_lifecycle.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_topic_lifecycle.py
new file mode 100644
index 000000000..99f0948f3
--- /dev/null
+++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_topic_lifecycle.py
@@ -0,0 +1,608 @@
+# Copyright (c) 2019 Rackspace, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License.  You may obtain a copy
+# of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+
+import ddt
+import falcon
+import mock
+from oslo_serialization import jsonutils
+from oslo_utils import uuidutils
+import six
+
+from zaqar.storage import errors as storage_errors
+from zaqar import tests as testing
+from zaqar.tests.unit.transport.wsgi import base
+
+
+@ddt.ddt
+class TestTopicLifecycleMongoDB(base.V2Base):
+
+    config_file = 'wsgi_mongodb.conf'
+
+    @testing.requires_mongodb
+    def setUp(self):
+        super(TestTopicLifecycleMongoDB, self).setUp()
+
+        self.topic_path = self.url_prefix + '/topics'
+        self.mars_topic_path = self.topic_path + '/mars'
+        self.venus_topic_path = self.topic_path + '/venus'
+
+        self.headers = {
+            'Client-ID': uuidutils.generate_uuid(),
+            'X-Project-ID': '3387309841abc_'
+        }
+
+    def tearDown(self):
+        control = self.boot.control
+        storage = self.boot.storage._storage
+        connection = storage.connection
+
+        connection.drop_database(control.topics_database)
+
+        for db in storage.message_databases:
+            connection.drop_database(db)
+
+        super(TestTopicLifecycleMongoDB, self).tearDown()
+
+    def test_without_project_id(self):
+        headers = {
+            'Client-ID': uuidutils.generate_uuid(),
+        }
+
+        self.simulate_put(self.mars_topic_path, headers=headers,
+                          need_project_id=False)
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+        self.simulate_delete(self.mars_topic_path, headers=headers,
+                             need_project_id=False)
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+    def test_empty_project_id(self):
+        headers = {
+            'Client-ID': uuidutils.generate_uuid(),
+            'X-Project-ID': ''
+        }
+
+        self.simulate_put(self.mars_topic_path, headers=headers)
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+        self.simulate_delete(self.mars_topic_path, headers=headers)
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+    @ddt.data('480924', 'foo')
+    def test_basics_thoroughly(self, project_id):
+        headers = {
+            'Client-ID': uuidutils.generate_uuid(),
+            'X-Project-ID': project_id
+        }
+        mars_topic_path_stats = self.mars_topic_path + '/stats'
+
+        # Stats are empty - topic not created yet
+        self.simulate_get(mars_topic_path_stats, headers=headers)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # Create
+        doc = '{"messages": {"ttl": 600}}'
+        self.simulate_put(self.mars_topic_path,
+                          headers=headers, body=doc)
+        self.assertEqual(falcon.HTTP_201, self.srmock.status)
+
+        location = self.srmock.headers_dict['Location']
+        self.assertEqual(location, self.mars_topic_path)
+
+        # Fetch metadata
+        result = self.simulate_get(self.mars_topic_path,
+                                   headers=headers)
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+        ref_doc = jsonutils.loads(doc)
+        ref_doc['_default_message_ttl'] = 3600
+        ref_doc['_max_messages_post_size'] = 262144
+        ref_doc['_default_message_delay'] = 0
+        self.assertEqual(ref_doc, result_doc)
+
+        # Stats empty topic
+        self.simulate_get(mars_topic_path_stats, headers=headers)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # Delete
+        self.simulate_delete(self.mars_topic_path, headers=headers)
+        self.assertEqual(falcon.HTTP_204, self.srmock.status)
+
+        # Get non-existent stats
+        self.simulate_get(mars_topic_path_stats, headers=headers)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+    @ddt.data('1234567890', '11111111111111111111111111111111111')
+    def test_basics_thoroughly_with_different_client_id(self, client_id):
+        self.conf.set_override('client_id_uuid_safe', 'off', 'transport')
+        headers = {
+            'Client-ID': client_id,
+            'X-Project-ID': '480924'
+        }
+        mars_topic_path_stats = self.mars_topic_path + '/stats'
+
+        # Stats are empty - topic not created yet
+        self.simulate_get(mars_topic_path_stats, headers=headers)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # Create
+        doc = '{"messages": {"ttl": 600}}'
+        self.simulate_put(self.mars_topic_path,
+                          headers=headers, body=doc)
+        self.assertEqual(falcon.HTTP_201, self.srmock.status)
+
+        location = self.srmock.headers_dict['Location']
+        self.assertEqual(location, self.mars_topic_path)
+
+        # Fetch metadata
+        result = self.simulate_get(self.mars_topic_path,
+                                   headers=headers)
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+        ref_doc = jsonutils.loads(doc)
+        ref_doc['_default_message_ttl'] = 3600
+        ref_doc['_max_messages_post_size'] = 262144
+        ref_doc['_default_message_delay'] = 0
+        self.assertEqual(ref_doc, result_doc)
+
+        # Stats empty topic
+        self.simulate_get(mars_topic_path_stats, headers=headers)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # Delete
+        self.simulate_delete(self.mars_topic_path, headers=headers)
+        self.assertEqual(falcon.HTTP_204, self.srmock.status)
+
+        # Get non-existent stats
+        self.simulate_get(mars_topic_path_stats, headers=headers)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+    def test_name_restrictions(self):
+        self.simulate_put(self.topic_path + '/Nice-Boat_2',
+                          headers=self.headers)
+        self.assertEqual(falcon.HTTP_201, self.srmock.status)
+
+        self.simulate_put(self.topic_path + '/Nice-Bo@t',
+                          headers=self.headers)
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+        self.simulate_put(self.topic_path + '/_' + 'niceboat' * 8,
+                          headers=self.headers)
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+        self.simulate_put(self.topic_path + '/Service.test_topic',
+                          headers=self.headers)
+        self.assertEqual(falcon.HTTP_201, self.srmock.status)
+
+    def test_project_id_restriction(self):
+        muvluv_topic_path = self.topic_path + '/Muv-Luv'
+
+        self.simulate_put(muvluv_topic_path,
+                          headers={'Client-ID': uuidutils.generate_uuid(),
+                                   'X-Project-ID': 'JAM Project' * 24})
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+        # no charset restrictions
+        self.simulate_put(muvluv_topic_path,
+                          headers={'Client-ID': uuidutils.generate_uuid(),
+                                   'X-Project-ID': 'JAM Project'})
+        self.assertEqual(falcon.HTTP_201, self.srmock.status)
+
+    def test_non_ascii_name(self):
+        test_params = ((u'/topics/non-ascii-n\u0153me', 'utf-8'),
+                       (u'/topics/non-ascii-n\xc4me', 'iso8859-1'))
+
+        for uri, enc in test_params:
+            uri = self.url_prefix + uri
+
+            if six.PY2:
+                uri = uri.encode(enc)
+
+            self.simulate_put(uri, headers=self.headers)
+            self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+            self.simulate_delete(uri, headers=self.headers)
+            self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+    def test_no_metadata(self):
+        self.simulate_put(self.venus_topic_path,
+                          headers=self.headers)
+        self.assertEqual(falcon.HTTP_201, self.srmock.status)
+
+        self.simulate_put(self.venus_topic_path, body='',
+                          headers=self.headers)
+        self.assertEqual(falcon.HTTP_204, self.srmock.status)
+
+        result = self.simulate_get(self.venus_topic_path,
+                                   headers=self.headers)
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual(256 * 1024,
+                         result_doc.get('_max_messages_post_size'))
+        self.assertEqual(3600,
+                         result_doc.get('_default_message_ttl'))
+        self.assertEqual(0,
+                         result_doc.get('_default_message_delay'))
+
+    @ddt.data('{', '[]', '.', '  ')
+    def test_bad_metadata(self, document):
+        self.simulate_put(self.venus_topic_path,
+                          headers=self.headers,
+                          body=document)
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+    def test_too_much_metadata(self):
+        self.simulate_put(self.venus_topic_path, headers=self.headers)
+        self.assertEqual(falcon.HTTP_201, self.srmock.status)
+        doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
+
+        max_size = self.transport_cfg.max_queue_metadata
+        padding_len = max_size - (len(doc) - 10) + 1
+
+        doc = doc.format(pad='x' * padding_len)
+
+        self.simulate_put(self.venus_topic_path,
+                          headers=self.headers,
+                          body=doc)
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+    def test_way_too_much_metadata(self):
+        self.simulate_put(self.venus_topic_path, headers=self.headers)
+        self.assertEqual(falcon.HTTP_201, self.srmock.status)
+        doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
+
+        max_size = self.transport_cfg.max_queue_metadata
+        padding_len = max_size * 100
+
+        doc = doc.format(pad='x' * padding_len)
+
+        self.simulate_put(self.venus_topic_path,
+                          headers=self.headers, body=doc)
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+    def test_custom_metadata(self):
+        # Set
+        doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
+
+        max_size = self.transport_cfg.max_queue_metadata
+        padding_len = max_size - (len(doc) - 2)
+
+        doc = doc.format(pad='x' * padding_len)
+        self.simulate_put(self.venus_topic_path,
+                          headers=self.headers,
+                          body=doc)
+        self.assertEqual(falcon.HTTP_201, self.srmock.status)
+
+        # Get
+        result = self.simulate_get(self.venus_topic_path,
+                                   headers=self.headers)
+        result_doc = jsonutils.loads(result[0])
+        ref_doc = jsonutils.loads(doc)
+        ref_doc['_default_message_ttl'] = 3600
+        ref_doc['_max_messages_post_size'] = 262144
+        ref_doc['_default_message_delay'] = 0
+        self.assertEqual(ref_doc, result_doc)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+    def test_update_metadata(self):
+        xyz_topic_path = self.url_prefix + '/topics/xyz'
+        xyz_topic_path_metadata = xyz_topic_path
+        headers = {
+            'Client-ID': uuidutils.generate_uuid(),
+            'X-Project-ID': uuidutils.generate_uuid()
+        }
+        # Create
+        self.simulate_put(xyz_topic_path, headers=headers)
+        self.assertEqual(falcon.HTTP_201, self.srmock.status)
+
+        headers.update({'Content-Type':
+                        "application/openstack-messaging-v2.0-json-patch"})
+        # add metadata
+        doc1 = ('[{"op":"add", "path": "/metadata/key1", "value": 1},'
+                '{"op":"add", "path": "/metadata/key2", "value": 1}]')
+        self.simulate_patch(xyz_topic_path_metadata,
+                            headers=headers,
+                            body=doc1)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # remove reserved metadata, zaqar will do nothing and return 200,
+        # because
+        doc3 = '[{"op":"remove", "path": "/metadata/_default_message_ttl"}]'
+        self.simulate_patch(xyz_topic_path_metadata,
+                            headers=headers,
+                            body=doc3)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # replace metadata
+        doc2 = '[{"op":"replace", "path": "/metadata/key1", "value": 2}]'
+        self.simulate_patch(xyz_topic_path_metadata,
+                            headers=headers,
+                            body=doc2)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # replace reserved metadata, zaqar will store the reserved metadata
+        doc2 = ('[{"op":"replace", "path": "/metadata/_default_message_ttl",'
+                '"value": 300}]')
+        self.simulate_patch(xyz_topic_path_metadata,
+                            headers=headers,
+                            body=doc2)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # Get
+        result = self.simulate_get(xyz_topic_path_metadata,
+                                   headers=headers)
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual({'key1': 2, 'key2': 1,
+                          '_default_message_ttl': 300,
+                          '_max_messages_post_size': 262144,
+                          '_default_message_delay': 0}, result_doc)
+
+        # remove metadata
+        doc3 = '[{"op":"remove", "path": "/metadata/key1"}]'
+        self.simulate_patch(xyz_topic_path_metadata,
+                            headers=headers,
+                            body=doc3)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # remove reserved metadata
+        doc3 = '[{"op":"remove", "path": "/metadata/_default_message_ttl"}]'
+        self.simulate_patch(xyz_topic_path_metadata,
+                            headers=headers,
+                            body=doc3)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # Get
+        result = self.simulate_get(xyz_topic_path_metadata,
+                                   headers=headers)
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual({'key2': 1, '_default_message_ttl': 3600,
+                          '_max_messages_post_size': 262144,
+                          '_default_message_delay': 0}, result_doc)
+
+        # replace non-existent metadata
+        doc4 = '[{"op":"replace", "path": "/metadata/key3", "value":2}]'
+        self.simulate_patch(xyz_topic_path_metadata,
+                            headers=headers,
+                            body=doc4)
+        self.assertEqual(falcon.HTTP_409, self.srmock.status)
+
+        # remove non-existent metadata
+        doc5 = '[{"op":"remove", "path": "/metadata/key3"}]'
+        self.simulate_patch(xyz_topic_path_metadata,
+                            headers=headers,
+                            body=doc5)
+        self.assertEqual(falcon.HTTP_409, self.srmock.status)
+
+        self.simulate_delete(xyz_topic_path, headers=headers)
+
+        # add metadata to non-existent topic
+        doc1 = ('[{"op":"add", "path": "/metadata/key1", "value": 1},'
+                '{"op":"add", "path": "/metadata/key2", "value": 1}]')
+        self.simulate_patch(xyz_topic_path_metadata,
+                            headers=headers,
+                            body=doc1)
+        self.assertEqual(falcon.HTTP_404, self.srmock.status)
+
+        # replace metadata in non-existent topic
+        doc4 = '[{"op":"replace", "path": "/metadata/key3", "value":2}]'
+        self.simulate_patch(xyz_topic_path_metadata,
+                            headers=headers,
+                            body=doc4)
+        self.assertEqual(falcon.HTTP_404, self.srmock.status)
+
+        # remove metadata from non-existent topic
+        doc5 = '[{"op":"remove", "path": "/metadata/key3"}]'
+        self.simulate_patch(xyz_topic_path_metadata,
+                            headers=headers,
+                            body=doc5)
+        self.assertEqual(falcon.HTTP_404, self.srmock.status)
+
+    def test_list(self):
+        arbitrary_number = 644079696574693
+        project_id = str(arbitrary_number)
+        client_id = uuidutils.generate_uuid()
+        header = {
+            'X-Project-ID': project_id,
+            'Client-ID': client_id
+        }
+
+        # NOTE(kgriffs): It's important that this one sort after the one
+        # above. This is in order to prove that bug/1236605 is fixed, and
+        # stays fixed!
+        alt_project_id = str(arbitrary_number + 1)
+
+        # List empty
+        result = self.simulate_get(self.topic_path, headers=header)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+        results = jsonutils.loads(result[0])
+        self.assertEqual([], results['topics'])
+        self.assertIn('links', results)
+        self.assertEqual(0, len(results['links']))
+
+        # Payload exceeded
+        self.simulate_get(self.topic_path, headers=header,
+                          query_string='limit=21')
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+        # Create some
+        def create_topic(name, project_id, body):
+            altheader = {'Client-ID': client_id}
+            if project_id is not None:
+                altheader['X-Project-ID'] = project_id
+            uri = self.topic_path + '/' + name
+            self.simulate_put(uri, headers=altheader, body=body)
+
+        create_topic('q1', project_id, '{"node": 31}')
+        create_topic('q2', project_id, '{"node": 32}')
+        create_topic('q3', project_id, '{"node": 33}')
+
+        create_topic('q3', alt_project_id, '{"alt": 1}')
+
+        # List (limit)
+        result = self.simulate_get(self.topic_path, headers=header,
+                                   query_string='limit=2')
+
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual(2, len(result_doc['topics']))
+
+        # List (no metadata, get all)
+        result = self.simulate_get(self.topic_path,
+                                   headers=header, query_string='limit=5')
+
+        result_doc = jsonutils.loads(result[0])
+        [target, params] = result_doc['links'][0]['href'].split('?')
+        self.simulate_get(target, headers=header, query_string=params)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # Ensure we didn't pick up the topic from the alt project.
+        topics = result_doc['topics']
+        self.assertEqual(3, len(topics))
+
+        # List with metadata
+        result = self.simulate_get(self.topic_path, headers=header,
+                                   query_string='detailed=true')
+
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+        result_doc = jsonutils.loads(result[0])
+        [target, params] = result_doc['links'][0]['href'].split('?')
+
+        topic = result_doc['topics'][0]
+        result = self.simulate_get(topic['href'], headers=header)
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual(topic['metadata'], result_doc)
+        self.assertEqual({'node': 31, '_default_message_ttl': 3600,
+                          '_max_messages_post_size': 262144,
+                          '_default_message_delay': 0},  result_doc)
+
+        # topic filter
+        result = self.simulate_get(self.topic_path, headers=header,
+                                   query_string='node=34')
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual(0, len(result_doc['topics']))
+
+        # List tail
+        self.simulate_get(target, headers=header, query_string=params)
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # List manually-constructed tail
+        self.simulate_get(target, headers=header, query_string='marker=zzz')
+        self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+    def test_list_returns_503_on_nopoolfound_exception(self):
+        arbitrary_number = 644079696574693
+        project_id = str(arbitrary_number)
+        client_id = uuidutils.generate_uuid()
+        header = {
+            'X-Project-ID': project_id,
+            'Client-ID': client_id
+        }
+
+        topic_controller = self.boot.storage.topic_controller
+
+        with mock.patch.object(topic_controller, 'list') as mock_topic_list:
+
+            def topic_generator():
+                raise storage_errors.NoPoolFound()
+
+            # This generator tries to be like topic controller list generator
+            # in some ways.
+            def fake_generator():
+                yield topic_generator()
+                yield {}
+            mock_topic_list.return_value = fake_generator()
+            self.simulate_get(self.topic_path, headers=header)
+            self.assertEqual(falcon.HTTP_503, self.srmock.status)
+
+    def test_list_with_filter(self):
+        arbitrary_number = 644079696574693
+        project_id = str(arbitrary_number)
+        client_id = uuidutils.generate_uuid()
+        header = {
+            'X-Project-ID': project_id,
+            'Client-ID': client_id
+        }
+
+        # Create some
+        def create_topic(name, project_id, body):
+            altheader = {'Client-ID': client_id}
+            if project_id is not None:
+                altheader['X-Project-ID'] = project_id
+            uri = self.topic_path + '/' + name
+            self.simulate_put(uri, headers=altheader, body=body)
+
+        create_topic('q1', project_id, '{"test_metadata_key1": "value1"}')
+        create_topic('q2', project_id, '{"_max_messages_post_size": 2000}')
+        create_topic('q3', project_id, '{"test_metadata_key2": 30}')
+
+        # List (filter query)
+        result = self.simulate_get(self.topic_path, headers=header,
+                                   query_string='name=q&test_metadata_key2=30')
+
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual(1, len(result_doc['topics']))
+        self.assertEqual('q3', result_doc['topics'][0]['name'])
+
+        # List (filter query)
+        result = self.simulate_get(self.topic_path, headers=header,
+                                   query_string='_max_messages_post_size=2000')
+
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual(1, len(result_doc['topics']))
+        self.assertEqual('q2', result_doc['topics'][0]['name'])
+
+        # List (filter query)
+        result = self.simulate_get(self.topic_path, headers=header,
+                                   query_string='name=q')
+
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual(3, len(result_doc['topics']))
+
+
+class TestTopicLifecycleFaultyDriver(base.V2BaseFaulty):
+
+    config_file = 'wsgi_faulty.conf'
+
+    def test_simple(self):
+        self.headers = {
+            'Client-ID': uuidutils.generate_uuid(),
+            'X-Project-ID': '338730984abc_1'
+        }
+
+        mars_topic_path = self.url_prefix + '/topics/mars'
+        doc = '{"messages": {"ttl": 600}}'
+        self.simulate_put(mars_topic_path,
+                          headers=self.headers,
+                          body=doc)
+        self.assertEqual(falcon.HTTP_503, self.srmock.status)
+
+        location = ('Location', mars_topic_path)
+        self.assertNotIn(location, self.srmock.headers)
+
+        result = self.simulate_get(mars_topic_path,
+                                   headers=self.headers)
+        result_doc = jsonutils.loads(result[0])
+        self.assertEqual(falcon.HTTP_503, self.srmock.status)
+        self.assertNotEqual(result_doc, jsonutils.loads(doc))
+
+        self.simulate_get(mars_topic_path + '/stats',
+                          headers=self.headers)
+        self.assertEqual(falcon.HTTP_503, self.srmock.status)
+
+        self.simulate_get(self.url_prefix + '/topics',
+                          headers=self.headers)
+        self.assertEqual(falcon.HTTP_503, self.srmock.status)
+
+        self.simulate_delete(mars_topic_path, headers=self.headers)
+        self.assertEqual(falcon.HTTP_503, self.srmock.status)
diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py
index 501f93610..e66784b3f 100644
--- a/zaqar/transport/validation.py
+++ b/zaqar/transport/validation.py
@@ -676,3 +676,27 @@ class Validator(object):
                                        self._limits_conf.max_length_client_id)
         if self._limits_conf.client_id_uuid_safe == 'strict':
             uuid.UUID(client_id)
+
+    def topic_identification(self, topic, project):
+        """Restrictions on a project id & topic name pair.
+
+        :param queue: Name of the topic
+        :param project: Project id
+        :raises ValidationFailed: if the `name` is longer than 64
+            characters or contains anything other than ASCII digits and
+            letters, underscores, and dashes.  Also raises if `project`
+            is not None but longer than 256 characters.
+        """
+
+        if project is not None and len(project) > PROJECT_ID_MAX_LEN:
+            msg = _(u'Project ids may not be more than {0} characters long.')
+            raise ValidationFailed(msg, PROJECT_ID_MAX_LEN)
+
+        if len(topic) > QUEUE_NAME_MAX_LEN:
+            msg = _(u'Topic names may not be more than {0} characters long.')
+            raise ValidationFailed(msg, QUEUE_NAME_MAX_LEN)
+
+        if not QUEUE_NAME_REGEX.match(topic):
+            raise ValidationFailed(
+                _(u'Topic names may only contain ASCII letters, digits, '
+                  'underscores, and dashes.'))
diff --git a/zaqar/transport/wsgi/driver.py b/zaqar/transport/wsgi/driver.py
index afcc9cd5f..81516262b 100644
--- a/zaqar/transport/wsgi/driver.py
+++ b/zaqar/transport/wsgi/driver.py
@@ -72,6 +72,10 @@ class Driver(transport.DriverBase):
         return helpers.validate_queue_identification(
             self._validate.queue_identification, req, resp, params)
 
+    def _validate_topic_identification(self, req, resp, params):
+        return helpers.validate_topic_identification(
+            self._validate.topic_identification, req, resp, params)
+
     def _require_client_id(self, req, resp, params):
         return helpers.require_client_id(
             self._validate.client_id_uuid_safe, req, resp, params)
@@ -91,7 +95,10 @@ class Driver(transport.DriverBase):
             helpers.inject_context,
 
             # NOTE(kgriffs): Depends on project_id being extracted, above
-            self._validate_queue_identification
+            self._validate_queue_identification,
+
+            # NOTE(kgriffs): Depends on project_id being extracted, above
+            self._validate_topic_identification
         ]
 
     def _init_routes(self):
diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py
index cc4b5b0ce..266e27fea 100644
--- a/zaqar/transport/wsgi/v2_0/__init__.py
+++ b/zaqar/transport/wsgi/v2_0/__init__.py
@@ -24,6 +24,9 @@ from zaqar.transport.wsgi.v2_0 import purge
 from zaqar.transport.wsgi.v2_0 import queues
 from zaqar.transport.wsgi.v2_0 import stats
 from zaqar.transport.wsgi.v2_0 import subscriptions
+from zaqar.transport.wsgi.v2_0 import topic
+from zaqar.transport.wsgi.v2_0 import topic_purge
+from zaqar.transport.wsgi.v2_0 import topic_stats
 from zaqar.transport.wsgi.v2_0 import urls
 
 
@@ -52,6 +55,7 @@ def public_endpoints(driver, conf):
     message_controller = driver._storage.message_controller
     claim_controller = driver._storage.claim_controller
     subscription_controller = driver._storage.subscription_controller
+    topic_controller = driver._storage.topic_controller
 
     defaults = driver._defaults
 
@@ -119,6 +123,42 @@ def public_endpoints(driver, conf):
 
         # Pre-Signed URL Endpoint
         ('/queues/{queue_name}/share', urls.Resource(driver)),
+
+        # Topics Endpoints
+        ('/topics',
+         topic.CollectionResource(driver._validate, topic_controller)),
+        ('/topics/{topic_name}',
+         topic.ItemResource(driver._validate, topic_controller,
+                            message_controller)),
+        ('/topics/{topic_name}/stats',
+         topic_stats.Resource(topic_controller)),
+        ('/topics/{topic_name}/purge',
+         topic_purge.Resource(driver)),
+        # Topic Messages Endpoints
+        ('/topics/{topic_name}/messages',
+         messages.CollectionResource(driver._wsgi_conf,
+                                     driver._validate,
+                                     message_controller,
+                                     topic_controller,
+                                     defaults.message_ttl)),
+        ('/topics/{topic_name}/messages/{message_id}',
+         messages.ItemResource(message_controller)),
+        # Topic Subscription Endpoints
+        ('/topics/{topic_name}/subscriptions',
+         subscriptions.CollectionResource(driver._validate,
+                                          subscription_controller,
+                                          defaults.subscription_ttl,
+                                          topic_controller,
+                                          conf)),
+
+        ('/topics/{topic_name}/subscriptions/{subscription_id}',
+         subscriptions.ItemResource(driver._validate,
+                                    subscription_controller)),
+
+        ('/topics/{topic_name}/subscriptions/{subscription_id}/confirm',
+         subscriptions.ConfirmResource(driver._validate,
+                                       subscription_controller,
+                                       conf)),
     ]
 
 
diff --git a/zaqar/transport/wsgi/v2_0/topic.py b/zaqar/transport/wsgi/v2_0/topic.py
new file mode 100644
index 000000000..c8171aad4
--- /dev/null
+++ b/zaqar/transport/wsgi/v2_0/topic.py
@@ -0,0 +1,333 @@
+# Copyright (c) 2019 Rackspace, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import falcon
+from oslo_log import log as logging
+import six
+
+from zaqar.common import decorators
+from zaqar.i18n import _
+from zaqar.storage import errors as storage_errors
+from zaqar.transport import acl
+from zaqar.transport import utils
+from zaqar.transport import validation
+from zaqar.transport.wsgi import errors as wsgi_errors
+from zaqar.transport.wsgi import utils as wsgi_utils
+
+LOG = logging.getLogger(__name__)
+
+
+def _get_reserved_metadata(validate):
+    _reserved_metadata = ['max_messages_post_size', 'default_message_ttl',
+                          'default_message_delay']
+    reserved_metadata = {
+        '_%s' % meta:
+            validate.get_limit_conf_value(meta)
+        for meta in _reserved_metadata
+    }
+
+    return reserved_metadata
+
+
+class ItemResource(object):
+
+    __slots__ = ('_validate', '_topic_controller', '_message_controller',
+                 '_reserved_metadata')
+
+    def __init__(self, validate, topic_controller, message_controller):
+        self._validate = validate
+        self._topic_controller = topic_controller
+        self._message_controller = message_controller
+
+    @decorators.TransportLog("Topics item")
+    @acl.enforce("topics:get")
+    def on_get(self, req, resp, project_id, topic_name):
+        try:
+            resp_dict = self._topic_controller.get(topic_name,
+                                                   project=project_id)
+            for meta, value in _get_reserved_metadata(self._validate).items():
+                if not resp_dict.get(meta):
+                    resp_dict[meta] = value
+        except storage_errors.DoesNotExist as ex:
+            LOG.debug(ex)
+            raise wsgi_errors.HTTPNotFound(six.text_type(ex))
+
+        except Exception as ex:
+            LOG.exception(ex)
+            description = _(u'Topic metadata could not be retrieved.')
+            raise wsgi_errors.HTTPServiceUnavailable(description)
+
+        resp.body = utils.to_json(resp_dict)
+        # status defaults to 200
+
+    @decorators.TransportLog("Topics item")
+    @acl.enforce("topics:create")
+    def on_put(self, req, resp, project_id, topic_name):
+        try:
+            # Place JSON size restriction before parsing
+            self._validate.queue_metadata_length(req.content_length)
+            # Deserialize Topic metadata
+            metadata = None
+            if req.content_length:
+                document = wsgi_utils.deserialize(req.stream,
+                                                  req.content_length)
+                metadata = wsgi_utils.sanitize(document)
+            self._validate.queue_metadata_putting(metadata)
+        except validation.ValidationFailed as ex:
+            LOG.debug(ex)
+            raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
+
+        try:
+            created = self._topic_controller.create(topic_name,
+                                                    metadata=metadata,
+                                                    project=project_id)
+
+        except storage_errors.FlavorDoesNotExist as ex:
+            LOG.exception(ex)
+            raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
+        except Exception as ex:
+            LOG.exception(ex)
+            description = _(u'Topic could not be created.')
+            raise wsgi_errors.HTTPServiceUnavailable(description)
+
+        resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
+        resp.location = req.path
+
+    @decorators.TransportLog("Topics item")
+    @acl.enforce("topics:delete")
+    def on_delete(self, req, resp, project_id, topic_name):
+        LOG.debug(u'Topic item DELETE - topic: %(topic)s, '
+                  u'project: %(project)s',
+                  {'topic': topic_name, 'project': project_id})
+        try:
+            self._topic_controller.delete(topic_name, project=project_id)
+
+        except Exception as ex:
+            LOG.exception(ex)
+            description = _(u'Topic could not be deleted.')
+            raise wsgi_errors.HTTPServiceUnavailable(description)
+
+        resp.status = falcon.HTTP_204
+
+    @decorators.TransportLog("Topics item")
+    @acl.enforce("topics:update")
+    def on_patch(self, req, resp, project_id, topic_name):
+        """Allows one to update a topic's metadata.
+
+        This method expects the user to submit a JSON object. There is also
+        strict format checking through the use of
+        jsonschema. Appropriate errors are returned in each case for
+        badly formatted input.
+
+        :returns: HTTP | 200,400,409,503
+        """
+        LOG.debug(u'PATCH topic - name: %s', topic_name)
+
+        try:
+            # Place JSON size restriction before parsing
+            self._validate.queue_metadata_length(req.content_length)
+        except validation.ValidationFailed as ex:
+            LOG.debug(ex)
+            raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex))
+
+        # NOTE(flwang): See below link to get more details about draft 10,
+        # tools.ietf.org/html/draft-ietf-appsawg-json-patch-10
+        content_types = {
+            'application/openstack-messaging-v2.0-json-patch': 10,
+        }
+
+        if req.content_type not in content_types:
+            headers = {'Accept-Patch':
+                       ', '.join(sorted(content_types.keys()))}
+            msg = _("Accepted media type for PATCH: %s.")
+            LOG.debug(msg, headers)
+            raise wsgi_errors.HTTPUnsupportedMediaType(msg % headers)
+
+        if req.content_length:
+            try:
+                changes = utils.read_json(req.stream, req.content_length)
+                changes = wsgi_utils.sanitize(changes, doctype=list)
+            except utils.MalformedJSON as ex:
+                LOG.debug(ex)
+                description = _(u'Request body could not be parsed.')
+                raise wsgi_errors.HTTPBadRequestBody(description)
+
+            except utils.OverflowedJSONInteger as ex:
+                LOG.debug(ex)
+                description = _(u'JSON contains integer that is too large.')
+                raise wsgi_errors.HTTPBadRequestBody(description)
+
+            except Exception as ex:
+                # Error while reading from the network/server
+                LOG.exception(ex)
+                description = _(u'Request body could not be read.')
+                raise wsgi_errors.HTTPServiceUnavailable(description)
+        else:
+            msg = _("PATCH body could not be empty for update.")
+            LOG.debug(msg)
+            raise wsgi_errors.HTTPBadRequestBody(msg)
+
+        try:
+            changes = self._validate.queue_patching(req, changes)
+
+            # NOTE(Eva-i): using 'get_metadata' instead of 'get', so
+            # QueueDoesNotExist error will be thrown in case of non-existent
+            # queue.
+            metadata = self._topic_controller.get_metadata(topic_name,
+                                                           project=project_id)
+            reserved_metadata = _get_reserved_metadata(self._validate)
+            for change in changes:
+                change_method_name = '_do_%s' % change['op']
+                change_method = getattr(self, change_method_name)
+                change_method(req, metadata, reserved_metadata, change)
+
+            self._validate.queue_metadata_putting(metadata)
+
+            self._topic_controller.set_metadata(topic_name,
+                                                metadata,
+                                                project_id)
+        except storage_errors.DoesNotExist as ex:
+            LOG.debug(ex)
+            raise wsgi_errors.HTTPNotFound(six.text_type(ex))
+        except validation.ValidationFailed as ex:
+            LOG.debug(ex)
+            raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex))
+        except wsgi_errors.HTTPConflict as ex:
+            raise ex
+        except Exception as ex:
+            LOG.exception(ex)
+            description = _(u'Topic could not be updated.')
+            raise wsgi_errors.HTTPServiceUnavailable(description)
+        for meta, value in _get_reserved_metadata(self._validate).items():
+            if not metadata.get(meta):
+                metadata[meta] = value
+        resp.body = utils.to_json(metadata)
+
+    def _do_replace(self, req, metadata, reserved_metadata, change):
+        path = change['path']
+        path_child = path[1]
+        value = change['value']
+        if path_child in metadata or path_child in reserved_metadata:
+            metadata[path_child] = value
+        else:
+            msg = _("Can't replace non-existent object %s.")
+            raise wsgi_errors.HTTPConflict(msg % path_child)
+
+    def _do_add(self, req, metadata, reserved_metadata, change):
+        path = change['path']
+        path_child = path[1]
+        value = change['value']
+        metadata[path_child] = value
+
+    def _do_remove(self, req, metadata, reserved_metadata, change):
+        path = change['path']
+        path_child = path[1]
+        if path_child in metadata:
+            metadata.pop(path_child)
+        elif path_child not in reserved_metadata:
+            msg = _("Can't remove non-existent object %s.")
+            raise wsgi_errors.HTTPConflict(msg % path_child)
+
+
+class CollectionResource(object):
+
+    __slots__ = ('_topic_controller', '_validate', '_reserved_metadata')
+
+    def __init__(self, validate, topic_controller):
+        self._topic_controller = topic_controller
+        self._validate = validate
+
+    def _topic_list(self, project_id, path, kfilter, **kwargs):
+        try:
+            self._validate.queue_listing(**kwargs)
+            results = self._topic_controller.list(project=project_id,
+                                                  kfilter=kfilter, **kwargs)
+
+            # Buffer list of topics
+            topics = list(next(results))
+
+        except validation.ValidationFailed as ex:
+            LOG.debug(ex)
+            raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
+
+        except Exception as ex:
+            LOG.exception(ex)
+            description = _(u'Topics could not be listed.')
+            raise wsgi_errors.HTTPServiceUnavailable(description)
+
+        # Got some. Prepare the response.
+        kwargs['marker'] = next(results) or kwargs.get('marker', '')
+        reserved_metadata = _get_reserved_metadata(self._validate).items()
+        for each_topic in topics:
+            each_topic['href'] = path + '/' + each_topic['name']
+            if kwargs.get('detailed'):
+                for meta, value in reserved_metadata:
+                    if not each_topic.get('metadata', {}).get(meta):
+                        each_topic['metadata'][meta] = value
+
+        return topics, kwargs['marker']
+
+    def _on_get_with_kfilter(self, req, resp, project_id, kfilter={}):
+        kwargs = {}
+
+        # NOTE(kgriffs): This syntax ensures that
+        # we don't clobber default values with None.
+        req.get_param('marker', store=kwargs)
+        req.get_param_as_int('limit', store=kwargs)
+        req.get_param_as_bool('detailed', store=kwargs)
+        req.get_param('name', store=kwargs)
+
+        topics, marker = self._topic_list(project_id,
+                                          req.path, kfilter, **kwargs)
+
+        links = []
+        kwargs['marker'] = marker
+        if topics:
+            links = [
+                {
+                    'rel': 'next',
+                    'href': req.path + falcon.to_query_str(kwargs)
+                }
+            ]
+
+        response_body = {
+            'topics': topics,
+            'links': links
+        }
+
+        resp.body = utils.to_json(response_body)
+        # status defaults to 200
+
+    @decorators.TransportLog("Topics collection")
+    @acl.enforce("topics:get_all")
+    def on_get(self, req, resp, project_id):
+        field = ('marker', 'limit', 'detailed', 'name')
+        kfilter = copy.deepcopy(req.params)
+
+        for key in req.params.keys():
+            if key in field:
+                kfilter.pop(key)
+
+        kfilter = kfilter if len(kfilter) > 0 else {}
+        for key in kfilter.keys():
+            # Since we get the filter value from URL, so need to
+            # turn the string to integer if using integer filter value.
+            try:
+                kfilter[key] = int(kfilter[key])
+            except ValueError:
+                continue
+        self._on_get_with_kfilter(req, resp, project_id, kfilter)
+        # status defaults to 200
diff --git a/zaqar/transport/wsgi/v2_0/topic_purge.py b/zaqar/transport/wsgi/v2_0/topic_purge.py
new file mode 100644
index 000000000..3869616bb
--- /dev/null
+++ b/zaqar/transport/wsgi/v2_0/topic_purge.py
@@ -0,0 +1,82 @@
+# Copyright 2019 Catalyst IT Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License.  You may obtain a copy
+# of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+import falcon
+
+from oslo_log import log as logging
+import six
+
+from zaqar.common import decorators
+from zaqar.i18n import _
+from zaqar.transport import acl
+from zaqar.transport import validation
+from zaqar.transport.wsgi import errors as wsgi_errors
+from zaqar.transport.wsgi import utils as wsgi_utils
+
+LOG = logging.getLogger(__name__)
+
+
+class Resource(object):
+
+    __slots__ = ('_driver', '_conf',
+                 '_message_ctrl', '_subscription_ctrl', '_validate')
+
+    def __init__(self, driver):
+        self._driver = driver
+        self._conf = driver._conf
+        self._message_ctrl = driver._storage.message_controller
+        self._subscription_ctrl = driver._storage.subscription_controller
+        self._validate = driver._validate
+
+    @decorators.TransportLog("Topics item")
+    @acl.enforce("topics:purge")
+    def on_post(self, req, resp, project_id, topic_name):
+        try:
+            if req.content_length:
+                document = wsgi_utils.deserialize(req.stream,
+                                                  req.content_length)
+                self._validate.queue_purging(document)
+            else:
+                document = {'resource_types': ['messages', 'subscriptions']}
+        except validation.ValidationFailed as ex:
+            LOG.debug(ex)
+            raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
+
+        try:
+            if "messages" in document['resource_types']:
+                pop_limit = 100
+                LOG.debug("Purge all messages under topic %s", topic_name)
+                messages = self._message_ctrl.pop(topic_name, pop_limit,
+                                                  project=project_id)
+                while messages:
+                    messages = self._message_ctrl.pop(topic_name, pop_limit,
+                                                      project=project_id)
+
+            if "subscriptions" in document['resource_types']:
+                LOG.debug("Purge all subscriptions under topic %s", topic_name)
+                results = self._subscription_ctrl.list(topic_name,
+                                                       project=project_id)
+                subscriptions = list(next(results))
+                for sub in subscriptions:
+                    self._subscription_ctrl.delete(topic_name,
+                                                   sub['id'],
+                                                   project=project_id)
+        except ValueError as err:
+            raise wsgi_errors.HTTPBadRequestAPI(str(err))
+        except Exception as ex:
+            LOG.exception(ex)
+            description = _(u'Topic could not be purged.')
+            raise wsgi_errors.HTTPServiceUnavailable(description)
+
+        resp.status = falcon.HTTP_204
diff --git a/zaqar/transport/wsgi/v2_0/topic_stats.py b/zaqar/transport/wsgi/v2_0/topic_stats.py
new file mode 100644
index 000000000..89c6c4f5c
--- /dev/null
+++ b/zaqar/transport/wsgi/v2_0/topic_stats.py
@@ -0,0 +1,78 @@
+# Copyright (c) 2019 Rackspace, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from oslo_log import log as logging
+import six
+
+from zaqar.common import decorators
+from zaqar.i18n import _
+from zaqar.storage import errors as storage_errors
+from zaqar.transport import acl
+from zaqar.transport import utils
+from zaqar.transport.wsgi import errors as wsgi_errors
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Resource(object):
+
+    __slots__ = '_topic_ctrl'
+
+    def __init__(self, topic_controller):
+        self._topic_ctrl = topic_controller
+
+    @decorators.TransportLog("Topics stats item")
+    @acl.enforce("topics:stats")
+    def on_get(self, req, resp, project_id, topic_name):
+        try:
+            resp_dict = self._topic_ctrl.stats(topic_name,
+                                               project=project_id)
+
+            message_stats = resp_dict['messages']
+
+            if message_stats['total'] != 0:
+                base_path = req.path[:req.path.rindex('/')] + '/messages/'
+
+                newest = message_stats['newest']
+                newest['href'] = base_path + newest['id']
+                del newest['id']
+
+                oldest = message_stats['oldest']
+                oldest['href'] = base_path + oldest['id']
+                del oldest['id']
+
+            resp.body = utils.to_json(resp_dict)
+            # status defaults to 200
+
+        except (storage_errors.TopicDoesNotExist,
+                storage_errors.TopicIsEmpty) as ex:
+            resp_dict = {
+                'messages': {
+                    'claimed': 0,
+                    'free': 0,
+                    'total': 0
+                }
+            }
+            resp.body = utils.to_json(resp_dict)
+
+        except storage_errors.DoesNotExist as ex:
+            LOG.debug(ex)
+            raise wsgi_errors.HTTPNotFound(six.text_type(ex))
+
+        except Exception as ex:
+            LOG.exception(ex)
+            description = _(u'Topic stats could not be read.')
+            raise wsgi_errors.HTTPServiceUnavailable(description)