# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2011 OpenStack LLC. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # Only (de)serialization utils hasn't been removed to decrease requirements # number. """Utility methods for working with WSGI servers.""" import datetime from xml.dom import minidom from xml.parsers import expat from muranorepository.openstack.common import exception from muranorepository.openstack.common.gettextutils import _ from muranorepository.openstack.common import jsonutils from muranorepository.openstack.common import log as logging from muranorepository.openstack.common import xmlutils LOG = logging.getLogger(__name__) class ActionDispatcher(object): """Maps method name to local methods through action name.""" def dispatch(self, *args, **kwargs): """Find and call local method.""" action = kwargs.pop('action', 'default') action_method = getattr(self, str(action), self.default) return action_method(*args, **kwargs) def default(self, data): raise NotImplementedError() class DictSerializer(ActionDispatcher): """Default request body serialization""" def serialize(self, data, action='default'): return self.dispatch(data, action=action) def default(self, data): return "" class JSONDictSerializer(DictSerializer): """Default JSON request body serialization""" def default(self, data): def sanitizer(obj): if isinstance(obj, datetime.datetime): _dtime = obj - datetime.timedelta(microseconds=obj.microsecond) return _dtime.isoformat() return unicode(obj) return jsonutils.dumps(data, default=sanitizer) class XMLDictSerializer(DictSerializer): def __init__(self, metadata=None, xmlns=None): """ :param metadata: information needed to deserialize xml into a dictionary. :param xmlns: XML namespace to include with serialized xml """ super(XMLDictSerializer, self).__init__() self.metadata = metadata or {} self.xmlns = xmlns def default(self, data): # We expect data to contain a single key which is the XML root. root_key = data.keys()[0] doc = minidom.Document() node = self._to_xml_node(doc, self.metadata, root_key, data[root_key]) return self.to_xml_string(node) def to_xml_string(self, node, has_atom=False): self._add_xmlns(node, has_atom) return node.toprettyxml(indent=' ', encoding='UTF-8') #NOTE (ameade): the has_atom should be removed after all of the # xml serializers and view builders have been updated to the current # spec that required all responses include the xmlns:atom, the has_atom # flag is to prevent current tests from breaking def _add_xmlns(self, node, has_atom=False): if self.xmlns is not None: node.setAttribute('xmlns', self.xmlns) if has_atom: node.setAttribute('xmlns:atom', "http://www.w3.org/2005/Atom") def _to_xml_node(self, doc, metadata, nodename, data): """Recursive method to convert data members to XML nodes.""" result = doc.createElement(nodename) # Set the xml namespace if one is specified # TODO(justinsb): We could also use prefixes on the keys xmlns = metadata.get('xmlns', None) if xmlns: result.setAttribute('xmlns', xmlns) #TODO(bcwaldon): accomplish this without a type-check if type(data) is list: collections = metadata.get('list_collections', {}) if nodename in collections: metadata = collections[nodename] for item in data: node = doc.createElement(metadata['item_name']) node.setAttribute(metadata['item_key'], str(item)) result.appendChild(node) return result singular = metadata.get('plurals', {}).get(nodename, None) if singular is None: if nodename.endswith('s'): singular = nodename[:-1] else: singular = 'item' for item in data: node = self._to_xml_node(doc, metadata, singular, item) result.appendChild(node) #TODO(bcwaldon): accomplish this without a type-check elif type(data) is dict: collections = metadata.get('dict_collections', {}) if nodename in collections: metadata = collections[nodename] for k, v in data.items(): node = doc.createElement(metadata['item_name']) node.setAttribute(metadata['item_key'], str(k)) text = doc.createTextNode(str(v)) node.appendChild(text) result.appendChild(node) return result attrs = metadata.get('attributes', {}).get(nodename, {}) for k, v in data.items(): if k in attrs: result.setAttribute(k, str(v)) else: node = self._to_xml_node(doc, metadata, k, v) result.appendChild(node) else: # Type is atom node = doc.createTextNode(str(data)) result.appendChild(node) return result def _create_link_nodes(self, xml_doc, links): link_nodes = [] for link in links: link_node = xml_doc.createElement('atom:link') link_node.setAttribute('rel', link['rel']) link_node.setAttribute('href', link['href']) if 'type' in link: link_node.setAttribute('type', link['type']) link_nodes.append(link_node) return link_nodes class TextDeserializer(ActionDispatcher): """Default request body deserialization""" def deserialize(self, datastring, action='default'): return self.dispatch(datastring, action=action) def default(self, datastring): return {} class JSONDeserializer(TextDeserializer): def _from_json(self, datastring): try: return jsonutils.loads(datastring) except ValueError: msg = _("cannot understand JSON") raise exception.MalformedRequestBody(reason=msg) def default(self, datastring): return {'body': self._from_json(datastring)} class XMLDeserializer(TextDeserializer): def __init__(self, metadata=None): """ :param metadata: information needed to deserialize xml into a dictionary. """ super(XMLDeserializer, self).__init__() self.metadata = metadata or {} def _from_xml(self, datastring): plurals = set(self.metadata.get('plurals', {})) try: node = xmlutils.safe_minidom_parse_string(datastring).childNodes[0] return {node.nodeName: self._from_xml_node(node, plurals)} except expat.ExpatError: msg = _("cannot understand XML") raise exception.MalformedRequestBody(reason=msg) def _from_xml_node(self, node, listnames): """Convert a minidom node to a simple Python type. :param listnames: list of XML node names whose subnodes should be considered list items. """ if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3: return node.childNodes[0].nodeValue elif node.nodeName in listnames: return [self._from_xml_node(n, listnames) for n in node.childNodes] else: result = dict() for attr in node.attributes.keys(): result[attr] = node.attributes[attr].nodeValue for child in node.childNodes: if child.nodeType != node.TEXT_NODE: result[child.nodeName] = self._from_xml_node(child, listnames) return result def find_first_child_named(self, parent, name): """Search a nodes children for the first child with a given name""" for node in parent.childNodes: if node.nodeName == name: return node return None def find_children_named(self, parent, name): """Return all of a nodes children who have the given name""" for node in parent.childNodes: if node.nodeName == name: yield node def extract_text(self, node): """Get the text field contained by the given node""" if len(node.childNodes) == 1: child = node.childNodes[0] if child.nodeType == child.TEXT_NODE: return child.nodeValue return "" def default(self, datastring): return {'body': self._from_xml(datastring)}