Source code for ost.s1.s1scene

#! /usr/bin/env python
# -*- coding: utf-8 -*-

"""Class for handling a single Sentinel-1 product

This class, initialized by a valid Sentinel-1 scene identifier,
extracts basic metadata information from the scene ID itself,
as well as more detailed  and allows for retrieving OST relevant
paths.

For GRD products it is possible to pre-process the respective scene
based on a ARD product type.
"""

# ------ bug of rasterio --------
import os

if "GDAL_DATA" in list(os.environ.keys()):
    del os.environ["GDAL_DATA"]
if "PROJ_LIB" in list(os.environ.keys()):
    del os.environ["PROJ_LIB"]
# ------ bug of rasterio --------

import sys
import json
import logging
import zipfile
import fnmatch
import xml.dom.minidom
import xml.etree.ElementTree as eTree
import urllib.request
import urllib.parse
from urllib.error import URLError
from pathlib import Path

import requests
import pandas as pd
import geopandas as gpd

from ost.helpers import scihub, peps, onda, asf, raster as ras, helpers as h
from ost.helpers.settings import APIHUB_BASEURL, OST_ROOT
from ost.helpers.settings import set_log_level, check_ard_parameters
from ost.s1.grd_to_ard import grd_to_ard, ard_to_rgb

logger = logging.getLogger(__name__)

CONNECTION_ERROR = "We failed to connect to the server. Reason: "
CONNECTION_ERROR_2 = "The server couldn't fulfill the request. Error code: "


[docs]class Sentinel1Scene: """ TBD """ scene_id = None "TBD" mission_id = None "TBD" mode_beam = None "TBD" product_type = None "TBD" resolution_class = None "TBD" proc_level = None "TBD" pol_mode = None "TBD" start_date = None "TBD" start_time = None "TBD" stop_date = None "TBD" stop_time = None "TBD" abs_orbit = None "TBD" data_take_id = None "TBD" unique_id = None "TBD" year = None "TBD" month = None "TBD" day = None "TBD" onda_class = None "TBD" orbit_offset = None "TBD" satellite = None "TBD" rel_orbit = None "TBD" acq_mode = None "TBD" p_type = None "TBD" product_dl_path = None "TBD" ard_dimap = None "TBD" ard_rgb = None "TBD" rgb_thumbnail = None "TBD" config_dict = None "TBD" config_file = None "TBD" def __init__(self, scene_id, ard_type="OST_GTC", log_level=logging.INFO): # set log level set_log_level(log_level) # get metadata from scene identifier self.scene_id = scene_id self.mission_id = scene_id[0:3] self.mode_beam = scene_id[4:6] self.product_type = scene_id[7:10] self.resolution_class = scene_id[10] self.proc_level = scene_id[12] self.pol_mode = scene_id[14:16] self.start_date = scene_id[17:25] self.start_time = scene_id[26:32] self.stop_date = scene_id[33:41] self.stop_time = scene_id[42:48] self.abs_orbit = scene_id[49:55] self.data_take_id = scene_id[57:62] self.unique_id = scene_id[63:] self.year = scene_id[17:21] self.month = scene_id[21:23] self.day = scene_id[23:25] self.onda_class = scene_id[4:14] # Calculate the relative orbit out of absolute orbit # (from Peter Meadows (ESA) @ # http://forum.step.esa.int/t/sentinel-1-relative-orbit-from-filename/7042) if self.mission_id == "S1A": self.orbit_offset = 73 self.satellite = "Sentinel-1A" elif self.mission_id == "S1B": self.orbit_offset = 27 self.satellite = "Sentinel-1B" self.rel_orbit = ((int(self.abs_orbit) - int(self.orbit_offset)) % 175) + 1 # get acquisition mode if self.mode_beam == "IW": self.acq_mode = "Interferometric Wide Swath" elif self.mode_beam.startswith("S"): self.acq_mode = "Stripmap" elif self.mode_beam == "EW": self.acq_mode = "Extra-Wide swath" elif self.mode_beam == "WV": self.acq_mode = "Wave" # get product type if self.product_type == "GRD": self.p_type = "Ground Range Detected (GRD)" elif self.product_type == "SLC": self.p_type = "Single-Look Complex (SLC)" elif self.product_type == "OCN": self.p_type = "Ocean" elif self.product_type == "RAW": self.p_type = "Raw Data (RAW)" # set initial product paths to None self.product_dl_path = None self.ard_dimap = None self.ard_rgb = None self.rgb_thumbnail = None # set initial ARD parameters to ard_type self.get_ard_parameters(ard_type) self.config_dict = dict(processing=self.ard_parameters) self.config_file = None
[docs] def info(self): # actual print function print(" -------------------------------------------------") print(" Scene Information:") print(" Scene Identifier: " + str(self.scene_id)) print(" Satellite: " + str(self.satellite)) print(" Acquisition Mode: " + str(self.acq_mode)) print(" Processing Level: " + str(self.proc_level)) print(" Product Type: " + str(self.p_type)) print(" Acquisition Date: " + str(self.start_date)) print(" Start Time: " + str(self.start_time)) print(" Stop Time: " + str(self.stop_time)) print(" Absolute Orbit: " + str(self.abs_orbit)) print(" Relative Orbit: " + str(self.rel_orbit)) print(" -------------------------------------------------")
[docs] def info_dict(self): # create info dictionary necessary for tests inf_dict = {} inf_dict.update( Scene_Identifier=str(self.scene_id), Satellite=str(self.satellite), Acquisition_Mode=str(self.acq_mode), Processing_Level=str(self.proc_level), Product_Type=str(self.p_type), Acquisition_Date=str(self.start_date), Start_Time=str(self.start_time), Stop_Time=str(self.stop_time), Absolute_Orbit=str(self.abs_orbit), Relative_Orbit=str(self.rel_orbit), ) return inf_dict
[docs] def download(self, download_dir, mirror=None): if not mirror: logger.info("One or more of your scenes need to be downloaded.") print(" Select the server from where you want to download:") print(" (1) Copernicus Apihub (ESA, rolling archive)") print(" (2) Alaska Satellite Facility (NASA, full archive)") print(" (3) PEPS (CNES, 1 year rolling archive)") print(" (4) ONDA DIAS (ONDA DIAS full archive for" " SLC - or GRD from 30 June 2019)") # print(' (5) Alaska Satellite Facility (using WGET' # ' - unstable - use only if 2 fails)') mirror = input(" Type 1, 2, 3, or 4: ") from ost.s1 import download if isinstance(download_dir, str): download_dir = Path(download_dir) if mirror == "1": uname, pword = scihub.ask_credentials() opener = scihub.connect(uname=uname, pword=pword) df = pd.DataFrame({"identifier": [self.scene_id], "uuid": [self.scihub_uuid(opener)]}) elif mirror == "2": uname, pword = asf.ask_credentials() df = pd.DataFrame({"identifier": [self.scene_id]}) elif mirror == "3": uname, pword = peps.ask_credentials() df = pd.DataFrame( { "identifier": [self.scene_id], "uuid": [self.peps_uuid(uname=uname, pword=pword)], } ) elif mirror == "4": uname, pword = onda.ask_credentials() opener = onda.connect(uname=uname, pword=pword) df = pd.DataFrame({"identifier": [self.scene_id], "uuid": [self.ondadias_uuid(opener)]}) else: raise ValueError("You entered the wrong mirror.") # else: # ASF # df = pd.DataFrame({'identifier': [self.scene_id]}) # download.download_sentinel1(df, download_dir, mirror) # return download.download_sentinel1(df, download_dir, mirror, uname=uname, pword=pword) # delete credentials del uname, pword
# location of file (including diases)
[docs] def download_path(self, download_dir, mkdir=False): if isinstance(download_dir, str): download_dir = Path(download_dir) download_path = ( Path(download_dir) / "SAR" / f"{self.product_type}" / f"{self.year}" / f"{self.month}" / f"{self.day}" ) # make dir if not existent if mkdir: download_path.mkdir(parents=True, exist_ok=True) # get file_path file_path = download_path / f"{self.scene_id}.zip" self.product_dl_path = file_path return file_path
def _creodias_path(self, data_mount): if isinstance(data_mount, str): data_mount = Path(data_mount) path = ( Path(data_mount) / "Sentinel-1" / "SAR" / f"{self.product_type}" / f"{self.year}" / f"{self.month}" / f"{self.day}" / f"{self.scene_id}.SAFE" ) return path def _onda_path(self, data_mount): path = ( Path(data_mount) / "S1" / "LEVEL-1" / f"{self.onda_class}" / f"{self.year}" / f"{self.month}" / f"{self.day}" / f"{self.scene_id}.zip" / f"{self.scene_id}.SAFE" ) return path
[docs] def get_path(self, download_dir=None, data_mount=None): path = None if download_dir: # convert download_dir to Path object if isinstance(download_dir, str): download_dir = Path(download_dir) # construct download path self.download_path(download_dir=download_dir, mkdir=False) # check if scene is succesfully downloaded if self.product_dl_path.with_suffix(".downloaded").exists(): path = self.product_dl_path if data_mount and not path: # covert data_mount to Path object if isinstance(data_mount, str): data_mount = Path(data_mount) # check creodias folder structure if (self._creodias_path(data_mount) / "manifest.safe").exists(): path = self._creodias_path(data_mount) # check for ondadias folder structure elif self._onda_path(data_mount).exists(): path = self._onda_path(data_mount) return path
# scihub related
[docs] def scihub_uuid(self, opener): # construct the basic the url base_url = "https://apihub.copernicus.eu/apihub/odata/v1/Products?$filter=" # request action = urllib.parse.quote(f"Name eq '{self.scene_id}'") # construct the download url url = base_url + action try: # get the request req = opener.open(url) except URLError as error: if hasattr(error, "reason"): logger.info(f"{CONNECTION_ERROR}{error.reason}") sys.exit() elif hasattr(error, "code"): logger.info(f"{CONNECTION_ERROR_2}{error.reason}") sys.exit() else: # write the request to to the response variable # (i.e. the xml coming back from scihub) response = req.read().decode("utf-8") # return uuid from response return response.split("Products('")[1].split("')")[0]
[docs] def scihub_url(self, opener): # return the full url return f"{APIHUB_BASEURL}('{self.scihub_uuid(opener)}')/$value"
[docs] def scihub_md5(self, opener): # return the md5 checksum return f"{APIHUB_BASEURL}('{self.scihub_uuid(opener)}')" f"/Checksum/Value/$value"
[docs] def scihub_online_status(self, opener): # get url for product url = f"{APIHUB_BASEURL}('{self.scihub_uuid(opener)}')/Online/$value" # check if something is coming back from our request try: # get the request req = opener.open(url) except URLError as error: if hasattr(error, "reason"): logger.info(f"{CONNECTION_ERROR}{error.reason}") sys.exit() elif hasattr(error, "code"): logger.info(f"{CONNECTION_ERROR_2}{error.reason}") sys.exit() else: # write the request to to the response variable # (i.e. the xml coming back from scihub) response = req.read().decode("utf-8") if response == "true": response = True elif response == "false": response = False else: raise TypeError("Wrong response type.") return response
[docs] def scihub_trigger_production(self, opener): # get uuid and construct url for scihub's apihub uuid = self.scihub_uuid(opener) url = f"{APIHUB_BASEURL}('{uuid}')/$value" try: # get the request req = opener.open(url) except URLError as error: if hasattr(error, "reason"): logger.info(f"{CONNECTION_ERROR}{error.reason}") sys.exit() elif hasattr(error, "code"): logger.info(f"{CONNECTION_ERROR_2}{error.reason}") sys.exit() else: # write the request to to the response variable # (i.e. the xml coming back from scihub) code = req.getcode() if code == 202: logging.info(f"Production of {self.scene_id} successfully requested.") return code
def _scihub_annotation_url(self, opener): """Retrieve the urls for the product annotation files :param opener: :type opener: :return: the urls for the product annotation files :rtype: list """ # get uuid for product uuid = self.scihub_uuid(opener) logger.info(f"Retrieving URLs of annotation files for S1 product: " f"{self.scene_id}.") scihub_url = "https://apihub.copernicus.eu/apihub/odata/v1/Products" anno_path = f"('{uuid}')/Nodes('{self.scene_id}.SAFE')" f"/Nodes('annotation')/Nodes" # construct anno url path url = scihub_url + anno_path # try to retrieve try: # get the request req = opener.open(url) except URLError as error: if hasattr(error, "reason"): logger.info(f"{CONNECTION_ERROR}{error.reason}") sys.exit() elif hasattr(error, "code"): logger.info(f"{CONNECTION_ERROR_2}{error.reason}") sys.exit() else: # read out the response response = req.read().decode("utf-8") # parse the response dom = xml.dom.minidom.parseString(response) # loop through each entry (with all metadata) url_list = [] for node in dom.getElementsByTagName("entry"): download_url = node.getElementsByTagName("id")[0].firstChild.nodeValue # if we find an xml we append to the list of relevant files if download_url[-6:-2] == ".xml": url_list.append(f"{download_url}/$value") return url_list
[docs] def scihub_annotation_get(self, uname=None, pword=None): """ :param uname: :param pword: :return: """ from ost.s1.burst_inventory import burst_extract # create emtpy geodataframe to be filled with burst infos column_names = [ "SceneID", "Track", "Date", "SwathID", "AnxTime", "BurstNr", "geometry", ] gdf_final = gpd.GeoDataFrame(columns=column_names) # get connected to scihub opener = scihub.connect(uname, pword) anno_list = self._scihub_annotation_url(opener) for url in anno_list: try: # get the request req = opener.open(url) except URLError as error: if hasattr(error, "reason"): logger.info(f"{CONNECTION_ERROR}{error.reason}") sys.exit() elif hasattr(error, "code"): logger.info(f"{CONNECTION_ERROR_2}{error.reason}") sys.exit() else: # write the request to to the response variable # (i.e. the xml coming back from scihub) response = req.read().decode("utf-8") et_root = eTree.fromstring(response) # parse the xml page from the response gdf = burst_extract(self.scene_id, self.rel_orbit, self.start_date, eTree.parse(et_root)) gdf_final = pd.concat([gdf_final, gdf]) return gdf_final.drop_duplicates(["AnxTime"], keep="first")
[docs] def zip_annotation_get(self, download_dir, data_mount=None): """ :param download_dir: :param data_mount: :return: """ from ost.s1.burst_inventory import burst_extract column_names = [ "SceneID", "Track", "Date", "SwathID", "AnxTime", "BurstNr", "geometry", ] # crs for empty dataframe crs = "epsg:4326" gdf_final = gpd.GeoDataFrame(columns=column_names, crs=crs) file = self.get_path(download_dir, data_mount) # extract info from archive archive = zipfile.ZipFile(file, "r") namelist = archive.namelist() anno_files = fnmatch.filter(namelist, "*/annotation/s*.xml") # loop through xml annotation files for anno_file in anno_files: anno_string = archive.open(anno_file) gdf = burst_extract(self.scene_id, self.rel_orbit, self.start_date, eTree.parse(anno_string)) gdf_final = pd.concat([gdf_final, gdf]) return gdf_final.drop_duplicates(["AnxTime"], keep="first")
[docs] def safe_annotation_get(self, download_dir, data_mount=None): from ost.s1.burst_inventory import burst_extract column_names = [ "SceneID", "Track", "Date", "SwathID", "AnxTime", "BurstNr", "geometry", ] gdf_final = gpd.GeoDataFrame(columns=column_names) file_path = self.get_path(download_dir=download_dir, data_mount=data_mount) for anno_file in list(file_path.glob("annotation/*xml")): # parse the xml page from the response gdf = burst_extract(self.scene_id, self.rel_orbit, self.start_date, eTree.parse(anno_file)) gdf_final = pd.concat([gdf_final, gdf]) return gdf_final.drop_duplicates(["AnxTime"], keep="first")
# onda dias uuid extractor
[docs] def ondadias_uuid(self, opener): # construct the basic the url base_url = "https://catalogue.onda-dias.eu/dias-catalogue/Products?$search=" # construct the download url action = '"' + self.scene_id + '.zip"' url = base_url + action try: # get the request req = opener.open(url) except URLError as error: if hasattr(error, "reason"): logger.info(f"{CONNECTION_ERROR}{error.reason}") sys.exit() elif hasattr(error, "code"): logger.info(f"{CONNECTION_ERROR_2}{error.reason}") sys.exit() else: # write the request to to the response variable # (i.e. the xml coming back from onda dias) response = req.read().decode("utf-8") # parse the uuid from the response (a messy pseudo xml) uuid = response.split('":"')[3].split('","')[0] return uuid
# other data providers
[docs] def asf_url(self): """Constructor for ASF download URL :return: string of the ASF download url :rtype: str """ # base url of ASF asf_url = "https://datapool.asf.alaska.edu" # get ASF style mission id if self.mission_id == "S1A": mission = "SA" elif self.mission_id == "S1B": mission = "SB" else: raise ValueError("Wrong mission id.") # get relevant product type in ASF style if self.product_type == "SLC": product_type = self.product_type elif self.product_type == "GRD": product_type = f"GRD_{self.resolution_class}{self.pol_mode[0]}" else: raise ValueError("Wrong product type.") return f"{asf_url}/{product_type}/{mission}/{self.scene_id}.zip"
[docs] def peps_uuid(self, uname, pword): """Retrieval of the PEPS UUID from the Peps server :param uname: username for CNES' PEPS server :param pword: password for CNES' PEPS server :return: """ # construct product url url = f"https://peps.cnes.fr/resto/api/collections/S1/search.json?q=" f"{self.scene_id}" # get response response = requests.get(url, stream=True, auth=(uname, pword)) # check response if response.status_code == 401: raise ValueError(" ERROR: Username/Password are incorrect.") elif response.status_code != 200: response.raise_for_status() data = json.loads(response.text) peps_uuid = data["features"][0]["id"] download_url = data["features"][0]["properties"]["services"]["download"]["url"] return peps_uuid, download_url
[docs] def peps_online_status(self, uname, pword): """Check if product is online at CNES' Peps server. :param uname: CNES' Peps username :param pword: CNES' Peps password :return: """ _, url = self.peps_uuid(uname, pword) # define url response = requests.get(url, stream=True, auth=(uname, pword)) status = response.status_code # check response if status == 401: raise ValueError(" ERROR: Username/Password are incorrect.") elif status == 404: raise ValueError(" ERROR: File not found.") elif status == 200: status = "online" elif status == 202: status = "onTape" else: response.raise_for_status() return status, url
[docs] def get_ard_parameters(self, ard_type): # find respective template for selected ARD type template_file = ( OST_ROOT / "graphs" / "ard_json" / f"{self.product_type.lower()}.{ard_type.replace('-', '_').lower()}.json" ) # open and load parameters with open(template_file, "r") as ard_file: self.ard_parameters = json.load(ard_file)["processing"]
[docs] def update_ard_parameters(self, ard_type=None): """ :param ard_type: :return: """ # if a ard type is selected, load if ard_type: self.get_ard_parameters(ard_type) # check for correctness of ard parameters check_ard_parameters(self.ard_parameters) # re-create project dict with update ard parameters self.config_dict.update(processing=self.ard_parameters) # dump to json file if self.config_file: with open(self.config_file, "w") as outfile: json.dump(self.config_dict, outfile, indent=4)
[docs] def set_external_dem(self, dem_file, ellipsoid_correction=True): """ :param dem_file: :param ellipsoid_correction: :return: """ import rasterio # check if file exists if not Path(dem_file).exists(): raise FileNotFoundError(f"No file found at {dem_file}.") # get no data value with rasterio.open(dem_file) as file: dem_nodata = file.nodata # get resampling img_res = self.ard_parameters["single_ARD"]["dem"]["image_resampling"] dem_res = self.ard_parameters["single_ARD"]["dem"]["dem_resampling"] # update ard parameters dem_dict = dict( { "dem_name": "External DEM", "dem_file": dem_file, "dem_nodata": dem_nodata, "dem_resampling": dem_res, "image_resampling": img_res, "egm_correction": ellipsoid_correction, "out_projection": "WGS84(DD)", } ) # update ard_parameters self.ard_parameters["single_ARD"]["dem"] = dem_dict
[docs] def create_ard(self, infile, out_dir, subset=None, overwrite=False): """ :param infile: :param out_dir: :param subset: :param overwrite: :return: """ if self.product_type != "GRD": raise ValueError( "The create_ard method for single products is currently " "only available for GRD products" ) if isinstance(infile, str): infile = Path(infile) if isinstance(out_dir, str): out_dir = Path(out_dir) # set config param necessary for processing self.config_dict["processing_dir"] = str(out_dir) self.config_dict["temp_dir"] = str(out_dir / "temp") self.config_dict["snap_cpu_parallelism"] = os.cpu_count() self.config_dict["subset"] = False if subset: self.config_dict["subset"] = True self.config_dict["aoi"] = subset # create directories out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "temp").mkdir(parents=True, exist_ok=True) if overwrite: file_dir = out_dir / f"{self.rel_orbit}" / f"{self.start_date}" if (file_dir / ".processed").exists(): (file_dir / ".processed").unlink() # -------------------------------------------- # 2 Check if within SRTM coverage # set ellipsoid correction and force GTC production # when outside SRTM center_lat = self._get_center_lat(infile) if float(center_lat) > 59 or float(center_lat) < -59: if "SRTM" in self.ard_parameters["single_ARD"]["dem"]["dem_name"]: logger.info( "Scene is outside SRTM coverage. Snap will therefore use " "the Copernicus 30m Global DEM. " ) self.ard_parameters["single_ARD"]["dem"]["dem_name"] = "Copernicus 30m Global DEM" if self.ard_parameters["single_ARD"]["dem"]["out_projection"] == 4326: logger.info( "The scene's location is towards the poles. " "Consider to use a stereographic projection." ) epsg = input( "Type an alternative EPSG code for the projection of the " "output data or just press enter for keeping Lat/Lon " "coordinate system (e.g. 3413 for NSIDC Sea Ice Polar " "Stereographic North projection, or 3976 for " "NSIDC Sea Ice Polar Stereographic South projection" ) if not epsg: epsg = 4326 self.ard_parameters["single_ARD"]["dem"]["out_projection"] = int(epsg) # -------------------------------------------- # 3 Check ard parameters in case they have been updated, # and write them to json file # set config file to output directory self.config_file = out_dir / "processing.json" # write ard parameters, and check if they are correct self.update_ard_parameters() # -------------------------------------------- # 4 set resolution to degree # self.ard_parameters['resolution'] = h.resolution_in_degree( # self.center_lat, self.ard_parameters['resolution']) # -------------------------------------------- # 5 run the burst to ard batch routine filelist, out_bs, out_ls, error = grd_to_ard( [infile], self.config_file, ) # print error if any if error: logger.info(error) else: # remove temp folder tmp_folder = out_dir / "temp" h.remove_folder_content(tmp_folder) tmp_folder.rmdir() self.ard_dimap = out_bs
[docs] def create_rgb(self, outfile, driver="GTiff"): # invert ot db from create_ard workflow for rgb creation # (otherwise we do it double) if self.ard_parameters["single_ARD"]["to_db"]: to_db = False else: to_db = True ard_to_rgb(self.ard_dimap, outfile, driver, to_db) self.ard_rgb = outfile
[docs] def create_rgb_thumbnail(self, outfile, shrink_factor=25): # invert ot db from create_ard workflow for rgb creation # (otherwise we do it double) if self.ard_parameters["single_ARD"]["to_db"]: to_db = False else: to_db = True self.rgb_thumbnail = outfile driver = "JPEG" ard_to_rgb(self.ard_dimap, self.rgb_thumbnail, driver, to_db, shrink_factor)
[docs] def visualise_rgb(self, shrink_factor=25): ras.visualise_rgb(self.ard_rgb, shrink_factor)
# other functions def _get_center_lat(self, scene_path=None): if scene_path.suffix == ".zip": zip_archive = zipfile.ZipFile(str(scene_path)) manifest = zip_archive.read(f"{self.scene_id}.SAFE/manifest.safe") elif scene_path.suffix == ".SAFE": with (scene_path / "manifest.safe").open("rb") as file: manifest = file.read() else: raise ValueError("Invalid file.") root = eTree.fromstring(manifest) coordinates = None for child in root: metadata = child.findall("metadataObject") for meta in metadata: for wrap in meta.findall("metadataWrap"): for data in wrap.findall("xmlData"): for frame_set in data.findall("{http://www.esa.int/safe/sentinel-1.0}" "frameSet"): for frame in frame_set.findall("{http://www.esa.int/safe/sentinel-1.0}" "frame"): for footprint in frame.findall( "{http://www.esa.int/" "safe/sentinel-1.0}footPrint" ): for coords in footprint.findall( "{http://www.opengis.net/gml}" "coordinates" ): coordinates = coords.text.split(" ") if coordinates: sums, i = 0, 0 for i, coords in enumerate(coordinates): sums = sums + float(coords.split(",")[0]) return sums / (i + 1) else: raise RuntimeError("Could not find any coordinates within the metadata file")