sphinxext-ip/jandd/sphinxext/ip.py

350 lines
11 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
jandd.sphinxext.ip
~~~~~~~~~~~~~~~~~~
The IP domain.
:copyright: Copyright (c) 2016 Jan Dittberner
:license: GPLv3+, see COPYING file for details.
"""
import re
from ipcalc import Network
from docutils import nodes
from docutils.parsers.rst import Directive
from sphinx import addnodes
from sphinx.domains import Domain, ObjType
from sphinx.environment import NoUri
from sphinx.locale import l_
from sphinx.roles import XRefRole
from sphinx.util.nodes import make_refnode
__version__ = '0.2.5.dev1'
def ip_object_anchor(typ, path):
path = re.sub(r'[.:/]', '-', path)
return typ.lower() + '-' + path
class ip_node(nodes.Inline, nodes.TextElement):
pass
class ip_range(nodes.General, nodes.Element):
pass
class IPXRefRole(XRefRole):
"""
Cross referencing role for the IP domain.
"""
def __init__(self, method, index_type, **kwargs):
self.method = method
self.index_type = index_type
innernodeclass = None
if method in ('v4', 'v6'):
innernodeclass = ip_node
super(IPXRefRole, self).__init__(
innernodeclass=innernodeclass, **kwargs)
def __call__(self, typ, rawtext, text, lineno, inliner,
options={}, content=[]):
try:
Network(text)
except ValueError as e:
env = inliner.document.settings.env
env.warn(env.docname, "invalid ip address/range %s" % text, lineno)
return [nodes.literal(text, text), []]
return super(IPXRefRole, self).__call__(
typ, rawtext, text, lineno, inliner, options, content)
def process_link(self, env, refnode, has_explicit_title, title, target):
domaindata = env.domaindata['ip']
domaindata[self.method][target] = (target, refnode)
return title, target
def result_nodes(self, document, env, node, is_ref):
2016-04-24 15:04:07 +02:00
try:
node['typ'] = self.method
2016-04-24 15:04:07 +02:00
indexnode = addnodes.index()
targetid = 'index-%s' % env.new_serialno('index')
targetnode = nodes.target('', '', ids=[targetid])
doctitle = document.traverse(nodes.title)[0].astext()
idxtext = "%s; %s" % (node.astext(), doctitle)
idxtext2 = "%s; %s" % (self.index_type, node.astext())
indexnode['entries'] = [
('single', idxtext, targetid, '', None),
('single', idxtext2, targetid, '', None),
]
2016-04-24 15:04:07 +02:00
return [indexnode, targetnode, node], []
except KeyError as e:
return [node], [e.args[0]]
class IPRange(Directive):
has_content = True
required_arguments = 1
optional_arguments = 0
final_argument_whitespace = False
option_spec = {}
def handle_rangespec(self, node):
titlenode = nodes.title()
node.append(titlenode)
titlenode.append(nodes.inline('', self.get_prefix_title()))
titlenode.append(nodes.literal('', self.rangespec))
ids = ip_object_anchor(self.typ, self.rangespec)
node['ids'].append(ids)
self.env.domaindata[self.domain][self.typ][ids] = (
self.env.docname,
self.options.get('synopsis', ''))
return ids
def run(self):
if ':' in self.name:
self.domain, self.objtype = self.name.split(':', 1)
else:
self.domain, self.objtype = '', self.name
self.env = self.state.document.settings.env
self.rangespec = self.arguments[0]
node = nodes.section()
name = self.handle_rangespec(node)
if self.env.docname in self.env.titles:
doctitle = self.env.titles[self.env.docname]
else:
doctitle = self.state.document.traverse(nodes.title)[0].astext()
idx_text = "%s; %s" % (self.rangespec, doctitle)
self.indexnode = addnodes.index(entries=[
('single', idx_text, name, '', None),
('single', self.get_index_text(), name, '', None)
])
if self.content:
contentnode = nodes.paragraph('')
node.append(contentnode)
self.state.nested_parse(
self.content, self.content_offset, contentnode)
iprange = ip_range()
node.append(iprange)
iprange['rangespec'] = self.rangespec
return [self.indexnode, node]
class IPv4Range(IPRange):
typ = 'v4range'
def get_prefix_title(self):
return l_('IPv4 address range ')
def get_index_text(self):
return "%s; %s" % (l_('IPv4 range'), self.rangespec)
class IPv6Range(IPRange):
typ = 'v6range'
def get_prefix_title(self):
return l_('IPv6 address range ')
def get_index_text(self):
return "%s; %s" % (l_('IPv6 range'), self.rangespec)
class IPDomain(Domain):
"""
IP address and range domain.
"""
name = 'ip'
label = 'IP addresses and ranges.'
object_types = {
'v4': ObjType(l_('v4'), 'v4', 'obj'),
'v6': ObjType(l_('v6'), 'v6', 'obj'),
'v4range': ObjType(l_('v4range'), 'v4range', 'obj'),
'v6range': ObjType(l_('v6range'), 'v6range', 'obj'),
}
directives = {
'v4range': IPv4Range,
'v6range': IPv6Range,
}
roles = {
'v4': IPXRefRole('v4', l_('IPv4 address')),
'v6': IPXRefRole('v6', l_('IPv6 address')),
'v4range': IPXRefRole('v4range', l_('IPv4 range')),
'v6range': IPXRefRole('v6range', l_('IPv6 range')),
}
initial_data = {
'v4': {},
'v6': {},
'v4range': {},
'v6range': {},
'ips': [],
}
def clear_doc(self, docname):
to_remove = []
for key, value in self.data['v4range'].items():
if docname == value[0]:
to_remove.append(key)
for key in to_remove:
del self.data['v4range'][key]
to_remove = []
for key, value in self.data['v6range'].items():
if docname == value[0]:
to_remove.append(key)
for key in to_remove:
del self.data['v6range'][key]
self.data['ips'] = [
item for item in self.data['ips'] if item['docname'] != docname
]
def resolve_xref(self, env, fromdocname, builder, typ, target, node,
contnode):
key = ip_object_anchor(typ, target)
try:
info = self.data[typ][key]
except KeyError:
text = contnode.rawsource
role = self.roles.get(typ)
if role is None:
return None
resnode = role.result_nodes(env.get_doctree(fromdocname),
env, node, True)[0][2]
if isinstance(resnode, addnodes.pending_xref):
text = node[0][0]
reporter = env.get_doctree(fromdocname).reporter
reporter.warning('Cannot resolve reference to %r' % text,
line=node.line)
return node.children
return resnode
else:
title = typ.upper() + ' ' + target
return make_refnode(builder, fromdocname, info[0], key,
contnode, title)
@property
def items(self):
return dict((key, self.data[key]) for key in self.object_types)
def get_objects(self):
for typ, items in self.items.items():
for path, info in items.items():
anchor = ip_object_anchor(typ, path)
yield (path, path, typ, info[0], anchor, 1)
def process_ips(app, doctree):
env = app.builder.env
domaindata = env.domaindata[IPDomain.name]
for node in doctree.traverse(ip_node):
ip = node.astext()
domaindata['ips'].append({
'docname': env.docname,
'source': node.parent.source or env.doc2path(env.docname),
'lineno': node.parent.line,
'ip': ip,
'typ': node.parent['typ'],
})
replacement = nodes.literal(ip, ip)
node.replace_self(replacement)
def sort_ip(item):
return Network(item).ip
def create_table_row(rowdata):
row = nodes.row()
for cell in rowdata:
entry = nodes.entry()
row += entry
entry += cell
return row
def process_ip_nodes(app, doctree, fromdocname):
env = app.builder.env
domaindata = env.domaindata[IPDomain.name]
header = (l_('IP address'), l_('Used by'))
colwidths = (1, 3)
for node in doctree.traverse(ip_range):
content = []
net = Network(node['rangespec'])
ips = {}
for key, value in [
(ip_info['ip'], ip_info) for ip_info in
domaindata['ips'] if ip_info['ip'] in net
]:
addrlist = ips.get(key, [])
addrlist.append(value)
ips[key] = addrlist
if ips:
table = nodes.table()
tgroup = nodes.tgroup(cols=len(header))
table += tgroup
for colwidth in colwidths:
tgroup += nodes.colspec(colwidth=colwidth)
thead = nodes.thead()
tgroup += thead
thead += create_table_row([
nodes.paragraph(text=label) for label in header])
tbody = nodes.tbody()
tgroup += tbody
for ip, ip_info in [
(ip, ips[ip]) for ip in sorted(ips, key=sort_ip)
]:
para = nodes.paragraph()
para += nodes.literal('', ip)
refnode = nodes.paragraph()
refuris = set()
refnodes = []
for item in ip_info:
ids = ip_object_anchor(item['typ'], item['ip'])
if ids not in para['ids']:
para['ids'].append(ids)
domaindata[item['typ']][ids] = (fromdocname, '')
newnode = nodes.reference('', '', internal=True)
try:
newnode['refuri'] = app.builder.get_relative_uri(
fromdocname, item['docname'])
if newnode['refuri'] in refuris:
continue
refuris.add(newnode['refuri'])
except NoUri:
pass
title = env.titles[item['docname']]
innernode = nodes.Text(title.astext())
newnode.append(innernode)
refnodes.append(newnode)
for count in range(len(refnodes)):
refnode.append(refnodes[count])
if count < len(refnodes) - 1:
refnode.append(nodes.Text(", "))
tbody += create_table_row([para, refnode])
content.append(table)
else:
para = nodes.paragraph(l_('No IP addresses in this range'))
content.append(para)
node.replace_self(content)
def setup(app):
app.add_domain(IPDomain)
app.connect('doctree-read', process_ips)
app.connect('doctree-resolved', process_ip_nodes)
return {'version': __version__}