Decorator to Prevent Duplicate Execution of Django Management Commands

Django
2015-11-27 09:40 (9 years ago) ytyng
def find_process_ids(pattern):
    """
    Finds and returns process IDs that match the given pattern (excluding the current process).
    :rtype: list
    """
    self_pid = os.getpid()
    try:
        out = subprocess.check_output(['pgrep', '-f', pattern])
        return list(filter(
            lambda x: x != self_pid, map(int, out.splitlines())))
    except subprocess.CalledProcessError:
        return []


def runs_once(batch_name):
    """
    Prevents duplicate execution of management commands.

    @runs_once(__file__)
    def handle(...):
        ...

    """
    re_batch_name = re.compile(r'^/.*/(\w+)\.py$')
    m = re_batch_name.match(batch_name)
    if m:
        batch_name = m.group(1)

    def _inner(func):
        @wraps(func)
        def decorate(*args, **kwargs):
            process_ids = find_process_ids(batch_name)
            if process_ids:
                print('Process already exists. {}, {}'.format(
                    batch_name, process_ids)
                )
                return
            else:
                return func(*args, **kwargs)

        return decorate

    return _inner
Currently unrated
The author runs the application development company Cyberneura.
We look forward to discussing your development needs.

Archive

2025
2024
2023
2022
2021
2020
2019
2018
2017
2016
2015
2014
2013
2012
2011