Source

django-statemachine / statemachine / fsm.py

Full commit
from copy import deepcopy
from django.conf import settings
from django.forms.forms import pretty_name
"""
Some Exceptions
"""
class FSM_Exception(Exception): pass
class FSM_TransitionNotAllowed(FSM_Exception): pass
class FSM_StateDoesNotExist(FSM_Exception): pass
class FSM_NotAllowed(FSM_Exception): pass

class FSM_VerificationError(FSM_Exception):
    def __init__(self, message, states=None):
        self.message = message
        self.states = states

    def __str__(self):
        if self.states:
            return "%s %s" % (self.message, self.states)
        else:
            return self.message

"""
Django settings
"""
try:
    STATE_MACHINE_KEEP_HISTORY = settings.STATE_MACHINE_KEEP_HISTORY
except AttributeError:
    STATE_MACHINE_KEEP_HISTORY = True
try:
    STATE_MACHINE_DEFAULT = settings.STATE_MACHINE_DEFAULT
except AttributeError:
    STATE_MACHINE_DEFAULT = "start"


from django.utils.datastructures import SortedDict

def get_declared_states(bases, attrs, with_base_states=True):
    """
    ** "Inspired by" django.forms.forms **

    Create a list of State instances from the passed in 'attrs', plus any
    similar states on the base classes (in 'bases').

    If 'with_base_states' is True, all states from the bases are used.
    Otherwise, only states in the 'declared_states' attribute on the bases are
    used.
    """
    states = [(state_name, attrs.pop(state_name)) for state_name, obj in attrs.items() if isinstance(obj, State)]
    for name, state in states:
        state.name = name
    states.sort(key=lambda x: x[1].creation_counter)

    # If this class is subclassing another StateMachine, add that StateMachine's states.
    # Note that we loop over the bases in *reverse*. This is necessary in
    # order to preserve the correct order of statess.
    if with_base_states:
        for base in bases[::-1]:
            if hasattr(base, 'base_states'):
                states = base.base_states.items() + states
    else:
        for base in bases[::-1]:
            if hasattr(base, 'declared_states'):
                states = base.declared_states.items() + states

    return SortedDict(states)

class DeclarativeStatesMetaclass(type):
    """
    Metaclass that converts State attributes to a dictionary called
    'base_states', taking into account parent class 'base_states' as well.
    """
    def __new__(cls, name, bases, attrs):
        attrs['base_states'] = get_declared_states(bases, attrs)
        new_class = super(DeclarativeStatesMetaclass,
                     cls).__new__(cls, name, bases, attrs)
        return new_class

class State(object):
    """
    Represents each individual state of the machine. 

    exit_states: 
        a list of strings representing allowed transitions from this state

    entry_action(exited_state, model): 
        a function to run on entry into the state. 

    exit action(target_state, model): 
        a function to run on exit from the state, must return
        FSM_TransitionNotAllowed if conditions for the transition are not
        met. 

    """
    name = ""
    exit_states = []
    entry_action = None
    exit_action = None
    creation_counter = 0

    def __init__(self, label=None, exits_to=None, entry_action=None, exit_action=None, **kwargs):
        self.label = label
        self.exit_states = exits_to or []
        self.entry_action = entry_action
        self.exit_action = exit_action
        for k, v in kwargs.items():
            setattr(self, k, v)
            

    def __unicode__(self):
        return self.name

    def exit(self, target_state, *args, **kwargs):
        """ 
        Checks states and exits if possible, if not possible it raises FSM_TransitionNotAllowed 
        """
        if self.exit_action:
            self.exit_action(target_state, *args, **kwargs)

        return True

    def enter(self, exited_state, *args, **kwargs):
        """
        Runs an entry action if it is set
        """
        if self.entry_action:
            return self.entry_action(exited_state, *args, **kwargs)
        
class FSM(object):
    """
    Simple FSM implementation.
    """
    __metaclass__ = DeclarativeStatesMetaclass

    def __init__(self, verify_on_execute=True):
        self.states = deepcopy(self.base_states)
        self.__state = self.states[self.states.keys()[0]].name
        self.dbg = None
        self.verify_on_execute = verify_on_execute

        for k, s in self.states.items():
            if '*' in s.exit_states:
                s.exit_states = [key for key in self.states.keys()]

    def __unicode__(self):
        return self.__state

    def getstate(self):
        return self.__state
    def setstate(self, value):
        raise FSM_NotAllowed("State is read only, please use change() to change state")
    state = property(getstate, setstate)

    def set_initial_state(self, state):
        self.__state = state

    def available_states(self):
        return [self.states[s] for s in self.states[self.__state].exit_states]
        
    def verify(self):
        """
        Check that all the named exit states exist, list the ones that don't
        """
        bad_states = []
        state_names = set(self.states.keys())
        for name, state in self.states.items():
            bad_states.extend(set(state.exit_states) - state_names)

        if len(bad_states):
            raise FSM_VerificationError("Invalid exit state(s)", bad_states)

    def change(self, new_state, *args, **kwargs):
        """
        Transitions the machine to its new state.
        """
        if self.verify_on_execute:
            self.verify()

        if not new_state in self.states:
            raise FSM_StateDoesNotExist(new_state)

        exiting_state = self.states[self.state]

        if not new_state in exiting_state.exit_states:
            raise FSM_TransitionNotAllowed(new_state)

        entering_state = self.states[new_state]

        #TODO: this needs to be transactional, shouldn't exit unless it
        #can enter
        exiting_state.exit(entering_state, *args, **kwargs)
        entering_state.enter(entering_state, *args, **kwargs)

        self.__state = new_state

        return self.state