# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# A derivative of a dnspython VersionedZone and related classes, using a BTreeDict and
# a separate per-version delegation index. These additions let us
#
# 1) Do efficient CoW versioning (useful for future online updates).
# 2) Maintain sort order
# 3) Allow delegations to be found easily
# 4) Handle glue
# 5) Add Node flags ORIGIN, DELEGATION, and GLUE whenever relevant. The ORIGIN
# flag is set at the origin node, the DELEGATION FLAG is set at delegation
# points, and the GLUE flag is set on nodes beneath delegation points.
import enum
from collections.abc import Callable, MutableMapping
from dataclasses import dataclass
from typing import cast
import dns.btree
import dns.immutable
import dns.name
import dns.node
import dns.rdataclass
import dns.rdataset
import dns.rdatatype
import dns.versioned
import dns.zone
[docs]
class NodeFlags(enum.IntFlag):
"""Flags that classify a node's role in the zone.
``ORIGIN`` is set on the zone origin node.
``DELEGATION`` is set at NS delegation points (not at the origin, and not
on nodes beneath a delegation).
``GLUE`` is set on nodes that are proper subdomains of a delegation point.
"""
ORIGIN = 0x01
DELEGATION = 0x02
GLUE = 0x04
[docs]
class Node(dns.node.Node):
"""A BTree zone node, extending :py:class:`dns.node.Node` with ``flags`` and
``id`` fields.
.. attribute:: flags
The node's role flags.
:type: :py:class:`dns.btreezone.NodeFlags`
.. attribute:: id
The version id of the last write that touched this node.
:type: int
"""
__slots__ = ["flags", "id"]
def __init__(self, flags: NodeFlags | None = None):
super().__init__()
if flags is None:
# We allow optional flags rather than a default
# as pyright doesn't like assigning a literal 0
# to flags.
flags = NodeFlags(0)
self.flags = flags
self.id = 0
[docs]
def is_delegation(self):
"""Return ``True`` if this node is an NS delegation point.
:rtype: bool
"""
return (self.flags & NodeFlags.DELEGATION) != 0
[docs]
def is_glue(self):
"""Return ``True`` if this node is beneath a delegation point.
:rtype: bool
"""
return (self.flags & NodeFlags.GLUE) != 0
[docs]
def is_origin(self):
"""Return ``True`` if this node is the zone origin.
:rtype: bool
"""
return (self.flags & NodeFlags.ORIGIN) != 0
[docs]
def is_origin_or_glue(self):
"""Return ``True`` if this node is at the origin or beneath a delegation.
:rtype: bool
"""
return (self.flags & (NodeFlags.ORIGIN | NodeFlags.GLUE)) != 0
[docs]
@dns.immutable.immutable
class ImmutableNode(Node):
"""An immutable :py:class:`dns.btreezone.Node`.
Mutation methods raise :py:exc:`TypeError`.
"""
def __init__(self, node: Node):
super().__init__()
self.id = node.id
self.rdatasets = tuple( # pyright: ignore
[dns.rdataset.ImmutableRdataset(rds) for rds in node.rdatasets]
)
self.flags = node.flags
def find_rdataset(
self,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
create: bool = False,
) -> dns.rdataset.Rdataset:
if create:
raise TypeError("immutable")
return super().find_rdataset(rdclass, rdtype, covers, False)
def get_rdataset(
self,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
create: bool = False,
) -> dns.rdataset.Rdataset | None:
if create:
raise TypeError("immutable")
return super().get_rdataset(rdclass, rdtype, covers, False)
def delete_rdataset(
self,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
) -> None:
raise TypeError("immutable")
def replace_rdataset(self, replacement: dns.rdataset.Rdataset) -> None:
raise TypeError("immutable")
def is_immutable(self) -> bool:
return True
[docs]
class Delegations(dns.btree.BTreeSet[dns.name.Name]):
"""A sorted set of delegation-point names.
Used by :py:class:`dns.btreezone.WritableVersion` and
:py:class:`dns.btreezone.ImmutableVersion` to efficiently determine
whether a given name is at or beneath a delegation point.
"""
[docs]
def get_delegation(self, name: dns.name.Name) -> tuple[dns.name.Name | None, bool]:
"""Get the delegation applicable to *name*, if it exists.
:returns: A tuple of the delegation point name and a boolean which is
``True`` if *name* is a proper subdomain of the delegation point,
or ``False`` if it is equal to the delegation point. If there is
no applicable delegation, returns ``(None, False)``.
:rtype: tuple[:py:class:`dns.name.Name` or ``None``, bool]
"""
cursor = self.cursor()
cursor.seek(name, before=False)
prev = cursor.prev()
if prev is None:
return None, False
cut = prev.key()
reln, _, _ = name.fullcompare(cut)
is_subdomain = reln == dns.name.NameRelation.SUBDOMAIN
if is_subdomain or reln == dns.name.NameRelation.EQUAL:
return cut, is_subdomain
else:
return None, False
[docs]
def is_glue(self, name: dns.name.Name) -> bool:
"""Is *name* glue, i.e. is it beneath a delegation?"""
cursor = self.cursor()
cursor.seek(name, before=False)
cut, is_subdomain = self.get_delegation(name)
if cut is None:
return False
return is_subdomain
[docs]
class WritableVersion(dns.zone.WritableVersion):
"""A mutable version of a :py:class:`dns.btreezone.Zone`.
Extends :py:class:`dns.zone.WritableVersion` with a
:py:class:`dns.btreezone.Delegations` index and automatic management of
``NodeFlags.ORIGIN``, ``NodeFlags.DELEGATION``, and ``NodeFlags.GLUE``
flags on every node.
Instances are created internally by the zone; callers should not
construct them directly.
"""
def __init__(self, zone: dns.zone.Zone, replacement: bool = False):
super().__init__(zone, True)
if not replacement:
assert isinstance(zone, dns.versioned.Zone)
version = zone._versions[-1]
self.nodes: dns.btree.BTreeDict[dns.name.Name, Node] = dns.btree.BTreeDict[
dns.name.Name, Node
](
original=version.nodes # type: ignore
)
self.delegations = Delegations(original=version.delegations) # type: ignore
else:
self.delegations = Delegations()
def _is_origin(self, name: dns.name.Name) -> bool:
# Assumes name has already been validated (and thus adjusted to the right
# relativity too)
if self.zone.relativize:
return name == dns.name.empty
else:
return name == self.zone.origin
def _maybe_cow_with_name(
self, name: dns.name.Name
) -> tuple[dns.node.Node, dns.name.Name]:
node, name = super()._maybe_cow_with_name(name)
node = cast(Node, node)
if self._is_origin(name):
node.flags |= NodeFlags.ORIGIN
elif self.delegations.is_glue(name):
node.flags |= NodeFlags.GLUE
return (node, name)
[docs]
def update_glue_flag(self, name: dns.name.Name, is_glue: bool) -> None:
"""Set or clear the ``NodeFlags.GLUE`` flag on all nodes that are
subdomains of *name*.
:param name: The delegation-point name whose subtree should be updated.
:type name: :py:class:`dns.name.Name`
:param is_glue: ``True`` to set the GLUE flag; ``False`` to clear it.
:type is_glue: bool
"""
cursor = self.nodes.cursor() # pyright: ignore
cursor.seek(name, False)
updates = []
while True:
elt = cursor.next()
if elt is None:
break
ename = elt.key()
if not ename.is_subdomain(name):
break
node = cast(dns.node.Node, elt.value())
if ename not in self.changed:
new_node = self.zone.node_factory()
new_node.id = self.id # type: ignore
new_node.rdatasets.extend(node.rdatasets)
self.changed.add(ename)
node = new_node
assert isinstance(node, Node)
if is_glue:
node.flags |= NodeFlags.GLUE
else:
node.flags &= ~NodeFlags.GLUE
# We don't update node here as any insertion could disturb the
# btree and invalidate our cursor. We could use the cursor in a
# with block and avoid this, but it would do a lot of parking and
# unparking so the deferred update mode may still be better.
updates.append((ename, node))
for ename, node in updates:
self.nodes[ename] = node
[docs]
def delete_node(self, name: dns.name.Name) -> None:
"""Delete the node at *name*, updating delegation tracking as needed.
If *name* is a delegation point, it is removed from the delegations
index and the GLUE flag is cleared from its subtree. If *name* does
not exist in the zone, this method is a no-op.
:param name: The name of the node to delete.
:type name: :py:class:`dns.name.Name`
"""
name = self._validate_name(name)
node = self.nodes.get(name)
if node is not None:
if node.is_delegation(): # pyright: ignore
self.delegations.discard(name)
self.update_glue_flag(name, False)
del self.nodes[name]
self.changed.add(name)
[docs]
def put_rdataset(
self, name: dns.name.Name, rdataset: dns.rdataset.Rdataset
) -> None:
"""Store *rdataset* at *name*, updating delegation flags as needed.
If *rdataset* is an NS rdataset and *name* is not the origin or beneath
an existing delegation, the ``DELEGATION`` flag is set on the node and
the ``GLUE`` flag is set on all nodes in *name*'s subtree.
:param name: The owner name.
:type name: :py:class:`dns.name.Name`
:param rdataset: The rdataset to store.
:type rdataset: :py:class:`dns.rdataset.Rdataset`
"""
node, name = self._maybe_cow_with_name(name)
if (
rdataset.rdtype == dns.rdatatype.NS
and not node.is_origin_or_glue() # type: ignore
):
node.flags |= NodeFlags.DELEGATION # type: ignore
if name not in self.delegations:
self.delegations.add(name)
self.update_glue_flag(name, True)
node.replace_rdataset(rdataset)
[docs]
def delete_rdataset(
self,
name: dns.name.Name,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType,
) -> None:
"""Delete the rdataset with *rdtype* and *covers* at *name*.
If the deleted rdataset was the NS rdataset at a delegation point,
the ``DELEGATION`` flag is cleared from that node and the ``GLUE``
flag is cleared from all nodes in its subtree.
:param name: The owner name.
:type name: :py:class:`dns.name.Name`
:param rdtype: The rdata type to remove.
:type rdtype: :py:class:`dns.rdatatype.RdataType`
:param covers: The covered type (usually ``dns.rdatatype.NONE``).
:type covers: :py:class:`dns.rdatatype.RdataType`
"""
node, name = self._maybe_cow_with_name(name)
if rdtype == dns.rdatatype.NS and name in self.delegations: # pyright: ignore
node.flags &= ~NodeFlags.DELEGATION # type: ignore
self.delegations.discard(name) # pyright: ignore
self.update_glue_flag(name, False)
node.delete_rdataset(self.zone.rdclass, rdtype, covers)
if len(node) == 0:
del self.nodes[name]
[docs]
@dataclass(frozen=True)
class Bounds:
"""The result of a :py:meth:`~dns.btreezone.ImmutableVersion.bounds` query.
Useful for constructing authoritative responses and for on-the-fly DNSSEC
signatures.
.. attribute:: name
The queried name.
:type: :py:class:`dns.name.Name`
.. attribute:: left
The greatest name in the zone that is less than or equal to ``name``.
:type: :py:class:`dns.name.Name`
.. attribute:: right
The least name in the zone that is greater than ``name``, or ``None``
if ``name`` is greater than every name in the zone.
:type: :py:class:`dns.name.Name` or ``None``
.. attribute:: closest_encloser
The name with the greatest number of labels that is a common ancestor
of ``name`` and is present in the zone (explicitly or as an implied
empty non-terminal).
:type: :py:class:`dns.name.Name`
.. attribute:: is_equal
``True`` if ``name`` is present in the zone (i.e. ``name == left``).
:type: bool
.. attribute:: is_delegation
``True`` if the left bound is a delegation point.
:type: bool
"""
name: dns.name.Name
left: dns.name.Name
right: dns.name.Name | None
closest_encloser: dns.name.Name
is_equal: bool
is_delegation: bool
def __str__(self):
if self.is_equal:
op = "="
else:
op = "<"
if self.is_delegation:
zonecut = " zonecut"
else:
zonecut = ""
return (
f"{self.left} {op} {self.name} < {self.right}{zonecut}; "
f"{self.closest_encloser}"
)
[docs]
@dns.immutable.immutable
class ImmutableVersion(dns.zone.Version):
"""An immutable, committed version of a :py:class:`dns.btreezone.Zone`.
In addition to the standard read-only zone API, provides the
:py:meth:`bounds` method for DNSSEC and authoritative-response support.
Instances are created internally when a
:py:class:`dns.btreezone.WritableVersion` is committed; callers should
not construct them directly.
"""
def __init__(self, version: dns.zone.Version):
if not isinstance(version, WritableVersion):
raise ValueError(
"a dns.btreezone.ImmutableVersion requires a "
"dns.btreezone.WritableVersion"
)
super().__init__(version.zone, True)
self.id = version.id
self.origin = version.origin
for name in version.changed:
node = version.nodes.get(name)
if node:
version.nodes[name] = ImmutableNode(node)
self.nodes = cast(MutableMapping[dns.name.Name, dns.node.Node], version.nodes)
self.nodes.make_immutable() # type: ignore
self.delegations = version.delegations
self.delegations.make_immutable()
[docs]
def bounds(self, name: dns.name.Name | str) -> Bounds:
"""Return the bounds of *name* in its zone.
The bounds information is useful when making an authoritative response, as
it can be used to determine whether the query name is at or beneath a delegation
point. The other data in the :py:class:`dns.btreezone.Bounds` object is useful
for making on-the-fly DNSSEC signatures.
The left bound of *name* is *name* itself if it is in the zone, or the greatest
predecessor which is in the zone.
The right bound of *name* is the least successor of *name*, or ``None`` if
no name in the zone is greater than *name*.
The closest encloser of *name* is *name* itself, if *name* is in the zone;
otherwise it is the name with the largest number of labels in common with
*name* that is in the zone, either explicitly or by the implied existence
of empty non-terminals.
The *is_equal* field of the result is ``True`` if and only if *name* is equal
to its left bound.
The *is_delegation* field of the result is ``True`` if and only if the left
bound is a delegation point.
:param name: The name to look up.
:type name: :py:class:`dns.name.Name` or str
:rtype: :py:class:`dns.btreezone.Bounds`
"""
assert self.origin is not None
# validate the origin because we may need to relativize
origin = self.zone._validate_name(self.origin)
name = self.zone._validate_name(name)
cut, _ = self.delegations.get_delegation(name)
if cut is not None:
target = cut
is_delegation = True
else:
target = name
is_delegation = False
c = cast(dns.btree.BTreeDict, self.nodes).cursor()
c.seek(target, False)
left = c.prev()
assert left is not None
c.next() # skip over left
while True:
right = c.next()
if right is None or not right.value().is_glue():
break
left_comparison = left.key().fullcompare(name)
if right is not None:
right_key = right.key()
right_comparison = right_key.fullcompare(name)
else:
right_comparison = (
dns.name.NAMERELN_COMMONANCESTOR,
-1,
len(origin),
)
right_key = None
closest_encloser = dns.name.Name(
name[-max(left_comparison[2], right_comparison[2]) :]
)
return Bounds(
name,
left.key(),
right_key,
closest_encloser,
left_comparison[0] == dns.name.NameRelation.EQUAL,
is_delegation,
)
[docs]
class Zone(dns.versioned.Zone):
"""A versioned DNS zone backed by a BTree.
Extends :py:class:`dns.versioned.Zone` with:
- **Sorted iteration order**: names are always visited in DNS canonical order.
- **Automatic flag tracking**: every node is tagged with
:py:class:`dns.btreezone.NodeFlags` (``ORIGIN``, ``DELEGATION``,
``GLUE``) as rdatasets are added and removed.
- **Efficient copy-on-write versioning**: the underlying
:py:class:`dns.btree.BTreeDict` shares structure between versions so
that creating a new version is cheap.
- **DNSSEC / authoritative-response support**: committed versions expose
:py:meth:`~dns.btreezone.ImmutableVersion.bounds`, which returns the
nearest names and closest encloser for any query name.
"""
node_factory: Callable[[], dns.node.Node] = Node
map_factory: Callable[[], MutableMapping[dns.name.Name, dns.node.Node]] = cast(
Callable[[], MutableMapping[dns.name.Name, dns.node.Node]],
dns.btree.BTreeDict[dns.name.Name, Node],
)
writable_version_factory: (
Callable[[dns.zone.Zone, bool], dns.zone.Version] | None
) = WritableVersion
immutable_version_factory: Callable[[dns.zone.Version], dns.zone.Version] | None = (
ImmutableVersion
)