For accessing the data from the Amazon S3 service, we will use boto3 and botocore libraries.

! pip3 install -q boto3 botocore
import pandas as pd
import numpy as np
import os
from tqdm.notebook import tqdm

# for using Amazon S3
import boto3
import botocore

import multiprocessing
from IPython.display import clear_output
import json
import gzip
class OpenAQData:
    def __init__(
        self, download_path, bucket_name="openaq-fetches", prefix="realtime-gzipped/"
    ):
        """
        Initialize class object.
        Parameters
        ----------
        download_path : str
            The path for the download directory.
        bucket_name : str
            The S3 bucket name containng the dataset (the default is "openaq-fetches",)
        prefix : str
            Directory of the required data in the bucket. (the default is "realtime-gzipped/")
        """
        self.download_path = download_path
        self.bucket_name = bucket_name
        self.prefix = prefix

    def download_data_date_specific(self, curr_date):
        """
        Downloads air quality data for specific date

        Parameters
        ----------
        curr_date : pandas date-time
            The date-time for which data needs to be downloaded
            From the date-time, the corresponding date is extracted.

        Returns
        ----------
        None
        """
        try:
            if self.verbose:
                print(f"Downloading: {curr_date}")
            # Extract the date from YYYY-MM-DD HH:MM:SS string
            curr_date_str = str(curr_date).split()[0]
            data_date_prefix = self.prefix + curr_date_str
            s3 = boto3.client(
                "s3", config=botocore.config.Config(signature_version=botocore.UNSIGNED)
            )
            curr_date_data_dict = s3.list_objects(
                Bucket=self.bucket_name, Prefix=data_date_prefix
            )

            # If the current date data is not present in the bucket data
            if "Contents" not in curr_date_data_dict:
                print(f"---------- Unable to fetch Date: {curr_date}----------")

            for file_obj in curr_date_data_dict["Contents"]:
                file_obj_name = file_obj["Key"]
                file_download_path = os.path.join(self.download_path, file_obj_name)
                file_download_dir = os.path.dirname(file_download_path)

                if not os.path.exists(file_download_dir):
                    os.makedirs(file_download_dir)

                s3 = boto3.client(
                    "s3",
                    config=botocore.config.Config(signature_version=botocore.UNSIGNED),
                )
                s3.download_file(self.bucket_name, file_obj_name, file_download_path)
            if self.verbose:
                print(f"Completed: {curr_date}")
        except Exception as err:
            print(f"Error: {err} for {curr_date} ")

    def download_data(
        self, start_date, end_date, num_multiprocess_pool=5, verbose=False
    ):
        """
        Downloads air quality data from start_data to end_date (both inclusive).
        Stores the downloaded data at the self.download_path + "realtime-gzipped" directory.

        Parameters
        ----------
        start_date : str
            Starting date, Format 'yyyy-mm-dd' (inclusive)
        end_date : str
            Ending date, Format 'yyyy-mm-dd' (inclusive)
        verbose : bool, optional
            The default is False.

        Returns
        ----------
        None
        """
        self.verbose = verbose
        print(f"---------- Downloading:----------")
        pool = multiprocessing.Pool(num_multiprocess_pool)
        n_total_iterations = len(pd.date_range(start=start_date, end=end_date))
        for _ in tqdm(
            pool.imap(
                self.download_data_date_specific,
                pd.date_range(start=start_date, end=end_date),
            ),
            total=n_total_iterations,
        ):
            pass
        pool.close()

        print(f"------------Checking Missing Files ... ----------------")
        (
            missing_dates_after_download,
            missing_files_after_download,
        ) = self.find_missing_data(start_date, end_date)
        print(
            f"---------- Missing Dates : {len(missing_dates_after_download)} Dates Missing----------"
        )
        if len(missing_dates_after_download):
            print("Missing Dates are ::\n", *missing_dates_after_download)
        print(
            f"---------- Missing Files : {len(missing_files_after_download)} files Missing----------"
        )
        if len(missing_files_after_download):
            print("Missing Files are ::\n", *missing_files_after_download)

    def find_missing_data(self, start_date, end_date):
        """
        Checks and returns all the missing data filenames in the date range from start_date to end_date (both inclusive).

        Parameters
        ----------
        start_date : str
            Starting date, Format 'yyyy/mm/dd' (inclusive)
        end_date : str
            Ending date, Format 'yyyy/mm/dd' (inclusive)

        Returns
        ----------
        List (str)
            List of the names of the missing files
        """
        dates_missing_list = []
        files_missing_list = []
        s3 = boto3.client(
            "s3", config=botocore.config.Config(signature_version=botocore.UNSIGNED)
        )
        for curr_date in tqdm(pd.date_range(start=start_date, end=end_date)):
            curr_date_str = str(curr_date).split()[0]
            data_date_prefix = self.prefix + curr_date_str
            curr_date_data_dict = s3.list_objects(
                Bucket=self.bucket_name, Prefix=data_date_prefix
            )

            if "Contents" not in curr_date_data_dict:
                dates_missing_list += [curr_date_str]
                continue

            for file_obj in curr_date_data_dict["Contents"]:
                file_obj_name = file_obj["Key"]
                file_download_path = os.path.join(self.download_path, file_obj_name)
                if not os.path.exists(file_download_path):
                    files_missing_list += [file_obj_name]

        return dates_missing_list, files_missing_list

    def parse_downloaded_data_folder_wise_required_cities(self, args):
        """
        Parses downloaded data for the specific folder.
        Only parses if the data is of date between self.start_date and self.end_date
        Filters the data only if city in the self.required_cities list.
        The parsed data is stored in "parsed" directory in self.download_path

        Parameters
        ----------
        args : arguments
            root_path, dirnames, filenames
            root_path: The current directory path under consideration
            dirnames: List of directory names directly in root_path
            filenames: List of filenames directly in root_path

        Returns
        ----------
        None
        """
        try:
            root_path, dirnames, filenames = args
            if len(filenames) == 0:
                return
            try:
                curr_date = pd.to_datetime(os.path.basename(root_path))
            except:
                curr_date = pd.to_datetime("9999-01-01")
            if curr_date > self.end_date or curr_date < self.start_date:
                return
            if self.verbose:
                print(f"Parsing: {os.path.basename(root_path)}")
            dataframes_list = []
            for filename in filenames:
                if ".ndjson.gz" in filename:
                    file_path = os.path.join(root_path, filename)
                    json_objs = []
                    with gzip.open(file_path, "rb") as jl_file:
                        for line in jl_file:
                            try:
                                new_json_obj = json.loads(line)
                                json_objs += [new_json_obj]
                            except:
                                pass
                    df = pd.json_normalize(json_objs)
                    df = df[df["city"].isin(self.required_cities)]
                    dataframes_list.append(df)
            combined_df = pd.concat(dataframes_list)
            combined_df.to_csv(
                os.path.join(
                    self.download_path,
                    "parsed",
                    os.path.basename(root_path) + ".csv.gz",
                ),
                index=False,
                compression="gzip",
            )
            if self.verbose:
                print(f"Completed Parsing: {os.path.basename(root_path)}")
        except Exception as err:
            print(f"Error: {err} for {filename} in {root_path}")

    def parse_downloaded_data_required_cities(
        self,
        start_date,
        end_date,
        num_multiprocess_pool=5,
        required_cities=["Delhi"],
        verbose=False,
    ):
        """
        Parses downloaded data.
        Only parses if the data is of date between self.start_date and self.end_date (both inclusive)
        Filters the data only if city in the self.required_cities list.
        The parsed data is stored in "parsed" directory in self.download_path

        Parameters
        ----------
        start_date : str
            In format 'yyyy/mm/dd'.
        end_date : str
            In format 'yyyy/mm/dd'
        num_multiprocess_pool : int, optional
            The number of pool processes in multi-processing. (the default is 5)
        required_cities : list[str], optional
            The parsed data would contain data for only the cities in required_cities_list. (Case-sensitive) (The default is ["Delhi",])
        verbose : bool, optional
            The default is False.


        Returns
        ----------
        None
        """
        self.start_date = pd.to_datetime(start_date)
        self.end_date = pd.to_datetime(end_date)
        self.required_cities = required_cities
        self.verbose = verbose
        if not os.path.exists(os.path.join(self.download_path, "parsed")):
            os.makedirs(os.path.join(self.download_path, "parsed"))
        pool = multiprocessing.Pool(num_multiprocess_pool)
        print(f"---------- Parsing:----------")
        n_total_iterations = len(pd.date_range(start=start_date, end=end_date))
        for _ in tqdm(
            pool.imap(
                self.parse_downloaded_data_folder_wise_required_cities,
                os.walk(os.path.join(self.download_path, self.prefix)),
            ),
            total=n_total_iterations,
        ):
            pass
        pool.close()
        print(f"----------:Parsing Completed----------")

    def find_missing_parsed_files(self, start_date, end_date):
        """
        Checks and returns all the missing parsed data dates in the date range from start_date to end_date (both inclusive).

        Parameters
        ----------
        start_date : str
            Starting date, Format 'yyyy/mm/dd' (inclusive)
        end_date : str
            Ending date, Format 'yyyy/mm/dd' (inclusive)

        Returns
        ----------
        List (str)
            List of the dates with missing parsed data
        """
        parsed_files_list = os.listdir(os.path.join(self.download_path, "parsed"))
        parsed_files_list = [
            filename for filename in parsed_files_list if ".csv.gz" in filename
        ]

        missing_dates_list = list(pd.date_range(start_date, end_date))

        for parsed_filename in parsed_files_list:
            file_date = pd.to_datetime(parsed_filename.split(".")[0])
            if file_date in missing_dates_list:
                missing_dates_list.remove(file_date)

        missing_dates_list = [
            str(date_time).split()[0] for date_time in missing_dates_list
        ]
        return missing_dates_list

We will start with Air Quality data for "Delhi" for the month of February 2021

class Config:
    ## Download ##

    # Data for date range from start_date to end_date (both inclusive)
    start_date = "2021/02/01"
    end_date = "2021/02/28"
    # Number of simulatenous cores to use in multi-processing
    num_multiprocess_pool = 64
    # Download directory path
    download_path = "./data/"
    verbose = False

    ## Parsing ##

    # The cities required in the final parsed data
    required_cities = [
        "Delhi",
    ]

Downloading Dataset

First we start with downloading the data.
The dataset would be in form of ndjson files (json lines format) In our code, we are using multi-processing library for parallelising the downloading as well as parsing process.

Once downloading is complete, a check is run to list all of the missing files, if any.

open_aq = OpenAQData(download_path=Config.download_path)
open_aq.download_data(
    start_date=Config.start_date,
    end_date=Config.end_date,
    num_multiprocess_pool=Config.num_multiprocess_pool,
    verbose=Config.verbose,
)
---------- Downloading:----------
---------- Unable to fetch Date: 2021-02-28 00:00:00----------
Error: 'Contents' for 2021-02-28 00:00:00 
---------- Unable to fetch Date: 2021-02-20 00:00:00----------
Error: 'Contents' for 2021-02-20 00:00:00 
------------Checking Missing Files ... ----------------
---------- Missing Dates : 2 Dates Missing----------
Missing Dates are ::
 2021-02-20 2021-02-28
---------- Missing Files : 0 files Missing----------

In above cell, we dowloaded data for February 2021.

As we can see in the output. For the month of February data is missing for two dates:

  • 20 February 2022
  • 28 February 2022
missing_dates_list, missing_files_list = open_aq.find_missing_data(
    start_date=Config.start_date, end_date=Config.end_date
)
missing_dates_list
['2021-02-20', '2021-02-28']
missing_files_list
[]

Parsing downloaded data

For a single date, the data is distributed in multiple ndjson (json lines) format files.

We now parse the overall dataset and store the data is csv format. Also as the whole dataset is very large, we will only store results for specific required cities such as Delhi here.

open_aq.parse_downloaded_data_required_cities(
    start_date=Config.start_date,
    end_date=Config.end_date,
    num_multiprocess_pool=Config.num_multiprocess_pool,
    required_cities=Config.required_cities,
    verbose=Config.verbose,
)
---------- Parsing:----------
----------:Parsing Completed----------
open_aq.find_missing_parsed_files(
    start_date=Config.start_date, end_date=Config.end_date
)
['2021-02-20', '2021-02-28']

As seen earlier, after parsing data, the data is currently missing for dates:

  • 20 February
  • 28 February for the month of February 2021.

So the dataset is now ready for us to do our investigations.