#!/usr/bin/env python
"""
Use the Zenodo API to deposit or retrieve data.
Needs an API token - to create one:
Sign-in or create an account at https://zenodo.org
Create an API token by going to this page:
https://zenodo.org/account/settings/applications/tokens/new/
tick "deposit:actions" and "deposit:write" in the "Scopes" section
and click Create.
Then copy the created token into a file called "zenodo_api_token" in the pyveg/configs/ directory.
OR, to use the "Sandbox" API for testing, follow the same steps but replacing "zenodo.org"
with "sandbox.zenodo.org" in the URLs, and put the token into a file named "zenodo_test_api_token"
then call the functions in this module with the "test" argument set to True.
"""
import os
import shutil
import json
import re
import requests
import tempfile
from zipfile import ZipFile, BadZipFile
from pyveg.src.file_utils import get_filepath_after_directory
from pyveg.src.coordinate_utils import find_coords_string
try:
import pyveg.zenodo_config as config
except:
print("File zenodo_config.py not found - will not be able to access the Zenodo API")
[docs]def get_base_url_and_token(test=False):
"""
Get the base URL for the API, and the API token, for use in requests.
Parameters
==========
test: bool, True if we will use the sandbox API, False otherwise
Returns
=======
base_url: str, the first part of the URL for the API
api_token: str, the personal access token, read from a file.
"""
if test:
base_url = config.test_api_credentials["base_url"]
token = config.test_api_credentials["api_token"]
else:
base_url = config.prod_api_credentials["base_url"]
token = config.prod_api_credentials["api_token"]
return base_url, token
[docs]def get_deposition_id(json_or_csv="json", test=False):
"""
If we have previously created a deposition, we hopefully stored its ID in
the zenodo_config.py file.
"""
if test:
credentials = config.test_api_credentials
else:
credentials = config.prod_api_credentials
if json_or_csv == "json":
return credentials["deposition_id_summary_json"]
else:
return credentials["deposition_id_ts_csv"]
[docs]def list_depositions(test=False):
"""
List all the depositions created by this account.
Parameters
==========
test: bool, True if we will use the sandbox API, False otherwise
Returns
=======
r: list of dicts, response from the API with info about the depositions
"""
base_url, api_token = get_base_url_and_token(test)
r = requests.get('{}/deposit/depositions'.format(base_url),
params={'access_token': api_token})
if r.status_code != 200:
print("Error retrieving depositions", r.content)
return False
return r.json()
[docs]def create_deposition(test=False):
"""
Create a new, empty deposition.
Parameters
==========
test: bool, True if we will use the sandbox API, False otherwise
Returns
=======
r: dict, response from the API with info about the newly created deposition
"""
headers = {"Content-Type": "application/json"}
base_url, api_token = get_base_url_and_token(test)
params = {'access_token': api_token}
r = requests.post('{}/deposit/depositions'.format(base_url),
params=params,
json={},
headers=headers)
if r.status_code != 201:
print("Error creating deposition", r.content)
return None
return r.json()
[docs]def get_deposition_info(deposition_id, test=False):
"""
Get the JSON object containing details of a deposition.
Parameters
==========
deposition_id: int, ID of the deposition.
test: bool, if True use the sandbox API, if False will use the real one.
Returns
=======
dep_info: dict, information about the deposition
"""
headers = {"Content-Type": "application/json"}
base_url, api_token = get_base_url_and_token(test)
params = {'access_token': api_token}
r = requests.get('{}/deposit/depositions/{}'.format(base_url, deposition_id),
params=params,
json={},
headers=headers)
if r.status_code != 200:
print("Error getting deposition", r.content)
return {}
return r.json()
[docs]def get_bucket_url(deposition_id, test=False):
"""
For a given deposition_id, find the URL needed to upload a file.
Parameters
==========
deposition_id: int, ID of the deposition.
test: bool, if True use the sandbox API, if False will use the real one.
Returns
=======
bucket_url: str, the URL of the bucket for this deposition, or empty string if id not found
"""
dep_info = get_deposition_info(deposition_id, test)
if not dep_info:
print("Deposition {} not found".format(deposition_id))
return ""
return dep_info["links"]["bucket"]
[docs]def upload_file(filename, deposition_id, test=False):
"""
Upload a file to a deposition.
Parameters
==========
filename: str, full path to the file to be uploaded
deposition_id: int, ID of the deposition to which we want to upload.
test: bool, True if we will use the sandbox API, False otherwise
Returns
=======
uploaded_ok: bool, True if we get status code 200 from the API
"""
base_url, api_token = get_base_url_and_token(test)
bucket_url = get_bucket_url(deposition_id, test)
with open(filename, "rb") as f:
r = requests.put("{}/{}".format(bucket_url, os.path.basename(filename)),
data=f,
params = {'access_token': api_token})
if r.status_code != 200:
print("Error uploading file", r.content)
return False
return True
[docs]def list_files(deposition_id, json_or_csv="json", test=False):
"""
List all the files in a deposition.
Parameters
==========
deposition_id: int, ID of the deposition on which to list files
json_or_csv: str, if 'json', list the deposition containing the results_summary.json
otherwise list the one containing ts_summary_stats.csv
test: bool, True if using the sandbox API, False otherwise
Returns
=======
files: list[str], list of all filenames in the deposition.
"""
base_url, api_token = get_base_url_and_token(test)
deposition_id = get_deposition_id(json_or_csv, test=test)
r = requests.get("{}/deposit/depositions/{}/files".format(base_url, deposition_id),
params={"access_token": api_token})
if r.status_code != 200:
print("Error getting file list for deposition {}".format(deposition_id))
return [f["filename"] for f in r.json()]
[docs]def download_file(filename, deposition_id, destination_path=".", test=False):
"""
Upload a file to a deposition.
Parameters
==========
filename: str, full path to the file to be uploaded
deposition_id: int, ID of the deposition containing this file
destination_path: str, where to put the downloaded file
test: bool, True if we will use the sandbox API, False otherwise
Returns
=======
filepath: str, location of downloaded file.
"""
base_url, api_token = get_base_url_and_token(test)
bucket_url = get_bucket_url(deposition_id, test)
r = requests.get("{}/{}".format(bucket_url, os.path.basename(filename)),
params = {'access_token': api_token})
if r.status_code != 200:
print("Error downloading file", r.content)
return {}
os.makedirs(destination_path, exist_ok=True)
destination = os.path.join(destination_path, filename)
with open(destination, "wb") as output:
output.write(r.content)
return destination
[docs]def delete_file(filename, deposition_id, test=False):
"""
Delete a file from a deposition.
Parameters
==========
filename: str, full path to the file to be deleted
deposition_id: int, ID of the deposition containing this file
test: bool, True if we will use the sandbox API, False otherwise
Returns
=======
True if file was deleted OK, False otherwise.
"""
base_url, api_token = get_base_url_and_token(test)
r = requests.delete("{}/deposit/depositions/{}/files/{}".format(base_url, deposition_id, filename),
params = {'access_token': api_token})
if r.status_code != 204:
print("Error deleting file", r.content)
return False
return True
[docs]def publish_deposition(deposition_id, test=False):
"""
Submit the deposition, so it will be findable on Zenodo and have a DOI.
"""
base_url, api_token = get_base_url_and_token(test)
r = requests.post("{}/deposit/depositions/{}/actions/publish".format(base_url, deposition_id),
params={"access_token": api_token})
if r.status_code != 202:
print("Error publishing", r.content)
return False
return r.json()
[docs]def unlock_deposition(deposition_id, test=False):
"""
Unlock a previously submitted deposition, so we can add to it.
"""
base_url, api_token = get_base_url_and_token(test)
r = requests.post("{}/deposit/depositions/{}/actions/edit".format(base_url, deposition_id),
params={"access_token": api_token})
if r.status_code != 201:
print("Error unlocking", r.content)
return False
return r.json()
[docs]def prepare_results_zipfile(collection_name,
png_location,
png_location_type="local",
json_location=None,
json_location_type="local"):
"""
Create a zipfile called <results_long_lat_collection> containing the 'results_summary.json',
and the outputs of the analysis.
Parameters
==========
collection_name: str, typically "Sentinel2" or "Landsat8" or similar
base_png_location: str, directory containing analysis/ subdirectory
png_location_type: str, either "local" or "azure"
base_json_location: str, directory containing "results_summary.json.
If not specified, assume same as base_png_location
json_location_type: str, either "local" or "azure"
Returns
=======
zip_filename: str, location of the produced zipfile
"""
tmpdir = tempfile.mkdtemp()
zip_filename = os.path.join(tmpdir,"results_")
if find_coords_string(png_location):
zip_filename += find_coords_string(png_location) + "_"
zip_filename += collection_name + ".zip"
zf = ZipFile(zip_filename, mode="w")
if not json_location:
# assume json and png are in the same directory
json_location = png_location
if json_location_type == "local":
if not os.path.isdir(json_location):
raise RuntimeError("{} is not a directory".format(json_location))
dir_contents = os.listdir(json_location)
if not os.path.exists(os.path.join(json_location, "results_summary.json")):
raise RuntimeError("Could not find results_summary.json in {}".format(json_location))
zf.write(os.path.join(json_location, "results_summary.json"),
arcname="results_summary.json")
if png_location_type == "local":
if not os.path.exists(os.path.join(png_location, "analysis")):
raise RuntimeError("Could not find analysis dir in {}".format(png_location))
for root, dirnames, filenames in os.walk(os.path.join(png_location, "analysis")):
for filename in filenames:
full_filepath = os.path.join(root, filename)
short_filepath = get_filepath_after_directory(full_filepath, "analysis")
zf.write(full_filepath, arcname=short_filepath)
zf.close()
return zip_filename
[docs]def get_results_summary_json(coords_string, collection, deposition_id, test=False):
"""
Assuming the zipfile is named following the convention
results_<long>_<lat>_<collection>.zip
download this from the deposition, and extract the
results_summary.json.
"""
zip_filename = "results_{}_{}.zip".format(coords_string, collection)
if not zip_filename in list_files(deposition_id, test):
print("Unable to find file {} in deposition {}".format(zip_filename, deposition_id))
return None
data = {}
with tempfile.TemporaryDirectory() as tmpdir:
tmp_zipfile = download_file(zip_filename, deposition_id, tmpdir, test)
zf = ZipFile(tmp_zipfile)
try:
data = zf.read("results_summary.json")
except KeyError:
print("results_summary.json not found in {}".format(zip_filename))
return {}
return json.loads(data)
[docs]def download_results_by_coord_id(coords_id, json_or_csv="json", destination_path=None, deposition_id=None, test=False):
"""
Search the deposition (defined by the deposition_id in zenodo_config.py)
for results_summary json or summary_stats csv files beginning with 'coord_id'
and download the most recent one.
Parameters
==========
coords_id: str, two-digit string identifiying the row of the location in coordinates.py
json_or_csv: str, if "json", download 'results_summary.json', otherwise download 'ts_summary_stats.csv'.
destination_path: str, directory to download to. If not given, put in temporary dir
deposition_id: str, deposition ID in Zenodo. If not given, use the one from zenodo_config.py
test: bool, if True, use the sandbox Zenodo repository
"""
# coords_id should be two digits, e.g. '00'
if not re.search('[\d]{2}', coords_id):
raise RuntimeError("coords_id should be a 2-digit string")
if not deposition_id:
deposition_id = get_deposition_id(json_or_csv, test=test)
if not destination_path:
destination_path = tempfile.TemporaryDirectory().name
elif not os.path.exists(destination_path):
os.makedirs(destination_path)
# list the files in the deposition
file_list = [f for f in list_files(deposition_id, test=test) \
if f.startswith(coords_id) and "results_summary" in f]
if len(file_list)==0:
print("No files for coords_id {} found.".format(coords_id))
return ""
# files should follow the same naming convention, and have the date at the end.
# this means they should be sort-able. Find the most recent:
file_list.sort()
latest_file = file_list[-1]
# download this
destination = download_file(latest_file, deposition_id, destination_path, test)
return destination