# -*- coding: utf-8 -*-
# import stdlib modules
import logging
import warnings
from pathlib import Path
from datetime import datetime
from datetime import timedelta
from calendar import isleap
import rasterio
import numpy as np
from scipy import stats
from retrying import retry
from ost.helpers import raster as ras
from ost.helpers import helpers as h
logger = logging.getLogger(__name__)
[docs]def remove_outliers(arrayin, stddev=2, z_threshold=None):
warnings.filterwarnings("ignore", "invalid value", RuntimeWarning)
if z_threshold:
z_score = np.abs(stats.zscore(arrayin))
array_out = np.ma.MaskedArray(arrayin, mask=z_score > z_threshold)
else:
# calculate percentiles
perc95 = np.percentile(arrayin, 95, axis=0)
perc5 = np.percentile(arrayin, 5, axis=0)
# we mask out the percentile outliers for std dev calculation
masked_array = np.ma.MaskedArray(arrayin, mask=np.logical_or(arrayin > perc95, arrayin < perc5))
# we calculate new std and mean
masked_std = np.std(masked_array, axis=0)
masked_mean = np.mean(masked_array, axis=0)
# we mask based on mean +- x * stddev
array_out = np.ma.MaskedArray(
arrayin,
mask=np.logical_or(
arrayin > masked_mean + masked_std * stddev,
arrayin < masked_mean - masked_std * stddev,
),
)
return array_out
[docs]def date_as_float(date):
size_of_day = 1.0 / 366.0
size_of_second = size_of_day / (24.0 * 60.0 * 60.0)
days_from_jan1 = date - datetime(date.year, 1, 1)
if not isleap(date.year) and days_from_jan1.days >= 31 + 28:
days_from_jan1 += timedelta(1)
return date.year + days_from_jan1.days * size_of_day + days_from_jan1.seconds * size_of_second
[docs]def difference_in_years(start, end):
return date_as_float(end) - date_as_float(start)
[docs]def deseasonalize(stack):
percentiles = np.percentile(stack, 95, axis=[1, 2])
deseasoned = np.subtract(percentiles[:, np.newaxis], stack.reshape(stack.shape[0], -1))
return deseasoned.reshape(stack.shape)
def _zvalue_from_index(arr, ind):
"""work around the limitation of np.choose() by employing np.take()
arr has to be a 3D array
ind has to be a 2D array containing values for z-indicies to take from arr
See: http://stackoverflow.com/a/32091712/4169585
This is faster and more memory efficient than using
the ogrid based solution with fancy indexing.
"""
# get number of columns and rows
_, cols, rows = arr.shape
# get linear indices and extract elements with np.take()
idx = cols * rows * ind + np.arange(cols * rows).reshape((cols, rows))
return np.take(arr, idx)
[docs]def nan_percentile(arr, q):
# taken from:
# https://krstn.eu/np.nanpercentile()-there-has-to-be-a-faster-way/
# valid (non NaN) observations along the first axis
valid_obs = np.sum(np.isfinite(arr), axis=0)
# replace NaN with maximum
max_val = np.nanmax(arr)
arr[np.isnan(arr)] = max_val
# sort - former NaNs will move to the end
arr = np.sort(arr, axis=0)
# loop over requested quantiles
if type(q) is list:
qs = []
qs.extend(q)
else:
qs = [q]
result = []
for i in range(len(qs)):
quant = qs[i]
# desired position as well as floor and ceiling of it
k_arr = (valid_obs - 1) * (quant / 100.0)
f_arr = np.floor(k_arr).astype(np.int32)
c_arr = np.ceil(k_arr).astype(np.int32)
fc_equal_k_mask = f_arr == c_arr
# linear interpolation (like numpy percentile)
# takes the fractional part of desired position
floor_val = _zvalue_from_index(arr=arr, ind=f_arr) * (c_arr - k_arr)
ceil_val = _zvalue_from_index(arr=arr, ind=c_arr) * (k_arr - f_arr)
quant_arr = floor_val + ceil_val
quant_arr[fc_equal_k_mask] = _zvalue_from_index(arr=arr, ind=k_arr.astype(np.int32))[fc_equal_k_mask]
result.append(quant_arr)
return result
[docs]@retry(stop_max_attempt_number=3, wait_fixed=1)
def mt_metrics(stack, out_prefix, metrics, rescale_to_datatype, to_power, outlier_removal, datelist):
"""
:param stack:
:param out_prefix:
:param metrics:
:param rescale_to_datatype:
:param to_power:
:param outlier_removal:
:param datelist:
:return:
"""
logger.info(
f"Creating timescan layers ({metrics}) of track/burst "
f"{out_prefix.parent.parent.name} for {out_prefix.name}"
)
warnings.filterwarnings("ignore", r"All-NaN (slice|axis) encountered")
warnings.filterwarnings("ignore", r"Mean of empty slice")
warnings.filterwarnings("ignore", r"Degrees of freedom", RuntimeWarning)
if "harmonics" in metrics:
logger.info("Calculating harmonics")
if not datelist:
raise RuntimeWarning("Harmonics need the datelist. " "Harmonics will not be calculated")
else:
metrics.remove("harmonics")
metrics.extend(["amplitude", "phase", "residuals", "trend", "model_mean"])
if "percentiles" in metrics:
metrics.remove("percentiles")
metrics.extend(["p95", "p5"])
with rasterio.open(stack) as src:
# get metadata
meta = src.profile
# update driver and reduced band count
meta.update({"driver": "GTiff"})
meta.update({"count": 1})
# write all different output files into a dictionary
metric_dict = {}
for metric in metrics:
filename = f"{out_prefix}.{metric}.tif"
metric_dict[metric] = rasterio.open(filename, "w", **meta)
# scaling factors in case we have to rescale to integer
minimums = {
"avg": int(-30),
"max": int(-30),
"min": int(-30),
"median": -30,
"p5": -30,
"p95": -30,
"std": 0.00001,
"cov": 0.00001,
"amplitude": -5,
"phase": -np.pi,
"residuals": -10,
"trend": -5,
"model_mean": -30,
}
maximums = {
"avg": 5,
"max": 5,
"min": 5,
"median": 5,
"p5": 5,
"p95": 5,
"std": 0.2,
"cov": 1,
"amplitude": 5,
"phase": np.pi,
"residuals": 10,
"trend": 5,
"model_mean": 5,
}
if "amplitude" in metrics:
# construct independent variables
dates, sines, cosines, intercept = [], [], [], []
two_pi = np.multiply(2, np.pi)
for date in sorted(datelist):
delta = difference_in_years(
datetime.strptime("700101", "%y%m%d"),
datetime.strptime(date, "%y%m%d"),
)
dates.append(delta)
sines.append(np.sin(np.multiply(two_pi, delta)))
cosines.append(np.cos(np.multiply(two_pi, delta)))
intercept.append(1)
x_array = np.array([dates, cosines, sines, intercept])
# loop through blocks
for _, window in src.block_windows(1):
# read array with all bands
stack = src.read(range(1, src.count + 1), window=window)
# rescale to float
if rescale_to_datatype is True and meta["dtype"] != "float32":
stack = ras.rescale_to_float(stack, meta["dtype"])
# transform to power
if to_power is True:
stack = np.power(10, np.divide(stack, 10))
# outlier removal (only applies if there are more than 5 bands)
if outlier_removal is True and src.count >= 5:
stack = remove_outliers(stack)
# get stats
arr = {
"p95": (nan_percentile(stack, [95, 5]) if "p95" in metrics else (False, False))[0],
"p5": (nan_percentile(stack, [95, 5]) if "p95" in metrics else (False, False))[1],
"median": (np.nanmedian(stack, axis=0) if "median" in metrics else False),
"avg": (np.nanmean(stack, axis=0) if "avg" in metrics else False),
"max": (np.nanmax(stack, axis=0) if "max" in metrics else False),
"min": (np.nanmin(stack, axis=0) if "min" in metrics else False),
"std": (np.nanstd(stack, axis=0) if "std" in metrics else False),
# 'cov': (stats.variation(stack, axis=0, nan_policy='omit')
"cov": (
np.divide(np.nanstd(stack, axis=0), np.nanmean(stack, axis=0))
if "cov" in metrics
else False
),
}
if "amplitude" in metrics:
stack_size = (stack.shape[1], stack.shape[2])
if to_power is True:
y = ras.convert_to_db(stack).reshape(stack.shape[0], -1)
else:
y = stack.reshape(stack.shape[0], -1)
x, residuals, _, _ = np.linalg.lstsq(x_array.T, y, rcond=-1)
arr["amplitude"] = np.hypot(x[1], x[2]).reshape(stack_size)
arr["phase"] = np.arctan2(x[2], x[1]).reshape(stack_size)
arr["trend"] = x[0].reshape(stack_size)
arr["model_mean"] = x[3].reshape(stack_size)
arr["residuals"] = np.sqrt(np.divide(residuals, stack.shape[0])).reshape(stack_size)
# the metrics to be re-turned to dB, in case to_power is True
metrics_to_convert = ["avg", "min", "max", "p95", "p5", "median"]
# do the back conversions and write to disk loop
for metric in metrics:
if to_power is True and metric in metrics_to_convert:
arr[metric] = ras.convert_to_db(arr[metric])
if (rescale_to_datatype is True and meta["dtype"] != "float32") or (
metric in ["cov", "phase"] and meta["dtype"] != "float32"
):
arr[metric] = ras.scale_to_int(
arr[metric], minimums[metric], maximums[metric], meta["dtype"]
)
# write to dest
metric_dict[metric].write(
np.nan_to_num(arr[metric]).astype(meta["dtype"]),
window=window,
indexes=1,
)
metric_dict[metric].update_tags(1, BAND_NAME=f"{Path(out_prefix).name}_{metric}")
metric_dict[metric].set_band_description(1, f"{Path(out_prefix).name}_{metric}")
# close the output files
for metric in metrics:
# close rio opening
metric_dict[metric].close()
# construct filename
filename = f"{str(out_prefix)}.{metric}.tif"
return_code = h.check_out_tiff(filename)
if return_code != 0:
for metric_ in metrics:
# remove all files and return
filename = f"{str(out_prefix)}.{metric_}.tif"
Path(filename).unlink()
if Path(f"{filename}.xml").exists():
Path(f"{filename}.xml").unlink()
return None, None, None, return_code
# write out that it's been processed
dirname = out_prefix.parent
check_file = dirname / f".{out_prefix.name}.processed"
with open(str(check_file), "w") as file:
file.write("passed all tests \n")
target = out_prefix.parent.parent.name
return target, out_prefix.name, metrics, None
[docs]def gd_mt_metrics(list_of_args):
stack, out_prefix, metrics, rescale_to_datatype = list_of_args[:4]
to_power, outlier_removal, datelist = list_of_args[4:]
return mt_metrics(
stack,
out_prefix,
metrics,
rescale_to_datatype,
to_power,
outlier_removal,
datelist,
)