Source code for ost.s1.burst_to_ard

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import json
import logging
from pathlib import Path
from tempfile import TemporaryDirectory

import numpy as np
import rasterio

from ost.helpers import helpers as h
from ost.s1 import slc_wrappers as slc
from ost.generic import common_wrappers as common
from ost.helpers import raster as ras
from ost.helpers.errors import GPTRuntimeError, NotValidFileError

logger = logging.getLogger(__name__)


[docs]def create_polarimetric_layers(import_file, out_dir, burst_prefix, config_dict): """Pipeline for Dual-polarimetric decomposition :param import_file: :param out_dir: :param burst_prefix: :param config_dict: :return: """ # temp dir for intermediate files with TemporaryDirectory(prefix=f"{config_dict['temp_dir']}/") as temp: temp = Path(temp) # ------------------------------------------------------- # 1 Polarimetric Decomposition # create namespace for temporary decomposed product out_haa = temp / f"{burst_prefix}_h" # create namespace for decompose log haa_log = out_dir / f"{burst_prefix}_haa.err_log" # run polarimetric decomposition try: slc.ha_alpha(import_file, out_haa, haa_log, config_dict) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, error # ------------------------------------------------------- # 2 Geocoding # create namespace for temporary geocoded product out_htc = temp / f"{burst_prefix}_pol" # create namespace for geocoding log haa_tc_log = out_dir / f"{burst_prefix}_haa_tc.err_log" # run geocoding try: common.terrain_correction(out_haa.with_suffix(".dim"), out_htc, haa_tc_log, config_dict) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, error # set nans to 0 (issue from SNAP for polarimetric layers) for infile in list(out_htc.with_suffix(".data").glob("*.img")): with rasterio.open(str(infile), "r") as src: meta = src.meta.copy() array = src.read() array[np.isnan(array)] = 0 with rasterio.open(str(infile), "w", **meta) as dest: dest.write(array) # --------------------------------------------------------------------- # 5 Create an outline ras.image_bounds(out_htc.with_suffix(".data")) # move to final destination ard = config_dict["processing"]["single_ARD"] h.move_dimap(out_htc, out_dir / f"{burst_prefix}_pol", ard["to_tif"]) # write out check file for tracking that it is processed with (out_dir / ".pol.processed").open("w+") as file: file.write("passed all tests \n") dim_file = out_dir / f"{burst_prefix}_pol.dim" return (str(dim_file), None)
[docs]def create_backscatter_layers(import_file, out_dir, burst_prefix, config_dict): """Pipeline for backscatter processing :param import_file: :param out_dir: :param burst_prefix: :param config_dict: :return: """ # get relevant config parameters ard = config_dict["processing"]["single_ARD"] # temp dir for intermediate files with TemporaryDirectory(prefix=f"{config_dict['temp_dir']}/") as temp: temp = Path(temp) # --------------------------------------------------------------------- # 1 Calibration # create namespace for temporary calibrated product out_cal = temp / f"{burst_prefix}_cal" # create namespace for calibrate log cal_log = out_dir / f"{burst_prefix}_cal.err_log" # run calibration on imported scene try: slc.calibration(import_file, out_cal, cal_log, config_dict) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, None, error # --------------------------------------------------------------------- # 2 Speckle filtering if ard["remove_speckle"]: # create namespace for temporary speckle filtered product speckle_import = temp / f"{burst_prefix}_speckle_import" # create namespace for speckle filter log speckle_log = out_dir / f"{burst_prefix}_speckle.err_log" # run speckle filter on calibrated input try: common.speckle_filter( out_cal.with_suffix(".dim"), speckle_import, speckle_log, config_dict, ) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, None, error # remove input h.delete_dimap(out_cal) # reset master_import for following routine out_cal = speckle_import # --------------------------------------------------------------------- # 3 dB scaling if ard["to_db"]: # create namespace for temporary db scaled product out_db = temp / f"{burst_prefix}_cal_db" # create namespace for db scaling log db_log = out_dir / f"{burst_prefix}_cal_db.err_log" # run db scaling on calibrated/speckle filtered input try: common.linear_to_db(out_cal.with_suffix(".dim"), out_db, db_log, config_dict) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, None, error # remove tmp files h.delete_dimap(out_cal) # set out_cal to out_db for further processing out_cal = out_db # --------------------------------------------------------------------- # 4 Geocoding # create namespace for temporary geocoded product out_tc = temp / f"{burst_prefix}_bs" # create namespace for geocoding log tc_log = out_dir / f"{burst_prefix}_bs_tc.err_log" # run terrain correction on calibrated/speckle filtered/db input try: common.terrain_correction(out_cal.with_suffix(".dim"), out_tc, tc_log, config_dict) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, None, error # --------------------------------------------------------------------- # 5 Create an outline ras.image_bounds(out_tc.with_suffix(".data")) # --------------------------------------------------------------------- # 6 Layover/Shadow mask out_ls = None # set to none for final return statement if ard["create_ls_mask"] is True: # create namespace for temporary ls mask product ls_mask = temp / f"{burst_prefix}_ls_mask" # create namespace for ls mask log logfile = out_dir / f"{burst_prefix}.ls_mask.errLog" # run ls mask routine try: common.ls_mask(out_cal.with_suffix(".dim"), ls_mask, logfile, config_dict) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, None, error # polygonize ls_raster = list(ls_mask.with_suffix(".data").glob("*img"))[0] ras.polygonize_ls(ls_raster, ls_mask.with_suffix(".json")) out_ls = out_tc.with_suffix(".data").joinpath(ls_mask.name).with_suffix(".json") # move to product folder ls_mask.with_suffix(".json").rename(out_ls) # move final backscatter product to actual output directory h.move_dimap(out_tc, out_dir / f"{burst_prefix}_bs", ard["to_tif"]) # write out check file for tracking that it is processed with (out_dir / ".bs.processed").open("w+") as file: file.write("passed all tests \n") return ( str((out_dir / f"{burst_prefix}_bs").with_suffix(".dim")), str(out_ls), None, )
[docs]def create_coherence_layers(master_import, slave_import, out_dir, master_prefix, config_dict): """ Pipeline for Dual-polarimetric decomposition :param master_import: :param slave_import: :param out_dir: :param master_prefix: :param config_dict: :return: """ # get relevant config parameters ard = config_dict["processing"]["single_ARD"] with TemporaryDirectory(prefix=f"{config_dict['temp_dir']}/") as temp: temp = Path(temp) # --------------------------------------------------------------- # 1 Co-registration # create namespace for temporary co-registered stack out_coreg = temp / f"{master_prefix}_coreg" # create namespace for co-registration log coreg_log = out_dir / f"{master_prefix}_coreg.err_log" # run co-registration try: slc.coreg(master_import, slave_import, out_coreg, coreg_log, config_dict) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) h.delete_dimap(out_coreg) # remove imports h.delete_dimap(master_import) return None, error # remove imports h.delete_dimap(master_import) h.delete_dimap(slave_import) # --------------------------------------------------------------- # 2 Coherence calculation # create namespace for temporary coherence product out_coh = temp / f"{master_prefix}_coherence" # create namespace for coherence log coh_log = out_dir / f"{master_prefix}_coh.err_log" # run coherence estimation try: slc.coherence(out_coreg.with_suffix(".dim"), out_coh, coh_log, config_dict) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, error # remove coreg tmp files h.delete_dimap(out_coreg) # --------------------------------------------------------------- # 3 Geocoding # create namespace for temporary geocoded roduct out_tc = temp / f"{master_prefix}_coh" # create namespace for geocoded log tc_log = out_dir / f"{master_prefix}_coh_tc.err_log" # run geocoding try: common.terrain_correction(out_coh.with_suffix(".dim"), out_tc, tc_log, config_dict) except (GPTRuntimeError, NotValidFileError) as error: logger.info(error) return None, error # --------------------------------------------------------------- # 4 Checks and Clean-up # remove tmp files h.delete_dimap(out_coh) # --------------------------------------------------------------------- # 5 Create an outline ras.image_bounds(out_tc.with_suffix(".data")) # move to final destination h.move_dimap(out_tc, out_dir / f"{master_prefix}_coh", ard["to_tif"]) # write out check file for tracking that it is processed with (out_dir / ".coh.processed").open("w+") as file: file.write("passed all tests \n") dim_file = out_dir / f"{master_prefix}_coh.dim" return (str(dim_file), None)
[docs]def burst_to_ard(burst, config_file): # this is a godale thing if isinstance(burst, tuple): i, burst = burst # load relevant config parameters with open(config_file, "r") as file: config_dict = json.load(file) ard = config_dict["processing"]["single_ARD"] temp_dir = Path(config_dict["temp_dir"]) # creation of out_directory out_dir = Path(burst.out_directory) out_dir.mkdir(parents=True, exist_ok=True) # existence of processed files pol_file = (out_dir / ".pol.processed").exists() bs_file = (out_dir / ".bs.processed").exists() coh_file = (out_dir / ".coh.processed").exists() # set all return values initially to None out_bs, out_ls, out_pol, out_coh = None, None, None, None # check if we need to produce coherence if ard["coherence"]: # we check if there is actually a slave file or # if it is the end of the time-series coherence = True if burst.slave_file else False else: coherence = False # get info on master from GeoSeries master_prefix = burst["master_prefix"] master_file = burst["file_location"] master_burst_nr = burst["BurstNr"] swath = burst["SwathID"] logger.info(f"Processing burst {burst.bid} acquired at {burst.Date}") # check if somethings already processed if ( (ard["H-A-Alpha"] and not pol_file) or (ard["backscatter"] and not bs_file) or (coherence and not coh_file) ): # --------------------------------------------------------------------- # 1 Master import # create namespace for master import master_import = temp_dir / f"{master_prefix}_import" if not master_import.with_suffix(".dim").exists(): # create namespace for log file import_log = out_dir / f"{master_prefix}_import.err_log" # run import try: slc.burst_import( master_file, master_import, import_log, swath, master_burst_nr, config_dict, ) except (GPTRuntimeError, NotValidFileError) as error: if master_import.with_suffix(".dim").exists(): h.delete_dimap(master_import) logger.info(error) return burst.bid, burst.Date, None, None, None, None, error # --------------------------------------------------------------------- # 2 Product Generation if ard["H-A-Alpha"] and not pol_file: out_pol, error = create_polarimetric_layers( master_import.with_suffix(".dim"), out_dir, master_prefix, config_dict ) elif ard["H-A-Alpha"] and pol_file: # construct namespace for existing pol layer out_pol = str(out_dir / f"{master_prefix}_pol.dim") if ard["backscatter"] and not bs_file: out_bs, out_ls, error = create_backscatter_layers( master_import.with_suffix(".dim"), out_dir, master_prefix, config_dict ) elif ard["backscatter"] and bs_file: out_bs = str(out_dir / f"{master_prefix}_bs.dim") if ard["create_ls_mask"] and bs_file: out_ls = str(out_dir / f"{master_prefix}_LS.dim") if coherence and not coh_file: # get info on slave from GeoSeries slave_prefix = burst["slave_prefix"] slave_file = burst["slave_file"] slave_burst_nr = burst["slave_burst_nr"] with TemporaryDirectory(prefix=f"{str(temp_dir)}/") as temp: # convert temp to Path object temp = Path(temp) # import slave burst slave_import = temp / f"{slave_prefix}_import" import_log = out_dir / f"{slave_prefix}_import.err_log" try: slc.burst_import( slave_file, slave_import, import_log, swath, slave_burst_nr, config_dict, ) except (GPTRuntimeError, NotValidFileError) as error: if slave_import.with_suffix(".dim").exists(): h.delete_dimap(slave_import) logger.info(error) return burst.bid, burst.Date, None, None, None, None, error out_coh, error = create_coherence_layers( master_import.with_suffix(".dim"), slave_import.with_suffix(".dim"), out_dir, master_prefix, config_dict, ) # remove master import h.delete_dimap(master_import) elif coherence and coh_file: out_coh = str(out_dir / f"{master_prefix}_coh.dim") # remove master import h.delete_dimap(master_import) else: # remove master import h.delete_dimap(master_import) # in case everything has been already processed, # we re-construct the out names for proper return value else: if ard["H-A-Alpha"] and pol_file: out_pol = str(out_dir / f"{master_prefix}_pol.dim") if ard["backscatter"] and bs_file: out_bs = str(out_dir / f"{master_prefix}_bs.dim") if ard["create_ls_mask"] and bs_file: out_ls = str(out_dir / f"{master_prefix}_LS.dim") if coherence and coh_file: out_coh = str(out_dir / f"{master_prefix}_coh.dim") return burst.bid, burst.Date, out_bs, out_ls, out_pol, out_coh, None
if __name__ == "__main__": import argparse # write a description descript = """This is a command line client for the creation of Sentinel-1 ARD data from Level 1 SLC bursts. """ epilog = """ Example: to do """ # create a parser parser = argparse.ArgumentParser(description=descript, epilog=epilog) # search parameters parser.add_argument( "-b", "--burst", help=" (str) path to OST burst inventory file for" " one burst", required=True, ) parser.add_argument( "-c", "--config_file", help=" (str) path to OST project configuration file", required=True, ) args = parser.parse_args() # execute processing burst_to_ard(args.burst_inventory, args.config_file)