"""
Definitions:
===========
A PIPELINE is the whole analysis procedure for one set of coordinates.
It will likely consist of a couple of SEQUENCES - e.g.
one for vegetation data and one for weather data.
A SEQUENCE is composed of one or more MODULES, that each do specific tasks,
e.g. download data, process images, calculate quantities from image.
A special type of MODULE may be placed at the end of a PIPELINE to combine
the results of the different SEQUENCES into one output file.
"""
import os
import json
import subprocess
import time
import logging
from logging.handlers import RotatingFileHandler
logger = logging.getLogger("pyveg_logger")
formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
logger.setLevel(logging.INFO)
c_handler = logging.StreamHandler()
c_handler.setFormatter(formatter)
f_handler = RotatingFileHandler(
"pyveg_{}.log".format(time.strftime("%Y-%m-%d_%H-%M-%S")),
maxBytes=5 * 1024 * 1024, backupCount=10
)
f_handler.setFormatter(formatter)
logger.addHandler(f_handler)
logger.addHandler(c_handler)
from pyveg.src.file_utils import save_json
from shutil import copyfile
try:
from pyveg.src import azure_utils
from pyveg.src import batch_utils
except:
print("Azure utils could not be imported - is Azure SDK installed?")
[docs]class Pipeline(object):
"""
A Pipeline contains all the Sequences we want to run on a particular
set of coordinates and a date range. e.g. there might be one Sequence
for vegetation data and one for weather data.
"""
def __init__(self, name):
self.name = name
self.sequences = []
self.coords = None
self.date_range = None
self.output_location = None
self.output_location_type = None
self.is_configured = False
def __iadd__(self, sequence):
"""
Overload the '+=' operator, so we can add sequences
directly to the pipeline.
"""
sequence.parent = self
self.__setattr__(sequence.name, sequence)
self.sequences.append(sequence)
return self
def __repr__(self):
"""
overload the builtin operator for printing the pipeline.
"""
output = "\n[Pipeline]: {} \n".format(self.name)
output += "=======================\n"
output += "coordinates: {}\n".format(self.coords)
output += "date_range: {}\n".format(self.date_range)
output += "output_location: {}\n".format(self.output_location)
output += "output_location_type: {}\n".format(self.output_location_type)
output += "\n ------- Sequences ----------\n\n"
for s in self.sequences:
output += s.__repr__()
output += "=======================\n"
return output
[docs] def get(self, seq_name):
"""
Return a sequence object when asked for by name.
"""
for sequence in self.sequences:
if sequence.name == seq_name:
return sequence
[docs] def run(self):
"""
run all the sequences in this pipeline
"""
for sequence in self.sequences:
sequence.run()
self.print_run_status()
self.cleanup()
[docs] def print_run_status(self):
for sequence in self.sequences:
sequence.print_run_status()
[docs] def cleanup(self):
"""
Call cleanup() for all our sequences
"""
for sequence in self.sequences:
sequence.cleanup()
[docs]class Sequence(object):
"""
A Sequence is a collection of Modules where the output of one module is
typically the input to the next one.
It will typically correspond to a particular data collection, e.g. for
vegetation imagery, we might have one module to download the images,
one to process them, and one to analyze the processed images.
"""
def __init__(self, name):
self.name = name
self.modules = []
self.depends_on = []
self.parent = None
self.output_location = None
self.output_location_type = None
self.is_configured = False
self.is_finished = False
self.run_status = {}
def __iadd__(self, module):
"""
overload the += operator so we can add modules directly to the sequence
"""
module.parent = self
self.modules.append(module)
# if the module doesn't already have a name, or only the default one,
# give it a name <sequence_name>_<class_name> here
if (not module.name) or (module.name == module.__class__.__name__):
module.name = "{}_{}".format(self.name, module.__class__.__name__)
# add module name as an attribute of the sequence
self.__setattr__(module.name, module)
return self
[docs] def join_path(self, *path_elements):
"""
If output_location_type is 'local', we will just use
os.path.join, which puts a "/" separator in for posix, or "\" for windows.
However, if output_location_type is 'azure', we always want "/".
Parameters
==========
path_elements: list of strings. Directory-like path elements.
Returns
=======
path: str, the path elements joined by "/" or "\".
"""
if self.output_location_type == "azure":
path = "/".join(path_elements)
else:
path = os.path.join(*path_elements)
return path
[docs] def set_output_location(self):
if self.parent:
self.output_location_type = self.parent.output_location_type
self.output_location = self.join_path(
self.parent.output_location,
f"gee_{self.coords[0]}_{self.coords[1]}"
+ "_"
+ self.name.replace("/", "-"),
)
else:
self.output_location = (
f"gee_{self.coords[0]}_{self.coords[1]}"
+ "_"
+ self.name.replace("/", "-")
)
self.output_location_type = "local"
[docs] def set_config(self, config_dict):
for k, v in config_dict.items():
logger.info("{}: setting {} to {}".format(self.name, k, v))
self.__setattr__(k, v)
[docs] def run(self):
"""
Before we run the Modules in this Sequence, check if there are any other Sequences
on which we depend, and if so, wait for them to finish.
"""
if len(self.depends_on) > 0:
logger.info(
"{} Checking if all dependency Sequences have finished".format(
self.name
)
)
dependencies_finished = False
while not dependencies_finished:
num_seq_finished = 0
for seq_name in self.depends_on:
seq = self.parent.get(seq_name)
logger.info("{}: checking status of {}".format(self.name, seq.name))
if seq.check_if_finished():
logger.info(" {} ... finished".format(seq.name))
num_seq_finished += 1
dependencies_finished = num_seq_finished == len(self.depends_on)
logger.info(
"{}: {} / {} dependencies finished".format(
self.name,
num_seq_finished, len(self.depends_on)
)
)
time.sleep(10)
self.create_batch_job_if_needed()
for module in self.modules:
self.run_status[module.name] = module.run()
def __repr__(self):
if not self.is_configured:
return "Sequence not configured\n"
output = "\n [Sequence]: {} \n".format(self.name)
output += " =======================\n"
for k, v in vars(self).items():
# exclude the things we don't want to print
if (
k == "name"
or k == "modules"
or k == "parent"
or isinstance(v, BaseModule)
):
continue
output += " {}: {}\n".format(k, v)
output += "\n ------- Modules ----------\n\n"
for m in self.modules:
output += m.__repr__()
output += " =======================\n\n"
return output
[docs] def print_run_status(self):
"""
For all modules in the sequence, print out how many jobs
succeeded or failed.
"""
logger.info("\nSequence {}".format(self.name))
logger.info("-----------------\n")
for module in self.modules:
module.print_run_status()
logger.info("\n")
[docs] def get(self, mod_name):
"""
Return a module object when asked for by name, or by class name
"""
for module in self.modules:
if module.name == mod_name:
return module
elif module.__class__.__name__ == mod_name:
return module
[docs] def has_batch_job(self):
"""
Do any of the Modules in this sequence have run_mode == 'batch'?
"""
for module in self.modules:
if "run_mode" in vars(module) and module.run_mode == "batch":
return True
return False
[docs] def create_batch_job_if_needed(self):
"""
If any modules in this sequence are to be run in batch mode,
create a batch job for them.
"""
if self.has_batch_job():
self.batch_job_id = self.name + "_" + time.strftime("%Y-%m-%d_%H-%M-%S")
batch_utils.create_job(self.batch_job_id)
logger.info(
"Sequence {}: Creating batch job {}".format(
self.name, self.batch_job_id
)
)
[docs] def check_if_finished(self):
"""
Only relevant when one or more modules are running in batch mode,
Sequences that depend on this Sequence will call this function
while they wait for all Modules to finish.
"""
num_modules_finished = 0
for module in self.modules:
logger.info("{}: checking status of {}".format(self.name, module.name))
if module.check_if_finished():
logger.info("{} ... finished".format(module.name))
num_modules_finished += 1
logger.info(
"{} / {} modules finished".format(num_modules_finished, len(self.modules))
)
self.is_finished = num_modules_finished == len(self.modules)
return self.is_finished
[docs] def cleanup(self):
"""
If we have batch resources (job/pool), remove them to avoid charges
"""
if self.has_batch_job():
if "batch_job_id" in vars(self):
batch_utils.delete_job(self.batch_job_id)
batch_utils.delete_pool()
[docs]class BaseModule(object):
"""
A "Module" is a building block of a sequence - takes some input, does something
(e.g. Downloads from GEE, processes some images, ...) and produces some output.
The working directory for all modules within a sequence will be given by the sequence -
modules may write output to subdirectories of this (e.g. for different dates), but what
we call "output_location" will be the base directory common to all modules, and will contain
info about the image collection name, and the coordinates.
"""
def __init__(self, name=None):
if name:
self.name = name
else:
self.name = self.__class__.__name__
self.params = []
self.parent = None
self.depends_on = []
self.is_configured = False
self.is_finished = False
self.run_status = {"succeeded": 0, "failed": 0, "incomplete": 0}
[docs] def set_parameters(self, config_dict):
for k, v in config_dict.items():
logger.info("{}: setting {} to {}".format(self.name, k, v))
self.__setattr__(k, v)
[docs] def check_config(self):
"""
Loop through list of parameters, which will each be a tuple (name, [allowed_types])
and check that the parameter exists, and is of the correct type.
"""
for param in self.params:
if not param[0] in vars(self):
raise RuntimeError(
"{}: {} needs to be set.".format(self.name, param[0])
)
val = self.__getattribute__(param[0])
type_ok = False
for param_type in param[1]:
if isinstance(val, param_type):
type_ok = True
break
if not type_ok:
raise TypeError(
"{}: {} should be {}, got {}:{}".format(
self.name, param[0], param[1], val, type(val)
)
)
return True
[docs] def set_default_parameters(self):
pass
[docs] def prepare_for_run(self):
if not self.is_configured:
raise RuntimeError(
"Module {} needs to be configured before running".format(self.name)
)
if self.output_location_type == "azure":
# if we're running this module standalone on azure, we might need to
# create the output container on the blob storage account"
output_location_base = self.output_location.split("/")[0]
container_name = azure_utils.sanitize_container_name(output_location_base)
if not azure_utils.check_container_exists(container_name):
logger.info("Create container {}".format(container_name))
azure_utils.create_container(container_name)
elif self.output_location_type == "local" and not os.path.exists(
self.output_location
):
os.makedirs(self.output_location, exist_ok=True)
[docs] def check_if_finished(self):
return self.is_finished
def __repr__(self):
if not self.is_configured:
return "\n Module not configured"
output = " [Module]: {} \n".format(self.name)
output += " =======================\n"
for k, v in vars(self).items():
# exclude the things we don't want to print
if k == "name" or k == "parent" or k == "params":
continue
output += " {}: {}\n".format(k, v)
output += " =======================\n\n"
return output
[docs] def join_path(self, *path_elements):
"""
If output_location_type is 'local', we will just use
os.path.join, which puts a "/" separator in for posix, or "\" for windows.
However, if output_location_type is 'azure', we always want "/".
Parameters
==========
path_elements: list of strings. Directory-like path elements.
Returns
=======
path: str, the path elements joined by "/" or "\".
"""
if self.output_location_type == "azure":
path = "/".join(path_elements)
else:
path = os.path.join(*path_elements)
return path
[docs] def copy_to_output_location(self, tmpdir, output_location, file_endings=[]):
"""
Copy contents of a temporary directory to a specified output location.
Parameters
==========
tmpdir: str, location of temporary directory
output_location: str, either path to a local directory (if self.output_location_type is "local")
or to Azure <container>/<blob_path> if self.output_location_type=="azure")
file_endings: list of str, optional. If given, only files with those endings will be copied.
"""
if self.output_location_type == "local":
os.makedirs(output_location, exist_ok=True)
for root, dirs, files in os.walk(tmpdir):
for filename in files:
if file_endings:
for ending in file_endings:
if filename.endswith(ending):
copyfile(os.path.join(root,filename),
os.path.join(output_location,filename))
else:
subprocess.run(
[
"cp",
"-r",
os.path.join(root, filename),
os.path.join(output_location, filename),
]
)
elif self.output_location_type == "azure":
# first part of self.output_location should be the container name
container_name = self.output_location.split("/")[0]
azure_utils.write_files_to_blob(
tmpdir, container_name, output_location, file_endings
)
[docs] def list_directory(self, directory_path, location_type):
"""
List contents of a directory, either on local file system
or Azure blob storage.
"""
if location_type == "local":
if not os.path.isdir(directory_path):
return []
return os.listdir(directory_path)
elif location_type == "azure":
# first part of self.output_location should be the container name
container_name = self.output_location.split("/")[0]
return azure_utils.list_directory(directory_path, container_name)
else:
raise RuntimeError("Unknown location_type - must be 'local' or 'azure'")
[docs] def save_json(self, data, filename, location, location_type):
"""
Save json to local filesystem or blob storage depending on location_type
"""
if location_type == "local":
save_json(data, location, filename)
elif location_type == "azure":
# first part of self.output_location should be the container name
container_name = self.output_location.split("/")[0]
azure_utils.save_json(data, location, filename, container_name)
else:
raise RuntimeError("Unknown location_type - must be 'local' or 'azure'")
[docs] def get_json(self, filepath, location_type):
"""
Read a json file either local or blob storage.
"""
if location_type == "local":
if os.path.exists(filepath):
return json.load(open(filepath))
else:
return None
elif location_type == "azure":
# first part of filepath should be the container name
container_name = filepath.split("/")[0]
return azure_utils.read_json(filepath, container_name)
else:
raise RuntimeError("Unknown location_type - must be 'local' or 'azure'")
[docs] def get_file(self, filename, location_type):
"""
Just return the filename if location _type is "local".
Otherwise return a tempfile with the contents of a blob if the location
is "azure".
"""
if location_type == "local":
return filename
elif location_type == "azure":
# first part of self.output_location should be the container name
container_name = self.output_location.split("/")[0]
return azure_utils.get_blob_to_tempfile(filename, container_name)
else:
raise RuntimeError("Unknown location_type - must be 'local' or 'azure'")
[docs] def check_for_existing_files(self, location, num_files_expected):
"""
See if there are already num_files in the specified location.
If "replace_existing_files" is set to True, always return False
"""
if self.output_location_type == "local":
os.makedirs(location, exist_ok=True)
# if we haven't specified number of expected files per point it will be -1
if num_files_expected < 0:
return False
if self.replace_existing_files:
return False
existing_files = self.list_directory(location, self.output_location_type)
if len(existing_files) == num_files_expected:
logger.info(
"{}: Already found {} files in {} - skipping".format(
self.name, num_files_expected, location
)
)
return True
return False
[docs] def get_config(self):
"""
Get the configuration of this module as a dict.
"""
config_dict = {}
for param, _ in self.params:
config_dict[param] = self.__getattribute__(param)
config_dict["class_name"] = self.__class__.__name__
return config_dict
[docs] def save_config(self, config_location):
"""
Write out the configuration of this module as a json file.
"""
config_dict = self.get_config()
output_config_dir = os.path.dirname(config_location)
if output_config_dir and not os.path.exists(output_config_dir):
os.makedirs(output_config_dir)
with open(config_location, "w") as output_json:
json.dump(config_dict, output_json)
logger.info("{}: wrote config to {}".format(self.name, config_location))
[docs] def print_run_status(self):
"""
Print out how many jobs succeeded or failed
"""
logger.info(
"{}: Succeeded: {} Failed: {} Incomplete: {}".format(
self.name,
self.run_status["succeeded"],
self.run_status["failed"],
self.run_status["incomplete"]
)
)