Examples¶
All code examples can be found here.
An example job¶
A job can contain anything from single tasks, to incredibly complex task lists. Instead of covering this entire range, we’ve instead create a jupyter notebook for you to experiment with creating a DANE job yourself.
https://github.com/CLARIAH/DANE/blob/master/examples/job_examples.ipynb
Instead of performing any interaction with a DANE server, the job examples work with a DummyHandler which implements all the required handler functionality. The DummyHandler simply stores all information in variables, as such there is no persistence of the data, but it does allow for experimenting with DANE jobs.
An example worker¶
DANE workers have been designed such that very little boilerplate code is necessary.
Nonetheless, some boilerplate is required to ensure we can rely on the logic defined
in the DANE.base_classes.base_worker.
In this example we will break down the code on how to construct a worker which returns the file size of a source file.
We’ll start by defining a new class, which we have appropriately named
filesize_worker. We ensure that it inherits from DANE.base_classes.base_worker,
and then we’re ready to start adding logic.
class filesize_worker(DANE.base_classes.base_worker):
__queue_name = 'filesize_queue'
__binding_key = '#.FILESIZE'
def __init__(self, config):
super().__init__(queue=self.__queue_name,
binding_key=self.__binding_key, config=config)
First, we define two class constants with the name of the queue the worker should use, and the binding key. We want all workers of the same type to share the same queue name, so if we start multiple workers they can divide the work.
The binding key follows the pattern <file type>.<task key>, where the file type
can be * for any type of source material, or optionally we can build a worker which only
processes video files. The task_key is the key used in the job task list to specify that
we mean this type of worker.
In theory, multiple different workers can have the same task_key, while having a different queue name. This could be use for example to do logging. However, this can be risky in that if the queue for the intended task is not initialised, the task might never be assigned to the correct queue.
Up next is the __init__ function. In order to properly set up the worker we need to call the init of the base_worker class, provide the queue name, binding key, and the config parameters to connect to the RabbitMQ instance. If the worker requires any set up, the init can be extended to include this as well.
Besides any setting up logic which might be in the init, the majority of worker specific logic is contained in the callback. This function is called whenever a new job is read from the queue.
The base_worker contains all the code for interacting with the queue, so in the callback we can focus on actually doing the work.
def callback(self, job):
if exists(job.source_url):
fs = getsize(job.source_url)
return json.dumps({'state': 200,
'message': 'Success',
'size': fs})
else:
return json.dumps({'state': 404,
'message': 'No file found at source_url'})
The callback receives a job, including the entire task list and any responses
from preceding tasks. For the file size worker we are only interested in the
source material. We assume that the source material is a local file, so we can
rely on functionality from os.path.
The first step is to check if the source material actually exists. In general, any input verification and validity checking is relegated to the workers themselves. If the file exists, we retrieve its size and return a JSON serialised dict containing the success state (200), a message detailing that we have succeeded, and the actual file size.
The file size will be added to the job, in the response attribute of the DANE.Job,
and as such it will be available to subsequent workers. The response attribute is a dictionary
where the first level of keys match the task_key of the job that produced it, and the
value of that key is a dictionary of the responses. Subsequent workers after this worker,
would then have access to job.response['FILESIZE']['size'] to retrieve the file size.
For the else clause, we can simply return a 404 state, and a descriptive message to indicate that the source material was not found. In all cases a job must return at least a state and a message. For more on states see Task states.
Lastly, we need some code to start the worker.
if __name__ == '__main__':
fsw = filesize_worker(cfg)
print(' # Initialising worker. Ctrl+C to exit')
try:
fsw.run()
except KeyboardInterrupt:
fsw.stop()
To start a worker, we first initialise it with a config file. By default a worker only needs access to the RabbitMQ details provided by the DANE.config, such that it can set up a queue and listen to work to perform. However, this can be extended with worker specific configuration options. More details on how to work with the configuration can be found in the Usage guide.
After having initialised the worker we can simply call the DANE.base_classes.base_worker.run()
method to start listening for work. As this starts a blocking process, we have
added a way in which we can (slightly) more elegantly interrupt it. Namely,
once Ctrl+C is pressed, this will trigger the KeyBoardInterrupt exception,
which we catch with the try-except block, and then we call the stop method.
To test this worker it is necessary to have access to a RabbitMQ instance. However, to simulate job requests we have constructed a generator which can be run without having to set up the other components of a DANE server.