"""
Data analysis code including functions to read the .json results file,
and functions analyse and plot the data.
"""
import json
import math
import os
import datetime
import numpy as np
import pandas as pd
from shapely.geometry import Point
import geopandas as gpd
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from scipy.fftpack import fft
from scipy.stats import sem, t, norm
from statsmodels.tsa.seasonal import STL
import ewstools
import scipy
import scipy.optimize as sco
[docs]def convert_to_geopandas(df):
"""
Given a pandas DatFrame with `lat` and `long` columns, convert
to geopandas DataFrame.
Parameters
----------
df : DataFrame
Pandas DatFrame with `lat` and `long` columns.
Returns
----------
geopandas DataFrame
"""
df["geometry"] = [Point(xy) for xy in zip(df.latitude, df.longitude)]
crs = {"init": "epsg:4326"}
df = gpd.GeoDataFrame(df, crs=crs, geometry=df["geometry"])
return df
[docs]def calculate_ci(data, ci_level=0.99):
"""
Calculate the confidence interval on the mean for a set of data.
Parameters
----------
data : Series
Series of data to calculate the confidence interval of the mean.
ci_level : float, optional
Size of the confidence interval to calculate
Returns
----------
float
Confidence interval value where the CI is [mu - h, mu + h],
where mu is the mean.
"""
# remove NaNs
ys = data.dropna().values
# calculate CI
n = len(ys)
std_err = sem(ys)
h = std_err * t.ppf((1 + ci_level) / 2, n - 1)
return h
[docs]def get_confidence_intervals(df, column, ci_level=0.99):
"""
Calculate the confidence interval at each time point of a
DataFrame containing data for a large image.
Parameters
----------
df : DataFrame
Time series data for multiple sub-image locations.
column : str
Name of the column to calculate the CI of.
ci_level : float, optional
Size of the confidence interval to calculate
Returns
----------
DataFrame
Time series data for multiple sub-image locations with
added column for the ci.
"""
# group all the data at each date
d = {}
for name, group in df.groupby(["date"]):
d[name] = group
# for each timepoint, calculate the CI
for df in d.values():
df["ci"] = calculate_ci(df[column], ci_level=ci_level)
# merge results
df = list(d.values())[0]
for df_ in list(d.values())[1:]:
df = df.append(df_)
return df
[docs]def coarse_dataframe(geodf, side_square):
"""
Coarse the granularity of a dataframe by grouping lat,long points
that are close to each other in a square of L = size_square
Parameters
----------
geodf: Dataframe
Input dataframe
side_square: integer
Side of the square
Returns
----------
A dataframe
A coarser dataframe
"""
# initialise the categories
geodf["category"] = -1
# do calculations on the first date, then extrapolate to the rest
data_df = geodf[geodf["date"] == np.unique(geodf["date"])[0]]
data_df = data_df.sort_values(by=["longitude", "latitude"])
n_grids = int(math.sqrt(data_df.shape[0]))
category = 0
for n in range(data_df.shape[0]):
# only process lat,long point that do not have a category
if data_df["category"].iloc[n] == -1:
# get the side_square^2 nearest indexes to the point.
indexes = []
for i in range(side_square):
for j in range(side_square):
if (
n + n_grids * i + j < n_grids * n_grids
and data_df["category"].iloc[n + n_grids * i + j] == -1
):
indexes.append(n + n_grids * i + j)
# assing them all to the same categorty
data_df["category"].iloc[indexes] = str(category)
# get the geometry points of that catery
cat_geometry = data_df[data_df["category"] == str(category)]["geometry"]
# get indexes of each point belonging to the category
indexes_all = []
for point in cat_geometry:
indexes_all.append(geodf[geodf["geometry"] == point].index.tolist())
indexes_all_flat = [item for sublist in indexes_all for item in sublist]
geodf["category"].iloc[indexes_all_flat] = str(category)
category = category + 1
geodf["category"] = (geodf["category"].astype(str)).str.cat(geodf["date"], sep="_")
geodf = geodf.dissolve(by=["category", "date"], aggfunc="mean")
# re-assing the date because we are losing it
geodf["date"] = [i[1] for i in geodf.index]
geodf["category"] = [i[0] for i in geodf.index]
return geodf
[docs]def fft_series(time_series):
"""
Perform Fast Fourier Transform on an input series (assume one row per day).
Parameters
----------
time_series: a pandas Series with one row per day, and datetime index (which we'll ignore)
Returns
-------
xvals, yvals: np.arrays of frequencies (1/day) and strengths in frequency space.
Ready to be plotted directly in a matplotlib plot.
"""
ts = list(time_series)
# Number of points
N = len(ts)
# Sample spacing (days)
T = 1.0
fourier = fft(ts)
# x-axis values
xvals = np.linspace(0.0, 1.0 / (20 * T), N // 20)
yvals = 2.0 / N * np.abs(fourier[0: N // 20])
return xvals, yvals
[docs]def write_slimmed_csv(dfs, output_dir, filename_suffix=""):
for collection_name, veg_df in dfs.items():
if collection_name == "COPERNICUS/S2" or "LANDSAT" in collection_name:
df_summary = dfs["ECMWF/ERA5/DAILY"]
df_summary.loc[veg_df.index, "offset50_mean"] = veg_df["offset50_mean"]
df_summary.loc[veg_df.index, "offset50_std"] = veg_df["offset50_std"]
df_summary.loc[veg_df.index, "offset50_smooth_mean"] = veg_df[
"offset50_smooth_mean"
]
df_summary.loc[veg_df.index, "offset50_smooth_std"] = veg_df[
"offset50_smooth_std"
]
summary_csv_filename = os.path.join(
output_dir,
collection_name.replace("/", "-")
+ "_time_series"
+ filename_suffix
+ ".csv",
)
print(f"\nWriting '{summary_csv_filename}'...")
df_summary.to_csv(summary_csv_filename)
[docs]def get_AR1_parameter_estimate(ys):
"""
Fit an AR(1) model to the time series data and return
the associated parameter of the model.
Parameters
----------
ys: array
Input time series data.
Returns
-------
float
The parameter value of the AR(1) model..
float
The parameter standard error
"""
ys = ys.dropna()
if len(ys) < 4:
print("Time series too short to reliably calculate AR1")
return np.NaN, np.NaN
from statsmodels.tsa.ar_model import AutoReg
# more sophisticated models to consider:
# from statsmodels.tsa.statespace.sarimax import SARIMAX
# from statsmodels.tsa.arima_model import ARMA
# create and fit the AR(1) model
if pd.infer_freq(ys.index) is not None:
# explicitly add frequency to index to prevent warnings
ys.index = pd.DatetimeIndex(ys.index, freq=pd.infer_freq(ys.index))
model = AutoReg(ys, lags=1, missing="drop").fit() # currently warning
else:
# remove index
model = AutoReg(ys.values, lags=1, missing="drop").fit() # currently warning
# get the single parameter value
parameter = model.params[1]
se = model.bse[1]
return parameter, se
[docs]def get_kendell_tau(ys):
"""
Kendall's tau gives information about the trend of the time series.
It is just a rank correlation test with one variable being time
(or the vector 1 to the length of the time series), and the other
variable being the data itself. A tau value of 1 means that the
time series is always increasing, whereas -1 mean always decreasing,
and 0 signifies no overall trend.
Parameters
----------
ys: array
Input time series data.
Returns
-------
float
The value of tau.
float
The p value of the rank correlation test.
"""
from scipy.stats import kendalltau
# calculate Kendall tau
tau, p = kendalltau(range(len(ys)), ys)
return tau, p
[docs]def write_to_json(filename, out_dict):
"""
Create or append the contents of `out_dict`
to json file `filename`.
Parameters
----------
filename: array
Output json filename.
out_dict: dict
Information to save.
"""
# if file doesn't exist
if not os.path.exists(filename):
# make enclosing dir if needed
output_dir = os.path.dirname(filename)
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
# write new json file
with open(filename, "w") as json_file:
json.dump(out_dict, json_file, indent=2)
# file exists
else:
# json read
data = None
with open(filename, "r") as json_file:
data = json.load(json_file)
# update dict
for k, v in out_dict.items():
data[k] = v
# json write
with open(filename, "w") as json_file:
json.dump(data, json_file, indent=2)
[docs]def stl_decomposition(series, period=12):
"""
Run STL decomposition on a pandas Series object.
Parameters
----------
series : Series object
The observations to be deseasonalised.
period : int (optional)
Length of the seasonal period in observations.
"""
stl = STL(series, period, robust=True)
res = stl.fit()
return res
[docs]def get_max_lagged_cor(dirname, veg_prefix):
"""
Convenience function which returns the maximum correlation as a
function of lag (using a file saved earlier).
Parameters
----------
dirname : str
Path to the `analysis/` directory of the current analysis job.
veg_prefix : str
Compact representation of the satellite collection name used to
obtain vegetation data.
Returns
----------
tuple
Max correlation, and lag, for smoothed and unsmoothed vegetation time
series.
"""
# construct path to lagged correlations file
filename = os.path.join(dirname, "correlations", "lagged_correlations.json")
# check file exists
if not os.path.exists(filename):
raise FileNotFoundError(f'Could not find file "{os.path.abspath(filename)}".')
# read file
json_file = open(filename)
lagged_cor = json.load(json_file)
# calculate max corr
lagged_cor = {k: np.array(v[:5]) for k, v in lagged_cor.items() if veg_prefix in k}
lagged_cor = {k: (np.max(v), np.argmax(v)) for k, v in lagged_cor.items()}
if veg_prefix + "_offset50_mean_lagged_correlation" in lagged_cor.keys():
max_corr_unsmoothed = lagged_cor[
veg_prefix + "_offset50_mean_lagged_correlation"
]
else:
max_corr_unsmoothed = (np.NaN, np.NaN)
if veg_prefix + "_offset50_smooth_mean_lagged_correlation" in lagged_cor.keys():
max_corr_smooth = lagged_cor[
veg_prefix + "_offset50_smooth_mean_lagged_correlation"
]
else:
max_corr_smooth = (np.NaN, np.NaN)
return max_corr_smooth, max_corr_unsmoothed
[docs]def variance_moving_average_time_series(series, length):
"""
Calculate a variance time series using a moving average
Parameters
----------
series : pandas Series
Time series observations.
length : int
Length of the moving window in number of observations.
Returns
-------
pandas Series:
pandas Series with datetime index, and one column, one row per date.
"""
# just in case the index isn't already datetime type
series.index = pd.to_datetime(series.index)
variance = series.rolling(length).var()
variance.name = series.name + "_var"
return variance
[docs]def ar1_moving_average_time_series(series, length=1):
"""
Calculate an AR1 time series using a moving average
Parameters
----------
series : pandas Series
Time series observations.
length : int
Length of the moving window in number of observations.
Returns
-------
pandas Series:
pandas Series with datetime index, and one column, one row per date
"""
# just in case the index isn't already datetime type
series.index = pd.to_datetime(series.index)
ar1 = []
ar1_se = []
index = []
for i in range(len(series) - length):
# print(series[i:(length + i)])
param, se = get_AR1_parameter_estimate(series[i: (length + i)])
ar1.append(param)
ar1_se.append(se)
index.append(series.index[length + i])
ar1_name = series.name + "_ar1"
ar1_se_name = series.name + "_ar1_se"
ar1_df = pd.DataFrame()
ar1_df[ar1_name] = pd.Series(ar1)
ar1_df[ar1_se_name] = pd.Series(ar1_se)
ar1_df.index = index
return ar1_df
[docs]def get_ar1_var_timeseries_df(series, window_size=0.5):
"""
Given a time series calculate AR1 and variance using
a moving window. Put the two resulting time series into
a new DataFrame and return the result.
Parameters
----------
series : pandas Series
Time series observations.
window_size: float (optional)
Size of the moving window as a fraction of the time series length.
Returns
----------
DataFrame
The AR1 and variance results in a time series dataframe.
"""
# drop null values
series = series.dropna()
# calculate the length in number of time points of the moving window
length = round(len(series) * window_size)
# calculate the ar1 and variance
ar1_df = ar1_moving_average_time_series(series, length)
variance = variance_moving_average_time_series(series, length)
# merge results
ar1_var_df = pd.merge(variance, ar1_df, left_index=True, right_index=True)
return ar1_var_df
[docs]def get_corrs_by_lag(series_A, series_B):
# set up
max_lag = 6 # assuming monthly sampling we shouldn't need to go past this
correlations = []
# loop through offsets
for lag in range(0, max_lag):
# shift vegetation time series back
lagged_data = series_A.shift(-lag)
# correlate with series_B
corr = series_B.corr(lagged_data)
correlations.append(round(corr, 4))
return correlations
[docs]def get_correlation_lag_ts(series_A, series_B, window_size=0.5):
"""
Given two time series and a lag betweent them, calculate the
lagged correlation between the two time series using a moving
window. Additionally calculate the lag of the maximum precipitation
using the moving window..
Parameters
----------
series_A : pandas Series
Observations of the first time series.
series_B : pandas Series
Observations of the second time series.
window_size: float (optional)
Size of the moving window as a fraction of the time series length.
Returns
----------
DataFrame
Lagged corrleation and lag which maximises the correlation time series.s
"""
# get correlations as a function of lag
correlations = get_corrs_by_lag(series_A, series_B)
# get the lag which maximises the correlation
lag_max_cor = np.argmax(np.array(correlations))
# create an offset version of series_A
series_A_lagged = series_A.shift(-lag_max_cor)
# compute the length of the moving window in number of observations
length = round(len(series_A) * window_size)
# just in case the index isn't already datetime type
series_A.index = pd.to_datetime(series_A.index)
series_B.index = pd.to_datetime(series_B.index)
# place to store results
correlations_mw = []
mag_max_cors_mw = []
index = []
# for each step along the moving window
for i in range(len(series_A) - length):
# get the slices of the timeseries
frame_A = series_A[i: (length + i)]
frame_A_lagged = series_A_lagged[i: (length + i)]
frame_B = series_B[i: (length + i)]
# compute the lagged correlation using the lag
# which maximises the global correlation
frame_corr = frame_B.corr(frame_A_lagged)
# compute the correlation which maximises the lag
frame_correlations = get_corrs_by_lag(frame_A, frame_B)
frame_lag_max_cor = np.argmax(np.array(frame_correlations))
# store results
correlations_mw.append(frame_corr)
mag_max_cors_mw.append(frame_lag_max_cor)
index.append(series_A_lagged.index[length + i])
s = "ndvi" if "ndvi" in series_A else "offest50"
correlations_mva_series_name = (
series_A.name.split("_")[0] + "_" + s + "_precip_corr"
)
mag_max_cors_mw_series_name = series_A.name.split("_")[0] + "_" + s + "_precip_lag"
out_df = pd.DataFrame()
out_df[correlations_mva_series_name] = pd.Series(correlations_mw)
out_df[mag_max_cors_mw_series_name] = pd.Series(mag_max_cors_mw)
out_df.index = index
return out_df
[docs]def moving_window_analysis(df, output_dir, window_size=0.5):
"""
Run moving window AR1 and variance calculations for several
input time series time series.
Parameters
----------
df : DataFrame
Input time series DataFrame containing several time series.
output_dir : str
Path output plotting directory.
window_size: float (optional)
Size of the moving window as a fraction of the time series length.
Returns
----------
DataFrame
AR1 and variance time-series for each of the input time series.
"""
# new output dataframe
mwa_df = pd.DataFrame()
# loop through columns
for column in df.columns:
# run moving window analysis veg and precip columns
if (
("offset50" in column or "ndvi" in column)
and "mean" in column
or "total_precipitation" in column
):
# reindex time series using data
time_series = df.set_index("date")[column]
# compute AR1 and variance time series
df_ = get_ar1_var_timeseries_df(time_series, window_size)
mwa_df = mwa_df.join(df_, how="outer")
# for the precipitation column, look at correlations to veg
if "total_precipitation" in column:
for column_veg in df.columns:
if (
("offset50" in column_veg or "ndvi" in column_veg)
and "mean" in column_veg
and "smooth" not in column_veg
):
mwa_df = mwa_df.merge(
get_correlation_lag_ts(
df.set_index("date")[column_veg],
df.set_index("date")[column],
window_size=window_size,
),
how="outer",
left_index=True,
right_index=True,
)
# use date as a column, and reset index
mwa_df.index.name = "date"
mwa_df = mwa_df.reset_index()
return mwa_df
[docs]def get_datetime_xs(df):
"""
Return the date column of `df` as datetime objects.
"""
# check the column exists
if "date" not in df.columns:
raise RuntimeError("Couldn't find column 'date' in input df")
# safely read date column and convert to datetime objects
try:
xs = [datetime.datetime.strptime(d, "%Y-%m-%d").date() for d in df.date]
except:
# if the time series has been resampled the index is a TimeStamp object
xs = [
datetime.datetime.strptime(d._date_repr, "%Y-%m-%d").date() for d in df.date
]
return xs
[docs]def early_warnings_sensitivity_analysis(
series,
indicators=["var", "ac"],
winsizerange=[0.10, 0.8],
incrwinsize=0.10,
smooth="Gaussian",
bandwidthrange=[0.05, 1.0],
spanrange=[0.05, 1.1],
incrbandwidth=0.2,
incrspanrange=0.1,
):
"""
Function to estimate the sensitivity of the early warnings analysis to
the smoothing and windowsize used. The function returns a dataframe that
contains the Kendall tau rank correlation estimates for the rolling window
sizes (winsize variable) and bandwidths or span sizes depending on the
de-trending (smooth variable).
This function is inspired in the sensitivity_ews.R function from Vasilis
Dakos, Leo Lahti in the early-warnings-R package:
https://github.com/earlywarningtoolbox/earlywarnings-R.
Parameters
----------
series : pandas Series
Time series observations.
indicators: list of strings
The statistics (leading indicator) selected for which the sensitivity analysis is perfomed.
winsizerange: list of float
Range of the rolling window sizes expressed as ratio of the timeseries length (must be numeric between 0 and 1). Default is 0.25 - 0.75.
incrwinsize: float
Increments the rolling window size (must be numeric between 0 and 1). Default is 0.25.
smooth: string
Type of detrending. It can be {'Gaussian', 'Lowess', 'None'}.
bandwidthrange: list of float
Range of the bandwidth used for the Gaussian kernel when gaussian filtering is selected. It is expressed as percentage of the timeseries length (must be numeric between 0 and 100). Default is 5\% - 100\%.
spanrange: list of float
Parameter that controls the degree of Lowess smoothing (numeric between 0 and 1). Default is 0.05 - 1.
incrbandwidth: float
Size to increment the bandwidth used for the Gaussian kernel when gaussian filtering is applied. It is expressed as percentage of the timeseries length (must be numeric between 0 and 1). Default is 0.2.
incrspanrange: float
Size to increment the the span used for the Lowess smoothing
Returns
--------
DataFrame:
A dataframe that contains the Kendall tau rank correlation estimates for the rolling window sizes (winsize variable)
and bandwidths or span sizes depending on the de-trending (smooth variable).
"""
results_kendal_tau = []
for winsize in np.arange(winsizerange[0], winsizerange[1] + 0.01, incrwinsize):
winsize = round(winsize, 3)
if smooth == "Gaussian":
for bw in np.arange(
bandwidthrange[0], bandwidthrange[1] + 0.01, incrbandwidth
):
bw = round(bw, 3)
ews_dic_veg = ewstools.core.ews_compute(
series.dropna(),
roll_window=winsize,
smooth=smooth,
lag_times=[1, 2],
ews=indicators,
band_width=bw,
)
result = ews_dic_veg["Kendall tau"]
result["smooth"] = bw
result["winsize"] = winsize
results_kendal_tau.append(result)
elif smooth == "Lowess":
for span in np.arange(spanrange[0], spanrange[1] + 0.01, incrspanrange):
span = round(span, 2)
ews_dic_veg = ewstools.core.ews_compute(
series.dropna(),
roll_window=winsize,
smooth=smooth,
lag_times=[1, 2],
ews=indicators,
span=span,
)
result = ews_dic_veg["Kendall tau"]
result["smooth"] = bw
result["winsize"] = winsize
results_kendal_tau.append(result)
else:
ews_dic_veg = ewstools.core.ews_compute(
series.dropna(),
roll_window=winsize,
smooth="None",
lag_times=[1, 2],
ews=indicators,
)
result = ews_dic_veg["Kendall tau"]
result["smooth"] = 0
result["winsize"] = winsize
results_kendal_tau.append(result)
sensitivity_df = pd.concat(results_kendal_tau)
return sensitivity_df
[docs]def early_warnings_null_hypothesis(
series,
indicators=["var", "ac"],
roll_window=0.4,
smooth="Lowess",
span=0.1,
band_width=0.2,
lag_times=[1],
n_simulations=1000,
):
"""
Function to estimate the significance of the early warnings analysis
by performing a null hypothesis test. The function estimate distributions
of trends in early warning indicators from different surrogate timeseries
generated after fitting an ARMA(p,q) model on the original data.
The trends are estimated by the nonparametric Kendall tau correlation
coefficient and can be compared to the trends estimated in the original
timeseries to produce probabilities of false positives. The function
returns a dataframe that contains the Kendall tau rank correlation
estimates for orignal data and surrogates.
Parameters
----------
series : pandas Series
Time series observations.
indicators: list of strings
The statistics (leading indicator) selected for which the sensitivity analysis is perfomed.
roll_window: float
Rolling window size as a proportion of the length of the time-series
data.
smooth : string
Type of detrending. It can be {'Gaussian', 'Lowess', 'None'}.
span: float
Span of time-series data used for Lowess filtering. Taken as a
proportion of time-series length if in (0,1), otherwise taken as
absolute.
band_width: float
Bandwidth of Gaussian kernel. Taken as a proportion of time-series length if in (0,1),
otherwise taken as absolute.
lag_times: list of int
List of lag times at which to compute autocorrelation.
n_simulations: int
The number of surrogate data. Default is 1000.
Returns
--------
DataFrame:
A dataframe that contains the Kendall tau rank correlation estimates for each
indicator estimated on each surrogate dataset.
"""
ews_dic = ewstools.core.ews_compute(
series,
roll_window=roll_window,
smooth=smooth,
span=span,
band_width=band_width,
ews=indicators,
lag_times=lag_times,
)
from statsmodels.tsa.arima_model import ARIMA
from statsmodels.tsa.arima_process import ArmaProcess
# Use the short_series EWS if smooth='None'. Otherwise use reiduals.
eval_series = ews_dic["EWS metrics"]["Residuals"]
# Fit ARMA model based on AIC
aic_max = 10000
for i in range(0, 2):
for j in range(0, 2):
model = ARIMA(eval_series, order=(i, j, 0))
model_fit = model.fit()
aic = model_fit.aic
print("AR", "MA", "AIC")
print(i, j, aic)
if aic < aic_max:
aic_max = aic
result = model_fit
def compute_indicators(series):
"""
Rolling window indicators computation based on the ewstools.core.ews_compute function from
ewstools
"""
df_ews = pd.DataFrame()
# Compute the rolling window size (integer value)
rw_size = int(np.floor(roll_window * series.shape[0]))
# ------------ Compute temporal EWS---------------#
# Compute standard deviation as a Series and add to the DataFrame
if "sd" in indicators:
roll_sd = series.rolling(window=rw_size).std()
df_ews["Standard deviation"] = roll_sd
# Compute variance as a Series and add to the DataFrame
if "var" in indicators:
roll_var = series.rolling(window=rw_size).var()
df_ews["Variance"] = roll_var
# Compute autocorrelation for each lag in lag_times and add to the DataFrame
if "ac" in indicators:
for i in range(len(lag_times)):
roll_ac = series.rolling(window=rw_size).apply(
func=lambda x: pd.Series(x).autocorr(lag=lag_times[i]), raw=True
)
df_ews["Lag-" + str(lag_times[i]) + " AC"] = roll_ac
# Compute Coefficient of Variation (C.V) and add to the DataFrame
if "cv" in indicators:
# mean of raw_series
roll_mean = series.rolling(window=rw_size).mean()
# standard deviation of residuals
roll_std = series.rolling(window=rw_size).std()
# coefficient of variation
roll_cv = roll_std.divide(roll_mean)
df_ews["Coefficient of variation"] = roll_cv
# Compute skewness and add to the DataFrame
if "skew" in indicators:
roll_skew = series.rolling(window=rw_size).skew()
df_ews["Skewness"] = roll_skew
# Compute Kurtosis and add to DataFrame
if "kurt" in indicators:
roll_kurt = series.rolling(window=rw_size).kurt()
df_ews["Kurtosis"] = roll_kurt
# ------------Compute Kendall tau coefficients----------------#
""" In this section we compute the kendall correlation coefficients for each EWS
with respect to time. Values close to one indicate high correlation (i.e. EWS
increasing with time), values close to zero indicate no significant correlation,
and values close to negative one indicate high negative correlation (i.e. EWS
decreasing with time)."""
# Put time values as their own series for correlation computation
time_vals = pd.Series(df_ews.index, index=df_ews.index)
# List of EWS that can be used for Kendall tau computation
ktau_metrics = [
"Variance",
"Standard deviation",
"Skewness",
"Kurtosis",
"Coefficient of variation",
"Smax",
"Smax/Var",
"Smax/Mean",
] + ["Lag-" + str(i) + " AC" for i in lag_times]
# Find intersection with this list and EWS computed
ews_list = df_ews.columns.values.tolist()
ktau_metrics = list(set(ews_list) & set(ktau_metrics))
# Find Kendall tau for each EWS and store in a DataFrame
dic_ktau = {
x: df_ews[x].corr(time_vals, method="kendall") for x in ktau_metrics
} # temporary dictionary
df_ktau = pd.DataFrame(
dic_ktau, index=[0]
) # DataFrame (easier for concatenation purposes)
# -------------Organise final output and return--------------#
# Ouptut a dictionary containing EWS DataFrame, power spectra DataFrame, and Kendall tau values
output_dic = {"EWS metrics": df_ews, "Kendall tau": df_ktau}
return output_dic
process = ArmaProcess.from_estimation(result)
# run simulations on best fitted ARIMA process and get values
kendall_tau = []
for i in range(n_simulations):
ts = process.generate_sample(len(eval_series))
kendall_tau.append(compute_indicators(pd.Series(ts))["Kendall tau"])
surrogates_kendall_tau_df = pd.concat(kendall_tau)
surrogates_kendall_tau_df["true_data"] = False
# get results for true data
data_kendall_tau_df = compute_indicators(eval_series)["Kendall tau"]
data_kendall_tau_df["true_data"] = True
# return dataframe with both surrogates and true data
kendall_tau_df = pd.concat([data_kendall_tau_df, surrogates_kendall_tau_df])
return kendall_tau_df
[docs]def mean_annual_ts(x, resolution=12):
"""
Calculate mean annual time series from time series. Also fills in missing values
by linear interpolation. NB Fails if there is missing value at the start or end.
Parameters
----------
x : Time series
Time series to calculate mean annual time series for
resolution : float
Number of values each year in a time series (12 is monthly for example)
Returns
----------
ndarray
Array of length equal to resolution that is the mean annual time series
"""
missing_inds = np.where(np.isnan(x))[0]
if len(missing_inds) > 0:
for i in range(len(missing_inds)):
print(i)
x[missing_inds[i]] = np.mean([x[missing_inds[i] - 1], x[missing_inds[i] + 1]])
mean_cycle = np.repeat(np.nan, resolution, axis=0)
for i in range(resolution):
mean_cycle[i] = np.nanmean(x[np.linspace(start=i, stop=len(x) - 1, num=resolution, dtype=int)])
return mean_cycle
[docs]def decay_rate(x, resolution=12, method='basic'):
"""
Calculates the decay rate between the max and min values of a time series.
Parameters
----------
x : time series
Time series to calculate decay rate on. mean_annual_ts is calculated
on this series within this function so raw time series is expected.
resolution : int
Number of values each year in a time series (12 is monthly for example)
method : 'basic' (default) or 'adjusted'
A choice on whether to calculate the decay rate on the mean annual
time series calculated within the function or to adjust the time series
such that the min value is set to 1 by substracting the minimum
plus 1 of the mean annual time series (useful for offset50 values)
Returns
----------
float
The decay rate value
"""
annual_cycle = mean_annual_ts(x, resolution)
if method == 'basic':
ts = annual_cycle
elif method == 'adjusted':
ts = annual_cycle - np.min(annual_cycle) + 1
else:
ts = np.nan # causes fail if method is not specified properly
max_ind = np.where(ts == np.max(ts))[0][0]
min_ind = np.where(ts == np.min(ts))[0][0]
if min_ind < max_ind:
# this ensures the length of time for decay is correct below
min_ind = min_ind + resolution
dr = np.log(np.min(ts) / np.max(ts)) / (min_ind - max_ind)
return dr
[docs]def exp_model_fit(x, resolution=12, method='basic'):
"""
Fits an exponential model from the maximum to the minimum of the
mean annual time series. A raw time series is expected as an input.
Parameters
----------
x : time series
Time series to calculate decay rate on. mean_annual_ts is calculated
on this series within this function so raw time series is expected.
resolution : int
Number of values each year in a time series (12 is monthly for example)
method : 'basic' (default) or 'adjusted'
A choice on whether to fit the expoenential model on the mean annual
time series calculated within the function or to adjust the time series
such that the min value is set to 1 by substracting the minimum
plus 1 of the mean annual time series (useful for offset50 values)
Returns
----------
ndarray
The coefficient values from the exponential model fit
"""
annual_cycle = mean_annual_ts(x, resolution)
if method == 'basic':
ts = annual_cycle
elif method == 'adjusted':
ts = annual_cycle - np.min(annual_cycle) + 1
else:
ts = np.nan # causes fail if method is not specified properly
max_ind = np.where(ts == np.max(ts))[0][0]
min_ind = np.where(ts == np.min(ts))[0][0]
# in most cases we find the minimum value is earlier in the year
# so the below crosses Dec/Jan if this is the case
# otherwise remains within a single year cycle
if min_ind < max_ind:
exp_ts = np.append(ts[max_ind:resolution], ts[0:min_ind])
else:
exp_ts = ts[max_ind:min_ind]
exp_mod = np.polyfit(np.log(exp_ts), np.linspace(start=0, stop=len(exp_ts) - 1, num=len(exp_ts), dtype=int), 1)
return exp_mod
[docs]def reverse_normalise_ts(x):
"""
Takes what is expected to be a mean annual time series (from mean_annual_ts), arranges it so the
first value is the last, reverses it and then normalises it.
It is to be used within cball function below.
Parameters
----------
x : time series
Time series reverse and normalise. Assumed this is from mean_annual_ts output
Returns
----------
ndarray
The reversed and normalised time series
"""
min_ind = np.where(x == np.min(x))[0][0]
arrangex = np.append(x[(min_ind + 1):len(x)], x[0:(min_ind + 1)])
revx = arrangex[::-1]
normx = (revx - np.min(revx)) / sum(revx - np.min(revx))
return normx
[docs]def cball(x=range(1, 13), alpha=1.5, n=150.0, xbar=8.0, sigma=2.0):
"""
Calculates the Crystal Ball pdf on the values 1 to 12 by default (i.e. monthly)
Default parameter values give a fit close to those we would expect from offset50
time series
Parameters
----------
x : Time series
Index values going from 1 to the length of the annual time series
alpha, n, xbar, sigma : Model parameters, int
Parameters used in Crystal Ball pdf calculation
Returns
----------
ndarray
The values of the Crystal Ball pdf for each index of x
"""
def erf(x):
output = 2 * norm.cdf(x * np.sqrt(2)) - 1
return output
def A(alpha, n):
output = ((n / np.abs(alpha)) ** n) * np.exp((-np.abs(alpha) ** 2) / 2)
return output
def B(alpha, n):
output = n / np.abs(alpha) - np.abs(alpha)
return output
def N(sigma, C, D):
output = 1 / (sigma * (C + D))
return output
def C(alpha, n):
output = (n / np.abs(alpha)) * (1 / (n - 1)) * np.exp((-np.abs(alpha) ** 2) / 2)
return output
def D(alpha):
output = np.sqrt(np.pi / 2) * (1 + erf(np.abs(alpha) / np.sqrt(2)))
return output
fx = np.repeat(np.nan, len(x), axis=0)
for i in range(len(x)):
if (((x[i] - xbar) / sigma) > -alpha):
fx[i] = N(sigma, C(alpha, n), D(alpha)) * np.exp((-(x[i] - xbar) ** 2) / (2 * sigma ** 2))
if (((x[i] - xbar) / sigma) <= -alpha):
fx[i] = N(sigma, C(alpha, n), D(alpha)) * A(alpha, n) * (B(alpha, n) - (x[i] - xbar) / sigma) ** (-n)
return fx
[docs]def err_func(params, ts):
"""
Calculates the difference between the cball function with supplied params
and a supplied time series of the same length.
err_func is used within
cball_parfit function below where full time series needs to be supplied
Parameters
----------
params : Model parameters, list
Parameters used in Crystal Ball pdf calculation
alpha, n, xbar, sigma
ts : Time series
Time series to compare output of cball function to
Returns
----------
ndarray
Residuals/differences between Crytal Ball pdf and supplied time series
"""
model_output = cball(range(1, len(ts) + 1), params[0], params[1], params[2], params[3])
residuals = []
for i in range(0, len(ts)):
r = model_output[i] - ts[i]
residuals.append(r)
return residuals
[docs]def cball_parfit(p0, timeseries, plot_name = 'CB_fit.png', output_dir = ''):
"""
Uses least squares regression to optimise the parameters in cball to fit the
timeseries supplied. The supplied time series should be the original series
as this function finds the mean annual ts and reverses and normalises it
Parameters
----------
p0 : Initial parameters, list
A list a parameters (alpha, n, xbar, sigma) to use in the Crystal Ball calculation as an initial estimate
timeseries : Time series
Original time series to calculate mean annual time series on, reverse and normalise
and then use to optimise the parameters on
plot_name: string
Name for the data/fit comparison plot
output_dir : str
Directory to save the plots in.
Returns
----------
ndarray
A list of optimised parameters (alpha, n, xbar, sigma)
int
A indication that the optimisation works (if output is 1,2,3 or 4 then ok)
float
The residuals from the best CB fit
"""
try:
mean_ts= timeseries.groupby(timeseries.index.month).mean()
if min(mean_ts)<0 and max(mean_ts)<0:
mean_ts = mean_ts - min(mean_ts)
p0 = [1.5,150,8.5,1.1]
except:
raise RuntimeError('Input time series for CB fit must have a datetime index')
ts = reverse_normalise_ts(mean_ts)
residuals_min = 1
for i in range(5):
params, success = sco.leastsq(err_func, p0, args=ts)
residuals = sum(err_func(params, ts))
plt.plot(ts, 'k.', label='data')
plt.plot(cball(range(1, len(ts) + 1), params[0], params[1], params[2], params[3]), 'r',
label='Crystal ball fit', linewidth=1)
if residuals < residuals_min:
residuals_min = residuals
p0 = params
final_params = params
final_sucess = success
fig, ax = plt.subplots()
plt.plot(ts, 'k.', label='data')
plt.plot(cball(range(1, len(ts) + 1), final_params[0], final_params[1], final_params[2], final_params[3]), 'r', label='Crystal ball fit', linewidth=1)
plt.legend(loc='upper right')
ax.set_xticks(np.arange(len(ts)))
labels = ['Dec', 'Nov','Oct','Sep','Aug','Jul','Jun','May','Apr','Mar','Feb','Jan']
ax.set_xticklabels(labels)
# Rotate the tick labels and set their alignment.
plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
rotation_mode="anchor")
plt.xlabel('Month (reversed)')
plt.ylabel('PDF')
plt.title('Crystal ball fit for ' + plot_name)
plt.savefig(os.path.join(output_dir, "fit_ts_CB_"+plot_name+".png"))
return final_params, final_sucess, residuals_min