django-powerdns / powerdns /

# -*- coding: utf-8 -*-

import base64
import hashlib
import string
import time

import ipaddr

from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.validators import validate_ipv4_address
from django.db import models
from django.utils.translation import ugettext_lazy as _
from lck.django.common.models import TimeTrackable

    'A', 'AAAA', 'CNAME', 'HINFO', 'MX', 'NAPTR', 'NS', 'PTR', 'SOA', 'SRV',



RECORD_TYPES = sorted(set(

except AttributeError:

b32_trans = string.maketrans('ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',

def validate_dns_nodot(value):
    PowerDNS considers the whole zone to be invalid if any of the records end
    with a period so this custom validator is used to catch them
    if value.endswith('.'):
        raise ValidationError(
            _(u'%s is not allowed to end in a period!') % value,

def validate_ipv6_address(value):
    except ipaddr.AddressValueError:
        raise ValidationError(
            _(u'Enter a valid IPv6 address.'), code='invalid',

class Domain(TimeTrackable):
    PowerDNS domains
        ('MASTER', 'MASTER'),
        ('NATIVE', 'NATIVE'),
        ('SLAVE', 'SLAVE'),
    name = models.CharField(_("name"), unique=True, max_length=255)
    master = models.CharField(
        _("master"), max_length=128, blank=True, null=True,
    last_check = models.IntegerField(_("last check"), blank=True, null=True)
    type = models.CharField(
        _("type"), max_length=6, blank=True, null=True, choices=DOMAIN_TYPE,
    notified_serial = models.PositiveIntegerField(
        _("notified serial"), blank=True, null=True,
    account = models.CharField(
        _("account"), max_length=40, blank=True, null=True,

    class Meta:
        db_table = u'domains'
        verbose_name = _("domain")
        verbose_name_plural = _("domains")

    def __unicode__(self):

    def clean(self): =

class Record(TimeTrackable):
    PowerDNS DNS records
    RECORD_TYPE = [(r, r) for r in RECORD_TYPES]
    domain = models.ForeignKey(Domain, verbose_name=_("domain"))
    name = models.CharField(
        _("name"), max_length=255, blank=True, null=True,
        help_text=_("Actual name of a record. Must not end in a '.' and be"
                    " fully qualified - it is not relative to the name of the"
                    " domain!"),
    type = models.CharField(
        _("type"), max_length=6, blank=True, null=True,
        choices=RECORD_TYPE, help_text=_("Record qtype"),
    content = models.CharField(
        _("content"), max_length=255, blank=True, null=True,
        help_text=_("The 'right hand side' of a DNS record. For an A"
                    " record, this is the IP address"),
    ttl = models.PositiveIntegerField(
        _("TTL"), blank=True, null=True, default=3600,
        help_text=_("TTL in seconds"),
    prio = models.PositiveIntegerField(
        _("priority"), blank=True, null=True,
        help_text=_("For MX records, this should be the priority of the"
                    " mail exchanger specified"),
    change_date = models.PositiveIntegerField(
        _("change date"), blank=True, null=True,
        help_text=_("Set automatically by the system to trigger SOA"
                    " updates and slave notifications"),
    ordername = models.CharField(
        _("DNSSEC Order"), max_length=255, blank=True, null=True,
    auth = models.NullBooleanField(
        help_text=_("Should be set for data for which is itself"
                    " authoritative, which includes the SOA record and our own"
                    " NS records but not set for NS records which are used for"
                    " delegation or any delegation related glue (A, AAAA)"
                    " records"),

    class Meta:
        db_table = u'records'
        ordering = ('name', 'type')
        unique_together = ('name', 'type', 'content')
        verbose_name = _("record")
        verbose_name_plural = _("records")

    def __unicode__(self):
        if self.prio is not None:
            content = "%d %s" % (self.prio, self.content)
            content = self.content
        return "%s IN %s %s" % (, self.type, content)

    def _generate_ordername(self):
        Check which DNSSEC Mode the domain is in and fill the `ordername`
        field depending on the mode.
        cryptokey = CryptoKey.objects.filter(domain=self.domain)
        if not cryptokey.count():
            return None
        metadata = DomainMetadata.objects.filter(domain=self.domain)
        nsec3param = metadata.filter(kind='NSEC3PARAM')
        nsec3narrow = metadata.filter(kind='NSEC3NARROW')
        if nsec3param.count():
            if nsec3narrow.count():
                # When running in NSEC3 'Narrow' mode, the ordername field is
                # ignored and best left empty.
                return ''
            return self._generate_ordername_nsec3(nsec3param[0])
        return self._generate_ordername_nsec()

    def _generate_ordername_nsec(self):
        In 'NSEC' mode, it should contain the relative part of a domain name,
        in reverse order, with dots replaced by spaces
        domain_words ='.')
        host_words ='.')
        relative_word_count = len(host_words) - len(domain_words)
        relative_words = host_words[0:relative_word_count]
        ordername = ' '.join(relative_words[::-1])
        return ordername

    def _generate_ordername_nsec3(self, nsec3param):
        In 'NSEC3' non-narrow mode, the ordername should contain a lowercase
        base32hex encoded representation of the salted & iterated hash of the
        full record name.  "pdnssec hash-zone-record zone record" can be used
        to calculate this hash.
            algo, flags, iterations, salt = nsec3param.content.split()
            if algo != '1':
                raise ValueError("Incompatible hash algorithm.")
            if flags != '1':
                raise ValueError("Incompatible flags.")
            salt = salt.decode('hex')
            # convert the record name to the DNSSEC canonical form, e.g.
            # a format suitable for digesting in hashes
            record_name = '%s.' %'.')
            parts = ["%s%s" % (chr(len(x)), x) for x in record_name.split('.')]
            record_name = ''.join(parts)
        except (ValueError, TypeError, AttributeError):
            return None  # incompatible input
        record_name = self._sha1(record_name, salt)
        i = 0
        while i < int(iterations):
            record_name = self._sha1(record_name, salt)
            i += 1
        result = base64.b32encode(record_name)
        result = result.translate(b32_trans)
        return result.lower()

    def _sha1(self, value, salt):
        s = hashlib.sha1()
        return s.digest()

    def clean(self):
        if self.type == 'A':
        if self.type == 'AAAA':
        if self.type:
            self.type = self.type.upper()

    def save(self, *args, **kwargs):
        self.change_date = int(time.time())
        self.ordername = self._generate_ordername()
        super(Record, self).save(*args, **kwargs)

class SuperMaster(TimeTrackable):
    PowerDNS DNS Servers that should be trusted to push new domains to us
    ip = models.CharField(_("IP"), max_length=25)
    nameserver = models.CharField(_("name server"), max_length=255)
    account = models.CharField(
        _("account"), max_length=40, blank=True, null=True,

    class Meta:
        db_table = u'supermasters'
        ordering = ('nameserver', 'account')
        unique_together = ('nameserver', 'account')
        verbose_name = _("supermaster")
        verbose_name_plural = _("supermasters")

    def __unicode__(self):
        return self.ip

class DomainMetadata(TimeTrackable):
    domain = models.ForeignKey(Domain, verbose_name=_("domain"))
    kind = models.CharField(_("kind"), max_length=15)
    content = models.TextField(_("content"), blank=True, null=True)

    class Meta:
        db_table = u'domainmetadata'
        ordering = ('domain',)
        verbose_name = _("domain metadata")
        verbose_name_plural = _("domain metadata")

    def __unicode__(self):
        return unicode(self.domain)

class CryptoKey(TimeTrackable):
    domain = models.ForeignKey(
        Domain, verbose_name=_("domain"), blank=True, null=True,
    flags = models.PositiveIntegerField(_("flags"))
    active = models.NullBooleanField(_("active"))
    content = models.TextField(_("content"), blank=True, null=True)

    class Meta:
        db_table = u'cryptokeys'
        ordering = ('domain',)
        verbose_name = _("crypto key")
        verbose_name_plural = _("crypto keys")

    def __unicode__(self):
        return unicode(self.domain)