ODH Logo

Download Datasets

NOTE: This notebook is only used in automation

While running our application on a monthly basis, we would prefer to not re-run all of the data preprocessing each month. Instead, we only pre-process the data once; on the month that it is pulled, and then store the interim data set in ceph. This notebook is responsible for downloading the pre-processed data sets stored remotely into our application for use by the analysis notebooks.

[ ]
import os
from pathlib import Path
import boto3
from concurrent import futures
from dotenv import load_dotenv

load_dotenv("../../.env")
[ ]
# Get environment variables for data access and management

BASE_PATH = os.getenv("LOCAL_DATA_PATH", "../../data")
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "https://s3.upshift.redhat.com")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
S3_BUCKET = os.getenv("S3_BUCKET", "DH-PLAYPEN")
S3_PROJECT_KEY = os.getenv("S3_PROJECT_KEY", "mcliffor/fedora_devel_mail")
[ ]
# Connect to our S3 instance

s3 = boto3.client(
    "s3",
    endpoint_url=S3_ENDPOINT_URL,
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)
[ ]
# Define a function for collecting one of the "interim" datasets


def download_dataset(dataset):
    dataset_base_path = Path(f"{BASE_PATH}/interim/{dataset}")
    dataset_base_path.mkdir(parents=True, exist_ok=True)

    for chunk in s3.list_objects_v2(
        Bucket=S3_BUCKET, Prefix=f"{S3_PROJECT_KEY}/interim/{dataset}"
    ).get("Contents", ()):
        print(
            f"Downloading file: {chunk['Key']} to {dataset_base_path}/{Path(chunk['Key']).name}"
        )
        yield (
            s3.download_file,
            S3_BUCKET,
            chunk["Key"],
            f"{dataset_base_path}/{Path(chunk['Key']).name}",
        )
[ ]
# Use `download_dataset` to get all DATASETS into our application.

DATASETS = ("text", "metadata")


with futures.ThreadPoolExecutor(max_workers=20) as e:
    [
        e.submit(*task)
        for dataset in DATASETS
        for task in download_dataset(dataset)
    ]

print("Done")