osmapi.dom

View Source
from datetime import datetime
import xml.dom.minidom
import xml.parsers.expat
import logging

from . import errors
from . import xmlbuilder


logger = logging.getLogger(__name__)


def OsmResponseToDom(response, tag, single=False, allow_empty=False):
    """
    Returns the (sub-) DOM parsed from an OSM response
    """
    try:
        dom = xml.dom.minidom.parseString(response)
        osm_dom = dom.getElementsByTagName("osm")[0]
        all_data = osm_dom.getElementsByTagName(tag)
        first_element = all_data[0]
    except (IndexError) as e:
        if allow_empty:
            return []
        raise errors.XmlResponseInvalidError(
            "The XML response from the OSM API is invalid: %r" % e
        )
    except (xml.parsers.expat.ExpatError) as e:
        raise errors.XmlResponseInvalidError(
            "The XML response from the OSM API is invalid: %r" % e
        )

    if single:
        return first_element
    return all_data


def DomParseNode(DomElement):
    """
    Returns NodeData for the node.
    """
    result = _DomGetAttributes(DomElement)
    result["tag"] = _DomGetTag(DomElement)
    return result


def DomParseWay(DomElement):
    """
    Returns WayData for the way.
    """
    result = _DomGetAttributes(DomElement)
    result["tag"] = _DomGetTag(DomElement)
    result["nd"] = _DomGetNd(DomElement)
    return result


def DomParseRelation(DomElement):
    """
    Returns RelationData for the relation.
    """
    result = _DomGetAttributes(DomElement)
    result["tag"] = _DomGetTag(DomElement)
    result["member"] = _DomGetMember(DomElement)
    return result


def DomParseChangeset(DomElement):
    """
    Returns ChangesetData for the changeset.
    """
    result = _DomGetAttributes(DomElement)
    result["tag"] = _DomGetTag(DomElement)
    result["discussion"] = _DomGetDiscussion(DomElement)

    return result


def DomParseNote(DomElement):
    """
    Returns NoteData for the note.
    """
    result = _DomGetAttributes(DomElement)
    result["id"] = xmlbuilder._GetXmlValue(DomElement, "id")
    result["status"] = xmlbuilder._GetXmlValue(DomElement, "status")

    result["date_created"] = _ParseDate(
        xmlbuilder._GetXmlValue(DomElement, "date_created")
    )
    result["date_closed"] = _ParseDate(
        xmlbuilder._GetXmlValue(DomElement, "date_closed")
    )
    result["comments"] = _DomGetComments(DomElement)

    return result


def _DomGetAttributes(DomElement):
    """
    Returns a formated dictionnary of attributes of a DomElement.
    """

    def is_true(v):
        return (v == "true")

    attribute_mapping = {
        'uid': int,
        'changeset': int,
        'version': int,
        'id': int,
        'lat': float,
        'lon': float,
        'open': is_true,
        'visible': is_true,
        'ref': int,
        'comments_count': int,
        'timestamp': _ParseDate,
        'created_at': _ParseDate,
        'closed_at': _ParseDate,
        'date': _ParseDate,
    }
    result = {}
    for k, v in DomElement.attributes.items():
        try:
            result[k] = attribute_mapping[k](v)
        except KeyError:
            result[k] = v
    return result


def _DomGetTag(DomElement):
    """
    Returns the dictionnary of tags of a DomElement.
    """
    result = {}
    for t in DomElement.getElementsByTagName("tag"):
        k = t.attributes["k"].value
        v = t.attributes["v"].value
        result[k] = v
    return result


def _DomGetNd(DomElement):
    """
    Returns the list of nodes of a DomElement.
    """
    result = []
    for t in DomElement.getElementsByTagName("nd"):
        result.append(int(int(t.attributes["ref"].value)))
    return result


def _DomGetDiscussion(DomElement):
    """
    Returns the dictionnary of comments of a DomElement.
    """
    result = []
    try:
        discussion = DomElement.getElementsByTagName("discussion")[0]
        for t in discussion.getElementsByTagName("comment"):
            comment = _DomGetAttributes(t)
            comment['text'] = xmlbuilder._GetXmlValue(t, "text")
            result.append(comment)
    except IndexError:
        pass
    return result


def _DomGetComments(DomElement):
    """
    Returns the list of comments of a DomElement.
    """
    result = []
    for t in DomElement.getElementsByTagName("comment"):
        comment = {}
        comment['date'] = _ParseDate(xmlbuilder._GetXmlValue(t, "date"))
        comment['action'] = xmlbuilder._GetXmlValue(t, "action")
        comment['text'] = xmlbuilder._GetXmlValue(t, "text")
        comment['html'] = xmlbuilder._GetXmlValue(t, "html")
        comment['uid'] = xmlbuilder._GetXmlValue(t, "uid")
        comment['user'] = xmlbuilder._GetXmlValue(t, "user")
        result.append(comment)
    return result


def _DomGetMember(DomElement):
    """
    Returns a list of relation members.
    """
    result = []
    for m in DomElement.getElementsByTagName("member"):
        result.append(_DomGetAttributes(m))
    return result


def _ParseDate(DateString):
    date_formats = ["%Y-%m-%d %H:%M:%S UTC", "%Y-%m-%dT%H:%M:%SZ"]
    for date_format in date_formats:
        try:
            result = datetime.strptime(DateString, date_format)
            return result
        except (ValueError, TypeError):
            logger.debug(f"{DateString} does not match {date_format}")

    return DateString
#   def OsmResponseToDom(response, tag, single=False, allow_empty=False):
View Source
def OsmResponseToDom(response, tag, single=False, allow_empty=False):
    """
    Returns the (sub-) DOM parsed from an OSM response
    """
    try:
        dom = xml.dom.minidom.parseString(response)
        osm_dom = dom.getElementsByTagName("osm")[0]
        all_data = osm_dom.getElementsByTagName(tag)
        first_element = all_data[0]
    except (IndexError) as e:
        if allow_empty:
            return []
        raise errors.XmlResponseInvalidError(
            "The XML response from the OSM API is invalid: %r" % e
        )
    except (xml.parsers.expat.ExpatError) as e:
        raise errors.XmlResponseInvalidError(
            "The XML response from the OSM API is invalid: %r" % e
        )

    if single:
        return first_element
    return all_data

Returns the (sub-) DOM parsed from an OSM response

#   def DomParseNode(DomElement):
View Source
def DomParseNode(DomElement):
    """
    Returns NodeData for the node.
    """
    result = _DomGetAttributes(DomElement)
    result["tag"] = _DomGetTag(DomElement)
    return result

Returns NodeData for the node.

#   def DomParseWay(DomElement):
View Source
def DomParseWay(DomElement):
    """
    Returns WayData for the way.
    """
    result = _DomGetAttributes(DomElement)
    result["tag"] = _DomGetTag(DomElement)
    result["nd"] = _DomGetNd(DomElement)
    return result

Returns WayData for the way.

#   def DomParseRelation(DomElement):
View Source
def DomParseRelation(DomElement):
    """
    Returns RelationData for the relation.
    """
    result = _DomGetAttributes(DomElement)
    result["tag"] = _DomGetTag(DomElement)
    result["member"] = _DomGetMember(DomElement)
    return result

Returns RelationData for the relation.

#   def DomParseChangeset(DomElement):
View Source
def DomParseChangeset(DomElement):
    """
    Returns ChangesetData for the changeset.
    """
    result = _DomGetAttributes(DomElement)
    result["tag"] = _DomGetTag(DomElement)
    result["discussion"] = _DomGetDiscussion(DomElement)

    return result

Returns ChangesetData for the changeset.

#   def DomParseNote(DomElement):
View Source
def DomParseNote(DomElement):
    """
    Returns NoteData for the note.
    """
    result = _DomGetAttributes(DomElement)
    result["id"] = xmlbuilder._GetXmlValue(DomElement, "id")
    result["status"] = xmlbuilder._GetXmlValue(DomElement, "status")

    result["date_created"] = _ParseDate(
        xmlbuilder._GetXmlValue(DomElement, "date_created")
    )
    result["date_closed"] = _ParseDate(
        xmlbuilder._GetXmlValue(DomElement, "date_closed")
    )
    result["comments"] = _DomGetComments(DomElement)

    return result

Returns NoteData for the note.