Source code for ost.helpers.vector

import os
import json
from pathlib import Path

import pyproj
from pyproj.crs import ProjectedCRS

# temporary, we need to set minimum version of pyproj
try:
    from pyproj.crs.coordinate_operation import AzimuthalEquidistantConversion
except ImportError:
    from pyproj.crs.coordinate_operation import (
        AzumuthalEquidistantConversion as AzimuthalEquidistantConversion,
    )

import geopandas as gpd
import logging
from osgeo import osr
from osgeo import ogr

from shapely.ops import transform
from shapely.wkt import loads
from shapely.geometry import Point, Polygon, mapping, shape
from shapely.errors import WKTReadingError
from fiona import collection
from fiona.crs import from_epsg

logger = logging.getLogger(__name__)


[docs]def aoi_to_wkt(aoi): """Helper function to transform various AOI formats into WKT This function is used to import an AOI definition into an OST project. The AOIs definition can be from difffrent sources, i.e. an ISO3 country code (that calls GeoPandas low-resolution country boundaries), a WKT string, :param aoi: AOI , which can be an ISO3 country code, a WKT String or a path to a shapefile, a GeoPackage or a GeoJSON file :type aoi: str/Path :return: AOI as WKT string :rtype: WKT string """ # see if aoi alread in wkt try: # let's check if it is a shapely readable WKT loads(str(aoi)) aoi_wkt = aoi except WKTReadingError: # see if aoi is an ISO3 country code try: # let's check if it is a shapely readable WKT world = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres")) aoi_wkt = world["geometry"][world["iso_a3"] == aoi].values[0].wkt except IndexError: # see if it is a Geovector file if Path(aoi).exists(): try: gdf = gpd.GeoDataFrame.from_file(aoi) if gdf.crs != "epsg:4326": try: gdf = gdf.geometry.to_crs("epsg:4326") except Exception: raise ValueError("No valid OST AOI definition.") # return AOI as single vector object aoi_wkt = str(gdf.geometry.unary_union) except Exception: # give up raise ValueError("No valid OST AOI definition.") else: # give up raise ValueError("No valid OST AOI definition.") return aoi_wkt
[docs]def get_epsg(prjfile): """ :param prjfile: :return: """ prj_file = open(prjfile, "r") prj_txt = prj_file.read() srs = osr.SpatialReference() srs.ImportFromESRI([prj_txt]) srs.AutoIdentifyEPSG() # return EPSG code return srs.GetAuthorityCode(None)
[docs]def get_proj4(prjfile): """Get the proj4 string from a projection file of a shapefile :param prjfile: :return: """ prj_file = open(prjfile, "r") prj_string = prj_file.read() # Lambert error if '"Lambert_Conformal_Conic"' in prj_string: print( " ERROR: It seems you used an ESRI generated shapefile" " with Lambert Conformal Conic projection. " ) print(" This one is not compatible with Open Standard OGR/GDAL" " tools used here. ") print(" Reproject your shapefile to a standard Lat/Long projection" " and try again") exit(1) srs = osr.SpatialReference() srs.ImportFromESRI([prj_string]) return srs.ExportToProj4()
[docs]def epsg_to_wkt_projection(epsg_code): """ :param epsg_code: :return: """ spatial_ref = osr.SpatialReference() spatial_ref.ImportFromEPSG(epsg_code) return spatial_ref.ExpotToWkt()
[docs]def reproject_geometry(geom, inproj4, out_epsg): """Reproject a wkt geometry based on EPSG code :param geom: an ogr geom object :param inproj4: a proj4 string :param out_epsg: the EPSG code to which the geometry should transformed :return: the transformed geometry (ogr-geometry object) """ geom = ogr.CreateGeometryFromWkt(geom) # input SpatialReference spatial_ref_in = osr.SpatialReference() spatial_ref_in.ImportFromProj4(inproj4) # output SpatialReference spatial_ref_out = osr.SpatialReference() spatial_ref_out.ImportFromEPSG(int(out_epsg)) # create the CoordinateTransformation coord_transform = osr.CoordinateTransformation(spatial_ref_in, spatial_ref_out) try: geom.Transform(coord_transform) except Exception: raise RuntimeError(" ERROR: Not able to transform the geometry") return geom
[docs]def geodesic_point_buffer(lon, lat, meters, envelope=False): """ :param lat: :param lon: :param meters: :param envelope: :return: """ proj_crs = ProjectedCRS(conversion=AzimuthalEquidistantConversion(float(lat), float(lon))) proj_wgs84 = pyproj.Proj("EPSG:4326") Trans = pyproj.Transformer.from_proj(proj_crs, proj_wgs84, always_xy=True).transform buf = Point(0, 0).buffer(meters) # distance in metres if envelope is True: geom = Polygon(transform(Trans, buf).exterior.coords[:]).envelope else: geom = Polygon(transform(Trans, buf).exterior.coords[:]) return geom.wkt
def latlon_to_wkt(lat, lon, buffer_degree=None, buffer_meter=None, envelope=False): """A helper function to create a WKT representation of Lat/Lon pair This function takes lat and lon values and returns the WKT Point representation by default. A buffer can be set in meters, which returns a WKT POLYGON. If envelope is set to True, the circular buffer will be squared by the extent buffer radius. :param lat: :type lat: str :param lon: :type lon: str :param buffer_degree: :type buffer_degree: float, optional :param buffer_meter: :type buffer_meter: float, optional :param envelope: :type envelope: bool, optional :return: WKT string :rtype: str """ if buffer_degree is None and buffer_meter is None: aoi_wkt = f"POINT ({lon} {lat})" elif buffer_degree: aoi_geom = loads(f"POINT ({lon} {lat})").buffer(buffer_degree) if envelope: aoi_geom = aoi_geom.envelope aoi_wkt = aoi_geom.wkt elif buffer_meter: aoi_wkt = geodesic_point_buffer(lon, lat, buffer_meter, envelope) return aoi_wkt
[docs]def wkt_manipulations(wkt, buffer=None, convex=False, envelope=False): """ :param wkt: :param buffer: :param convex: :param envelope: :return: """ geom = ogr.CreateGeometryFromWkt(wkt) if buffer: geom = geom.Buffer(buffer) if convex: geom = geom.ConvexHull() if envelope: geom = geom.GetEnvelope() geom = ogr.CreateGeometryFromWkt( f"POLYGON ((" f"{geom[1]} {geom[3]}, " f"{geom[0]} {geom[3]}, " f"{geom[0]} {geom[2]}, " f"{geom[1]} {geom[2]}, " f"{geom[1]} {geom[3]}, " f"{geom[1]} {geom[3]}" f"))" ) return geom.ExportToWkt()
[docs]def shp_to_wkt(shapefile, buffer=None, convex=False, envelope=False): """A helper function to translate a shapefile into WKT :param shapefile: :param buffer: :param convex: :param envelope: :return: """ # get filepaths and proj4 string shpfile = os.path.abspath(shapefile) prjfile = shpfile[:-4] + ".prj" proj4 = get_proj4(prjfile) lyr_name = os.path.basename(shapefile)[:-4] shp = ogr.Open(os.path.abspath(shapefile)) lyr = shp.GetLayerByName(lyr_name) geom = ogr.Geometry(ogr.wkbGeometryCollection) for feat in lyr: geom.AddGeometry(feat.GetGeometryRef()) wkt = geom.ExportToWkt() if proj4 != "+proj=longlat +datum=WGS84 +no_defs": logger.info("Reprojecting AOI file to Lat/Long (WGS84)") wkt = reproject_geometry(wkt, proj4, 4326).ExportToWkt() # do manipulations if needed wkt = wkt_manipulations(wkt, buffer=buffer, convex=convex, envelope=envelope) return wkt
[docs]def latlon_to_shp(lon, lat, shapefile): """ :param lon: :param lat: :param shapefile: :return: """ shapefile = str(shapefile) schema = {"geometry": "Point", "properties": {"id": "str"}} wkt = loads("POINT ({} {})".format(lon, lat)) with collection(shapefile, "w", crs=from_epsg(4326), driver="ESRI Shapefile", schema=schema) as output: output.write({"geometry": mapping(wkt), "properties": {"id": "1"}})
[docs]def wkt_to_gdf(wkt): """ :param wkt: :return: """ # load wkt geometry = loads(wkt) # point wkt if geometry.geom_type == "Point": data = {"id": ["1"], "geometry": loads(wkt).buffer(0.05).envelope} gdf = gpd.GeoDataFrame(data) # polygon wkt elif geometry.geom_type == "Polygon": data = {"id": ["1"], "geometry": loads(wkt)} gdf = gpd.GeoDataFrame(data, crs="epsg:4326") # geometry collection of single multiploygon elif ( geometry.geom_type == "GeometryCollection" and len(geometry) == 1 and "MULTIPOLYGON" in str(geometry) ): data = {"id": ["1"], "geometry": geometry} gdf = gpd.GeoDataFrame(data, crs="epsg:4326") ids, feats = [], [] for i, feat in enumerate(gdf.geometry.values[0]): ids.append(i) feats.append(feat) gdf = gpd.GeoDataFrame({"id": ids, "geometry": feats}, geometry="geometry", crs=gdf.crs) # geometry collection of single polygon elif geometry.geom_type == "GeometryCollection" and len(geometry) == 1: data = {"id": ["1"], "geometry": geometry} gdf = gpd.GeoDataFrame(data, crs="epsg:4326") # everything else else: i, ids, geoms = 1, [], [] for geom in geometry: ids.append(i) geoms.append(geom) i += 1 gdf = gpd.GeoDataFrame({"id": ids, "geometry": geoms}, crs="epsg:4326") return gdf
[docs]def gdf_to_json_geometry(gdf): """Function to parse features from GeoDataFrame in such a manner that rasterio wants them""" geojson = json.loads(gdf.to_json()) return [feature["geometry"] for feature in geojson["features"] if feature["geometry"]]
[docs]def exterior(infile, outfile, buffer=None): """Creates an exterior vector of an input vector :param infile: :param outfile: :param buffer: :return: """ gdf = gpd.read_file(infile, crs="epsg:4326") gdf.geometry = gdf.geometry.apply(lambda row: Polygon(row.exterior)) gdf_clean = gdf[gdf.geometry.area >= 1.0e-6] if buffer: gdf_clean.geometry = gdf_clean.geometry.buffer(buffer) # a negative buffer might polygons make disappear, so let's clean them gdf_clean = gdf_clean[~gdf_clean.geometry.is_empty] gdf_clean.to_file(outfile, driver="GPKG")
[docs]def difference(infile1, infile2, outfile): import warnings warnings.filterwarnings("ignore", "Geometry is in a geographic CRS", UserWarning) gdf1 = gpd.read_file(infile1) gdf2 = gpd.read_file(infile2) gdf3 = gpd.overlay(gdf1, gdf2, how="difference") # remove slivers and artifacts gdf3 = gdf3.buffer(0) buffer = 0.00001 gdf3 = gdf3.buffer(-buffer, 1, join_style=2).buffer(buffer, 1, join_style=2) gdf3.to_file(outfile, driver="GeoJSON")
[docs]def set_subset(aoi, inventory_df): # WKT aoi to shapely geom aoi = loads(aoi) # burst_inventory case if "bid" in inventory_df.columns: for burst in inventory_df.bid.unique(): burst_geom = inventory_df.geometry[inventory_df.bid == burst].unary_union subset = True if aoi.within(burst_geom) else False if not subset: return subset # grd inventory case else: for track in inventory_df.relativeorbit.unique(): track_geom = inventory_df.geometry[inventory_df.relativeorbit == track].unary_union subset = True if aoi.within(track_geom) else False if not subset: return subset # return if true return subset
[docs]def buffer_shape(infile, outfile, buffer=None): with collection(infile, "r") as in_shape: schema = {"geometry": "Polygon", "properties": {"id": "int"}} crs = in_shape.crs with collection(outfile, "w", "ESRI Shapefile", schema, crs=crs) as output: for i, point in enumerate(in_shape): output.write( { "properties": {"id": i}, "geometry": mapping(shape(point["geometry"]).buffer(buffer)), } )
[docs]def plot_inventory(aoi, inventory_df, transparency=0.05, annotate=False): import matplotlib.pyplot as plt # load world borders for background world = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres")) # do the import of aoi as gdf aoi_gdf = wkt_to_gdf(aoi) # get bounds of AOI bounds = inventory_df.geometry.bounds # get world map as base base = world.plot(color="lightgrey", edgecolor="white") # plot aoi aoi_gdf.plot(ax=base, color="None", edgecolor="black") # plot footprints inventory_df.plot(ax=base, alpha=transparency) # set bounds plt.xlim([bounds.minx.min() - 2, bounds.maxx.max() + 2]) plt.ylim([bounds.miny.min() - 2, bounds.maxy.max() + 2]) plt.grid(color="grey", linestyle="-", linewidth=0.2) if annotate: import math for idx, row in inventory_df.iterrows(): # print([row['geometry'].bounds[0],row['geometry'].bounds[3]]) coord = [row["geometry"].centroid.x, row["geometry"].centroid.y] x1, y2, x2, y1 = row["geometry"].bounds angle = math.degrees(math.atan2((y2 - y1), (x2 - x1))) plt.annotate( s=row["bid"], xy=coord, rotation=angle + 5, size=10, color="red", horizontalalignment="center", )