# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""DNS Zones."""
import contextlib
import dataclasses
import io
import os
import struct
from collections.abc import Callable, Iterable, Iterator, MutableMapping
from typing import Any, BinaryIO, TextIO, cast
import dns.exception
import dns.immutable
import dns.name
import dns.node
import dns.rdata
import dns.rdataclass
import dns.rdataset
import dns.rdatatype
import dns.rdtypes.ANY.SOA
import dns.rdtypes.ANY.ZONEMD
import dns.rrset
import dns.tokenizer
import dns.transaction
import dns.zonefile
from dns.zonetypes import DigestHashAlgorithm, DigestScheme, _digest_hashers
[docs]
class BadZone(dns.exception.DNSException):
"""The DNS zone is malformed."""
[docs]
class NoSOA(BadZone):
"""The DNS zone has no SOA RR at its origin."""
[docs]
class NoNS(BadZone):
"""The DNS zone has no NS RRset at its origin."""
[docs]
class UnknownOrigin(BadZone):
"""The DNS zone's origin is unknown."""
class UnsupportedDigestScheme(dns.exception.DNSException):
"""The zone digest's scheme is unsupported."""
class UnsupportedDigestHashAlgorithm(dns.exception.DNSException):
"""The zone digest's origin is unsupported."""
class NoDigest(dns.exception.DNSException):
"""The DNS zone has no ZONEMD RRset at its origin."""
class DigestVerificationFailure(dns.exception.DNSException):
"""The ZONEMD digest failed to verify."""
def _validate_name(
name: dns.name.Name,
origin: dns.name.Name | None,
relativize: bool,
) -> dns.name.Name:
# This name validation code is shared by Zone and Version
if origin is None:
# This should probably never happen as other code (e.g.
# _rr_line) will notice the lack of an origin before us, but
# we check just in case!
raise KeyError("no zone origin is defined")
if name.is_absolute():
if not name.is_subdomain(origin):
raise KeyError("name parameter must be a subdomain of the zone origin")
if relativize:
name = name.relativize(origin)
else:
# We have a relative name. Make sure that the derelativized name is
# not too long.
try:
abs_name = name.derelativize(origin)
except dns.name.NameTooLong:
# We map dns.name.NameTooLong to KeyError to be consistent with
# the other exceptions above.
raise KeyError("relative name too long for zone")
if not relativize:
# We have a relative name in a non-relative zone, so use the
# derelativized name.
name = abs_name
return name
[docs]
@dataclasses.dataclass(frozen=True)
class ZoneStyle(dns.node.NodeStyle):
"""Zone text styles.
A :py:class:`dns.zone.ZoneStyle` is also a :py:class:`dns.name.NameStyle`,
:py:class:`dns.rdata.RdataStyle`, :py:class:`dns.rdataset.RdatasetStyle`,
and :py:class:`dns.node.NodeStyle`.
See those classes for a description of their options.
:param bool sorted: If ``True`` (the default), the file will be written
with the names sorted in DNSSEC order from least to greatest.
Otherwise the names will be written in whatever order they happen
to have in the zone's dictionary.
:param nl: The end of line string. If ``None`` (the default), the
output will use the platform's native end-of-line marker (i.e. LF
on POSIX, CRLF on Windows).
:type nl: str or ``None``
:param bool want_origin: If ``True``, emit a ``$ORIGIN`` line at the
start of the output. If ``False`` (the default), do not emit one.
:param bool want_unicode_directive: If ``True`` and the zone has a
non-empty ``unicode`` attribute, then emit a ``$UNICODE`` line in
the output. This directive is not standard, but allows dnspython
and other aware software to read and write Unicode zonefiles without
changing the rendering of names and TXT-like records.
"""
sorted: bool = True
nl: str | None = None
want_origin: bool = False
want_unicode_directive: bool = True
[docs]
class Zone(dns.transaction.TransactionManager):
"""A DNS zone.
A ``Zone`` is a mapping from names to nodes. The zone object may be
treated like a Python dictionary, e.g. ``zone[name]`` will retrieve
the node associated with that name. The *name* may be a
``dns.name.Name object``, or it may be a string. In either case,
if the name is relative it is treated as relative to the origin of
the zone.
"""
node_factory: Callable[[], dns.node.Node] = dns.node.Node
map_factory: Callable[[], MutableMapping[dns.name.Name, dns.node.Node]] = dict
# We only require the version types as "Version" to allow for flexibility, as
# only the version protocol matters
writable_version_factory: Callable[["Zone", bool], "Version"] | None = None
immutable_version_factory: Callable[["Version"], "Version"] | None = None
__slots__ = ["rdclass", "origin", "nodes", "relativize"]
def __init__(
self,
origin: dns.name.Name | str | None,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
relativize: bool = True,
):
"""Initialize a zone object.
:param origin: The origin of the zone. It may be a
:py:class:`dns.name.Name`, a ``str``, or ``None``. If ``None``,
then the zone's origin will be set by the first ``$ORIGIN`` line
in a zone file.
:type origin: :py:class:`dns.name.Name`, str, or ``None``
:param rdclass: The zone's rdata class; the default is class IN.
:type rdclass: :py:class:`dns.rdataclass.RdataClass`
:param bool relativize: Whether domain names are relativized to the
zone's origin. The default is ``True``.
"""
if origin is not None:
if isinstance(origin, str):
origin = dns.name.from_text(origin)
elif not isinstance(origin, dns.name.Name):
raise ValueError("origin parameter must be convertible to a DNS name")
if not origin.is_absolute():
raise ValueError("origin parameter must be an absolute name")
self.origin: dns.name.Name | None = origin
self.rdclass = rdclass
self.nodes: MutableMapping[dns.name.Name, dns.node.Node] = self.map_factory()
self.relativize = relativize
self.unicode: set[str] = set()
def __eq__(self, other):
"""Two zones are equal if they have the same origin, class, and
nodes.
:rtype: bool
"""
if not isinstance(other, Zone):
return False
if (
self.rdclass != other.rdclass
or self.origin != other.origin
or self.nodes != other.nodes
):
return False
return True
def __ne__(self, other):
"""Are two zones not equal?
:rtype: bool
"""
return not self.__eq__(other)
def _validate_name(self, name: dns.name.Name | str) -> dns.name.Name:
# Note that any changes in this method should have corresponding changes
# made in the Version _validate_name() method.
if isinstance(name, str):
name = dns.name.from_text(name, None)
elif not isinstance(name, dns.name.Name):
raise KeyError("name parameter must be convertible to a DNS name")
return _validate_name(name, self.origin, self.relativize)
def __getitem__(self, key):
key = self._validate_name(key)
return self.nodes[key]
def __setitem__(self, key, value):
key = self._validate_name(key)
self.nodes[key] = value
def __delitem__(self, key):
key = self._validate_name(key)
del self.nodes[key]
def __iter__(self):
return self.nodes.__iter__()
def keys(self):
return self.nodes.keys()
def values(self):
return self.nodes.values()
def items(self):
return self.nodes.items()
def get(self, key):
key = self._validate_name(key)
return self.nodes.get(key)
def __contains__(self, key):
key = self._validate_name(key)
return key in self.nodes
[docs]
def find_node(
self, name: dns.name.Name | str, create: bool = False
) -> dns.node.Node:
"""Find a node in the zone, possibly creating it.
:param name: The name of the node to find. The value may be a
:py:class:`dns.name.Name` or a ``str``. If absolute, the name
must be a subdomain of the zone's origin. If ``zone.relativize``
is ``True``, then the name will be relativized.
:type name: :py:class:`dns.name.Name` or str
:param bool create: If ``True``, the node will be created if it does
not exist.
:raises KeyError: if the name is not known and *create* was not
specified, or if the name was not a subdomain of the origin.
:rtype: :py:class:`dns.node.Node`
"""
name = self._validate_name(name)
node = self.nodes.get(name)
if node is None:
if not create:
raise KeyError
node = self.node_factory()
self.nodes[name] = node
return node
[docs]
def get_node(
self, name: dns.name.Name | str, create: bool = False
) -> dns.node.Node | None:
"""Get a node in the zone, possibly creating it.
This method is like :py:meth:`find_node`, except it returns ``None``
instead of raising an exception if the node does not exist and
creation has not been requested.
:param name: The name of the node to find. The value may be a
:py:class:`dns.name.Name` or a ``str``. If absolute, the name
must be a subdomain of the zone's origin. If ``zone.relativize``
is ``True``, then the name will be relativized.
:type name: :py:class:`dns.name.Name` or str
:param bool create: If ``True``, the node will be created if it does
not exist.
:rtype: :py:class:`dns.node.Node` or ``None``
"""
try:
node = self.find_node(name, create)
except KeyError:
node = None
return node
[docs]
def delete_node(self, name: dns.name.Name | str) -> None:
"""Delete the specified node if it exists.
It is not an error if the node does not exist.
:param name: The name of the node to delete. The value may be a
:py:class:`dns.name.Name` or a ``str``. If absolute, the name
must be a subdomain of the zone's origin. If ``zone.relativize``
is ``True``, then the name will be relativized.
:type name: :py:class:`dns.name.Name` or str
"""
name = self._validate_name(name)
if name in self.nodes:
del self.nodes[name]
[docs]
def find_rdataset(
self,
name: dns.name.Name | str,
rdtype: dns.rdatatype.RdataType | str,
covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
create: bool = False,
) -> dns.rdataset.Rdataset:
"""Look for an rdataset with the specified name and type in the zone,
and return an rdataset encapsulating it.
The rdataset returned is not a copy; changes to it will change
the zone.
:param name: The name of the node to find. The value may be a
:py:class:`dns.name.Name` or a ``str``. If absolute, the name
must be a subdomain of the zone's origin. If ``zone.relativize``
is ``True``, then the name will be relativized.
:type name: :py:class:`dns.name.Name` or str
:param rdtype: The rdata type desired.
:type rdtype: :py:class:`dns.rdatatype.RdataType` or str
:param covers: The covered type. Usually :py:attr:`dns.rdatatype.NONE`,
but if *rdtype* is :py:attr:`dns.rdatatype.SIG` or
:py:attr:`dns.rdatatype.RRSIG`, then this is the rdata type the
SIG/RRSIG covers. The library treats the SIG and RRSIG types as
a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
:type covers: :py:class:`dns.rdatatype.RdataType` or str
:param bool create: If ``True``, the node will be created if it does
not exist.
:raises KeyError: if the name or type are not found and *create* was
not specified, or if the name was not a subdomain of the origin.
:rtype: :py:class:`dns.rdataset.Rdataset`
"""
name = self._validate_name(name)
rdtype = dns.rdatatype.RdataType.make(rdtype)
covers = dns.rdatatype.RdataType.make(covers)
node = self.find_node(name, create)
return node.find_rdataset(self.rdclass, rdtype, covers, create)
[docs]
def get_rdataset(
self,
name: dns.name.Name | str,
rdtype: dns.rdatatype.RdataType | str,
covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
create: bool = False,
) -> dns.rdataset.Rdataset | None:
"""Look for an rdataset with the specified name and type in the zone.
This method is like :py:meth:`find_rdataset`, except it returns
``None`` instead of raising an exception if the rdataset does not
exist and creation has not been requested.
The rdataset returned is not a copy; changes to it will change
the zone.
:param name: The name of the node to find. The value may be a
:py:class:`dns.name.Name` or a ``str``. If absolute, the name
must be a subdomain of the zone's origin. If ``zone.relativize``
is ``True``, then the name will be relativized.
:type name: :py:class:`dns.name.Name` or str
:param rdtype: The rdata type desired.
:type rdtype: :py:class:`dns.rdatatype.RdataType` or str
:param covers: The covered type. Usually :py:attr:`dns.rdatatype.NONE`,
but if *rdtype* is :py:attr:`dns.rdatatype.SIG` or
:py:attr:`dns.rdatatype.RRSIG`, then this is the rdata type the
SIG/RRSIG covers. The library treats the SIG and RRSIG types as
a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
:type covers: :py:class:`dns.rdatatype.RdataType` or str
:param bool create: If ``True``, the node will be created if it does
not exist.
:rtype: :py:class:`dns.rdataset.Rdataset` or ``None``
"""
try:
rdataset = self.find_rdataset(name, rdtype, covers, create)
except KeyError:
rdataset = None
return rdataset
[docs]
def delete_rdataset(
self,
name: dns.name.Name | str,
rdtype: dns.rdatatype.RdataType | str,
covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
) -> None:
"""Delete the rdataset matching *rdtype* and *covers*, if it
exists at the node specified by *name*.
It is not an error if the node does not exist, or if there is no
matching rdataset at the node. If the node has no rdatasets after
the deletion, it will itself be deleted.
:param name: The name of the node. The value may be a
:py:class:`dns.name.Name` or a ``str``. If absolute, the name
must be a subdomain of the zone's origin. If ``zone.relativize``
is ``True``, then the name will be relativized.
:type name: :py:class:`dns.name.Name` or str
:param rdtype: The rdata type desired.
:type rdtype: :py:class:`dns.rdatatype.RdataType` or str
:param covers: The covered type. Usually :py:attr:`dns.rdatatype.NONE`,
but if *rdtype* is :py:attr:`dns.rdatatype.SIG` or
:py:attr:`dns.rdatatype.RRSIG`, then this is the rdata type the
SIG/RRSIG covers. The library treats the SIG and RRSIG types as
a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
:type covers: :py:class:`dns.rdatatype.RdataType` or str
"""
name = self._validate_name(name)
rdtype = dns.rdatatype.RdataType.make(rdtype)
covers = dns.rdatatype.RdataType.make(covers)
node = self.get_node(name)
if node is not None:
node.delete_rdataset(self.rdclass, rdtype, covers)
if len(node) == 0:
self.delete_node(name)
[docs]
def replace_rdataset(
self, name: dns.name.Name | str, replacement: dns.rdataset.Rdataset
) -> None:
"""Replace an rdataset at name.
It is not an error if there is no rdataset matching *replacement*.
Ownership of the *replacement* object is transferred to the zone;
in other words, this method does not store a copy of *replacement*
at the node, it stores *replacement* itself. If the node does not
exist, it is created.
:param name: The name of the node. The value may be a
:py:class:`dns.name.Name` or a ``str``. If absolute, the name
must be a subdomain of the zone's origin. If ``zone.relativize``
is ``True``, then the name will be relativized.
:type name: :py:class:`dns.name.Name` or str
:param replacement: The replacement rdataset.
:type replacement: :py:class:`dns.rdataset.Rdataset`
"""
if replacement.rdclass != self.rdclass:
raise ValueError("replacement.rdclass != zone.rdclass")
node = self.find_node(name, True)
node.replace_rdataset(replacement)
[docs]
def find_rrset(
self,
name: dns.name.Name | str,
rdtype: dns.rdatatype.RdataType | str,
covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
) -> dns.rrset.RRset:
"""Look for an rdataset with the specified name and type in the zone,
and return an RRset encapsulating it.
This method is less efficient than the similar
:py:meth:`find_rdataset` because it creates an RRset instead of
returning the matching rdataset. It may be more convenient for some
uses since it returns an object which binds the owner name to the
rdataset.
This method may not be used to create new nodes or rdatasets;
use :py:meth:`find_rdataset` instead.
:param name: The name of the node to find. The value may be a
:py:class:`dns.name.Name` or a ``str``. If absolute, the name
must be a subdomain of the zone's origin. If ``zone.relativize``
is ``True``, then the name will be relativized.
:type name: :py:class:`dns.name.Name` or str
:param rdtype: The rdata type desired.
:type rdtype: :py:class:`dns.rdatatype.RdataType` or str
:param covers: The covered type. Usually :py:attr:`dns.rdatatype.NONE`,
but if *rdtype* is :py:attr:`dns.rdatatype.SIG` or
:py:attr:`dns.rdatatype.RRSIG`, then this is the rdata type the
SIG/RRSIG covers. The library treats the SIG and RRSIG types as
a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
:type covers: :py:class:`dns.rdatatype.RdataType` or str
:raises KeyError: if the name or type are not found, or if the name
was not a subdomain of the origin.
:rtype: :py:class:`dns.rrset.RRset`
"""
vname = self._validate_name(name)
rdtype = dns.rdatatype.RdataType.make(rdtype)
covers = dns.rdatatype.RdataType.make(covers)
rdataset = self.nodes[vname].find_rdataset(self.rdclass, rdtype, covers)
rrset = dns.rrset.RRset(vname, self.rdclass, rdtype, covers)
rrset.update(rdataset)
return rrset
[docs]
def get_rrset(
self,
name: dns.name.Name | str,
rdtype: dns.rdatatype.RdataType | str,
covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
) -> dns.rrset.RRset | None:
"""Look for an rdataset with the specified name and type in the zone,
and return an RRset encapsulating it.
This method is less efficient than the similar :py:meth:`get_rdataset`
because it creates an RRset instead of returning the matching
rdataset. It may be more convenient for some uses since it returns
an object which binds the owner name to the rdataset.
This method may not be used to create new nodes or rdatasets;
use :py:meth:`get_rdataset` instead.
:param name: The name of the node to find. The value may be a
:py:class:`dns.name.Name` or a ``str``. If absolute, the name
must be a subdomain of the zone's origin. If ``zone.relativize``
is ``True``, then the name will be relativized.
:type name: :py:class:`dns.name.Name` or str
:param rdtype: The rdata type desired.
:type rdtype: :py:class:`dns.rdatatype.RdataType` or str
:param covers: The covered type. Usually :py:attr:`dns.rdatatype.NONE`,
but if *rdtype* is :py:attr:`dns.rdatatype.SIG` or
:py:attr:`dns.rdatatype.RRSIG`, then this is the rdata type the
SIG/RRSIG covers. The library treats the SIG and RRSIG types as
a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
:type covers: :py:class:`dns.rdatatype.RdataType` or str
:rtype: :py:class:`dns.rrset.RRset` or ``None``
"""
try:
rrset = self.find_rrset(name, rdtype, covers)
except KeyError:
rrset = None
return rrset
[docs]
def iterate_rdatasets(
self,
rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.ANY,
covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
) -> Iterator[tuple[dns.name.Name, dns.rdataset.Rdataset]]:
"""Return a generator which yields ``(name, rdataset)`` tuples for
all rdatasets in the zone which have the specified *rdtype* and
*covers*. If *rdtype* is :py:attr:`dns.rdatatype.ANY` (the default),
then all rdatasets will be matched.
:param rdtype: The rdata type desired.
:type rdtype: :py:class:`dns.rdatatype.RdataType` or str
:param covers: The covered type. Usually :py:attr:`dns.rdatatype.NONE`,
but if *rdtype* is :py:attr:`dns.rdatatype.SIG` or
:py:attr:`dns.rdatatype.RRSIG`, then this is the rdata type the
SIG/RRSIG covers. The library treats the SIG and RRSIG types as
a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
:type covers: :py:class:`dns.rdatatype.RdataType` or str
"""
rdtype = dns.rdatatype.RdataType.make(rdtype)
covers = dns.rdatatype.RdataType.make(covers)
for name, node in self.items():
for rds in node:
if rdtype == dns.rdatatype.ANY or (
rds.rdtype == rdtype and rds.covers == covers
):
yield (name, rds)
[docs]
def iterate_rdatas(
self,
rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.ANY,
covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
) -> Iterator[tuple[dns.name.Name, int, dns.rdata.Rdata]]:
"""Return a generator which yields ``(name, ttl, rdata)`` tuples for
all rdatas in the zone which have the specified *rdtype* and *covers*.
If *rdtype* is :py:attr:`dns.rdatatype.ANY` (the default), then all
rdatas will be matched.
:param rdtype: The rdata type desired.
:type rdtype: :py:class:`dns.rdatatype.RdataType` or str
:param covers: The covered type. Usually :py:attr:`dns.rdatatype.NONE`,
but if *rdtype* is :py:attr:`dns.rdatatype.SIG` or
:py:attr:`dns.rdatatype.RRSIG`, then this is the rdata type the
SIG/RRSIG covers. The library treats the SIG and RRSIG types as
a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
:type covers: :py:class:`dns.rdatatype.RdataType` or str
"""
rdtype = dns.rdatatype.RdataType.make(rdtype)
covers = dns.rdatatype.RdataType.make(covers)
for name, node in self.items():
for rds in node:
if rdtype == dns.rdatatype.ANY or (
rds.rdtype == rdtype and rds.covers == covers
):
for rdata in rds:
yield (name, rds.ttl, rdata)
[docs]
def to_file(
self,
f: Any,
sorted: bool = True,
relativize: bool = True,
nl: str | bytes | None = None,
want_comments: bool = False,
want_origin: bool = False,
style: ZoneStyle | None = None,
) -> None:
"""Write a zone to a file.
:param f: A file object or a ``str`` filename. If a string, it is
treated as the name of a file to open.
:param bool sorted: If ``True`` (the default), the file will be
written with the names sorted in DNSSEC order from least to
greatest. Otherwise the names will be written in whatever order
they happen to have in the zone's dictionary.
:param bool relativize: If ``True`` (the default), domain names in
the output will be relativized to the zone's origin if possible.
:param nl: The end of line string. If ``None`` (the default), the
output will use the platform's native end-of-line marker (i.e. LF
on POSIX, CRLF on Windows).
:type nl: str, bytes, or ``None``
:param bool want_comments: If ``True``, emit end-of-line comments as
part of writing the file. If ``False`` (the default), do not
emit them.
:param bool want_origin: If ``True``, emit a ``$ORIGIN`` line at the
start of the file. If ``False`` (the default), do not emit one.
:param style: If specified, the style overrides the other parameters.
:type style: :py:class:`dns.zone.ZoneStyle` or ``None``
"""
if style is None:
kw = {}
kw["sorted"] = sorted
kw["relativize"] = relativize
if relativize:
assert self.origin is not None
kw["origin"] = self.origin
kw["nl"] = nl
kw["want_comments"] = want_comments
kw["want_origin"] = want_origin
style = ZoneStyle.from_keywords(kw)
return self.to_styled_file(style, f)
def _write_line(self, output, l, l_b, nl, nl_b):
try:
bout = cast(BinaryIO, output)
bout.write(l_b)
bout.write(nl_b)
except TypeError: # textual mode
tout = cast(TextIO, output)
tout.write(l)
tout.write(nl)
[docs]
def to_styled_file(
self,
style: ZoneStyle,
f: Any,
) -> None:
"""Write a zone to a styled file.
:param style: The style to apply.
:type style: :py:class:`dns.zone.ZoneStyle`
:param f: A file object or a ``str`` filename. If a string, it is
treated as the name of a file to open.
"""
# Apply style items we learned from $UNICODE when we loaded the zone (if any).
if style.want_unicode_directive:
idna_codec: dns.name.IDNACodec | None = style.idna_codec
if "2008" in self.unicode:
idna_codec = dns.name.IDNA_2008_Practical
elif "2003" in self.unicode:
idna_codec = dns.name.IDNA_2003_Practical
if "TXT" in self.unicode:
txt_is_utf8 = True
else:
txt_is_utf8 = style.txt_is_utf8
style = style.replace(idna_codec=idna_codec, txt_is_utf8=txt_is_utf8)
if isinstance(f, str):
cm: contextlib.AbstractContextManager = open(f, "wb")
else:
cm = contextlib.nullcontext(f)
with cm as output:
# must be in this way, f.encoding may contain None, or even
# attribute may not be there
file_enc = getattr(f, "encoding", None)
if file_enc is None:
file_enc = "utf-8"
if style.nl is None:
# binary mode, '\n' is not enough
nl_b = os.linesep.encode(file_enc)
nl = "\n"
elif isinstance(style.nl, str):
nl_b = style.nl.encode(file_enc)
nl = style.nl
else:
nl_b = style.nl
nl = style.nl.decode()
assert nl is not None
assert nl_b is not None
if style.want_unicode_directive and len(self.unicode) > 0:
l = "$UNICODE " + " ".join(sorted(self.unicode))
l_b = l.encode(file_enc)
self._write_line(output, l, l_b, nl, nl_b)
if style.want_origin:
assert self.origin is not None
# Ensure we don't relativize the origin to the origin in $ORIGIN!
origin_style = style.replace(origin=None)
l = "$ORIGIN " + self.origin.to_styled_text(origin_style)
l_b = l.encode(file_enc)
self._write_line(output, l, l_b, nl, nl_b)
if style.default_ttl is not None:
l = f"$TTL {style.default_ttl}"
l_b = l.encode(file_enc)
self._write_line(output, l, l_b, nl, nl_b)
if style.sorted:
names = list(self.keys())
names.sort()
else:
names = self.keys()
for n in names:
l = self[n].to_styled_text(style, n)
l_b = l.encode(file_enc)
self._write_line(output, l, l_b, nl, nl_b)
[docs]
def to_text(
self,
sorted: bool = True,
relativize: bool = True,
nl: str | None = None,
want_comments: bool = False,
want_origin: bool = False,
style: ZoneStyle | None = None,
) -> str:
"""Return a zone's text as though it were written to a file.
:param bool sorted: If ``True`` (the default), the output will be
written with the names sorted in DNSSEC order from least to
greatest. Otherwise the names will be written in whatever order
they happen to have in the zone's dictionary.
:param bool relativize: If ``True`` (the default), domain names in
the output will be relativized to the zone's origin if possible.
:param nl: The end of line string. If ``None`` (the default), the
output will use the platform's native end-of-line marker (i.e. LF
on POSIX, CRLF on Windows).
:type nl: str or ``None``
:param bool want_comments: If ``True``, emit end-of-line comments as
part of the output. If ``False`` (the default), do not emit them.
:param bool want_origin: If ``True``, emit a ``$ORIGIN`` line at the
start of the output. If ``False`` (the default), do not emit one.
:param style: If specified, the style overrides the other parameters.
:type style: :py:class:`dns.zone.ZoneStyle` or ``None``
:rtype: str
"""
temp_buffer = io.StringIO()
self.to_file(temp_buffer, sorted, relativize, nl, want_comments, want_origin)
return_value = temp_buffer.getvalue()
temp_buffer.close()
return return_value
[docs]
def to_styled_text(self, style: ZoneStyle):
"""Return a zone's styled text as though it were written to a file.
See the documentation for :py:class:`dns.zone.ZoneStyle` for a description
of the style parameters.
:param style: The style to apply.
:type style: :py:class:`dns.zone.ZoneStyle`
:rtype: str
"""
temp_buffer = io.StringIO()
self.to_styled_file(style, temp_buffer)
return_value = temp_buffer.getvalue()
temp_buffer.close()
return return_value
[docs]
def check_origin(self) -> None:
"""Do some simple checking of the zone's origin.
:raises dns.zone.NoSOA: if there is no SOA RRset.
:raises dns.zone.NoNS: if there is no NS RRset.
:raises KeyError: if there is no origin node.
"""
if self.relativize:
name = dns.name.empty
else:
assert self.origin is not None
name = self.origin
if self.get_rdataset(name, dns.rdatatype.SOA) is None:
raise NoSOA
if self.get_rdataset(name, dns.rdatatype.NS) is None:
raise NoNS
[docs]
def get_soa(
self, txn: dns.transaction.Transaction | None = None
) -> dns.rdtypes.ANY.SOA.SOA:
"""Get the zone SOA rdata.
:raises dns.zone.NoSOA: if there is no SOA RRset.
:rtype: :py:class:`dns.rdtypes.ANY.SOA.SOA`
"""
if self.relativize:
origin_name = dns.name.empty
else:
if self.origin is None:
# get_soa() has been called very early, and there must not be
# an SOA if there is no origin.
raise NoSOA
origin_name = self.origin
soa_rds: dns.rdataset.Rdataset | None
if txn:
soa_rds = txn.get(origin_name, dns.rdatatype.SOA)
else:
soa_rds = self.get_rdataset(origin_name, dns.rdatatype.SOA)
if soa_rds is None:
raise NoSOA
else:
soa = cast(dns.rdtypes.ANY.SOA.SOA, soa_rds[0])
return soa
def _compute_digest(
self,
hash_algorithm: DigestHashAlgorithm,
scheme: DigestScheme = DigestScheme.SIMPLE,
) -> bytes:
hashinfo = _digest_hashers.get(hash_algorithm)
if not hashinfo:
raise UnsupportedDigestHashAlgorithm
if scheme != DigestScheme.SIMPLE:
raise UnsupportedDigestScheme
if self.relativize:
origin_name = dns.name.empty
else:
assert self.origin is not None
origin_name = self.origin
hasher = hashinfo()
for name, node in sorted(self.items()):
rrnamebuf = name.to_digestable(self.origin)
for rdataset in sorted(node, key=lambda rds: (rds.rdtype, rds.covers)):
if name == origin_name and dns.rdatatype.ZONEMD in (
rdataset.rdtype,
rdataset.covers,
):
continue
rrfixed = struct.pack(
"!HHI", rdataset.rdtype, rdataset.rdclass, rdataset.ttl
)
rdatas = [rdata.to_digestable(self.origin) for rdata in rdataset]
for rdata in sorted(rdatas):
rrlen = struct.pack("!H", len(rdata))
hasher.update(rrnamebuf + rrfixed + rrlen + rdata)
return hasher.digest()
def compute_digest(
self,
hash_algorithm: DigestHashAlgorithm,
scheme: DigestScheme = DigestScheme.SIMPLE,
) -> dns.rdtypes.ANY.ZONEMD.ZONEMD:
serial = self.get_soa().serial
digest = self._compute_digest(hash_algorithm, scheme)
return dns.rdtypes.ANY.ZONEMD.ZONEMD(
self.rdclass, dns.rdatatype.ZONEMD, serial, scheme, hash_algorithm, digest
)
def verify_digest(
self, zonemd: dns.rdtypes.ANY.ZONEMD.ZONEMD | None = None
) -> None:
digests: dns.rdataset.Rdataset | list[dns.rdtypes.ANY.ZONEMD.ZONEMD]
if zonemd:
digests = [zonemd]
else:
assert self.origin is not None
rds = self.get_rdataset(self.origin, dns.rdatatype.ZONEMD)
if rds is None:
raise NoDigest
digests = rds
for digest in digests:
try:
computed = self._compute_digest(digest.hash_algorithm, digest.scheme)
if computed == digest.digest:
return
except Exception:
pass
raise DigestVerificationFailure
# TransactionManager methods
[docs]
def reader(self) -> "Transaction":
return Transaction(self, False, Version(self, 1, self.nodes, self.origin))
[docs]
def writer(self, replacement: bool = False) -> "Transaction":
txn = Transaction(self, replacement)
txn._setup_version()
return txn
[docs]
def get_class(self):
return self.rdclass
# Transaction methods
def _end_read(self, txn):
pass
def _end_write(self, txn):
pass
def _commit_version(self, txn, version, origin):
self.nodes = version.nodes
if self.origin is None:
self.origin = origin
def _get_next_version_id(self) -> int:
# Versions are ephemeral and all have id 1
return 1
# These classes used to be in dns.versioned, but have moved here so we can use
# the copy-on-write transaction mechanism for both kinds of zones. In a
# regular zone, the version only exists during the transaction, and the nodes
# are regular dns.node.Nodes.
# A node with a version id.
[docs]
class VersionedNode(dns.node.Node): # lgtm[py/missing-equals]
__slots__ = ["id"]
def __init__(self):
super().__init__()
# A proper id will get set by the Version
self.id = 0
[docs]
@dns.immutable.immutable
class ImmutableVersionedNode(VersionedNode):
def __init__(self, node):
super().__init__()
self.id = node.id
self.rdatasets = tuple(
[dns.rdataset.ImmutableRdataset(rds) for rds in node.rdatasets]
)
def find_rdataset(
self,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
create: bool = False,
) -> dns.rdataset.Rdataset:
if create:
raise TypeError("immutable")
return super().find_rdataset(rdclass, rdtype, covers, False)
def get_rdataset(
self,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
create: bool = False,
) -> dns.rdataset.Rdataset | None:
if create:
raise TypeError("immutable")
return super().get_rdataset(rdclass, rdtype, covers, False)
def delete_rdataset(
self,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
) -> None:
raise TypeError("immutable")
def replace_rdataset(self, replacement: dns.rdataset.Rdataset) -> None:
raise TypeError("immutable")
def is_immutable(self) -> bool:
return True
[docs]
class Version:
def __init__(
self,
zone: Zone,
id: int,
nodes: MutableMapping[dns.name.Name, dns.node.Node] | None = None,
origin: dns.name.Name | None = None,
):
self.zone = zone
self.id = id
if nodes is not None:
self.nodes = nodes
else:
self.nodes = zone.map_factory()
self.origin = origin
def _validate_name(self, name: dns.name.Name) -> dns.name.Name:
return _validate_name(name, self.origin, self.zone.relativize)
def get_node(self, name: dns.name.Name) -> dns.node.Node | None:
name = self._validate_name(name)
return self.nodes.get(name)
def get_rdataset(
self,
name: dns.name.Name,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType,
) -> dns.rdataset.Rdataset | None:
node = self.get_node(name)
if node is None:
return None
return node.get_rdataset(self.zone.rdclass, rdtype, covers)
def keys(self):
return self.nodes.keys()
def items(self):
return self.nodes.items()
[docs]
class WritableVersion(Version):
def __init__(self, zone: Zone, replacement: bool = False):
# The zone._versions_lock must be held by our caller in a versioned
# zone.
id = zone._get_next_version_id()
super().__init__(zone, id)
if not replacement:
# We copy the map, because that gives us a simple and thread-safe
# way of doing versions, and we have a garbage collector to help
# us. We only make new node objects if we actually change the
# node.
self.nodes.update(zone.nodes)
# We have to copy the zone origin as it may be None in the first
# version, and we don't want to mutate the zone until we commit.
self.origin = zone.origin
self.changed: set[dns.name.Name] = set()
def _maybe_cow_with_name(
self, name: dns.name.Name
) -> tuple[dns.node.Node, dns.name.Name]:
name = self._validate_name(name)
node = self.nodes.get(name)
if node is None or name not in self.changed:
new_node = self.zone.node_factory()
if hasattr(new_node, "id"):
# We keep doing this for backwards compatibility, as earlier
# code used new_node.id != self.id for the "do we need to CoW?"
# test. Now we use the changed set as this works with both
# regular zones and versioned zones.
#
# We ignore the type error as this is safe but checkers have trouble.
new_node.id = self.id # type: ignore
if node is not None:
# moo! copy on write!
new_node.rdatasets.extend(node.rdatasets)
self.nodes[name] = new_node
self.changed.add(name)
return (new_node, name)
else:
return (node, name)
def _maybe_cow(self, name: dns.name.Name) -> dns.node.Node:
return self._maybe_cow_with_name(name)[0]
def delete_node(self, name: dns.name.Name) -> None:
name = self._validate_name(name)
if name in self.nodes:
del self.nodes[name]
self.changed.add(name)
def put_rdataset(
self, name: dns.name.Name, rdataset: dns.rdataset.Rdataset
) -> None:
node = self._maybe_cow(name)
node.replace_rdataset(rdataset)
def delete_rdataset(
self,
name: dns.name.Name,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType,
) -> None:
node = self._maybe_cow(name)
node.delete_rdataset(self.zone.rdclass, rdtype, covers)
if len(node) == 0:
del self.nodes[name]
[docs]
@dns.immutable.immutable
class ImmutableVersion(Version):
def __init__(self, version: Version):
if not isinstance(version, WritableVersion):
raise ValueError(
"a dns.zone.ImmutableVersion requires a dns.zone.WritableVersion"
)
# We tell super() that it's a replacement as we don't want it
# to copy the nodes, as we're about to do that with an
# immutable Dict.
super().__init__(version.zone, True)
# set the right id!
self.id = version.id
# keep the origin
self.origin = version.origin
# Make changed nodes immutable
for name in version.changed:
node = version.nodes.get(name)
# it might not exist if we deleted it in the version
if node:
version.nodes[name] = ImmutableVersionedNode(node)
# We're changing the type of the nodes dictionary here on purpose, so
# we ignore the mypy error.
self.nodes = dns.immutable.Dict(
version.nodes, True, self.zone.map_factory
) # pyright: ignore
[docs]
class Transaction(dns.transaction.Transaction):
def __init__(
self,
zone: Zone,
replacement: bool,
version: Version | None = None,
make_immutable: bool = False,
):
read_only = version is not None
super().__init__(zone, replacement, read_only)
self.version = version
self.make_immutable = make_immutable
@property
def zone(self) -> Zone:
return cast(Zone, self.manager)
def _setup_version(self):
assert self.version is None
factory = self.manager.writable_version_factory # type: ignore
if factory is None:
factory = WritableVersion
self.version = factory(self.zone, self.replacement) # pyright: ignore
def _get_rdataset(self, name, rdtype, covers):
assert self.version is not None
return self.version.get_rdataset(name, rdtype, covers)
def _put_rdataset(self, name, rdataset):
assert not self.read_only
assert self.version is not None
version = cast(WritableVersion, self.version)
version.put_rdataset(name, rdataset)
def _delete_name(self, name):
assert not self.read_only
assert self.version is not None
version = cast(WritableVersion, self.version)
version.delete_node(name)
def _delete_rdataset(self, name, rdtype, covers):
assert not self.read_only
assert self.version is not None
version = cast(WritableVersion, self.version)
version.delete_rdataset(name, rdtype, covers)
def _name_exists(self, name):
assert self.version is not None
return self.version.get_node(name) is not None
def _changed(self):
if self.read_only:
return False
else:
assert self.version is not None
version = cast(WritableVersion, self.version)
return len(version.changed) > 0
def _end_transaction(self, commit):
assert self.zone is not None
assert self.version is not None
if self.read_only:
self.zone._end_read(self) # pyright: ignore
elif commit and len(cast(WritableVersion, self.version).changed) > 0:
if self.make_immutable:
factory = self.manager.immutable_version_factory # type: ignore
if factory is None:
factory = ImmutableVersion
version = factory(self.version)
else:
version = self.version
self.zone._commit_version( # pyright: ignore
self, version, self.version.origin
)
else:
# rollback
self.zone._end_write(self) # pyright: ignore
def _set_origin(self, origin):
assert self.version is not None
if self.version.origin is None:
self.version.origin = origin
def _iterate_rdatasets(self):
assert self.version is not None
for name, node in self.version.items():
for rdataset in node:
yield (name, rdataset)
def _iterate_names(self):
assert self.version is not None
return self.version.keys()
def _get_node(self, name):
assert self.version is not None
return self.version.get_node(name)
def _origin_information(
self,
) -> tuple[dns.name.Name | None, bool, dns.name.Name | None]:
assert self.version is not None
absolute, relativize, effective = self.manager.origin_information()
if absolute is None and self.version.origin is not None:
# No origin has been committed yet, but we've learned one as part of
# this txn. Use it.
absolute = self.version.origin
if relativize:
effective = dns.name.empty
else:
effective = absolute
return (absolute, relativize, effective)
def _from_text(
text: Any,
origin: dns.name.Name | str | None = None,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
relativize: bool = True,
zone_factory: Any = Zone,
filename: str | None = None,
allow_include: bool = False,
check_origin: bool = True,
idna_codec: dns.name.IDNACodec | None = None,
allow_directives: bool | Iterable[str] = True,
) -> Zone:
# See the comments for the public APIs from_text() and from_file() for
# details.
# 'text' can also be a file, but we don't publish that fact
# since it's an implementation detail. The official file
# interface is from_file().
if filename is None:
filename = "<string>"
zone = zone_factory(origin, rdclass, relativize=relativize)
with zone.writer(True) as txn:
tok = dns.tokenizer.Tokenizer(text, filename, idna_codec=idna_codec)
reader = dns.zonefile.Reader(
tok,
rdclass,
txn,
allow_include=allow_include,
allow_directives=allow_directives,
)
try:
reader.read()
zone.unicode = txn.unicode
except dns.zonefile.UnknownOrigin:
# for backwards compatibility
raise UnknownOrigin
# Now that we're done reading, do some basic checking of the zone.
if check_origin:
zone.check_origin()
return zone
[docs]
def from_text(
text: str,
origin: dns.name.Name | str | None = None,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
relativize: bool = True,
zone_factory: Any = Zone,
filename: str | None = None,
allow_include: bool = False,
check_origin: bool = True,
idna_codec: dns.name.IDNACodec | None = None,
allow_directives: bool | Iterable[str] = True,
) -> Zone:
"""Build a zone object from a zone file format string.
:param str text: The zone file format input.
:param origin: The origin of the zone. If not specified, the first
``$ORIGIN`` statement in the zone file will determine the origin.
:type origin: :py:class:`dns.name.Name`, str, or ``None``
:param rdclass: The zone's rdata class; the default is class IN.
:type rdclass: :py:class:`dns.rdataclass.RdataClass`
:param bool relativize: Whether domain names are relativized to the
zone's origin. The default is ``True``.
:param zone_factory: The zone factory to use. If ``None``, then
:py:class:`dns.zone.Zone` will be used. The value may be any class
or callable that returns a subclass of :py:class:`dns.zone.Zone`.
:param filename: The filename to emit when describing where an error
occurred; the default is ``'<string>'``.
:type filename: str or ``None``
:param bool allow_include: If ``True`` (the default), then ``$INCLUDE``
directives are permitted. If ``False``, then encountering a
``$INCLUDE`` will raise a :py:exc:`SyntaxError`.
:param bool check_origin: If ``True`` (the default), then sanity checks
of the origin node will be made by calling
:py:meth:`dns.zone.Zone.check_origin`.
:param idna_codec: The IDNA encoder/decoder. If ``None``, the default
IDNA encoder/decoder is used.
:type idna_codec: :py:class:`dns.name.IDNACodec` or ``None``
:param allow_directives: If ``True`` (the default), directives are
permitted and *allow_include* controls ``$INCLUDE``. If ``False``
or an empty iterable, no directive processing is done. If a
non-empty iterable, only the listed directives (including the ``$``)
are allowed.
:type allow_directives: bool or Iterable[str]
:raises dns.zone.NoSOA: if there is no SOA RRset.
:raises dns.zone.NoNS: if there is no NS RRset.
:raises KeyError: if there is no origin node.
:returns: A subclass of :py:class:`dns.zone.Zone`.
"""
return _from_text(
text,
origin,
rdclass,
relativize,
zone_factory,
filename,
allow_include,
check_origin,
idna_codec,
allow_directives,
)
[docs]
def from_file(
f: Any,
origin: dns.name.Name | str | None = None,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
relativize: bool = True,
zone_factory: Any = Zone,
filename: str | None = None,
allow_include: bool = True,
check_origin: bool = True,
idna_codec: dns.name.IDNACodec | None = None,
allow_directives: bool | Iterable[str] = True,
) -> Zone:
"""Read a zone file and build a zone object.
:param f: A file object or a ``str`` filename. If a string, it is
treated as the name of a file to open.
:param origin: The origin of the zone. If not specified, the first
``$ORIGIN`` statement in the zone file will determine the origin.
:type origin: :py:class:`dns.name.Name`, str, or ``None``
:param rdclass: The zone's rdata class; the default is class IN.
:type rdclass: :py:class:`dns.rdataclass.RdataClass`
:param bool relativize: Whether domain names are relativized to the
zone's origin. The default is ``True``.
:param zone_factory: The zone factory to use. If ``None``, then
:py:class:`dns.zone.Zone` will be used. The value may be any class
or callable that returns a subclass of :py:class:`dns.zone.Zone`.
:param filename: The filename to emit when describing where an error
occurred; the default is ``'<string>'``.
:type filename: str or ``None``
:param bool allow_include: If ``True`` (the default), then ``$INCLUDE``
directives are permitted. If ``False``, then encountering a
``$INCLUDE`` will raise a :py:exc:`SyntaxError`.
:param bool check_origin: If ``True`` (the default), then sanity checks
of the origin node will be made by calling
:py:meth:`dns.zone.Zone.check_origin`.
:param idna_codec: The IDNA encoder/decoder. If ``None``, the default
IDNA encoder/decoder is used.
:type idna_codec: :py:class:`dns.name.IDNACodec` or ``None``
:param allow_directives: If ``True`` (the default), directives are
permitted and *allow_include* controls ``$INCLUDE``. If ``False``
or an empty iterable, no directive processing is done. If a
non-empty iterable, only the listed directives (including the ``$``)
are allowed.
:type allow_directives: bool or Iterable[str]
:raises dns.zone.NoSOA: if there is no SOA RRset.
:raises dns.zone.NoNS: if there is no NS RRset.
:raises KeyError: if there is no origin node.
:returns: A subclass of :py:class:`dns.zone.Zone`.
"""
if isinstance(f, str):
if filename is None:
filename = f
cm: contextlib.AbstractContextManager = open(f, encoding="utf-8")
else:
cm = contextlib.nullcontext(f)
with cm as f:
return _from_text(
f,
origin,
rdclass,
relativize,
zone_factory,
filename,
allow_include,
check_origin,
idna_codec,
allow_directives,
)
assert False # make mypy happy lgtm[py/unreachable-statement]
[docs]
def from_xfr(
xfr: Any,
zone_factory: Any = Zone,
relativize: bool = True,
check_origin: bool = True,
) -> Zone:
"""Convert the output of a zone transfer generator into a zone object.
:param xfr: A generator of :py:class:`dns.message.Message` objects,
typically from :py:func:`dns.query.xfr`.
:param zone_factory: The zone factory to use. If ``None``, then
:py:class:`dns.zone.Zone` will be used.
:param bool relativize: Whether domain names are relativized to the
zone's origin. The default is ``True``. It is essential that this
setting matches the one specified to the generator.
:param bool check_origin: If ``True`` (the default), then sanity checks
of the origin node will be made by calling
:py:meth:`dns.zone.Zone.check_origin`.
:raises dns.zone.NoSOA: if there is no SOA RRset.
:raises dns.zone.NoNS: if there is no NS RRset.
:raises KeyError: if there is no origin node.
:raises ValueError: if no messages are yielded by the generator.
:returns: A subclass of :py:class:`dns.zone.Zone`.
"""
z = None
for r in xfr:
if z is None:
if relativize:
origin = r.origin
else:
origin = r.answer[0].name
rdclass = r.answer[0].rdclass
z = zone_factory(origin, rdclass, relativize=relativize)
for rrset in r.answer:
znode = z.nodes.get(rrset.name)
if not znode:
znode = z.node_factory()
z.nodes[rrset.name] = znode
zrds = znode.find_rdataset(rrset.rdclass, rrset.rdtype, rrset.covers, True)
zrds.update_ttl(rrset.ttl)
for rd in rrset:
zrds.add(rd)
if z is None:
raise ValueError("empty transfer")
if check_origin:
z.check_origin()
return z