Luigi task scheduler

2020-01-06
Python Automation



Table of Contents

1. What is Luigi

Luigi is a Python package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.

It is easy to set up and integrates well with python code. This, for example, make it possible to call tasks directly from python and/or to set task dependencies dinamically.

You can install it directly with pip:

pip install luigi

2. Usage

2.1. How it works

Luigi work with tasks that are defined with classes that extend the luigi.Task class. The basic usage is to overwrite the run function (with what to do) and the output (with the file that will output).

All Luigi tasks need to end with writting a file. This is how luigi tracks which tasks are completed.

For example a basic task would look like:

import luigi
from datetime import date, datetime

class ReportTask(luigi.Task):

    def run(self):
        with open(f"output.txt", "w") as stream:
            stream.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    def output(self):
        return luigi.LocalTarget("output.txt")

This task will write a file called output.txt with the execution datetime.

2.2. Basic example

For a real example tasks should also have dependencies.

First of all, for this example let's create two python files (register.py and report.py) which run simple python task. They both will accept a name as parameter so that it can be run for different days. The first one will simply write a file with the execution time:

register.py
from datetime import date, datetime

def main(filename):

    uri = f"output/{filename}.txt"

    with open(uri, "w") as stream:
        stream.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    print(f"File '{uri}' wrote")

And the other will write the same as an html file:

report.py
from datetime import date, datetime
from markdown import markdown


def main(filename):

    uri = f"output/{filename}.html"

    html = markdown(
        f"""# Report
        {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
        """
    )

    with open(uri, "w") as stream:
        stream.write(html)

    print(f"File '{uri}' wrote")

With those files it is possible to create the associated luigi tasks:

from datetime import date, datetime
import luigi

class RegisterTask(luigi.Task):

    mdate = luigi.DateParameter(default=date.today())

    def run(self):
        from register import main

        main(self.mdate.strftime("%Y_%m_%d"))

    def output(self):
        return luigi.LocalTarget(self.mdate.strftime("output/%Y_%m_%d.txt"))


class ReportTask(luigi.Task):

    mdate = luigi.DateParameter(default=date.today())

    def run(self):
        from report import main

        main(self.mdate.strftime("%Y_%m_%d"))

    def output(self):
        return luigi.LocalTarget(self.mdate.strftime("output/%Y_%m_%d.html"))

Both tasks have a parameter called mdate so that each day the task runs it will write a different file.

Finally let's add a dummy task that requires both RegisterTask and ReportTask.

class DoAllTask(luigi.WrapperTask):

    mdate = luigi.DateParameter(default=date.today())

    def requires(self):
        return RegisterTask(self.mdate), ReportTask(self.mdate)

Since DoAllTask does not need to do anything it extends luigi.WrapperTask.

To run the example you first need to start the luigi server with:

luigid

And then you put the three tasks to a file called master.py and add:

if __name__ == "__main__":
    luigi.build([DoAllTask()])

You can run:

python master.py

And you should see how Luigi completes the 3 tasks.

2.3. Luigi server

If you go to localhost:8082 you can see luigi server.

luigi_server

It gives a summary of the tasks and their status.

If you select a task (for example DoAllTask) you can see the dependency tree.

luigi_pipeline_svg

Remember to unmark the Hide Done button to see all task.

It is also possible view the dependency graph created with D3.

luigi_pipeline_d3

3. Using task templates

The best way to orchestrate tasks in a fault tolerant way is by having a script (or package) for each task. This is the same as the last example where there ware report.py and register.py scripts.

Then since not all tasks will output a file let's create a template for task that writes a yaml file with metadata about the execution. This will include the start_time, end_time and duration among others. If the task fails it will also show info about the failure.

When a task fails will output file with have a slightly different name so that if it is run another time it will try to repeat the task.

The full template would be:

luigi_default.py
import os
import time
from datetime import date, datetime

import luigi
import oyaml as yaml


PATH_LUIGI_YAML = "runs/"


class StandardTask(luigi.Task):
    """
        Extends luigi task, instead of calling run, one must call run_std

        Params:
            mdate:          date of execution
            t_data:         is a dictionary with instance data
            worker_timeout: maximum time allowed for a task to run in seconds
    """

    mdate = luigi.DateParameter(default=date.today())
    worker_timeout = 1 * 3600  # Default timeout is 1h per task
    t_data = {}

    # This is meant to be overwritten
    module = "change_this_to_module_name"

    def output_filename(self, success=True):
        """ Get output filename """

        # output will be a yaml file inside a folder with date
        uri = f"{PATH_LUIGI_YAML}{self.mdate:%Y%m%d}/"

        # make sure folder exists
        os.makedirs(uri, exist_ok=True)

        # add task name
        uri += self.__class__.__name__

        # If task fails write a file with different name
        # This allows re-runs to retry the failed task while keeping info about fails
        if not success:
            uri += datetime.now().strftime("_fail_%Y%m%d_%H%M%S")

        return f"{uri}.yaml"

    def output(self, success=True):
        return luigi.LocalTarget(self.output_filename())

    def save_result(self, success=True, **kwa):
        """ Stores result as a yaml file """

        # Store basic execution info
        self.t_data["end_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        self.t_data["duration"] = time.time() - self.start_time
        self.t_data["success"] = success

        # Allow extra params like 'exception'
        self.t_data.update(**kwa)

        # Export them as an ordered yaml
        with open(self.output_filename(success), "w") as stream:
            yaml.dump(self.t_data, stream)

    def on_failure(self, exception):

        # If there is an error store it anyway
        self.save_result(success=False, exception=repr(exception))
        self.disabled = True

        # If needed, do extra stuff (like log.error)

        # End up raising the error to Luigi
        super().on_failure(exception)

    def run_std(self):
        """
            This is what the task will actually do.

            If it is not overwritten it will 'import module' and then run:

                module.main(mdate)
        """

        # By default run the 'main' function of the asked module
        module = __import__(self.module)
        module.main(self.mdate.strftime("%Y_%m_%d"))

    def run(self):
        # Store start time and task name
        self.t_data["name"] = self.__class__.__name__
        self.start_time = time.time()
        self.t_data["start_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # Run the task and store the resutls
        self.run_std()
        self.save_result()

Then is easy to create a StandardTask. The two task from the example below would be:

master.py
import luigi

from luigi_default import StandardTask, date


class RegisterTask(StandardTask):
    module = "register"


class ReportTask(StandardTask):
    module = "report"


class DoAllTask(luigi.WrapperTask):

    mdate = luigi.DateParameter(default=date.today())

    def requires(self):
        return RegisterTask(self.mdate), ReportTask(self.mdate)


if __name__ == "__main__":
    luigi.build([DoAllTask()])

So it is only needed to say the name of the script/package as parameter called module of the StandardTask instance. This script/package needs a function called main since is what would be run by default.

It is even possible to adapt the StandardTask to do something different by overwritting the run_std function. For example:

class ReportTask(StandardTask):
    def run_std():
        with open(f"output.txt", "w") as stream:
            stream.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

4. Automatically run luigi

If you want a process always runing and to be restarted on failure you should use supervisor. You should you use it for luigid.

For runing the tasks itselfs Luigi does not have a scheduler but you can use cron instead.

4.1. Install supervisor

First install it with:

sudo apt install supervisor -y

Then create some auxiliar folders that you will need:

#!/bin/bash 

# For luigi logs
sudo mkdir /var/log/luigi/

# For cron logs (and give writte permission)
sudo mkdir /var/log/cron/
sudo chmod 777 /var/log/cron/

# This is to allow luigi to store the worker state
sudo mkdir /var/lib/luigi-server/

And now declare the luigid service by creating the file /etc/supervisor/conf.d/luigid.conf

/etc/supervisor/conf.d/luigid.conf
[program:luigid]
command=/home/ubuntu/.local/bin/luigid
stopsignal=QUIT
stdout_logfile=/var/log/luigi/luigid.log
stderr_logfile=/var/log/luigi/luigid.error.log
autorestart=true
user=ubuntu

It is important to use the full path for the command that supervisor will run. Be careful to set user to one that has visbility of the luigid file.

Remember you can call whereis luigid to get the full path

Then start supervisor:

sudo supervisorctl reread
sudo service supervisor restart

# Check the result
sudo supervisorctl status

If there are no errors you can start using Luigi.

4.2. Create a virtual environment

First install virtualenv with:

pip3 install virtualenv

Then create the virtual environment:

# Make a folder for virtual environments
mkdir /home/ubuntu/venv/

# Create a virtualenv called vtasks
virtualenv -p /home/ubuntu/venv/ vtasks

Then to activate the environment you can call:

source /home/ubuntu/venv/vtasks/bin/activate

And deactivate to deactivate the environment.

4.3. Script file to run luigi

Let's create the run_luigi.sh file with:

~/run_luigi.sh
#!/bin/bash 

# Add ssh keys
eval `keychain --agents ssh --eval github_ssh`

# Go to desired path
cd /home/ubuntu/villoro_tasks/

# Activate virtual environment
source /home/ubuntu/venv/vtasks/bin/activate

# Git fetch and checkout
git fetch
git checkout master
git pull origin master

# Install requirements
pip install -r requirements.txt

# Run luigi
python src/master.py

deactivate

Do not forget the #!/bin/bash part. If you do, the source command won't work.

This script does some things:

  1. Loading the ssh keys so that it can work with GIT.
  2. Change the path to the repository
  3. Activate the virtual environment
  4. GIT fetch and checkout new changes in master
  5. Install requirements
  6. Run Luigi

In order to run this you should give it executable permissions with:

chmod +x /home/ubuntu/run_luigi.sh

4.4. Runing luigi tasks with cron

Te best way to run Luigi automatically is using cron.

After doing all the previous steps you can set cron to run the script with:

crontab -e

Add this adding this line at the end:

0 4 * * * /home/ubuntu/run_luigi.sh >> /var/log/cron/luigi.log 2>&1

In this example it will run each day at 04:00 am GMT. If you want to run at different moment you can use crontab.guru to check cron expressions.

It will also write the output as log file called /var/log/cron/luigi.log.