Source code for pyveg.src.pyveg_pipeline


A PIPELINE is the whole analysis procedure for one set of coordinates.
It will likely consist of a couple of SEQUENCES - e.g.
one for vegetation data and one for weather data.

A SEQUENCE is composed of one or more MODULES, that each do specific tasks,
e.g. download data, process images, calculate quantities from image.

A special type of MODULE may be placed at the end of a PIPELINE to combine
the results of the different SEQUENCES into one output file.

import os
import json
import subprocess
import time

import logging
from logging.handlers import RotatingFileHandler

logger = logging.getLogger("pyveg_logger")
formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")

c_handler = logging.StreamHandler()
f_handler = RotatingFileHandler(
    maxBytes=5 * 1024 * 1024, backupCount=10


from pyveg.src.file_utils import save_json

from shutil import copyfile

    from pyveg.src import azure_utils
    from pyveg.src import batch_utils
    print("Azure utils could not be imported - is Azure SDK installed?")

[docs]class Pipeline(object): """ A Pipeline contains all the Sequences we want to run on a particular set of coordinates and a date range. e.g. there might be one Sequence for vegetation data and one for weather data. """ def __init__(self, name): = name self.sequences = [] self.coords = None self.date_range = None self.output_location = None self.output_location_type = None self.is_configured = False def __iadd__(self, sequence): """ Overload the '+=' operator, so we can add sequences directly to the pipeline. """ sequence.parent = self self.__setattr__(, sequence) self.sequences.append(sequence) return self def __repr__(self): """ overload the builtin operator for printing the pipeline. """ output = "\n[Pipeline]: {} \n".format( output += "=======================\n" output += "coordinates: {}\n".format(self.coords) output += "date_range: {}\n".format(self.date_range) output += "output_location: {}\n".format(self.output_location) output += "output_location_type: {}\n".format(self.output_location_type) output += "\n ------- Sequences ----------\n\n" for s in self.sequences: output += s.__repr__() output += "=======================\n" return output
[docs] def get(self, seq_name): """ Return a sequence object when asked for by name. """ for sequence in self.sequences: if == seq_name: return sequence
[docs] def configure(self): """ Configure all the sequences in this pipeline. """ for var in ["coords", "date_range", "output_location", "output_location_type"]: if (not var in vars(self)) or (not self.__getattribute__(var)): raise RuntimeError( "{}: need to set {} before calling configure()".format(, var ) ) if self.output_location_type == "azure": # ensure that the output location conforms to azure container-name requirements container_name = azure_utils.sanitize_container_name(self.output_location) self.output_location = container_name for sequence in self.sequences: if not "coords" in vars(sequence): sequence.coords = self.coords if not "date_range" in vars(sequence): sequence.date_range = self.date_range sequence.configure() self.is_configured = True
[docs] def run(self): """ run all the sequences in this pipeline """ for sequence in self.sequences: self.print_run_status() self.cleanup()
[docs] def print_run_status(self): for sequence in self.sequences: sequence.print_run_status()
[docs] def cleanup(self): """ Call cleanup() for all our sequences """ for sequence in self.sequences: sequence.cleanup()
[docs]class Sequence(object): """ A Sequence is a collection of Modules where the output of one module is typically the input to the next one. It will typically correspond to a particular data collection, e.g. for vegetation imagery, we might have one module to download the images, one to process them, and one to analyze the processed images. """ def __init__(self, name): = name self.modules = [] self.depends_on = [] self.parent = None self.output_location = None self.output_location_type = None self.is_configured = False self.is_finished = False self.run_status = {} def __iadd__(self, module): """ overload the += operator so we can add modules directly to the sequence """ module.parent = self self.modules.append(module) # if the module doesn't already have a name, or only the default one, # give it a name <sequence_name>_<class_name> here if (not or ( == module.__class__.__name__): = "{}_{}".format(, module.__class__.__name__) # add module name as an attribute of the sequence self.__setattr__(, module) return self
[docs] def join_path(self, *path_elements): """ If output_location_type is 'local', we will just use os.path.join, which puts a "/" separator in for posix, or "\" for windows. However, if output_location_type is 'azure', we always want "/". Parameters ========== path_elements: list of strings. Directory-like path elements. Returns ======= path: str, the path elements joined by "/" or "\". """ if self.output_location_type == "azure": path = "/".join(path_elements) else: path = os.path.join(*path_elements) return path
[docs] def set_output_location(self): if self.parent: self.output_location_type = self.parent.output_location_type self.output_location = self.join_path( self.parent.output_location, f"gee_{self.coords[0]}_{self.coords[1]}" + "_" +"/", "-"), ) else: self.output_location = ( f"gee_{self.coords[0]}_{self.coords[1]}" + "_" +"/", "-") ) self.output_location_type = "local"
[docs] def set_config(self, config_dict): for k, v in config_dict.items():"{}: setting {} to {}".format(, k, v)) self.__setattr__(k, v)
[docs] def configure(self): if (not self.coords) or (not self.date_range): raise RuntimeError( "{}: Need to set coords and date range before calling configure()".format( ) ) if not self.output_location: self.set_output_location() # set the input location for each module to be the output of the previous one. for i, module in enumerate(self.modules): module.output_location = self.output_location module.output_location_type = self.output_location_type if i > 0: module.input_location = self.modules[i - 1].output_location module.input_location_type = self.modules[i - 1].output_location_type # modules will depend on the previous module in the sequence module.depends_on.append(self.modules[i - 1].name) module.coords = self.coords module.date_range = self.date_range module.configure() self.is_configured = True
[docs] def run(self): """ Before we run the Modules in this Sequence, check if there are any other Sequences on which we depend, and if so, wait for them to finish. """ if len(self.depends_on) > 0: "{} Checking if all dependency Sequences have finished".format( ) ) dependencies_finished = False while not dependencies_finished: num_seq_finished = 0 for seq_name in self.depends_on: seq = self.parent.get(seq_name)"{}: checking status of {}".format(, if seq.check_if_finished():" {} ... finished".format( num_seq_finished += 1 dependencies_finished = num_seq_finished == len(self.depends_on) "{}: {} / {} dependencies finished".format(, num_seq_finished, len(self.depends_on) ) ) time.sleep(10) self.create_batch_job_if_needed() for module in self.modules: self.run_status[] =
def __repr__(self): if not self.is_configured: return "Sequence not configured\n" output = "\n [Sequence]: {} \n".format( output += " =======================\n" for k, v in vars(self).items(): # exclude the things we don't want to print if ( k == "name" or k == "modules" or k == "parent" or isinstance(v, BaseModule) ): continue output += " {}: {}\n".format(k, v) output += "\n ------- Modules ----------\n\n" for m in self.modules: output += m.__repr__() output += " =======================\n\n" return output
[docs] def print_run_status(self): """ For all modules in the sequence, print out how many jobs succeeded or failed. """"\nSequence {}".format("-----------------\n") for module in self.modules: module.print_run_status()"\n")
[docs] def get(self, mod_name): """ Return a module object when asked for by name, or by class name """ for module in self.modules: if == mod_name: return module elif module.__class__.__name__ == mod_name: return module
[docs] def has_batch_job(self): """ Do any of the Modules in this sequence have run_mode == 'batch'? """ for module in self.modules: if "run_mode" in vars(module) and module.run_mode == "batch": return True return False
[docs] def create_batch_job_if_needed(self): """ If any modules in this sequence are to be run in batch mode, create a batch job for them. """ if self.has_batch_job(): self.batch_job_id = + "_" + time.strftime("%Y-%m-%d_%H-%M-%S") batch_utils.create_job(self.batch_job_id) "Sequence {}: Creating batch job {}".format(, self.batch_job_id ) )
[docs] def check_if_finished(self): """ Only relevant when one or more modules are running in batch mode, Sequences that depend on this Sequence will call this function while they wait for all Modules to finish. """ num_modules_finished = 0 for module in self.modules:"{}: checking status of {}".format(, if module.check_if_finished():"{} ... finished".format( num_modules_finished += 1 "{} / {} modules finished".format(num_modules_finished, len(self.modules)) ) self.is_finished = num_modules_finished == len(self.modules) return self.is_finished
[docs] def cleanup(self): """ If we have batch resources (job/pool), remove them to avoid charges """ if self.has_batch_job(): if "batch_job_id" in vars(self): batch_utils.delete_job(self.batch_job_id) batch_utils.delete_pool()
[docs]class BaseModule(object): """ A "Module" is a building block of a sequence - takes some input, does something (e.g. Downloads from GEE, processes some images, ...) and produces some output. The working directory for all modules within a sequence will be given by the sequence - modules may write output to subdirectories of this (e.g. for different dates), but what we call "output_location" will be the base directory common to all modules, and will contain info about the image collection name, and the coordinates. """ def __init__(self, name=None): if name: = name else: = self.__class__.__name__ self.params = [] self.parent = None self.depends_on = [] self.is_configured = False self.is_finished = False self.run_status = {"succeeded": 0, "failed": 0, "incomplete": 0}
[docs] def set_parameters(self, config_dict): for k, v in config_dict.items():"{}: setting {} to {}".format(, k, v)) self.__setattr__(k, v)
[docs] def configure(self, config_dict=None): """ Order of preference for configuriation: 1) config_dict 2) values held by the parent Sequence 3) default values So we set them in reverse order here, so higher priorities will override. """ self.set_default_parameters() if self.parent: for param, param_type in self.params: if param in vars(self.parent): self.__setattr__(param, self.parent.__getattribute__(param)) if config_dict: self.set_parameters(config_dict) self.check_config() self.is_configured = True
[docs] def check_config(self): """ Loop through list of parameters, which will each be a tuple (name, [allowed_types]) and check that the parameter exists, and is of the correct type. """ for param in self.params: if not param[0] in vars(self): raise RuntimeError( "{}: {} needs to be set.".format(, param[0]) ) val = self.__getattribute__(param[0]) type_ok = False for param_type in param[1]: if isinstance(val, param_type): type_ok = True break if not type_ok: raise TypeError( "{}: {} should be {}, got {}:{}".format(, param[0], param[1], val, type(val) ) ) return True
[docs] def set_default_parameters(self): pass
[docs] def prepare_for_run(self): if not self.is_configured: raise RuntimeError( "Module {} needs to be configured before running".format( ) if self.output_location_type == "azure": # if we're running this module standalone on azure, we might need to # create the output container on the blob storage account" output_location_base = self.output_location.split("/")[0] container_name = azure_utils.sanitize_container_name(output_location_base) if not azure_utils.check_container_exists(container_name):"Create container {}".format(container_name)) azure_utils.create_container(container_name) elif self.output_location_type == "local" and not os.path.exists( self.output_location ): os.makedirs(self.output_location, exist_ok=True)
[docs] def check_if_finished(self): return self.is_finished
def __repr__(self): if not self.is_configured: return "\n Module not configured" output = " [Module]: {} \n".format( output += " =======================\n" for k, v in vars(self).items(): # exclude the things we don't want to print if k == "name" or k == "parent" or k == "params": continue output += " {}: {}\n".format(k, v) output += " =======================\n\n" return output
[docs] def join_path(self, *path_elements): """ If output_location_type is 'local', we will just use os.path.join, which puts a "/" separator in for posix, or "\" for windows. However, if output_location_type is 'azure', we always want "/". Parameters ========== path_elements: list of strings. Directory-like path elements. Returns ======= path: str, the path elements joined by "/" or "\". """ if self.output_location_type == "azure": path = "/".join(path_elements) else: path = os.path.join(*path_elements) return path
[docs] def copy_to_output_location(self, tmpdir, output_location, file_endings=[]): """ Copy contents of a temporary directory to a specified output location. Parameters ========== tmpdir: str, location of temporary directory output_location: str, either path to a local directory (if self.output_location_type is "local") or to Azure <container>/<blob_path> if self.output_location_type=="azure") file_endings: list of str, optional. If given, only files with those endings will be copied. """ if self.output_location_type == "local": os.makedirs(output_location, exist_ok=True) for root, dirs, files in os.walk(tmpdir): for filename in files: if file_endings: for ending in file_endings: if filename.endswith(ending): copyfile(os.path.join(root,filename), os.path.join(output_location,filename)) else: [ "cp", "-r", os.path.join(root, filename), os.path.join(output_location, filename), ] ) elif self.output_location_type == "azure": # first part of self.output_location should be the container name container_name = self.output_location.split("/")[0] azure_utils.write_files_to_blob( tmpdir, container_name, output_location, file_endings )
[docs] def list_directory(self, directory_path, location_type): """ List contents of a directory, either on local file system or Azure blob storage. """ if location_type == "local": if not os.path.isdir(directory_path): return [] return os.listdir(directory_path) elif location_type == "azure": # first part of self.output_location should be the container name container_name = self.output_location.split("/")[0] return azure_utils.list_directory(directory_path, container_name) else: raise RuntimeError("Unknown location_type - must be 'local' or 'azure'")
[docs] def save_json(self, data, filename, location, location_type): """ Save json to local filesystem or blob storage depending on location_type """ if location_type == "local": save_json(data, location, filename) elif location_type == "azure": # first part of self.output_location should be the container name container_name = self.output_location.split("/")[0] azure_utils.save_json(data, location, filename, container_name) else: raise RuntimeError("Unknown location_type - must be 'local' or 'azure'")
[docs] def get_json(self, filepath, location_type): """ Read a json file either local or blob storage. """ if location_type == "local": if os.path.exists(filepath): return json.load(open(filepath)) else: return None elif location_type == "azure": # first part of filepath should be the container name container_name = filepath.split("/")[0] return azure_utils.read_json(filepath, container_name) else: raise RuntimeError("Unknown location_type - must be 'local' or 'azure'")
[docs] def get_file(self, filename, location_type): """ Just return the filename if location _type is "local". Otherwise return a tempfile with the contents of a blob if the location is "azure". """ if location_type == "local": return filename elif location_type == "azure": # first part of self.output_location should be the container name container_name = self.output_location.split("/")[0] return azure_utils.get_blob_to_tempfile(filename, container_name) else: raise RuntimeError("Unknown location_type - must be 'local' or 'azure'")
[docs] def check_for_existing_files(self, location, num_files_expected): """ See if there are already num_files in the specified location. If "replace_existing_files" is set to True, always return False """ if self.output_location_type == "local": os.makedirs(location, exist_ok=True) # if we haven't specified number of expected files per point it will be -1 if num_files_expected < 0: return False if self.replace_existing_files: return False existing_files = self.list_directory(location, self.output_location_type) if len(existing_files) == num_files_expected: "{}: Already found {} files in {} - skipping".format(, num_files_expected, location ) ) return True return False
[docs] def get_config(self): """ Get the configuration of this module as a dict. """ config_dict = {} for param, _ in self.params: config_dict[param] = self.__getattribute__(param) config_dict["class_name"] = self.__class__.__name__ return config_dict
[docs] def save_config(self, config_location): """ Write out the configuration of this module as a json file. """ config_dict = self.get_config() output_config_dir = os.path.dirname(config_location) if output_config_dir and not os.path.exists(output_config_dir): os.makedirs(output_config_dir) with open(config_location, "w") as output_json: json.dump(config_dict, output_json)"{}: wrote config to {}".format(, config_location))
[docs] def print_run_status(self): """ Print out how many jobs succeeded or failed """ "{}: Succeeded: {} Failed: {} Incomplete: {}".format(, self.run_status["succeeded"], self.run_status["failed"], self.run_status["incomplete"] ) )