From 49a3d8948872dca41aafbe80d28fc8f8bc9271d4 Mon Sep 17 00:00:00 2001 From: Jan Dittberner Date: Sat, 4 Sep 2021 17:15:27 +0200 Subject: [PATCH] Reimplement using Sphinx 4 APIs - use ObjectDescription for IPRange - reimplement data handling in IPDomain - store target docname and anchors in data to avoid later lookups - remove broken entries from index - adapt tests --- jandd/sphinxext/ip.py | 591 +++++++++++++++++++++------------------ tests/root/conf.py | 8 +- tests/root/testpage3.rst | 3 +- tests/run.py | 2 +- tests/test_ip.py | 16 +- 5 files changed, 331 insertions(+), 289 deletions(-) diff --git a/jandd/sphinxext/ip.py b/jandd/sphinxext/ip.py index b83cb22..2352c92 100644 --- a/jandd/sphinxext/ip.py +++ b/jandd/sphinxext/ip.py @@ -5,171 +5,151 @@ The IP domain. - :copyright: Copyright (c) 2016 Jan Dittberner + :copyright: Copyright (c) 2016-2021 Jan Dittberner :license: GPLv3+, see COPYING file for details. """ +__version__ = "0.5.0-dev" -import re +from collections import defaultdict +from typing import Iterable, List, Optional, Tuple from docutils import nodes -from docutils.parsers.rst import Directive -from ipcalc import Network +from ipcalc import IP, Network from sphinx import addnodes +from sphinx.addnodes import desc_signature, pending_xref +from sphinx.builders import Builder +from sphinx.directives import ObjectDescription, T from sphinx.domains import Domain, ObjType +from sphinx.environment import BuildEnvironment from sphinx.errors import NoUri from sphinx.locale import _ from sphinx.roles import XRefRole from sphinx.util import logging from sphinx.util.nodes import make_refnode -__version__ = "0.4.0" - - logger = logging.getLogger(__name__) -def ip_object_anchor(typ, path): - path = re.sub(r"[.:/]", "-", path) - return typ.lower() + "-" + path - - -class ip_node(nodes.Inline, nodes.TextElement): - pass - - class ip_range(nodes.General, nodes.Element): pass -class IPXRefRole(XRefRole): - """ - Cross referencing role for the IP domain. - """ +class IPRangeDirective(ObjectDescription): + """A custom directive that describes an IP address range.""" - def __init__(self, method, index_type, **kwargs): - self.method = method - self.index_type = index_type - innernodeclass = None - if method in ("v4", "v6"): - innernodeclass = ip_node - super(IPXRefRole, self).__init__(innernodeclass=innernodeclass, **kwargs) - - def __cal__(self, typ, rawtext, text, lineno, inliner, options=None, content=None): - if content is None: - content = [] - if options is None: - options = {} - try: - Network(text) - except ValueError as e: - env = inliner.document.settings.env - logger.warning( - "invalid ip address/range %s" % text, location=(env.docname, lineno) - ) - return [nodes.literal(text, text), []] - return super(IPXRefRole, self).__call__( - typ, rawtext, text, lineno, inliner, options, content - ) - - def process_link(self, env, refnode, has_explicit_title, title, target): - domaindata = env.domaindata["ip"] - domaindata[self.method][target] = (target, refnode) - return title, target - - def result_nodes(self, document, env, node, is_ref): - try: - node["typ"] = self.method - indexnode = addnodes.index() - targetid = "index-%s" % env.new_serialno("index") - targetnode = nodes.target("", "", ids=[targetid]) - doctitle = list(document.traverse(nodes.title))[0].astext() - idxtext = "%s; %s" % (node.astext(), doctitle) - idxtext2 = "%s; %s" % (self.index_type, node.astext()) - indexnode["entries"] = [ - ("single", idxtext, targetid, "", None), - ("single", idxtext2, targetid, "", None), - ] - return [indexnode, targetnode, node], [] - except KeyError as e: - return [node], [e.args[0]] - - -class IPRange(Directive): has_content = True required_arguments = 1 - optional_arguments = 0 - final_argument_whitespace = False - option_spec = {} + title_prefix = None + range_spec = None - def handle_rangespec(self, node): - titlenode = nodes.title() - node.append(titlenode) - titlenode.append(nodes.inline("", self.get_prefix_title())) - titlenode.append(nodes.literal("", self.rangespec)) - ids = ip_object_anchor(self.typ, self.rangespec) - node["ids"].append(ids) - self.env.domaindata[self.domain][self.typ][ids] = ( - self.env.docname, - self.options.get("synopsis", ""), - ) - return ids + def get_title_prefix(self) -> str: + if self.title_prefix is None: + raise NotImplemented("subclasses must set title_prefix") + return self.title_prefix - def run(self): - if ":" in self.name: - self.domain, self.objtype = self.name.split(":", 1) - else: - self.domain, self.objtype = "", self.name - self.env = self.state.document.settings.env - self.rangespec = self.arguments[0] - node = nodes.section() - name = self.handle_rangespec(node) - if self.env.docname in self.env.titles: - doctitle = self.env.titles[self.env.docname] - else: - doctitle = list(self.state.document.traverse(nodes.title))[0].astext() - idx_text = "%s; %s" % (self.rangespec, doctitle) - self.indexnode = addnodes.index( - entries=[ - ("single", idx_text, name, "", None), - ("single", self.get_index_text(), name, "", None), - ] - ) + def handle_signature(self, sig: str, signode: desc_signature) -> T: + signode += addnodes.desc_name(text="{} {}".format(self.get_title_prefix(), sig)) + self.range_spec = sig + return sig - if self.content: - contentnode = nodes.paragraph("") - node.append(contentnode) - self.state.nested_parse(self.content, self.content_offset, contentnode) - - iprange = ip_range() - node.append(iprange) - iprange["rangespec"] = self.rangespec - return [self.indexnode, node] + def transform_content(self, contentnode: addnodes.desc_content) -> None: + ip_range_node = ip_range() + ip_range_node["range_spec"] = self.range_spec + contentnode += ip_range_node -class IPv4Range(IPRange): - typ = "v4range" +class IPV4RangeDirective(IPRangeDirective): + title_prefix = _("IPv4 range") - def get_prefix_title(self): - return _("IPv4 address range ") - - def get_index_text(self): - return "%s; %s" % (_("IPv4 range"), self.rangespec) + def add_target_and_index(self, name: T, sig: str, signode: desc_signature) -> None: + signode["ids"].append("ip4_range" + "-" + sig) + ips = self.env.get_domain("ip") + ips.add_ip4_range(sig) + idx_text = "{}; {}".format(self.title_prefix, name) + self.indexnode["entries"] = [ + ("single", idx_text, name, "", None), + ] -class IPv6Range(IPRange): - typ = "v6range" +class IPV6RangeDirective(IPRangeDirective): + title_prefix = _("IPv6 range") - def get_prefix_title(self): - return _("IPv6 address range ") + def add_target_and_index(self, name: T, sig: str, signode: desc_signature) -> None: + signode["ids"].append("ip6_range" + "-" + sig) + ips = self.env.get_domain("ip") + ips.add_ip6_range(sig) + idx_text = "{}; {}".format(self.title_prefix, name) + self.indexnode["entries"] = [ + ("single", idx_text, name, "", None), + ] - def get_index_text(self): - return "%s; %s" % (_("IPv6 range"), self.rangespec) + +class IPXRefRole(XRefRole): + def __init__(self, index_type): + self.index_type = index_type + super().__init__() + + def process_link( + self, + env: BuildEnvironment, + refnode: nodes.Element, + has_explicit_title: bool, + title: str, + target: str, + ) -> Tuple[str, str]: + refnode.attributes.update(env.ref_context) + + ips = env.get_domain("ip") + if refnode["reftype"] == "v4": + ips.add_ip4_address_reference(target) + elif refnode["reftype"] == "v6": + ips.add_ip6_address_reference(target) + elif refnode["reftype"] == "v4range": + ips.add_ip4_range_reference(target) + elif refnode["reftype"] == "v6range": + ips.add_ip6_range_reference(target) + + return title, target + + def result_nodes( + self, + document: nodes.document, + env: BuildEnvironment, + node: nodes.Element, + is_ref: bool, + ) -> Tuple[List[nodes.Node], List[nodes.system_message]]: + node_list, message = super().result_nodes(document, env, node, is_ref) + + ip = env.get_domain("ip") + if self.reftype in ["v4", "v6"] and self.target not in ip.data["ips"]: + return node_list, message + if ( + self.reftype in ["v4range", "v6range"] + and self.target not in ip.data["ranges"] + ): + return node_list, message + + index_node = addnodes.index() + target_id = "index-{}".format(env.new_serialno("index")) + target_node = nodes.target("", "", ids=[target_id]) + doc_title = next(d for d in document.traverse(nodes.title)).astext() + + node_text = node.astext() + + idx_text = "{}; {}".format(node_text, doc_title) + idx_text_2 = "{}; {}".format(self.index_type, node.astext()) + index_node["entries"] = [ + ("single", idx_text, target_id, "", None), + ("single", idx_text_2, target_id, "", None), + ] + + node_list.insert(0, target_node) + node_list.insert(0, index_node) + return node_list, message class IPDomain(Domain): - """ - IP address and range domain. - """ + """Custom domain for IP addresses and ranges.""" name = "ip" label = "IP addresses and ranges." @@ -182,99 +162,226 @@ class IPDomain(Domain): } directives = { - "v4range": IPv4Range, - "v6range": IPv6Range, + "v4range": IPV4RangeDirective, + "v6range": IPV6RangeDirective, } roles = { - "v4": IPXRefRole("v4", _("IPv4 address")), - "v6": IPXRefRole("v6", _("IPv6 address")), - "v4range": IPXRefRole("v4range", _("IPv4 range")), - "v6range": IPXRefRole("v6range", _("IPv6 range")), + "v4": IPXRefRole("IPv4 address"), + "v6": IPXRefRole("IPv6 address"), + "v4range": IPXRefRole("IPv4 range"), + "v6range": IPXRefRole("IPv6 range"), } initial_data = { - "v4": {}, - "v6": {}, - "v4range": {}, - "v6range": {}, - "ips": [], + "range_nodes": [], + "ip_refs": [], + "range_refs": [], + "ranges": defaultdict(list), + "ips": defaultdict(list), + "ip_dict": {}, } - def clear_doc(self, docname): - to_remove = [] - for key, value in self.data["v4range"].items(): - if docname == value[0]: - to_remove.append(key) - for key in to_remove: - del self.data["v4range"][key] + def get_full_qualified_name(self, node: nodes.Element) -> Optional[str]: + return "{}.{}".format("ip", node) - to_remove = [] - for key, value in self.data["v6range"].items(): - if docname == value[0]: - to_remove.append(key) - for key in to_remove: - del self.data["v6range"][key] - self.data["ips"] = [ - item for item in self.data["ips"] if item["docname"] != docname - ] + def get_objects(self) -> Iterable[Tuple[str, str, str, str, str, int]]: + for obj in self.data["range_nodes"]: + yield obj - def resolve_xref(self, env, fromdocname, builder, typ, target, node, contnode): - key = ip_object_anchor(typ, target) + def add_ip4_range(self, sig: desc_signature): + logger.debug("add_ip4_range: %s", sig) + self._add_ip_range("v4", sig) + + def add_ip6_range(self, sig: desc_signature): + logger.debug("add_ip6_range: %s", sig) + self._add_ip_range("v6", sig) + + def _add_ip_range(self, family: str, sig: desc_signature): + name = "ip{}range.{}".format(family, sig) + anchor = "ip-ip{}range-{}".format(family, sig) try: - info = self.data[typ][key] - except KeyError: - text = contnode.rawsource - role = self.roles.get(typ) - if role is None: - return None - resnode = role.result_nodes(env.get_doctree(fromdocname), env, node, True)[ - 0 - ][2] - if isinstance(resnode, addnodes.pending_xref): - text = node[0][0] - reporter = env.get_doctree(fromdocname).reporter - reporter.warning( - "Cannot resolve reference to %r" % text, line=node.line + ip_range = Network(sig) + self.data["range_nodes"].append( + (name, family, sig, self.env.docname, anchor, 0) + ) + self.data["ranges"][sig].append((ip_range, self.env.docname, anchor)) + except ValueError as e: + logger.error("invalid ip range address '%s': %s", sig, e) + + def add_ip4_address_reference(self, ip_address: str): + logger.debug("add_ip4_address_reference") + self._add_ip_address_reference("v4", ip_address) + + def add_ip6_address_reference(self, ip_address: str): + logger.debug("add_ip4_address_reference") + self._add_ip_address_reference("v6", ip_address) + + def _add_ip_address_reference(self, family, sig): + name = "ip{}.{}".format(family, sig) + anchor = "ip-ip{}-{}".format(family, sig) + try: + ip = IP(sig) + self.data["ip_refs"].append( + ( + name, + sig, + "IP{}".format(family), + str(ip), + self.env.docname, + anchor, + 0, ) - return node.children - return resnode + ) + self.data["ips"][sig].append((ip, self.env.docname, anchor)) + self.data["ip_dict"][sig] = ip + except ValueError as e: + logger.error("invalid ip address '%s': %s", sig, e) + + def add_ip4_range_reference(self, ip_range: str): + logger.debug("add_ip4_range_reference") + self._add_ip_range_reference("v4", ip_range) + + def add_ip6_range_reference(self, ip_range: str): + logger.debug("add_ip6_address_reference") + self._add_ip_range_reference("v6", ip_range) + + def _add_ip_range_reference(self, family, sig): + name = "iprange{}.{}".format(family, sig) + anchor = "ip-iprange{}-{}".format(family, sig) + try: + ip_range = Network(sig) + self.data["range_refs"].append( + ( + name, + sig, + "IP{} range".format(family), + str(ip_range), + self.env.docname, + anchor, + 0, + ) + ) + except ValueError as e: + logger.error("invalid ip range '%s': %s", sig, e) + + def resolve_xref( + self, + env: BuildEnvironment, + fromdocname: str, + builder: Builder, + typ: str, + target: str, + node: pending_xref, + contnode: nodes.Element, + ) -> Optional[nodes.Element]: + match = [] + if typ in ("v4", "v6"): + # reference to IP range + if target not in self.data["ip_dict"]: + # invalid ip address + raise NoUri(target) + match = [ + (docname, anchor) + for ip_range, docname, anchor in [ + r + for range_nodes in self.data["ranges"].values() + for r in range_nodes + ] + if self.data["ip_dict"][target] in ip_range + ] + elif typ in ("v4range", "v6range"): + if target in self.data["ranges"]: + match = [ + (docname, anchor) + for ip_range, docname, anchor in [ + range_nodes for range_nodes in self.data["ranges"][target] + ] + ] + if len(match) > 0: + todocname = match[0][0] + targ = match[0][1] + + return make_refnode(builder, fromdocname, todocname, targ, contnode, targ) else: - title = typ.upper() + " " + target - return make_refnode(builder, fromdocname, info[0], key, contnode, title) - - @property - def items(self): - return dict((key, self.data[key]) for key in self.object_types) - - def get_objects(self): - for typ, items in self.items.items(): - for path, info in items.items(): - anchor = ip_object_anchor(typ, path) - yield (path, path, typ, info[0], anchor, 1) + logger.error("found no link target for %s", target) + return None -def process_ips(app, doctree): +def process_ip_nodes(app, doctree, fromdocname): env = app.builder.env - domaindata = env.domaindata[IPDomain.name] + ips = env.get_domain(IPDomain.name) - for node in doctree.traverse(ip_node): - ip = node.astext() - domaindata["ips"].append( - { - "docname": env.docname, - "source": node.parent.source or env.doc2path(env.docname), - "lineno": node.parent.line, - "ip": ip, - "typ": node.parent["typ"], - } - ) - replacement = nodes.literal(ip, ip) - node.replace_self(replacement) + header = (_("IP address"), _("Used by")) + column_widths = (2, 5) + for node in doctree.traverse(ip_range): + content = [] + net = Network(node["range_spec"]) + addresses = defaultdict(list) + for ip_address_sig, refs in ips.data["ips"].items(): + for ip_address, todocname, anchor in refs: + if ip_address in net: + addresses[ip_address_sig].append((ip_address, todocname, anchor)) + logger.debug( + "found %s in network %s on %s", + ip_address_sig, + net.to_compressed(), + todocname, + ) + if addresses: + table = nodes.table() + table.attributes["classes"].append("indextable") + table.attributes["align"] = "left" -def sort_ip(item): - return Network(item).ip + tgroup = nodes.tgroup(cols=len(header)) + table += tgroup + for column_width in column_widths: + tgroup += nodes.colspec(colwidth=column_width) + + thead = nodes.thead() + tgroup += thead + thead += create_table_row([nodes.paragraph(text=label) for label in header]) + + tbody = nodes.tbody() + tgroup += tbody + + for ip_address_sig, ip_info in [(key, addresses[key]) for key in sorted(addresses, key=sort_by_ip)]: + para = nodes.paragraph() + para += nodes.literal("", ip_info[0][0].to_compressed()) + + ref_node = nodes.paragraph() + ref_uris = set() + ref_nodes = [] + + for item in ip_info: + ip_address, todocname, anchor = item + + newnode = nodes.reference("", "", internal=True) + try: + newnode["refuri"] = app.builder.get_relative_uri( + fromdocname, todocname + ) + if newnode["refuri"] in ref_uris: + continue + ref_uris.add(newnode["refuri"]) + except NoUri: + pass + title = env.titles[todocname] + innernode = nodes.Text(title.astext()) + newnode.append(innernode) + ref_nodes.append(newnode) + for count in range(len(ref_nodes)): + ref_node.append(ref_nodes[count]) + if count < len(ref_nodes) - 1: + ref_node.append(nodes.Text(", ")) + tbody += create_table_row([para, ref_node]) + content.append(table) + else: + para = nodes.paragraph(_("No IP addresses in this range")) + content.append(para) + + node.replace_self(content) def create_table_row(rowdata): @@ -286,78 +393,16 @@ def create_table_row(rowdata): return row -def process_ip_nodes(app, doctree, fromdocname): - env = app.builder.env - domaindata = env.domaindata[IPDomain.name] - - header = (_("IP address"), _("Used by")) - colwidths = (1, 3) - - for node in doctree.traverse(ip_range): - content = [] - net = Network(node["rangespec"]) - ips = {} - for key, value in [(ip_info["ip"], ip_info) for ip_info in domaindata["ips"]]: - try: - if not key in net: - continue - except ValueError as e: - logger.info("invalid IP address info %s", e.args) - continue - addrlist = ips.get(key, []) - addrlist.append(value) - ips[key] = addrlist - if ips: - table = nodes.table() - tgroup = nodes.tgroup(cols=len(header)) - table += tgroup - for colwidth in colwidths: - tgroup += nodes.colspec(colwidth=colwidth) - thead = nodes.thead() - tgroup += thead - thead += create_table_row([nodes.paragraph(text=label) for label in header]) - tbody = nodes.tbody() - tgroup += tbody - for ip, ip_info in [(ip, ips[ip]) for ip in sorted(ips, key=sort_ip)]: - para = nodes.paragraph() - para += nodes.literal("", ip) - refnode = nodes.paragraph() - refuris = set() - refnodes = [] - for item in ip_info: - ids = ip_object_anchor(item["typ"], item["ip"]) - if ids not in para["ids"]: - para["ids"].append(ids) - - domaindata[item["typ"]][ids] = (fromdocname, "") - newnode = nodes.reference("", "", internal=True) - try: - newnode["refuri"] = app.builder.get_relative_uri( - fromdocname, item["docname"] - ) - if newnode["refuri"] in refuris: - continue - refuris.add(newnode["refuri"]) - except NoUri: - pass - title = env.titles[item["docname"]] - innernode = nodes.Text(title.astext()) - newnode.append(innernode) - refnodes.append(newnode) - for count in range(len(refnodes)): - refnode.append(refnodes[count]) - if count < len(refnodes) - 1: - refnode.append(nodes.Text(", ")) - tbody += create_table_row([para, refnode]) - content.append(table) - else: - para = nodes.paragraph(_("No IP addresses in this range")) - content.append(para) - node.replace_self(content) +def sort_by_ip(item): + return IP(item).ip def setup(app): app.add_domain(IPDomain) - app.connect("doctree-read", process_ips) app.connect("doctree-resolved", process_ip_nodes) - return {"version": __version__} + return { + "version": __version__, + "env_version": 4, + "parallel_read_safe": True, + "parallel_write_safe": True, + } diff --git a/tests/root/conf.py b/tests/root/conf.py index cf90b39..7f8b00e 100644 --- a/tests/root/conf.py +++ b/tests/root/conf.py @@ -40,14 +40,14 @@ templates_path = ["_templates"] source_suffix = ".rst" # The encoding of source files. -# source_encoding = 'utf-8-sig' +# source_encoding = 'utf-8-ip_range' # The master toctree document. master_doc = "index" # General information about the project. project = "Sphinxext IP Tests" -copyright = "2016, Jan Dittberner" +copyright = "2016-2021, Jan Dittberner" author = "Jan Dittberner" # The version info for the project you're documenting, acts as replacement for @@ -55,9 +55,9 @@ author = "Jan Dittberner" # built documents. # # The short X.Y version. -version = "0.1.0" +version = "0.5.0" # The full version, including alpha/beta/rc tags. -release = "0.1.0" +release = version + "-dev" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/tests/root/testpage3.rst b/tests/root/testpage3.rst index ae38493..6abd74a 100644 --- a/tests/root/testpage3.rst +++ b/tests/root/testpage3.rst @@ -1,4 +1,5 @@ Test page 3 =========== -This page contains :ip:v6:`2001:dead:beef::1` like :doc:`testpage2` does. +This page contains :ip:v6:`2001:dead:beef::1` from :ip:v6range:`2001:dead:beef::/64` +like :doc:`testpage2` does. diff --git a/tests/run.py b/tests/run.py index 4274a09..019c157 100755 --- a/tests/run.py +++ b/tests/run.py @@ -31,7 +31,7 @@ def run(extra_args=[]): "The sphinx package is needed to run the jandd.sphinxext.ip " "test suite." ) - from .test_ip import TestIPExtension + from tests.test_ip import TestIPExtension print("Running jandd.sphinxext.ip test suite ...") diff --git a/tests/test_ip.py b/tests/test_ip.py index 49a2dde..7c3cf7d 100644 --- a/tests/test_ip.py +++ b/tests/test_ip.py @@ -28,10 +28,10 @@ class TestIPExtension(unittest.TestCase): def test_ip_domaindata(self): self.assertIn("ip", self.app.env.domaindata) ipdomdata = self.app.env.domaindata["ip"] - self.assertIn("v4", ipdomdata) - self.assertIn("v6", ipdomdata) - self.assertIn("v4range", ipdomdata) - self.assertIn("v6range", ipdomdata) + self.assertIn("ip_refs", ipdomdata) + self.assertIn("range_refs", ipdomdata) + self.assertIn("range_nodes", ipdomdata) + self.assertIn("ranges", ipdomdata) self.assertIn("ips", ipdomdata) def find_in_index(self, entry): @@ -43,19 +43,15 @@ class TestIPExtension(unittest.TestCase): self.fail("%s not found in index" % entry) def test_ip4_addresses(self): - ipv4 = self.app.env.domaindata["ip"]["v4"] ips = self.app.env.domaindata["ip"]["ips"] for ip in IP4_ADDRESSES: - self.assertIn(ip, ipv4) - self.assertIn(ip, [item["ip"] for item in ips]) + self.assertIn(ip, ips) self.find_in_index("IPv4 address; %s" % ip) self.find_in_index("%s; Test page 2" % ip) def test_ip6_addresses(self): - ipv6 = self.app.env.domaindata["ip"]["v6"] ips = self.app.env.domaindata["ip"]["ips"] for ip in IP6_ADDRESSES: - self.assertIn(ip, ipv6) - self.assertIn(ip, [item["ip"] for item in ips]) + self.assertIn(ip, ips) self.find_in_index("IPv6 address; %s" % ip) self.find_in_index("%s; Test page 2" % ip)