django-statemachine / statemachine / fsm.py

from copy import deepcopy
from django.conf import settings
from django.utils.datastructures import SortedDict

# Django settings
try:
    STATE_MACHINE_KEEP_HISTORY = settings.STATE_MACHINE_KEEP_HISTORY
except AttributeError:
    STATE_MACHINE_KEEP_HISTORY = True
try:
    STATE_MACHINE_DEFAULT = settings.STATE_MACHINE_DEFAULT
except AttributeError:
    STATE_MACHINE_DEFAULT = "start"


#Some Exceptions
class FSM_Exception(Exception):
    """
    Base exception for the FSM implementation.
    """
    pass


class FSM_TransitionNotAllowed(FSM_Exception):
    """Raised when a state change is prevented, generally when an the
    attempted change isn't in the exit_states list.

    """
    pass


class FSM_StateDoesNotExist(FSM_Exception):
    """
    Raised when a change to a non-existent state is requested.
    """


class FSM_NotAllowed(FSM_Exception):
    pass


class FSM_VerificationError(FSM_Exception):
    """Verification error, raised when FSM.verify() fails, should
    contain a list of failed states.

    """
    def __init__(self, message, states=None):
        super(FSM_VerificationError, self).__init__()
        self.message = message
        self.states = states

    def __str__(self):
        if self.states:
            return "%s %s" % (self.message, self.states)
        else:
            return self.message


def get_declared_states(bases, attrs, with_base_states=True):
    """
    *Taken pretty much verbatim from django.forms*

    Create a list of State instances from the passed in 'attrs', plus any
    similar states on the base classes (in 'bases').

    If 'with_base_states' is True, all states from the bases are used.
    Otherwise, only states in the 'declared_states' attribute on the bases are
    used.

    """
    states = [(state_name, attrs.pop(state_name))
              for state_name, obj in attrs.items()
              if isinstance(obj, State)]
    for name, state in states:
        state.name = name
    states.sort(key=lambda x: x[1].creation_counter)

    # If this class is subclassing another StateMachine, add that
    # StateMachine's states.  Note that we loop over the bases in
    # *reverse*. This is necessary in order to preserve the correct
    # order of statess.
    if with_base_states:
        for base in bases[::-1]:
            if hasattr(base, 'base_states'):
                states = base.base_states.items() + states
    else:
        for base in bases[::-1]:
            if hasattr(base, 'declared_states'):
                states = base.declared_states.items() + states

    return SortedDict(states)


class DeclarativeStatesMetaclass(type):
    """
    *Taken pretty much verbatim from django.forms*

    Metaclass that converts State attributes to a dictionary called
    'base_states', taking into account parent class 'base_states' as well.

    """
    def __new__(mcs, name, bases, attrs):
        attrs['base_states'] = get_declared_states(bases, attrs)
        new_class = super(
            DeclarativeStatesMetaclass,
            mcs
        ).__new__(mcs, name, bases, attrs)
        return new_class


class State(object):
    """
    Represents each individual state of the machine.

    exit_states:
        a list of strings representing allowed transitions from this state

    entry_action(exited_state, model):
        a function to run on entry into the state.

    exit action(target_state, model):
        a function to run on exit from the state, must return
        FSM_TransitionNotAllowed if conditions for the transition are not
        met.

    """
    name = ""
    exit_states = []
    entry_action = None
    exit_action = None
    creation_counter = 0

    def __init__(self, label=None, exits_to=None,
                 entry_action=None, exit_action=None, **kwargs):
        self.label = label
        self.exit_states = exits_to or []
        self.entry_action = entry_action
        self.exit_action = exit_action
        for k, v in kwargs.items():
            setattr(self, k, v)

    def __unicode__(self):
        if self.label:
            return self.label
        return self.name

    def exit(self, target_state, *args, **kwargs):
        """Checks states and exits if possible, if not possible it
        raises FSM_TransitionNotAllowed

        """
        if self.exit_action:
            self.exit_action(target_state, *args, **kwargs)

        return True

    def enter(self, exited_state, *args, **kwargs):
        """
        Runs an entry action if it is set
        """
        if self.entry_action:
            return self.entry_action(exited_state, *args, **kwargs)


class FSM(object):
    """
    Simple FSM implementation. Takes a declarative approach to
    defining states, and their interactions and capabilities.

    """
    __metaclass__ = DeclarativeStatesMetaclass

    def __init__(self, verify_on_execute=True):
        self.states = deepcopy(self.base_states)
        try:
            self.__state = self.states.keys()[0]
        except IndexError:
            self.__state = ''
        self.dbg = None
        self.verify_on_execute = verify_on_execute

        for s in self.states.values():
            if '*' in s.exit_states:
                s.exit_states = [key for key in self.states.keys()]

    def __unicode__(self):
        return self.__state

    def getstate(self):
        if self.__state and self.__state in self.states:
            return self.states[self.__state]

    def setstate(self, value):
        raise FSM_NotAllowed("State is read only, use change() instead.")
    state = property(getstate, setstate)

    def set_initial_state(self, state):
        """
        Sets the initial state of the FSM, should only be used when
        restoring a FSM into a state that has been previously attained
        from the inital state.
        """
        if state in self.states:
            self.__state = state
        else:
            self.__state = self.states.keys()[0]

    def available_states(self):
        """Returns a list containing the available exit states from
        the current state

        """
        return [self.states[s] for s in self.state.exit_states]

    def verify(self):
        """
        Check that all the states named in exit_states exist.

        If any named states are missing verify() throws a
        FSM_VerificationError which contains a list of the bad states.
        """
        bad_states = []
        state_names = set(self.states.keys())
        for state in self.states.values():
            bad_states.extend(set(state.exit_states) - state_names)

        if len(bad_states):
            raise FSM_VerificationError("Invalid exit state(s)", bad_states)

    def change(self, new_state, *args, **kwargs):
        """
        Transitions the machine to its new state, assuming it is a
        valid exit state and that entry and exit functions allow it.

        All provided arguments are passed to the relevant exit and entry
        functions as well as the current state.
        """
        if self.verify_on_execute:
            self.verify()

        if not new_state in self.states:
            raise FSM_StateDoesNotExist(new_state)

        exiting_state = self.state

        if not new_state in exiting_state.exit_states:
            raise FSM_TransitionNotAllowed(new_state)

        entering_state = self.states[new_state]

        #TODO: this needs to be transactional, shouldn't exit unless it
        #can enter
        exiting_state.exit(entering_state, *args, **kwargs)
        entering_state.enter(entering_state, *args, **kwargs)

        self.__state = new_state

        return self.state
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.