Luigi is a Python package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.
It is easy to set up and integrates well with python code. This, for example, make it possible to call tasks directly from python and/or to set task dependencies dinamically.
You can install it directly with pip:
pip install luigi
Luigi work with tasks that are defined with classes that extend the luigi.Task
class.
The basic usage is to overwrite the run
function (with what to do) and the output
(with the file that will output).
All Luigi tasks need to end with writting a file. This is how luigi tracks which tasks are completed.
For example a basic task would look like:
import luigi from datetime import date, datetime class ReportTask(luigi.Task): def run(self): with open(f"output.txt", "w") as stream: stream.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) def output(self): return luigi.LocalTarget("output.txt")
This task will write a file called output.txt
with the execution datetime.
For a real example tasks should also have dependencies.
First of all, for this example let's create two python files (register.py
and report.py
) which run simple python task.
They both will accept a name as parameter so that it can be run for different days.
The first one will simply write a file with the execution time:
from datetime import date, datetime def main(filename): uri = f"output/{filename}.txt" with open(uri, "w") as stream: stream.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) print(f"File '{uri}' wrote")
And the other will write the same as an html file:
from datetime import date, datetime from markdown import markdown def main(filename): uri = f"output/{filename}.html" html = markdown( f"""# Report {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} """ ) with open(uri, "w") as stream: stream.write(html) print(f"File '{uri}' wrote")
With those files it is possible to create the associated luigi tasks:
from datetime import date, datetime import luigi class RegisterTask(luigi.Task): mdate = luigi.DateParameter(default=date.today()) def run(self): from register import main main(self.mdate.strftime("%Y_%m_%d")) def output(self): return luigi.LocalTarget(self.mdate.strftime("output/%Y_%m_%d.txt")) class ReportTask(luigi.Task): mdate = luigi.DateParameter(default=date.today()) def run(self): from report import main main(self.mdate.strftime("%Y_%m_%d")) def output(self): return luigi.LocalTarget(self.mdate.strftime("output/%Y_%m_%d.html"))
Both tasks have a parameter called mdate
so that each day the task runs it will write a different file.
Finally let's add a dummy task that requires both RegisterTask
and ReportTask
.
class DoAllTask(luigi.WrapperTask): mdate = luigi.DateParameter(default=date.today()) def requires(self): return RegisterTask(self.mdate), ReportTask(self.mdate)
Since
DoAllTask
does not need to do anything it extendsluigi.WrapperTask
.
To run the example you first need to start the luigi server with:
luigid
And then you put the three tasks to a file called master.py
and add:
if __name__ == "__main__": luigi.build([DoAllTask()])
You can run:
python master.py
And you should see how Luigi completes the 3 tasks.
If you go to localhost:8082
you can see luigi server.
It gives a summary of the tasks and their status.
If you select a task (for example DoAllTask
) you can see the dependency tree.
Remember to unmark the
Hide Done
button to see all task.
It is also possible view the dependency graph created with D3
.
The best way to orchestrate tasks in a fault tolerant way is by having a script (or package) for each task.
This is the same as the last example where there ware report.py
and register.py
scripts.
Then since not all tasks will output a file let's create a template for task that writes a yaml
file with metadata about the execution.
This will include the start_time
, end_time
and duration
among others.
If the task fails it will also show info about the failure.
When a task fails will output file with have a slightly different name so that if it is run another time it will try to repeat the task.
The full template would be:
import os import time from datetime import date, datetime import luigi import oyaml as yaml PATH_LUIGI_YAML = "runs/" class StandardTask(luigi.Task): """ Extends luigi task, instead of calling run, one must call run_std Params: mdate: date of execution t_data: is a dictionary with instance data worker_timeout: maximum time allowed for a task to run in seconds """ mdate = luigi.DateParameter(default=date.today()) worker_timeout = 1 * 3600 # Default timeout is 1h per task t_data = {} # This is meant to be overwritten module = "change_this_to_module_name" def output_filename(self, success=True): """ Get output filename """ # output will be a yaml file inside a folder with date uri = f"{PATH_LUIGI_YAML}{self.mdate:%Y%m%d}/" # make sure folder exists os.makedirs(uri, exist_ok=True) # add task name uri += self.__class__.__name__ # If task fails write a file with different name # This allows re-runs to retry the failed task while keeping info about fails if not success: uri += datetime.now().strftime("_fail_%Y%m%d_%H%M%S") return f"{uri}.yaml" def output(self, success=True): return luigi.LocalTarget(self.output_filename()) def save_result(self, success=True, **kwa): """ Stores result as a yaml file """ # Store basic execution info self.t_data["end_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.t_data["duration"] = time.time() - self.start_time self.t_data["success"] = success # Allow extra params like 'exception' self.t_data.update(**kwa) # Export them as an ordered yaml with open(self.output_filename(success), "w") as stream: yaml.dump(self.t_data, stream) def on_failure(self, exception): # If there is an error store it anyway self.save_result(success=False, exception=repr(exception)) self.disabled = True # If needed, do extra stuff (like log.error) # End up raising the error to Luigi super().on_failure(exception) def run_std(self): """ This is what the task will actually do. If it is not overwritten it will 'import module' and then run: module.main(mdate) """ # By default run the 'main' function of the asked module module = __import__(self.module) module.main(self.mdate.strftime("%Y_%m_%d")) def run(self): # Store start time and task name self.t_data["name"] = self.__class__.__name__ self.start_time = time.time() self.t_data["start_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Run the task and store the resutls self.run_std() self.save_result()
Then is easy to create a StandardTask. The two task from the example below would be:
import luigi from luigi_default import StandardTask, date class RegisterTask(StandardTask): module = "register" class ReportTask(StandardTask): module = "report" class DoAllTask(luigi.WrapperTask): mdate = luigi.DateParameter(default=date.today()) def requires(self): return RegisterTask(self.mdate), ReportTask(self.mdate) if __name__ == "__main__": luigi.build([DoAllTask()])
So it is only needed to say the name of the script/package as parameter called module
of the StandardTask
instance.
This script/package needs a function called main
since is what would be run by default.
It is even possible to adapt the StandardTask
to do something different by overwritting the run_std
function.
For example:
class ReportTask(StandardTask): def run_std(): with open(f"output.txt", "w") as stream: stream.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
If you want a process always runing and to be restarted on failure you should use supervisor.
You should you use it for luigid
.
For runing the tasks itselfs Luigi does not have a scheduler but you can use cron instead.
First install it with:
sudo apt install supervisor -y
Then create some auxiliar folders that you will need:
#!/bin/bash # For luigi logs sudo mkdir /var/log/luigi/ # For cron logs (and give writte permission) sudo mkdir /var/log/cron/ sudo chmod 777 /var/log/cron/ # This is to allow luigi to store the worker state sudo mkdir /var/lib/luigi-server/
And now declare the luigid
service by creating the file /etc/supervisor/conf.d/luigid.conf
[program:luigid] command=/home/ubuntu/.local/bin/luigid stopsignal=QUIT stdout_logfile=/var/log/luigi/luigid.log stderr_logfile=/var/log/luigi/luigid.error.log autorestart=true user=ubuntu
It is important to use the full path for the command that supervisor will run.
Be careful to set user
to one that has visbility of the luigid
file.
Remember you can call
whereis luigid
to get the full path
Then start supervisor:
sudo supervisorctl reread
sudo service supervisor restart
# Check the result
sudo supervisorctl status
If there are no errors you can start using Luigi.
First install virtualenv with:
pip3 install virtualenv
Then create the virtual environment:
# Make a folder for virtual environments mkdir /home/ubuntu/venv/ # Create a virtualenv called vtasks virtualenv -p /home/ubuntu/venv/ vtasks
Then to activate the environment you can call:
source /home/ubuntu/venv/vtasks/bin/activate
And deactivate
to deactivate the environment.
Let's create the run_luigi.sh
file with:
#!/bin/bash # Add ssh keys eval `keychain --agents ssh --eval github_ssh` # Go to desired path cd /home/ubuntu/villoro_tasks/ # Activate virtual environment source /home/ubuntu/venv/vtasks/bin/activate # Git fetch and checkout git fetch git checkout master git pull origin master # Install requirements pip install -r requirements.txt # Run luigi python src/master.py deactivate
Do not forget the
#!/bin/bash
part. If you do, thesource
command won't work.
This script does some things:
In order to run this you should give it executable permissions with:
chmod +x /home/ubuntu/run_luigi.sh
Te best way to run Luigi automatically is using cron.
After doing all the previous steps you can set cron to run the script with:
crontab -e
Add this adding this line at the end:
0 4 * * * /home/ubuntu/run_luigi.sh >> /var/log/cron/luigi.log 2>&1
In this example it will run each day at 04:00 am GMT. If you want to run at different moment you can use crontab.guru to check cron expressions.
It will also write the output as log file called /var/log/cron/luigi.log
.