Source code for ost.generic.common_wrappers

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
from retrying import retry

from ost.helpers import helpers as h
from ost.helpers.settings import GPT_FILE, OST_ROOT
from ost.helpers.errors import GPTRuntimeError, NotValidFileError


logger = logging.getLogger(__name__)


[docs]@retry(stop_max_attempt_number=3, wait_fixed=1) def speckle_filter(infile, outfile, logfile, config_dict): """Wrapper function around SNAP's Speckle Filter function This function takes OST imported Sentinel-1 product and applies the Speckle Filter as defind within the config dictionary. :param infile: :param outfile: :param logfile: :param config_dict: :return: """ # get relevant config parameters cpus = config_dict["snap_cpu_parallelism"] speckle_dict = config_dict["processing"]["single_ARD"]["speckle_filter"] logger.debug("Applying speckle filtering.") # construct command string command = ( f"{GPT_FILE} Speckle-Filter -x -q {2*cpus} " f"-PestimateENL='{speckle_dict['estimate_ENL']}' " f"-PanSize='{speckle_dict['pan_size']}' " f"-PdampingFactor='{speckle_dict['damping']}' " f"-Penl='{speckle_dict['ENL']}' " f"-Pfilter='{speckle_dict['filter']}' " f"-PfilterSizeX='{speckle_dict['filter_x_size']}' " f"-PfilterSizeY='{speckle_dict['filter_y_size']}' " f"-PnumLooksStr='{speckle_dict['num_of_looks']}' " f"-PsigmaStr='{speckle_dict['sigma']}' " f"-PtargetWindowSizeStr=\"{speckle_dict['target_window_size']}\" " f"-PwindowSize=\"{speckle_dict['window_size']}\" " f"-t '{str(outfile)}' '{str(infile)}' " ) # run command and get return code return_code = h.run_command(command, logfile) # handle errors and logs if return_code == 0: logger.debug("Successfully applied speckle filtering.") else: raise GPTRuntimeError( f"Speckle filtering exited with error {return_code}. " f"See {logfile} for Snap's error message." ) # do check routine return_code = h.check_out_dimap(outfile) if return_code == 0: return str(outfile.with_suffix(".dim")) else: raise NotValidFileError(f"Product did not pass file check: {return_code}")
[docs]@retry(stop_max_attempt_number=3, wait_fixed=1) def linear_to_db(infile, outfile, logfile, config_dict): """Wrapper function around SNAP's linear to db routine This function takes an OST calibrated Sentinel-1 product and converts it to dB. :param infile: :param outfile: :param logfile: :param config_dict: :return: """ # get relevant config parameters cpus = config_dict["snap_cpu_parallelism"] logger.debug("Converting calibrated power image to dB scale.") # construct command string command = f"{GPT_FILE} LinearToFromdB -x -q {2*cpus} " f"-t '{str(outfile)}' {str(infile)}" # run command and get return code return_code = h.run_command(command, logfile) # handle errors and logs if return_code == 0: logger.debug("Succesfully converted product to dB-scale.") else: raise GPTRuntimeError( f"dB Scaling exited with error {return_code}. " f"See {logfile} for Snap's error message." ) # do check routine return_code = h.check_out_dimap(outfile) if return_code == 0: return str(outfile.with_suffix(".dim")) else: raise NotValidFileError(f"Product did not pass file check: {return_code}")
[docs]@retry(stop_max_attempt_number=3, wait_fixed=1) def terrain_flattening(infile, outfile, logfile, config_dict): """Wrapper function to Snap's Terrain Flattening routine :param infile: :param outfile: :param logfile: :param config_dict: :return: """ # get relevant config parameters cpus = config_dict["snap_cpu_parallelism"] dem_dict = config_dict["processing"]["single_ARD"]["dem"] logger.debug("Applying terrain flattening to calibrated product.") command = ( f"{GPT_FILE} Terrain-Flattening -x -q {2*cpus} " f"-PdemName='{dem_dict['dem_name']}' " f"-PdemResamplingMethod='{dem_dict['dem_resampling']}' " f"-PexternalDEMFile='{dem_dict['dem_file']}' " f"-PexternalDEMNoDataValue={dem_dict['dem_nodata']} " f"-t '{str(outfile)}' '{str(infile)}'" ) # run command and get return code return_code = h.run_command(command, logfile) # handle errors and logs if return_code == 0: logger.debug("Succesfully terrain flattened product") else: raise GPTRuntimeError( f"Terrain Flattening exited with error {return_code}. " f"See {logfile} for Snap's error message." ) # do check routine return_code = h.check_out_dimap(outfile) if return_code == 0: return str(outfile.with_suffix(".dim")) else: raise NotValidFileError(f"Product did not pass file check: {return_code}")
[docs]@retry(stop_max_attempt_number=3, wait_fixed=1) def terrain_correction(infile, outfile, logfile, config_dict): """Wrapper function around Snap's terrain or ellipsoid correction Based on the configuration parameters either the Range-Doppler terrain correction or an Ellisoid correction is applied for geocoding a calibrated Sentinel-1 product. :param infile: :param outfile: :param logfile: :param config_dict: :return: """ # get relevant config parameters ard = config_dict["processing"]["single_ARD"] dem_dict = ard["dem"] cpus = config_dict["snap_cpu_parallelism"] # auto projections of snap if 42001 <= dem_dict["out_projection"] <= 97002: projection = f"AUTO:{dem_dict['out_projection']}" # epsg codes elif int(dem_dict["out_projection"]) == 4326: projection = "WGS84(DD)" else: projection = f"EPSG:{dem_dict['out_projection']}" # logger.debug("Geocoding product.") if ard["geocoding"] == "terrain": command = ( f"{GPT_FILE} Terrain-Correction -x -q {2*cpus} " f"-PdemName='{dem_dict['dem_name']}' " f"-PdemResamplingMethod='{dem_dict['dem_resampling']}' " f"-PexternalDEMFile='{dem_dict['dem_file']}' " f"-PexternalDEMNoDataValue={dem_dict['dem_nodata']} " f"-PexternalDEMApplyEGM=" f"'{str(dem_dict['egm_correction']).lower()}' " f"-PimgResamplingMethod='{dem_dict['image_resampling']}' " f"-PpixelSpacingInMeter={ard['resolution']} " f"-PalignToStandardGrid=true " f"-PmapProjection='{projection}' " f"-t '{str(outfile)}' '{str(infile)}' " ) elif ard["geocoding"] == "ellipsoid": command = ( f"{GPT_FILE} Ellipsoid-Correction-RD -x -q {2*cpus} " f"-PdemName='{dem_dict['dem_name']}' " f"-PdemResamplingMethod='{dem_dict['dem_resampling']}' " f"-PexternalDEMFile='{dem_dict['dem_file']}' " f"-PexternalDEMNoDataValue={dem_dict['dem_nodata']} " f"-PexternalDEMApplyEGM=" f"'{str(dem_dict['egm_correction']).lower()}' " f"-PimgResamplingMethod='{dem_dict['image_resampling']}' " f"-PpixelSpacingInMeter={ard['resolution']} " f"-PalignToStandardGrid=true " f"-PmapProjection='{projection}' " f"-t '{str(outfile)}' '{str(infile)}' " ) else: raise ValueError("Geocoding method should be either 'terrain' or 'ellipsoid'.") # run command and get return code return_code = h.run_command(command, logfile) # handle errors and logs if return_code == 0: logger.debug("Succesfully geocoded product") else: raise GPTRuntimeError( f"Geocoding exited with error {return_code}. " f"See {logfile} for Snap's error message." ) # do check routine return_code = h.check_out_dimap(outfile) if return_code == 0: return str(outfile.with_suffix(".dim")) else: raise NotValidFileError(f"Product did not pass file check: {return_code}")
[docs]@retry(stop_max_attempt_number=3, wait_fixed=1) def ls_mask(infile, outfile, logfile, config_dict): """Wrapper function of a Snap graph for Layover/Shadow mask creation :param infile: :param outfile: :param logfile: :param config_dict: :return: """ # get relevant config parameters ard = config_dict["processing"]["single_ARD"] dem_dict = ard["dem"] cpus = config_dict["snap_cpu_parallelism"] # auto projections of snap if 42001 <= dem_dict["out_projection"] <= 97002: projection = f"AUTO:{dem_dict['out_projection']}" # epsg codes elif int(dem_dict["out_projection"]) == 4326: projection = "WGS84(DD)" else: projection = f"EPSG:{dem_dict['out_projection']}" logger.debug("Creating the Layover/Shadow mask") # get path to workflow xml graph = OST_ROOT / "graphs/S1_GRD2ARD/3_LSmap.xml" command = ( f"{GPT_FILE} {graph} -x -q {2 * cpus} " f"-Pinput='{str(infile)}' " f'-Presol={ard["resolution"]} ' f'-Pdem=\'{dem_dict["dem_name"]}\' ' f'-Pdem_file=\'{dem_dict["dem_file"]}\' ' f'-Pdem_nodata=\'{dem_dict["dem_nodata"]}\' ' f'-Pdem_resampling=\'{dem_dict["dem_resampling"]}\' ' f'-Pimage_resampling=\'{dem_dict["image_resampling"]}\' ' f'-Pegm_correction=\'{str(dem_dict["egm_correction"]).lower()}\' ' f"-Pprojection='{projection}' " f"-Poutput='{str(outfile)}'" ) # run command and get return code return_code = h.run_command(command, logfile) # handle errors and logs if return_code == 0: logger.debug("Successfully created a Layover/Shadow mask") else: raise GPTRuntimeError( f"Layover/Shadow mask creation exited with error {return_code}. " f"See {logfile} for Snap's error message." ) # do check routine return_code = h.check_out_dimap(outfile, test_stats=False) if return_code == 0: return str(outfile.with_suffix(".dim")) else: raise NotValidFileError(f"Product did not pass file check: {return_code}")
[docs]@retry(stop_max_attempt_number=3, wait_fixed=1) def create_stack( file_list, out_stack, logfile, config_dict, polarisation=None, pattern=None, ): """ :param file_list: :param out_stack: :param logfile: :param config_dict: :param polarisation: :param pattern: :return: """ # get relevant config parameters cpus = config_dict["snap_cpu_parallelism"] logger.debug("Creating multi-temporal stack.") if pattern: graph = OST_ROOT / "graphs/S1_TS/1_BS_Stacking_HAalpha.xml" command = ( f"{GPT_FILE} {graph} -x -q {2*cpus} " f"-Pfilelist={file_list} " f"-PbandPattern='{pattern}.*' " f"-Poutput={out_stack}" ) else: graph = OST_ROOT / "graphs/S1_TS/1_BS_Stacking.xml" command = ( f"{GPT_FILE} {graph} -x -q {2*cpus} " f"-Pfilelist={file_list} " f"-Ppol={polarisation} " f"-Poutput={out_stack}" ) return_code = h.run_command(command, logfile) if return_code == 0: logger.debug("Successfully created multi-temporal stack") else: raise GPTRuntimeError( f"Multi-temporal stack creation exited with error {return_code}. " f"See {logfile} for Snap's error message." ) # do check routine return_msg = h.check_out_dimap(out_stack) if return_msg == 0: logger.debug("Product passed validity check.") else: raise NotValidFileError(f"Product did not pass file check: {return_msg}")
[docs]@retry(stop_max_attempt_number=3, wait_fixed=1) def mt_speckle_filter(in_stack, out_stack, logfile, config_dict): """ :param in_stack: :param out_stack: :param logfile: :param config_dict: :return: """ # get relevant config parameters cpus = config_dict["snap_cpu_parallelism"] speckle_dict = config_dict["processing"]["time-series_ARD"]["mt_speckle_filter"] # debug message logger.debug("Applying multi-temporal speckle filtering.") # construct command string command = ( f"{GPT_FILE} Multi-Temporal-Speckle-Filter -x -q {2*cpus} " f"-PestimateENL='{speckle_dict['estimate_ENL']}' " f"-PanSize='{speckle_dict['pan_size']}' " f"-PdampingFactor='{speckle_dict['damping']}' " f"-Penl='{speckle_dict['ENL']}' " f"-Pfilter='{speckle_dict['filter']}' " f"-PfilterSizeX='{speckle_dict['filter_x_size']}' " f"-PfilterSizeY='{speckle_dict['filter_y_size']}' " f"-PnumLooksStr='{speckle_dict['num_of_looks']}' " f"-PsigmaStr='{speckle_dict['sigma']}' " f"-PtargetWindowSizeStr=\"{speckle_dict['target_window_size']}\" " f"-PwindowSize=\"{speckle_dict['window_size']}\" " f"-t '{out_stack}' '{in_stack}' " ) return_code = h.run_command(command, logfile) if return_code == 0: logger.debug("Successfully applied multi-temporal speckle filtering") else: raise GPTRuntimeError( f"Multi-temporal Spackle Filter exited with error {return_code}. " f"See {logfile} for Snap's error message." ) # do check routine return_code = h.check_out_dimap(out_stack) if return_code == 0: return str(out_stack.with_suffix(".dim")) else: raise NotValidFileError(f"Product did not pass file check: {return_code}")