Source code for ost.generic.ard_to_ts

# -*- coding: utf-8 -*-

import json
import logging
from pathlib import Path
from datetime import datetime as dt
from tempfile import TemporaryDirectory

from retrying import retry
from osgeo import gdal

from ost.generic.common_wrappers import create_stack, mt_speckle_filter
from ost.helpers import raster as ras, helpers as h
from ost.helpers.errors import GPTRuntimeError, NotValidFileError

logger = logging.getLogger(__name__)

SNAP_DATEFORMAT = "%d%b%Y"


[docs]@retry(stop_max_attempt_number=3, wait_fixed=1) def ard_to_ts(list_of_files, burst, product, pol, config_file): # ------------------------------------------- # 1 unpack list of args # convert list of files readable for snap list_of_files = f"'{','.join(str(x) for x in list_of_files)}'" # ------------------------------------------- # 2 read config file with open(config_file, "r") as file: config_dict = json.load(file) processing_dir = Path(config_dict["processing_dir"]) ard = config_dict["processing"]["single_ARD"] ard_mt = config_dict["processing"]["time-series_ARD"] # ------------------------------------------- # 3 get namespace of directories and check if already processed # get the burst directory burst_dir = processing_dir / burst # get timeseries directory and create if non existent out_dir = burst_dir / "Timeseries" Path.mkdir(out_dir, parents=True, exist_ok=True) # in case some processing has been done before, check if already processed check_file = out_dir / f".{product}.{pol}.processed" if Path.exists(check_file): logger.info(f"Timeseries of {burst} for {product} in {pol} " f"polarisation already processed.") out_files = "already_processed" out_vrt = "already_processed" return (burst, list_of_files, out_files, out_vrt, f"{product}.{pol}", None) # ------------------------------------------- # 4 adjust processing parameters according to config # get the db scaling right to_db = ard["to_db"] if to_db or product != "bs": to_db = False logger.debug(f"Not converting to dB for {product}") else: to_db = ard_mt["to_db"] logger.debug(f"Converting to dB for {product}") if ard_mt["apply_ls_mask"]: extent = burst_dir / f"{burst}.valid.json" else: extent = burst_dir / f"{burst}.min_bounds.json" # ------------------------------------------- # 5 SNAP processing with TemporaryDirectory(prefix=f"{config_dict['temp_dir']}/") as temp: # turn to Path object temp = Path(temp) # create namespaces temp_stack = temp / f"{burst}_{product}_{pol}" out_stack = temp / f"{burst}_{product}_{pol}_mt" stack_log = out_dir / f"{burst}_{product}_{pol}_stack.err_log" # run stacking routine if pol in ["Alpha", "Anisotropy", "Entropy"]: logger.info( f"Creating multi-temporal stack of images of burst/track " f"{burst} for the {pol} band of the polarimetric " f"H-A-Alpha decomposition." ) try: create_stack(list_of_files, temp_stack, stack_log, config_dict, pattern=pol) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, None, None, None, None, error else: logger.info( f"Creating multi-temporal stack of images of burst/track " f"{burst} for {product} product in {pol} polarization." ) try: create_stack(list_of_files, temp_stack, stack_log, config_dict, polarisation=pol) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, None, None, None, None, error # run mt speckle filter if ard_mt["remove_mt_speckle"] is True: speckle_log = out_dir / f"{burst}_{product}_{pol}_mt_speckle.err_log" logger.debug("Applying multi-temporal speckle filter") try: mt_speckle_filter(temp_stack.with_suffix(".dim"), out_stack, speckle_log, config_dict) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, None, None, None, None, error # remove tmp files h.delete_dimap(temp_stack) else: out_stack = temp_stack # ----------------------------------------------- # 6 Conversion to GeoTiff # min max dict for stretching in case of 16 or 8 bit datatype mm_dict = { "bs": {"min": -30, "max": 5}, "coh": {"min": 0.000001, "max": 1}, "Alpha": {"min": 0.000001, "max": 90}, "Anisotropy": {"min": 0.000001, "max": 1}, "Entropy": {"min": 0.000001, "max": 1}, } stretch = pol if pol in ["Alpha", "Anisotropy", "Entropy"] else product if product == "coh": # get slave and master dates from file names and sort them mst_dates = sorted( [ dt.strptime(file.name.split("_")[3].split(".")[0], SNAP_DATEFORMAT) for file in list(out_stack.with_suffix(".data").glob("*.img")) ] ) slv_dates = sorted( [ dt.strptime(file.name.split("_")[4].split(".")[0], SNAP_DATEFORMAT) for file in list(out_stack.with_suffix(".data").glob("*.img")) ] ) # write them back to string for following loop mst_dates = [dt.strftime(ts, SNAP_DATEFORMAT) for ts in mst_dates] slv_dates = [dt.strftime(ts, SNAP_DATEFORMAT) for ts in slv_dates] out_files = [] for i, (mst, slv) in enumerate(zip(mst_dates, slv_dates)): # re-construct namespace for input file infile = list(out_stack.with_suffix(".data").glob(f"*{pol}*{mst}_{slv}*img"))[0] # rename dates to YYYYMMDD format mst = dt.strftime(dt.strptime(mst, SNAP_DATEFORMAT), "%y%m%d") slv = dt.strftime(dt.strptime(slv, SNAP_DATEFORMAT), "%y%m%d") # create namespace for output file with renamed dates outfile = out_dir / f"{i+1:02d}.{mst}.{slv}.{product}.{pol}.tif" # fill internal values if any # with rasterio.open(str(infile), 'r') as src: # meta = src.meta.copy() # filled = ras.fill_internal_nans(src.read()) # with rasterio.open(str(infile), 'w', **meta) as dest: # dest.write(filled) # print('filled') # produce final outputfile, # including dtype conversion and ls mask ras.mask_by_shape( infile, outfile, extent, to_db=to_db, datatype=ard_mt["dtype_output"], min_value=mm_dict[stretch]["min"], max_value=mm_dict[stretch]["max"], ndv=0.0, description=True, ) # add ot a list for subsequent vrt creation out_files.append(str(outfile)) else: # get the dates of the files dates = sorted( [ dt.strptime(file.name.split("_")[-1][:-4], SNAP_DATEFORMAT) for file in list(out_stack.with_suffix(".data").glob("*.img")) ] ) # write them back to string for following loop dates = [dt.strftime(ts, "%d%b%Y") for ts in dates] out_files = [] for i, date in enumerate(dates): # re-construct namespace for input file infile = list(out_stack.with_suffix(".data").glob(f"*{pol}*{date}*img"))[0] # restructure date to YYMMDD date = dt.strftime(dt.strptime(date, SNAP_DATEFORMAT), "%y%m%d") # create namespace for output file outfile = out_dir / f"{i+1:02d}.{date}.{product}.{pol}.tif" # fill internal nodata # if ard['image_type'] == 'SLC': # with rasterio.open(str(infile), 'r') as src: # meta = src.meta.copy() # filled = ras.fill_internal_nans(src.read()) # with rasterio.open(str(infile), 'w', **meta) as dest: # dest.write(filled) # print('filledbs') # run conversion routine ras.mask_by_shape( infile, outfile, extent, to_db=to_db, datatype=ard_mt["dtype_output"], min_value=mm_dict[stretch]["min"], max_value=mm_dict[stretch]["max"], ndv=0.0, ) # add ot a list for subsequent vrt creation out_files.append(str(outfile)) # ----------------------------------------------- # 7 Filechecks for file in out_files: return_code = h.check_out_tiff(file) if return_code != 0: for file_ in out_files: Path(file_).unlink() if Path(f"{file}.xml").exists(): Path(f"{file}.xml").unlink() return (burst, list_of_files, None, None, f"{product}.{pol}", return_code) # write file, so we know this ts has been successfully processed with open(str(check_file), "w") as file: file.write("passed all tests \n") # ----------------------------------------------- # 8 Create vrts vrt_options = gdal.BuildVRTOptions(srcNodata=0, separate=True) out_vrt = str(out_dir / f"Timeseries.{product}.{pol}.vrt") gdal.BuildVRT(out_vrt, out_files, options=vrt_options) return burst, list_of_files, out_files, out_vrt, f"{product}.{pol}", None
[docs]def gd_ard_to_ts(list_of_args, config_file): list_of_files, burst, product, pol = list_of_args return ard_to_ts(list_of_files, burst, product, pol, config_file)