Source

iredadmin-ose / libs / ldaplib / admin.py

Full commit
Zhang Huangbin 878d625 

Zhang Huangbin 551aed9 

Zhang Huangbin 878d625 

















Zhang Huangbin 3e2eaa4 
Zhang Huangbin 878d625 



Zhang Huangbin 3e2eaa4 

Zhang Huangbin 878d625 






















Zhang Huangbin 551aed9 


Zhang Huangbin 878d625 






















Zhang Huangbin 551aed9 
Zhang Huangbin 878d625 









Zhang Huangbin 551aed9 
Zhang Huangbin 878d625 











Zhang Huangbin 551aed9 

Zhang Huangbin 878d625 



















Zhang Huangbin 551aed9 

Zhang Huangbin 878d625 



Zhang Huangbin 3e2eaa4 

Zhang Huangbin 878d625 










Zhang Huangbin 551aed9 
Zhang Huangbin 878d625 



Zhang Huangbin 3e2eaa4 


Zhang Huangbin 878d625 




Zhang Huangbin 551aed9 

Zhang Huangbin 878d625 




















Zhang Huangbin 551aed9 

Zhang Huangbin 878d625 









Zhang Huangbin 551aed9 

Zhang Huangbin 878d625 

Zhang Huangbin 551aed9 
Zhang Huangbin 878d625 






















Zhang Huangbin 551aed9 

Zhang Huangbin 878d625 















Zhang Huangbin 551aed9 
























# Author: Zhang Huangbin <zhb@iredmail.org>

import ldap
import ldap.filter
import web
from libs import iredutils
from libs.ldaplib import core, attrs, ldaputils, iredldif, deltree, connUtils, decorators

cfg = web.iredconfig
session = web.config.get('_session')


class Admin(core.LDAPWrap):
    def __del__(self):
        try:
            self.conn.unbind()
        except:
            pass

    # Get preferredLanguage.
    def getPreferredLanguage(self, dn):
        dn = ldap.filter.escape_filter_chars(dn)
        lang = self.conn.search_s(
            dn,
            ldap.SCOPE_BASE,
            attrlist=['preferredLanguage'],
        )
        if 'preferredLanguage' in lang[0][1].keys():
            lang = lang[0][1]['preferredLanguage'][0]
        else:
            lang = web.ctx.lang
        return lang

    # List all admin accounts.
    @decorators.require_global_admin
    def listAccounts(self, attrs=attrs.ADMIN_SEARCH_ATTRS):
        filter = "(objectClass=mailAdmin)"
        try:
            result = self.conn.search_s(
                self.domainadmin_dn,
                ldap.SCOPE_ONELEVEL,
                filter,
                attrs,
            )
            return (True, result)
        except Exception, e:
            return (False, ldaputils.getExceptionDesc(e))

    # Get admin profile.
    def profile(self, mail):
        self.mail = web.safestr(mail)
        self.dn = ldaputils.convKeywordToDN(self.mail, accountType='admin')
        if self.dn[0] is False:
            return self.dn

        try:
            self.admin_profile = self.conn.search_s(
                self.dn,
                ldap.SCOPE_BASE,
                '(&(objectClass=mailAdmin)(mail=%s))' % self.mail,
                attrs.ADMIN_ATTRS_ALL,
            )
            return (True, self.admin_profile)
        except ldap.NO_SUCH_OBJECT:
            return (False, 'NO_SUCH_OBJECT')
        except Exception, e:
            return (False, ldaputils.getExceptionDesc(e))

    # Add new admin.
    @decorators.require_global_admin
    def add(self, data):
        self.cn = data.get('cn')
        self.mail = web.safestr(data.get('mail')).strip().lower()

        if not iredutils.isEmail(self.mail):
            return (False, 'INVALID_MAIL')

        self.domainGlobalAdmin = web.safestr(data.get('domainGlobalAdmin', 'no'))
        if self.domainGlobalAdmin not in ['yes', 'no', ]:
            self.domainGlobalAdmin = 'no'

        self.preferredLanguage = web.safestr(data.get('preferredLanguage', 'en_US'))

        # Check password.
        self.newpw = web.safestr(data.get('newpw'))
        self.confirmpw = web.safestr(data.get('confirmpw'))

        result = iredutils.verifyNewPasswords(self.newpw, self.confirmpw)
        if result[0] is True:
            self.passwd = ldaputils.generateLDAPPasswd(result[1])
        else:
            return result

        ldif = iredldif.ldif_mailadmin(
                mail=self.mail,
                passwd=self.passwd,
                cn=self.cn,
                preferredLanguage=self.preferredLanguage,
                domainGlobalAdmin=self.domainGlobalAdmin,
                )

        self.dn = ldaputils.convKeywordToDN(self.mail, accountType='admin')
        if self.dn[0] is False:
            return self.dn

        try:
            self.conn.add_s(self.dn, ldif)
            web.logger(msg="Create admin: %s." % (self.mail), event='create',)
            return (True,)
        except ldap.ALREADY_EXISTS:
            return (False, 'ALREADY_EXISTS')
        except Exception, e:
            return (False, ldaputils.getExceptionDesc(e))

    # Update admin profile.
    def update(self, profile_type, mail, data):
        self.profile_type = web.safestr(profile_type)
        self.mail = web.safestr(mail)

        if session.get('domainGlobalAdmin') is not True and session.get('username') != self.mail:
            # Don't allow to view/update other admins' profile.
            return (False, 'PERMISSION_DENIED')

        self.dn = ldaputils.convKeywordToDN(self.mail, accountType='admin')
        if self.dn[0] is False:
            return self.dn

        mod_attrs = []
        if self.profile_type == 'general':
            # Get preferredLanguage.
            lang = web.safestr(data.get('preferredLanguage', 'en_US'))
            mod_attrs += [(ldap.MOD_REPLACE, 'preferredLanguage', lang)]

            # Get cn.
            cn = data.get('cn', None)
            mod_attrs += ldaputils.getSingleModAttr(attr='cn', value=cn, default=self.mail.split('@')[0],)

            # Get accountStatus.
            if 'accountStatus' in data.keys():
                accountStatus = 'active'
            else:
                accountStatus = 'disabled'

            mod_attrs += [(ldap.MOD_REPLACE, 'accountStatus', accountStatus)]

            try:
                # Modify profiles.
                self.conn.modify_s(self.dn, mod_attrs)
                if session.get('username') == self.mail and \
                   session.get('lang', 'en_US') != lang:
                    session['lang'] = lang
            except ldap.LDAPError, e:
                return (False, ldaputils.getExceptionDesc(e))

        elif self.profile_type == 'password':
            self.cur_passwd = data.get('oldpw', None)
            self.newpw = web.safestr(data.get('newpw'))
            self.confirmpw = web.safestr(data.get('confirmpw'))

            result = iredutils.verifyNewPasswords(self.newpw, self.confirmpw)
            if result[0] is True:
                self.passwd = result[1]
            else:
                return result

            # Change password.
            if self.cur_passwd is None and session.get('domainGlobalAdmin') is True:
                # Reset password without verify old password.
                self.cur_passwd = None
            else:
                self.cur_passwd = str(self.cur_passwd)

            connutils = connUtils.Utils()
            result = connutils.changePasswd(dn=self.dn, cur_passwd=self.cur_passwd, newpw=self.passwd,)
            if result[0] is True:
                return (True,)
            else:
                return result

        return (True,)

    @decorators.require_global_admin
    def delete(self, mails):
        if mails is None or len(mails) == 0:
            return (False, 'NO_ACCOUNT_SELECTED')

        result = {}

        for mail in mails:
            self.mail = web.safestr(mail)
            dn = ldaputils.convKeywordToDN(self.mail, accountType='admin')
            if dn[0] is False:
                return dn

            try:
                deltree.DelTree(self.conn, dn, ldap.SCOPE_SUBTREE)
                web.logger(msg="Delete admin: %s." % (self.mail,), event='delete',)
            except ldap.LDAPError, e:
                result[self.mail] = str(e)

        if result == {}:
            return (True,)
        else:
            return (False, ldaputils.getExceptionDesc(result))

    @decorators.require_global_admin
    def enableOrDisableAccount(self, mails, action, attr='accountStatus',):
        if mails is None or len(mails) == 0:
            return (False, 'NO_ACCOUNT_SELECTED')

        result = {}
        connutils = connUtils.Utils()
        for mail in mails:
            self.mail = web.safestr(mail).strip().lower()
            if not iredutils.isEmail(self.mail):
                continue

            self.domain = self.mail.split('@')[-1]
            self.dn = ldaputils.convKeywordToDN(self.mail, accountType='admin')
            if self.dn[0] is False:
                return self.dn

            try:
                connutils.enableOrDisableAccount(
                    domain=self.domain,
                    account=self.mail,
                    dn=self.dn,
                    action=web.safestr(action).strip().lower(),
                    accountTypeInLogger='admin',
                )
            except ldap.LDAPError, e:
                result[self.mail] = str(e)

        if result == {}:
            return (True,)
        else:
            return (False, ldaputils.getExceptionDesc(result))

    def getNumberOfManagedAccounts(self, admin=None, accountType='domain', domains=[],):
        if admin is None:
            admin = session.get('username')
        else:
            admin = str(admin)

        if not iredutils.isEmail(admin):
            return 0

        domains = []
        if len(domains) > 0:
            domains = [str(d).lower() for d in domains if iredutils.isDomain(d)]
        else:
            connutils = connUtils.Utils()
            qr = connutils.getManagedDomains(mail=admin, attrs=['domainName'], listedOnly=True)
            if qr[0] is True:
                domains = qr[1]

        if accountType == 'domain':
            try:
                return len(domains)
            except Exception:
                pass

        return 0