Download Datasets
NOTE: This notebook is only used in automation
While running our application on a monthly basis, we would prefer to not re-run all of the data preprocessing each month. Instead, we only pre-process the data once; on the month that it is pulled, and then store the interim data set in ceph. This notebook is responsible for downloading the pre-processed data sets stored remotely into our application for use by the analysis notebooks.
[ ]
import os
from pathlib import Path
import boto3
from concurrent import futures
from dotenv import load_dotenv
load_dotenv("../../.env")
[ ]
# Get environment variables for data access and management
BASE_PATH = os.getenv("LOCAL_DATA_PATH", "../../data")
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "https://s3.upshift.redhat.com")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
S3_BUCKET = os.getenv("S3_BUCKET", "DH-PLAYPEN")
S3_PROJECT_KEY = os.getenv("S3_PROJECT_KEY", "mcliffor/fedora_devel_mail")
[ ]
# Connect to our S3 instance
s3 = boto3.client(
"s3",
endpoint_url=S3_ENDPOINT_URL,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)
[ ]
# Define a function for collecting one of the "interim" datasets
def download_dataset(dataset):
dataset_base_path = Path(f"{BASE_PATH}/interim/{dataset}")
dataset_base_path.mkdir(parents=True, exist_ok=True)
for chunk in s3.list_objects_v2(
Bucket=S3_BUCKET, Prefix=f"{S3_PROJECT_KEY}/interim/{dataset}"
).get("Contents", ()):
print(
f"Downloading file: {chunk['Key']} to {dataset_base_path}/{Path(chunk['Key']).name}"
)
yield (
s3.download_file,
S3_BUCKET,
chunk["Key"],
f"{dataset_base_path}/{Path(chunk['Key']).name}",
)
[ ]
# Use `download_dataset` to get all DATASETS into our application.
DATASETS = ("text", "metadata")
with futures.ThreadPoolExecutor(max_workers=20) as e:
[
e.submit(*task)
for dataset in DATASETS
for task in download_dataset(dataset)
]
print("Done")