#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Functions for connecting and downloading from CNES Peps server
"""
import getpass
import urllib.request
import time
import multiprocessing
import logging
from pathlib import Path
import requests
import tqdm
from ost.helpers import helpers as h
logger = logging.getLogger(__name__)
[docs]def ask_credentials():
"""Interactive function asking the user for CNES' Peps credentials
:return: tuple of username and password
:rtype: tuple
"""
# SciHub account details (will be asked by execution)
print(" If you do not have a CNES Peps user account" " go to: https://peps.cnes.fr/ and register")
uname = input(" Your CNES Peps Username:")
pword = getpass.getpass(" Your CNES Peps Password:")
return uname, pword
[docs]def connect(uname=None, pword=None):
"""Generates an opener for the Copernicus apihub/dhus
:param uname: username of ONDA Dias
:type uname: str
:param pword: password of ONDA Dias
:type pword: str
:return: an urllib opener instance for Copernicus' scihub
:rtype: opener object
"""
if not uname:
print(" If you do not have a CNES Peps user account" " go to: https://peps.cnes.fr/ and register")
uname = input(" Your CNES Peps Username:")
if not pword:
pword = getpass.getpass(" Your CNES Peps Password:")
# open a connection to the CNES Peps
base_url = "https://peps.cnes.fr/"
manager = urllib.request.HTTPPasswordMgrWithDefaultRealm()
manager.add_password(None, base_url, uname, pword)
handler = urllib.request.HTTPBasicAuthHandler(manager)
opener = urllib.request.build_opener(handler)
return opener
[docs]def check_connection(uname, pword):
"""Check if a connection with CNES Pepscan be established
:param uname:
:param pword:
:return:
"""
response = requests.get(
"https://peps.cnes.fr/rocket/#/search?view=list&maxRecords=50",
auth=(uname, pword),
stream=True,
)
return response.status_code
[docs]def peps_download(argument_list):
"""Single scene download function for Copernicus scihub/apihub
:param argument_list:
a list with 4 entries (this is used to enable parallel execution)
argument_list[0]: product's url
argument_list[1]: local path for the download
argument_list[2]: username of Copernicus' scihub
argument_list[3]: password of Copernicus' scihub
:return:
"""
url, filename, uname, pword = argument_list
filename = Path(filename)
# get first response for file Size
response = requests.get(url, stream=True, auth=(uname, pword))
# get download size
total_length = int(response.headers.get("content-length", 0))
# define chunk_size
chunk_size = 1024
# check if file is partially downloaded
if filename.exists():
first_byte = filename.stat().st_size
if first_byte == total_length:
logger.info(f"{filename.name} already downloaded.")
else:
logger.info(f"Continue downloading scene to: {filename.name}")
else:
logger.info(f"Downloading scene to: {filename.resolve()}")
first_byte = 0
if first_byte >= total_length:
return total_length
zip_test = 1
while zip_test is not None and zip_test <= 10:
while first_byte < total_length:
# get byte offset for already downloaded file
header = {"Range": f"bytes={first_byte}-{total_length}"}
response = requests.get(url, headers=header, stream=True, auth=(uname, pword))
# actual download
with open(filename, "ab") as file:
if total_length is None:
file.write(response.content)
else:
pbar = tqdm.tqdm(
total=total_length,
initial=first_byte,
unit="B",
unit_scale=True,
desc=" INFO: Downloading: ",
)
for chunk in response.iter_content(chunk_size):
if chunk:
file.write(chunk)
pbar.update(chunk_size)
pbar.close()
# updated fileSize
first_byte = filename.stat().st_size
# zipFile check
logger.info(f"Checking the zip archive of {filename.name} for inconsistency")
zip_test = h.check_zipfile(filename)
# if it did not pass the test, remove the file
# in the while loop it will be downlaoded again
if zip_test is not None:
logger.info(f"{filename.name} did not pass the zip test. " f"Re-downloading the full scene.")
filename.unlink()
first_byte = 0
# otherwise we change the status to True
else:
logger.info(f"{filename} passed the zip test.")
with open(filename.with_suffix(".downloaded"), "w") as file:
file.write("successfully downloaded \n")
[docs]def batch_download(inventory_df, download_dir, uname, pword, concurrent=10):
from ost import Sentinel1Scene as S1Scene
logger.info("Getting the storage status (online/onTape) of each scene.")
logger.info("This may take a while.")
# this function does not just check,
# but it already triggers the production of the S1 scene
inventory_df["pepsStatus"], inventory_df["pepsUrl"] = zip(
*[S1Scene(product).peps_online_status(uname, pword) for product in inventory_df.identifier.tolist()]
)
# as long as there are any scenes left for downloading, loop
while len(inventory_df[inventory_df["pepsStatus"] != "downloaded"]) > 0:
# excluded downlaoded scenes
inventory_df = inventory_df[inventory_df["pepsStatus"] != "downloaded"]
# recheck for status
inventory_df["pepsStatus"], inventory_df["pepsUrl"] = zip(
*[
S1Scene(product).peps_online_status(uname, pword)
for product in inventory_df.identifier.tolist()
]
)
# if all scenes to download are on Tape, we wait for a minute
if len(inventory_df[inventory_df["pepsStatus"] == "online"]) == 0:
logger.info("Imagery still on tape, we will wait for 1 minute " "and try again.")
time.sleep(60)
# else we start downloading
else:
# create the peps_list for parallel download
peps_list = []
for index, row in inventory_df[inventory_df["pepsStatus"] == "online"].iterrows():
# get scene identifier
scene_id = row.identifier
# construct download path
scene = S1Scene(scene_id)
download_path = scene.download_path(download_dir, True)
# put all info to the peps_list for parallelised download
peps_list.append(
[
inventory_df.pepsUrl[inventory_df.identifier == scene_id].tolist()[0],
download_path,
uname,
pword,
]
)
# parallelised download
pool = multiprocessing.Pool(processes=concurrent)
pool.map(peps_download, peps_list)
# routine to check if the file has been downloaded
for index, row in inventory_df[inventory_df["pepsStatus"] == "online"].iterrows():
# get scene identifier
scene_id = row.identifier
# construct download path
scene = S1Scene(scene_id)
download_path = scene.download_path(download_dir)
if download_path.exists():
inventory_df.at[index, "pepsStatus"] = "downloaded"