Decorator to Prevent Duplicate Execution of Django Management Commands
Django
2015-11-27 00:40 (10 years ago)

def find_process_ids(pattern):
"""
Finds and returns process IDs that match the given pattern (excluding the current process).
:rtype: list
"""
self_pid = os.getpid()
try:
out = subprocess.check_output(['pgrep', '-f', pattern])
return list(filter(
lambda x: x != self_pid, map(int, out.splitlines())))
except subprocess.CalledProcessError:
return []
def runs_once(batch_name):
"""
Prevents duplicate execution of management commands.
@runs_once(__file__)
def handle(...):
...
"""
re_batch_name = re.compile(r'^/.*/(\w+)\.py$')
m = re_batch_name.match(batch_name)
if m:
batch_name = m.group(1)
def _inner(func):
@wraps(func)
def decorate(*args, **kwargs):
process_ids = find_process_ids(batch_name)
if process_ids:
print('Process already exists. {}, {}'.format(
batch_name, process_ids)
)
return
else:
return func(*args, **kwargs)
return decorate
return _inner
Please rate this article
Currently unrated
The author runs the application development company Cyberneura.
We look forward to discussing your development needs.
We look forward to discussing your development needs.