Source code for ost.s1.burst_batch

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Batch processing routines for Sentinel-1 bursts

This module handles all the batch processing routines involved
in the full workflow from raw Sentinel-1 SLC imagery to
large-scale time-series and timescan mosaics.
"""

import os
import shutil
import json
import itertools
import logging
from pathlib import Path

import pandas as pd
from godale._concurrent import Executor

from ost.helpers import raster as ras, helpers as h
from ost.s1.burst_inventory import prepare_burst_inventory
from ost.s1.burst_to_ard import burst_to_ard
from ost.generic import ard_to_ts, ts_extent, ts_ls_mask, timescan, mosaic

# set up logger
logger = logging.getLogger(__name__)

# ---------------------------------------------------
# Global variable
PRODUCT_LIST = [
    "bs.HH",
    "bs.VV",
    "bs.HV",
    "bs.VH",
    "coh.VV",
    "coh.VH",
    "coh.HH",
    "coh.HV",
    "pol.Entropy",
    "pol.Anisotropy",
    "pol.Alpha",
]


[docs]def bursts_to_ards(burst_gdf, config_file): """Batch processing from single bursts to ARD format This function handles the burst processing based on a OST burst inventory file and an OST config file that contains all necessary information about the project (e.g. project directory) and processing steps applied for the ARD generation based on the JSON ARD-type templates. :param burst_gdf: an OST burst inventory :type burst_gdf: GeoDataFrame :param config_file: (str/Path) path to the project config file :param executor_type: executer type for parallel processing with godale, defaults to multiprocessing :param max_workers: number of parallel burst processing jobs to start :return: """ print("--------------------------------------------------------------") logger.info("Processing all single bursts to ARD") print("--------------------------------------------------------------") logger.info("Preparing the processing pipeline. This may take a moment.") proc_inventory = prepare_burst_inventory(burst_gdf, config_file) with open(config_file, "r") as file: config_dict = json.load(file) executor_type = config_dict["executor_type"] max_workers = config_dict["max_workers"] # we update max_workers in case we have less snap_cpu_parallelism # then cpus available if max_workers == 1 and config_dict["snap_cpu_parallelism"] < os.cpu_count(): max_workers = int(os.cpu_count() / config_dict["snap_cpu_parallelism"]) # now we run with godale, which works also with 1 worker out_dict = { "burst": [], "acq_date": [], "out_bs": [], "out_ls": [], "out_pol": [], "out_coh": [], "error": [], } executor = Executor(executor=executor_type, max_workers=max_workers) for task in executor.as_completed( func=burst_to_ard, iterable=proc_inventory.iterrows(), fargs=( [ str(config_file), ] ), ): burst, date, out_bs, out_ls, out_pol, out_coh, error = task.result() out_dict["burst"].append(burst) out_dict["acq_date"].append(date) out_dict["out_bs"].append(out_bs) out_dict["out_ls"].append(out_ls) out_dict["out_pol"].append(out_pol) out_dict["out_coh"].append(out_coh) out_dict["error"].append(error) return pd.DataFrame.from_dict(out_dict)
def _create_extents(burst_gdf, config_file): """Batch processing for multi-temporal Layover7Shadow mask This function handles the organization of the :param burst_gdf: :param config_file: :return: """ with open(config_file, "r") as file: config_dict = json.load(file) processing_dir = Path(config_dict["processing_dir"]) # create extent iterable iter_list = [] for burst in burst_gdf.bid.unique(): # get the burst directory burst_dir = processing_dir / burst list_of_extents = list(burst_dir.glob("*/*/*bounds.json")) # if extent does not already exist, add to iterable if not (burst_dir / f"{burst}.min_bounds.json").exists(): iter_list.append(list_of_extents) # now we run with godale, which works also with 1 worker executor = Executor(executor=config_dict["executor_type"], max_workers=os.cpu_count()) out_dict = {"burst": [], "list_of_scenes": [], "extent": []} for task in executor.as_completed( func=ts_extent.mt_extent, iterable=iter_list, fargs=( [ str(config_file), ] ), ): track, list_of_scenes, extent = task.result() out_dict["burst"].append(track) out_dict["list_of_scenes"].append(list_of_scenes) out_dict["extent"].append(extent) def _create_extents_old(burst_gdf, config_file): """Batch processing for multi-temporal Layover7Shadow mask This function handles the organization of the :param burst_gdf: :param config_file: :return: """ with open(config_file, "r") as file: config_dict = json.load(file) processing_dir = Path(config_dict["processing_dir"]) # create extent iterable iter_list = [] for burst in burst_gdf.bid.unique(): # get the burst directory burst_dir = processing_dir / burst # get common burst extent list_of_bursts = list(burst_dir.glob("**/*img")) list_of_bursts = [str(x) for x in list_of_bursts if "layover" not in str(x)] # if the file does not already exist, add to iterable extent = burst_dir / f"{burst}.extent.gpkg" if not extent.exists(): iter_list.append(list_of_bursts) # now we run with godale, which works also with 1 worker executor = Executor(executor=config_dict["executor_type"], max_workers=config_dict["max_workers"]) for task in executor.as_completed( func=ts_extent.mt_extent, iterable=iter_list, fargs=( [ str(config_file), ] ), ): task.result() def _create_mt_ls_mask(burst_gdf, config_file): """Helper function to union the Layover/Shadow masks of a Time-series This function creates a :param inventory_df: :param config_file: :return: """ # read config file with open(config_file, "r") as file: config_dict = json.load(file) processing_dir = config_dict["processing_dir"] # create layover iter_list = [] for burst in burst_gdf.bid.unique(): # *** # get the burst directory burst_dir = Path(processing_dir) / burst # get common burst extent list_of_masks = list(burst_dir.glob("*/*/*_ls_mask.json")) if not (burst_dir / f"{burst}.ls_mask.json").exists(): iter_list.append(list_of_masks) # now we run with godale, which works also with 1 worker executor = Executor(executor=config_dict["executor_type"], max_workers=os.cpu_count()) for task in executor.as_completed(func=ts_ls_mask.mt_layover, iterable=iter_list): task.result() def _create_mt_ls_mask_old(burst_gdf, config_file): """Batch processing for multi-temporal Layover/Shadow mask This function handles the organization of the :param burst_gdf: :param config_file: :return: """ # read config file with open(config_file, "r") as file: config_dict = json.load(file) processing_dir = config_dict["processing_dir"] # create layover iter_list = [] for burst in burst_gdf.bid.unique(): # *** # get the burst directory burst_dir = Path(processing_dir) / burst # get layover scenes list_of_scenes = list(burst_dir.glob("20*/*data*/*img")) list_of_layover = [str(x) for x in list_of_scenes if "layover" in str(x)] # we need to redefine the namespace of the already created extents extent = burst_dir / f"{burst}.extent.gpkg" if not extent.exists(): raise FileNotFoundError(f"Extent file for burst {burst} not found.") # layover/shadow mask out_ls = burst_dir / f"{burst}.ls_mask.tif" # if the file does not already exists, then put into list to process if not out_ls.exists(): iter_list.append(list_of_layover) # now we run with godale, which works also with 1 worker executor = Executor(executor=config_dict["executor_type"], max_workers=config_dict["max_workers"]) for task in executor.as_completed( func=ts_ls_mask.mt_layover, iterable=iter_list, fargs=( [ str(config_file), ] ), ): task.result() def _create_timeseries(burst_gdf, config_file): # we need a # dict_of_product_types = {'bs': 'Gamma0', 'coh': 'coh', 'pol': 'pol'} list_of_product_types = { ("bs", "Gamma0"), ("bs", "Sigma0"), ("coh", "coh"), ("pol", "pol"), } pols = ["VV", "VH", "HH", "HV", "Alpha", "Entropy", "Anisotropy"] # read config file with open(config_file, "r") as file: config_dict = json.load(file) processing_dir = config_dict["processing_dir"] # create iterable iter_list = [] for burst in burst_gdf.bid.unique(): burst_dir = Path(processing_dir) / burst # for pr, pol in itertools.product(dict_of_product_types.items(), pols): for pr, pol in itertools.product(list_of_product_types, pols): # unpack items product, product_name = list(pr) # take care of H-A-Alpha naming for file search if pol in ["Alpha", "Entropy", "Anisotropy"] and product == "pol": list_of_files = sorted(list(burst_dir.glob(f"20*/*data*/*{pol}*img"))) else: # see if there is actually any imagery for this # combination of product and polarisation list_of_files = sorted(list(burst_dir.glob(f"20*/*data*/*{product_name}*{pol}*img"))) if len(list_of_files) <= 1: continue # create list of dims if polarisation is present list_of_dims = sorted(str(dim) for dim in list(burst_dir.glob(f"20*/*{product}*dim"))) iter_list.append([list_of_dims, burst, product, pol]) # now we run with godale, which works also with 1 worker executor = Executor(executor=config_dict["executor_type"], max_workers=config_dict["max_workers"]) out_dict = { "burst": [], "list_of_dims": [], "out_files": [], "out_vrt": [], "product": [], "error": [], } for task in executor.as_completed( func=ard_to_ts.gd_ard_to_ts, iterable=iter_list, fargs=( [ str(config_file), ] ), ): burst, list_of_dims, out_files, out_vrt, product, error = task.result() out_dict["burst"].append(burst) out_dict["list_of_dims"].append(list_of_dims) out_dict["out_files"].append(out_files) out_dict["out_vrt"].append(out_vrt) out_dict["product"].append(product) out_dict["error"].append(error) return pd.DataFrame.from_dict(out_dict)
[docs]def ards_to_timeseries(burst_gdf, config_file): print("--------------------------------------------------------------") logger.info("Processing all burst ARDs time-series") print("--------------------------------------------------------------") # load ard parameters with open(config_file, "r") as ard_file: ard_params = json.load(ard_file)["processing"] ard = ard_params["single_ARD"] ard_mt = ard_params["time-series_ARD"] # create all extents _create_extents(burst_gdf, config_file) # update extents in case of ls_mask if ard["create_ls_mask"] or ard_mt["apply_ls_mask"]: _create_mt_ls_mask(burst_gdf, config_file) # finally create time-series df = _create_timeseries(burst_gdf, config_file) return df
# -------------------- # timescan part # --------------------
[docs]def timeseries_to_timescan(burst_gdf, config_file): """Function to create a timescan out of a OST timeseries.""" print("--------------------------------------------------------------") logger.info("Processing all burst ARDs time-series to ARD timescans") print("--------------------------------------------------------------") # ------------------------------------- # 1 load project config with open(config_file, "r") as ard_file: config_dict = json.load(ard_file) processing_dir = config_dict["processing_dir"] ard = config_dict["processing"]["single_ARD"] ard_mt = config_dict["processing"]["time-series_ARD"] ard_tscan = config_dict["processing"]["time-scan_ARD"] # get the db scaling right to_db = True if ard["to_db"] or ard_mt["to_db"] else False # get datatype right dtype_conversion = True if ard_mt["dtype_output"] != "float32" else False # ------------------------------------- # 2 create iterable for parallel processing iter_list, vrt_iter_list = [], [] for burst in burst_gdf.bid.unique(): # get relevant directories burst_dir = Path(processing_dir) / burst timescan_dir = burst_dir / "Timescan" timescan_dir.mkdir(parents=True, exist_ok=True) for product in PRODUCT_LIST: # check if already processed if (timescan_dir / f".{product}.processed").exists(): logger.debug(f"Timescans for burst {burst} already processed.") continue # get respective timeseries timeseries = burst_dir / "Timeseries" / f"Timeseries.{product}.vrt" # che if this timsereis exists ( since we go through all products if not timeseries.exists(): continue # datelist for harmonics scenelist = list(burst_dir.glob(f"Timeseries/*{product}*tif")) datelist = [file.name.split(".")[1][:6] for file in sorted(scenelist)] # define timescan prefix timescan_prefix = timescan_dir / product # get rescaling and db right (backscatter vs. coh/pol) if "bs." in str(timescan_prefix): to_power, rescale = to_db, dtype_conversion else: to_power, rescale = False, False iter_list.append( [ timeseries, timescan_prefix, ard_tscan["metrics"], rescale, to_power, ard_tscan["remove_outliers"], datelist, ] ) vrt_iter_list.append(timescan_dir) # now we run with godale, which works also with 1 worker executor = Executor(executor=config_dict["executor_type"], max_workers=config_dict["max_workers"]) # run timescan creation out_dict = {"burst": [], "prefix": [], "metrics": [], "error": []} for task in executor.as_completed(func=timescan.gd_mt_metrics, iterable=iter_list): burst, prefix, metrics, error = task.result() out_dict["burst"].append(burst) out_dict["prefix"].append(prefix) out_dict["metrics"].append(metrics) out_dict["error"].append(error) df = pd.DataFrame.from_dict(out_dict) # run vrt creation for task in executor.as_completed( func=ras.create_tscan_vrt, iterable=vrt_iter_list, fargs=( [ str(config_file), ] ), ): task.result() return df
[docs]def mosaic_timeseries(burst_inventory, config_file): print(" -----------------------------------------------------------------") logger.info("Mosaicking time-series layers.") print(" -----------------------------------------------------------------") # ------------------------------------- # 1 load project config with open(config_file, "r") as ard_file: config_dict = json.load(ard_file) processing_dir = Path(config_dict["processing_dir"]) # create output folder ts_dir = processing_dir / "Mosaic" / "Timeseries" ts_dir.mkdir(parents=True, exist_ok=True) temp_mosaic = processing_dir / "Mosaic" / "temp" temp_mosaic.mkdir(parents=True, exist_ok=True) # ------------------------------------- # 2 create iterable # loop through each product iter_list, vrt_iter_list = [], [] for product in PRODUCT_LIST: for track in burst_inventory.Track.unique(): dates = [ date[2:] for date in sorted(burst_inventory.Date[burst_inventory.Track == track].unique()) ] for i, date in enumerate(dates): if "coh" in product: # we do the try, since for the last date # there is no dates[i+1] for coherence try: temp_acq = temp_mosaic / f"{i}.{date}.{dates[i + 1]}.{track}.{product}.tif" except IndexError: temp_acq = None else: temp_acq = temp_mosaic / f"{i}.{date}.{track}.{product}.tif" if temp_acq: iter_list.append([track, date, product, temp_acq, config_file]) # now we run with godale, which works also with 1 worker executor = Executor(executor=config_dict["executor_type"], max_workers=config_dict["max_workers"]) # run vrt creation for task in executor.as_completed(func=mosaic.gd_mosaic_slc_acquisition, iterable=iter_list): task.result() # mosaic the acquisitions iter_list, vrt_iter_list = [], [] for product in PRODUCT_LIST: outfiles = [] for i in range(len(dates)): list_of_files = list(temp_mosaic.glob(f"{i}.*{product}.tif")) if not list_of_files: continue datelist = [] for file in list_of_files: if "coh" in product: datelist.append(f"{file.name.split('.')[2]}_{file.name.split('.')[1]}") else: datelist.append(file.name.split(".")[1]) # get start and endate of mosaic start, end = sorted(datelist)[0], sorted(datelist)[-1] list_of_files = " ".join([str(file) for file in list_of_files]) # create namespace for output file if start == end: outfile = ts_dir / f"{i + 1:02d}.{start}.{product}.tif" # with the above operation, the list automatically # turns into string, so we can call directly list_of_files shutil.move(list_of_files, outfile) outfiles.append(outfile) continue else: outfile = ts_dir / f"{i + 1:02d}.{start}-{end}.{product}.tif" # create namespace for check_file check_file = outfile.parent / f".{outfile.name[:-4]}.processed" if check_file.exists(): logger.info(f"Mosaic layer {outfile} already processed.") continue # append to list of outfile for vrt creation outfiles.append(outfile) iter_list.append([list_of_files, outfile, config_file]) vrt_iter_list.append([ts_dir, product, outfiles]) # now we run with godale, which works also with 1 worker executor = Executor(executor=config_dict["executor_type"], max_workers=config_dict["max_workers"]) # run mosaicking for task in executor.as_completed(func=mosaic.gd_mosaic, iterable=iter_list): task.result() # run mosaicking vrts for task in executor.as_completed(func=mosaic.create_timeseries_mosaic_vrt, iterable=vrt_iter_list): task.result() # remove temp folder h.remove_folder_content(temp_mosaic)
[docs]def mosaic_timescan(burst_inventory, config_file): """ :param burst_inventory: :param config_file: :return: """ print(" -----------------------------------------------------------------") logger.info("Mosaicking time-scan layers.") print(" -----------------------------------------------------------------") # ------------------------------------- # 1 load project config with open(config_file, "r") as ard_file: config_dict = json.load(ard_file) processing_dir = Path(config_dict["processing_dir"]) metrics = config_dict["processing"]["time-scan_ARD"]["metrics"] if "harmonics" in metrics: metrics.remove("harmonics") metrics.extend(["amplitude", "phase", "residuals", "model_mean"]) if "percentiles" in metrics: metrics.remove("percentiles") metrics.extend(["p95", "p5"]) # create output folder ts_dir = processing_dir / "Mosaic" / "Timescan" ts_dir.mkdir(parents=True, exist_ok=True) temp_mosaic = processing_dir / "Mosaic" / "temp" temp_mosaic.mkdir(parents=True, exist_ok=True) # ------------------------------------- # 2 create iterable # loop through each product iter_list = [] for product, metric in itertools.product(PRODUCT_LIST, metrics): for track in burst_inventory.Track.unique(): filelist = list(processing_dir.glob(f"[A,D]{track}_IW*/Timescan/*{product}.{metric}.tif")) if not len(filelist) >= 1: continue temp_acq = temp_mosaic / f"{track}.{product}.{metric}.tif" if temp_acq: iter_list.append([track, metric, product, temp_acq, config_file]) # now we run with godale, which works also with 1 worker executor = Executor(executor=config_dict["executor_type"], max_workers=config_dict["max_workers"]) # run vrt creation for task in executor.as_completed(func=mosaic.gd_mosaic_slc_acquisition, iterable=iter_list): task.result() iter_list = [] for product, metric in itertools.product(PRODUCT_LIST, metrics): list_of_files = list(temp_mosaic.glob(f"*{product}.{metric}.tif")) if not list_of_files: continue # turn to OTB readable format list_of_files = " ".join([str(file) for file in list_of_files]) # create namespace for outfile outfile = ts_dir / f"{product}.{metric}.tif" check_file = outfile.parent / f".{outfile.name[:-4]}.processed" if check_file.exists(): logger.info(f"Mosaic layer {outfile.name} already processed.") continue logger.info(f"Mosaicking layer {outfile.name}.") iter_list.append([list_of_files, outfile, config_file]) # now we run with godale, which works also with 1 worker executor = Executor(executor=config_dict["executor_type"], max_workers=config_dict["max_workers"]) # run mosaicking for task in executor.as_completed(func=mosaic.gd_mosaic, iterable=iter_list): task.result() ras.create_tscan_vrt(ts_dir, config_file) # remove temp folder h.remove_folder_content(temp_mosaic)
[docs]def mosaic_timescan_old(config_file): print(" -----------------------------------------------------------------") logger.info("Mosaicking time-scan layers.") print(" -----------------------------------------------------------------") with open(config_file, "r") as ard_file: config_dict = json.load(ard_file) processing_dir = Path(config_dict["processing_dir"]) metrics = config_dict["processing"]["time-scan_ARD"]["metrics"] if "harmonics" in metrics: metrics.remove("harmonics") metrics.extend(["amplitude", "phase", "residuals"]) if "percentiles" in metrics: metrics.remove("percentiles") metrics.extend(["p95", "p5"]) tscan_dir = processing_dir / "Mosaic" / "Timescan" tscan_dir.mkdir(parents=True, exist_ok=True) iter_list = [] for product, metric in itertools.product(PRODUCT_LIST, metrics): filelist = list(processing_dir.glob(f"*/Timescan/*{product}.{metric}.tif")) if not len(filelist) >= 1: continue filelist = " ".join([str(file) for file in filelist]) outfile = tscan_dir / f"{product}.{metric}.tif" check_file = outfile.parent / f".{outfile.name[:-4]}.processed" if check_file.exists(): logger.info(f"Mosaic layer {outfile.name} already processed.") continue logger.info(f"Mosaicking layer {outfile.name}.") iter_list.append([filelist, outfile, config_file]) # now we run with godale, which works also with 1 worker executor = Executor(executor=config_dict["executor_type"], max_workers=config_dict["max_workers"]) # run mosaicking for task in executor.as_completed(func=mosaic.gd_mosaic, iterable=iter_list): task.result() ras.create_tscan_vrt(tscan_dir, config_file)