Source

ytmanager / gdata / apps / audit / service.py

# Copyright (C) 2008 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Allow Google Apps domain administrators to audit user data.

  AuditService: Set auditing."""

__author__ = 'jlee@pbu.edu'

from base64 import b64encode

import gdata.apps
import gdata.apps.service
import gdata.service

class AuditService(gdata.apps.service.PropertyService):
  """Client for the Google Apps Audit service."""

  def _serviceUrl(self, setting_id, domain=None, user=None):
    if domain is None:
      domain = self.domain
    if user is None:
      return '/a/feeds/compliance/audit/%s/%s' % (setting_id, domain)
    else:
      return '/a/feeds/compliance/audit/%s/%s/%s' % (setting_id, domain, user)

  def updatePGPKey(self, pgpkey):
    """Updates Public PGP Key Google uses to encrypt audit data

    Args:
      pgpkey: string, ASCII text of PGP Public Key to be used

    Returns:
      A dict containing the result of the POST operation."""

    uri = self._serviceUrl('publickey')
    b64pgpkey = b64encode(pgpkey)
    properties = {}
    properties['publicKey'] = b64pgpkey
    return self._PostProperties(uri, properties)

  def createEmailMonitor(self, source_user, destination_user, end_date, 
                         begin_date=None, incoming_headers_only=False, 
                         outgoing_headers_only=False, drafts=False, 
                         drafts_headers_only=False, chats=False, 
                         chats_headers_only=False):
    """Creates a email monitor, forwarding the source_users emails/chats

    Args:
      source_user: string, the user whose email will be audited
      destination_user: string, the user to receive the audited email
      end_date: string, the date the audit will end in
                "yyyy-MM-dd HH:mm" format, required
      begin_date: string, the date the audit will start in 
                  "yyyy-MM-dd HH:mm" format, leave blank to use current time
      incoming_headers_only: boolean, whether to audit only the headers of
                             mail delivered to source user
      outgoing_headers_only: boolean, whether to audit only the headers of
                             mail sent from the source user
      drafts: boolean, whether to audit draft messages of the source user
      drafts_headers_only: boolean, whether to audit only the headers of
                           mail drafts saved by the user
      chats: boolean, whether to audit archived chats of the source user
      chats_headers_only: boolean, whether to audit only the headers of
                          archived chats of the source user

    Returns:
      A dict containing the result of the POST operation."""

    uri = self._serviceUrl('mail/monitor', user=source_user)
    properties = {}
    properties['destUserName'] = destination_user
    if begin_date is not None:
      properties['beginDate'] = begin_date
    properties['endDate'] = end_date
    if incoming_headers_only:
      properties['incomingEmailMonitorLevel'] = 'HEADER_ONLY'
    else:
      properties['incomingEmailMonitorLevel'] = 'FULL_MESSAGE'
    if outgoing_headers_only:
      properties['outgoingEmailMonitorLevel'] = 'HEADER_ONLY'
    else:
      properties['outgoingEmailMonitorLevel'] = 'FULL_MESSAGE'
    if drafts:
      if drafts_headers_only:
        properties['draftMonitorLevel'] = 'HEADER_ONLY'
      else:
        properties['draftMonitorLevel'] = 'FULL_MESSAGE'
    if chats:
      if chats_headers_only:
        properties['chatMonitorLevel'] = 'HEADER_ONLY'
      else:
        properties['chatMonitorLevel'] = 'FULL_MESSAGE'
    return self._PostProperties(uri, properties)

  def getEmailMonitors(self, user):
    """"Gets the email monitors for the given user

    Args:
      user: string, the user to retrieve email monitors for

    Returns:
      list results of the POST operation

    """
    uri = self._serviceUrl('mail/monitor', user=user)
    return self._GetPropertiesList(uri)

  def deleteEmailMonitor(self, source_user, destination_user):
    """Deletes the email monitor for the given user

    Args:
      source_user: string, the user who is being monitored
      destination_user: string, theuser who recieves the monitored emails

    Returns:
      Nothing
    """

    uri = self._serviceUrl('mail/monitor', user=source_user+'/'+destination_user)
    try:
      return self._DeleteProperties(uri)
    except gdata.service.RequestError, e:
      raise AppsForYourDomainException(e.args[0])

  def createAccountInformationRequest(self, user):
    """Creates a request for account auditing details

    Args:
      user: string, the user to request account information for

    Returns:
      A dict containing the result of the post operation."""

    uri = self._serviceUrl('account', user=user)
    properties = {}
    #XML Body is left empty
    try:
      return self._PostProperties(uri, properties)
    except gdata.service.RequestError, e:
      raise AppsForYourDomainException(e.args[0])

  def getAccountInformationRequestStatus(self, user, request_id):
    """Gets the status of an account auditing request

    Args:
      user: string, the user whose account auditing details were requested
      request_id: string, the request_id

    Returns:
      A dict containing the result of the get operation."""

    uri = self._serviceUrl('account', user=user+'/'+request_id)
    try:
      return self._GetProperties(uri)
    except gdata.service.RequestError, e:
      raise AppsForYourDomainException(e.args[0])

  def getAllAccountInformationRequestsStatus(self):
    """Gets the status of all account auditing requests for the domain

    Args:
      None

    Returns:
      list results of the POST operation
    """

    uri = self._serviceUrl('account')
    return self._GetPropertiesList(uri)


  def deleteAccountInformationRequest(self, user, request_id):
    """Deletes the request for account auditing information

   Args:
     user: string, the user whose account auditing details were requested
     request_id: string, the request_id

   Returns:
     Nothing
   """

    uri = self._serviceUrl('account', user=user+'/'+request_id)
    try:
      return self._DeleteProperties(uri)
    except gdata.service.RequestError, e:
      raise AppsForYourDomainException(e.args[0])

  def createMailboxExportRequest(self, user, begin_date=None, end_date=None, include_deleted=False, search_query=None, headers_only=False):
    """Creates a mailbox export request

    Args:
      user: string, the user whose mailbox export is being requested
      begin_date: string, date of earliest emails to export, optional, defaults to date of account creation
                  format is 'yyyy-MM-dd HH:mm'
      end_date: string, date of latest emails to export, optional, defaults to current date
                format is 'yyyy-MM-dd HH:mm'
      include_deleted: boolean, whether to include deleted emails in export, mutually exclusive with search_query
      search_query: string, gmail style search query, matched emails will be exported, mutually exclusive with include_deleted

    Returns:
      A dict containing the result of the post operation."""

    uri = self._serviceUrl('mail/export', user=user)
    properties = {}
    if begin_date is not None:
      properties['beginDate'] = begin_date
    if end_date is not None:
      properties['endDate'] = end_date
    if include_deleted is not None:
      properties['includeDeleted'] = gdata.apps.service._bool2str(include_deleted)
    if search_query is not None:
      properties['searchQuery'] = search_query
    if headers_only is True:
      properties['packageContent'] = 'HEADER_ONLY'
    else:
      properties['packageContent'] = 'FULL_MESSAGE'
    return self._PostProperties(uri, properties)

  def getMailboxExportRequestStatus(self, user, request_id):
    """Gets the status of an mailbox export request

    Args:
      user: string, the user whose mailbox were requested
      request_id: string, the request_id

    Returns:
      A dict containing the result of the get operation."""

    uri = self._serviceUrl('mail/export', user=user+'/'+request_id)
    try:
      return self._GetProperties(uri)
    except gdata.service.RequestError, e:
      raise AppsForYourDomainException(e.args[0])

  def getAllMailboxExportRequestsStatus(self):
    """Gets the status of all mailbox export requests for the domain

    Args:
      None

    Returns:
      list results of the POST operation
    """

    uri = self._serviceUrl('mail/export')
    return self._GetPropertiesList(uri)


  def deleteMailboxExportRequest(self, user, request_id):
    """Deletes the request for mailbox export

   Args:
     user: string, the user whose mailbox were requested
     request_id: string, the request_id

   Returns:
     Nothing
   """

    uri = self._serviceUrl('mail/export', user=user+'/'+request_id)
    try:
      return self._DeleteProperties(uri)
    except gdata.service.RequestError, e:
      raise AppsForYourDomainException(e.args[0])