Snippets
Created by
Lucas Correia
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | # :coding: utf-8
# :copyright: Copyright (c) 2016 ftrack
# This snippet was last tested with ftrack 3.5.9 and Python API 1.3.1
import logging
import functools
import ftrack_api
import ftrack_api.inspection
logger = logging.getLogger('cascade_status_changes_event_listener')
status_map = {
'any_blocked': 'a0bc2444-15e2-11e1-b21a-0019bb4983d8',
'all_not_started': '44dd9fb2-4164-11df-9218-0019bb4983d8',
'any_in_progres': '44ddd0fe-4164-11df-9218-0019bb4983d8',
'all_done': '44de097a-4164-11df-9218-0019bb4983d8'
}
def is_status_change(entity):
'''Return if updated *entity* is a status change.'''
is_task_entity = entity['entityType'] == 'task'
is_add_update = entity.get('action') in ('add','update')
is_status_change = 'statusid' in entity.get('keys', [])
return (
is_task_entity and is_add_update and is_status_change
)
def get_new_shot_status(shot, tasks):
'''Update statuses for *shot*.'''
logger.info('Current shot status: {}'.format(shot['status']['name']))
any_blocked = False
all_not_started = True
any_in_progres = False
all_done = True
for child in tasks:
try:
state = child['status']['state']['short']
except KeyError:
logger.info(u'Child {} has no status'.format(
ftrack_api.inspection.identity(child)
))
continue
if state == 'BLOCKED':
all_not_started = False
all_done = False
any_blocked = True
elif state == 'NOT_STARTED':
all_done = False
elif state == 'IN_PROGRESS':
all_not_started = False
all_done = False
any_in_progres = True
elif state == 'DONE':
all_not_started = False
else:
logger.warning(u'Unknown state returned: {}'.format(state))
continue
new_status_id = None
if all_done == True:
new_status_id = status_map['all_done']
elif all_not_started == True:
new_status_id = status_map['all_not_started']
elif any_blocked == True:
new_status_id = status_map['any_blocked']
elif any_in_progres == True:
new_status_id = status_map['any_in_progres']
logger.info(u'Updating shot status to {}'.format(new_status_id))
return new_status_id
def cascade_status_changes_event_listener(session, event):
'''Handle *event*.'''
user_id = event['source'].get('user', {}).get('id', None)
if not user_id:
logger.warning('No source user...')
status_changed = False
entties = event['data'].get('entities', [])
for entity in entties:
if is_status_change(entity):
entity_id = entity['entityId']
# shot = session.query(
# 'select status_id, status.name, children.status.state.short from Shot '
# 'where children any (id is "{0}")'.format(entity_id)
# ).first()
shot = session.query(
'select status_id, status.name from Shot '
'where children any (id is "{0}")'.format(entity_id)
).first()
if shot:
tasks = session.query(
'select type.name, status.state.short from Task '
'where parent_id is "{}"'.format(shot['id'])
)
new_shot_status_id = get_new_shot_status(shot, tasks)
if shot['status_id'] != new_shot_status_id:
shot['status_id'] = new_shot_status_id
status_changed = True
else:
logger.info('No shot found, ignoring update')
if status_changed:
# Persist changes
try:
session.commit()
except Exception:
logger.exception('Failed to update status')
session.rollback()
raise
if user_id and status_changed:
# Trigger a message to the user (new in ftrack 3.3.31)
session.event_hub.publish(
ftrack_api.event.base.Event(
topic='ftrack.action.trigger-user-interface',
data=dict(
type='message',
success=True,
message='Shot status updated automatically'
),
target='applicationId=ftrack.client.web and user.id="{0}"'.format(user_id)
),
on_error='ignore'
)
def register(session, **kw):
'''Register event listener.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an incompatible API
# and return without doing anything.
if not isinstance(session, ftrack_api.Session):
# Exit to avoid registering this plugin again.
return
# Register the event handler
handle_event = functools.partial(
cascade_status_changes_event_listener, session
)
session.event_hub.subscribe('topic=ftrack.update', handle_event)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
session = ftrack_api.Session()
register(session)
# Wait for events.
session.event_hub.wait()
|
Comments (0)
You can clone a snippet to your computer for local editing. Learn more.