Fetching Air Quality OpenAQ data
Programmatically fetch and parse air quality dataset from OpenAQ
For accessing the data from the Amazon S3 service, we will use boto3
and botocore
libraries.
! pip3 install -q boto3 botocore
import pandas as pd
import numpy as np
import os
from tqdm.notebook import tqdm
# for using Amazon S3
import boto3
import botocore
import multiprocessing
from IPython.display import clear_output
import json
import gzip
class OpenAQData:
def __init__(
self, download_path, bucket_name="openaq-fetches", prefix="realtime-gzipped/"
):
"""
Initialize class object.
Parameters
----------
download_path : str
The path for the download directory.
bucket_name : str
The S3 bucket name containng the dataset (the default is "openaq-fetches",)
prefix : str
Directory of the required data in the bucket. (the default is "realtime-gzipped/")
"""
self.download_path = download_path
self.bucket_name = bucket_name
self.prefix = prefix
def download_data_date_specific(self, curr_date):
"""
Downloads air quality data for specific date
Parameters
----------
curr_date : pandas date-time
The date-time for which data needs to be downloaded
From the date-time, the corresponding date is extracted.
Returns
----------
None
"""
try:
if self.verbose:
print(f"Downloading: {curr_date}")
# Extract the date from YYYY-MM-DD HH:MM:SS string
curr_date_str = str(curr_date).split()[0]
data_date_prefix = self.prefix + curr_date_str
s3 = boto3.client(
"s3", config=botocore.config.Config(signature_version=botocore.UNSIGNED)
)
curr_date_data_dict = s3.list_objects(
Bucket=self.bucket_name, Prefix=data_date_prefix
)
# If the current date data is not present in the bucket data
if "Contents" not in curr_date_data_dict:
print(f"---------- Unable to fetch Date: {curr_date}----------")
for file_obj in curr_date_data_dict["Contents"]:
file_obj_name = file_obj["Key"]
file_download_path = os.path.join(self.download_path, file_obj_name)
file_download_dir = os.path.dirname(file_download_path)
if not os.path.exists(file_download_dir):
os.makedirs(file_download_dir)
s3 = boto3.client(
"s3",
config=botocore.config.Config(signature_version=botocore.UNSIGNED),
)
s3.download_file(self.bucket_name, file_obj_name, file_download_path)
if self.verbose:
print(f"Completed: {curr_date}")
except Exception as err:
print(f"Error: {err} for {curr_date} ")
def download_data(
self, start_date, end_date, num_multiprocess_pool=5, verbose=False
):
"""
Downloads air quality data from start_data to end_date (both inclusive).
Stores the downloaded data at the self.download_path + "realtime-gzipped" directory.
Parameters
----------
start_date : str
Starting date, Format 'yyyy-mm-dd' (inclusive)
end_date : str
Ending date, Format 'yyyy-mm-dd' (inclusive)
verbose : bool, optional
The default is False.
Returns
----------
None
"""
self.verbose = verbose
print(f"---------- Downloading:----------")
pool = multiprocessing.Pool(num_multiprocess_pool)
n_total_iterations = len(pd.date_range(start=start_date, end=end_date))
for _ in tqdm(
pool.imap(
self.download_data_date_specific,
pd.date_range(start=start_date, end=end_date),
),
total=n_total_iterations,
):
pass
pool.close()
print(f"------------Checking Missing Files ... ----------------")
(
missing_dates_after_download,
missing_files_after_download,
) = self.find_missing_data(start_date, end_date)
print(
f"---------- Missing Dates : {len(missing_dates_after_download)} Dates Missing----------"
)
if len(missing_dates_after_download):
print("Missing Dates are ::\n", *missing_dates_after_download)
print(
f"---------- Missing Files : {len(missing_files_after_download)} files Missing----------"
)
if len(missing_files_after_download):
print("Missing Files are ::\n", *missing_files_after_download)
def find_missing_data(self, start_date, end_date):
"""
Checks and returns all the missing data filenames in the date range from start_date to end_date (both inclusive).
Parameters
----------
start_date : str
Starting date, Format 'yyyy/mm/dd' (inclusive)
end_date : str
Ending date, Format 'yyyy/mm/dd' (inclusive)
Returns
----------
List (str)
List of the names of the missing files
"""
dates_missing_list = []
files_missing_list = []
s3 = boto3.client(
"s3", config=botocore.config.Config(signature_version=botocore.UNSIGNED)
)
for curr_date in tqdm(pd.date_range(start=start_date, end=end_date)):
curr_date_str = str(curr_date).split()[0]
data_date_prefix = self.prefix + curr_date_str
curr_date_data_dict = s3.list_objects(
Bucket=self.bucket_name, Prefix=data_date_prefix
)
if "Contents" not in curr_date_data_dict:
dates_missing_list += [curr_date_str]
continue
for file_obj in curr_date_data_dict["Contents"]:
file_obj_name = file_obj["Key"]
file_download_path = os.path.join(self.download_path, file_obj_name)
if not os.path.exists(file_download_path):
files_missing_list += [file_obj_name]
return dates_missing_list, files_missing_list
def parse_downloaded_data_folder_wise_required_cities(self, args):
"""
Parses downloaded data for the specific folder.
Only parses if the data is of date between self.start_date and self.end_date
Filters the data only if city in the self.required_cities list.
The parsed data is stored in "parsed" directory in self.download_path
Parameters
----------
args : arguments
root_path, dirnames, filenames
root_path: The current directory path under consideration
dirnames: List of directory names directly in root_path
filenames: List of filenames directly in root_path
Returns
----------
None
"""
try:
root_path, dirnames, filenames = args
if len(filenames) == 0:
return
try:
curr_date = pd.to_datetime(os.path.basename(root_path))
except:
curr_date = pd.to_datetime("9999-01-01")
if curr_date > self.end_date or curr_date < self.start_date:
return
if self.verbose:
print(f"Parsing: {os.path.basename(root_path)}")
dataframes_list = []
for filename in filenames:
if ".ndjson.gz" in filename:
file_path = os.path.join(root_path, filename)
json_objs = []
with gzip.open(file_path, "rb") as jl_file:
for line in jl_file:
try:
new_json_obj = json.loads(line)
json_objs += [new_json_obj]
except:
pass
df = pd.json_normalize(json_objs)
df = df[df["city"].isin(self.required_cities)]
dataframes_list.append(df)
combined_df = pd.concat(dataframes_list)
combined_df.to_csv(
os.path.join(
self.download_path,
"parsed",
os.path.basename(root_path) + ".csv.gz",
),
index=False,
compression="gzip",
)
if self.verbose:
print(f"Completed Parsing: {os.path.basename(root_path)}")
except Exception as err:
print(f"Error: {err} for {filename} in {root_path}")
def parse_downloaded_data_required_cities(
self,
start_date,
end_date,
num_multiprocess_pool=5,
required_cities=["Delhi"],
verbose=False,
):
"""
Parses downloaded data.
Only parses if the data is of date between self.start_date and self.end_date (both inclusive)
Filters the data only if city in the self.required_cities list.
The parsed data is stored in "parsed" directory in self.download_path
Parameters
----------
start_date : str
In format 'yyyy/mm/dd'.
end_date : str
In format 'yyyy/mm/dd'
num_multiprocess_pool : int, optional
The number of pool processes in multi-processing. (the default is 5)
required_cities : list[str], optional
The parsed data would contain data for only the cities in required_cities_list. (Case-sensitive) (The default is ["Delhi",])
verbose : bool, optional
The default is False.
Returns
----------
None
"""
self.start_date = pd.to_datetime(start_date)
self.end_date = pd.to_datetime(end_date)
self.required_cities = required_cities
self.verbose = verbose
if not os.path.exists(os.path.join(self.download_path, "parsed")):
os.makedirs(os.path.join(self.download_path, "parsed"))
pool = multiprocessing.Pool(num_multiprocess_pool)
print(f"---------- Parsing:----------")
n_total_iterations = len(pd.date_range(start=start_date, end=end_date))
for _ in tqdm(
pool.imap(
self.parse_downloaded_data_folder_wise_required_cities,
os.walk(os.path.join(self.download_path, self.prefix)),
),
total=n_total_iterations,
):
pass
pool.close()
print(f"----------:Parsing Completed----------")
def find_missing_parsed_files(self, start_date, end_date):
"""
Checks and returns all the missing parsed data dates in the date range from start_date to end_date (both inclusive).
Parameters
----------
start_date : str
Starting date, Format 'yyyy/mm/dd' (inclusive)
end_date : str
Ending date, Format 'yyyy/mm/dd' (inclusive)
Returns
----------
List (str)
List of the dates with missing parsed data
"""
parsed_files_list = os.listdir(os.path.join(self.download_path, "parsed"))
parsed_files_list = [
filename for filename in parsed_files_list if ".csv.gz" in filename
]
missing_dates_list = list(pd.date_range(start_date, end_date))
for parsed_filename in parsed_files_list:
file_date = pd.to_datetime(parsed_filename.split(".")[0])
if file_date in missing_dates_list:
missing_dates_list.remove(file_date)
missing_dates_list = [
str(date_time).split()[0] for date_time in missing_dates_list
]
return missing_dates_list
We will start with Air Quality data for "Delhi" for the month of February 2021
class Config:
## Download ##
# Data for date range from start_date to end_date (both inclusive)
start_date = "2021/02/01"
end_date = "2021/02/28"
# Number of simulatenous cores to use in multi-processing
num_multiprocess_pool = 64
# Download directory path
download_path = "./data/"
verbose = False
## Parsing ##
# The cities required in the final parsed data
required_cities = [
"Delhi",
]
First we start with downloading the data.
The dataset would be in form of ndjson files (json lines format)
In our code, we are using multi-processing
library for parallelising the downloading as well as parsing process.
Once downloading is complete, a check is run to list all of the missing files, if any.
open_aq = OpenAQData(download_path=Config.download_path)
open_aq.download_data(
start_date=Config.start_date,
end_date=Config.end_date,
num_multiprocess_pool=Config.num_multiprocess_pool,
verbose=Config.verbose,
)
In above cell, we dowloaded data for February 2021.
As we can see in the output. For the month of February data is missing for two dates:
- 20 February 2022
- 28 February 2022
missing_dates_list, missing_files_list = open_aq.find_missing_data(
start_date=Config.start_date, end_date=Config.end_date
)
missing_dates_list
missing_files_list
For a single date, the data is distributed in multiple ndjson (json lines) format files.
We now parse the overall dataset and store the data is csv
format.
Also as the whole dataset is very large, we will only store results for specific required cities such as Delhi here.
open_aq.parse_downloaded_data_required_cities(
start_date=Config.start_date,
end_date=Config.end_date,
num_multiprocess_pool=Config.num_multiprocess_pool,
required_cities=Config.required_cities,
verbose=Config.verbose,
)
open_aq.find_missing_parsed_files(
start_date=Config.start_date, end_date=Config.end_date
)
As seen earlier, after parsing data, the data is currently missing for dates:
- 20 February
- 28 February for the month of February 2021.
So the dataset is now ready for us to do our investigations.