import pandas as pd
import logging
[docs]
class DataManager:
"""
A class for managing and processing astronomical data sets.
This class provides methods to load and process forced source data and object data,
specifically focusing on time-domain objects and quasars.
Attributes
----------
fs_df : pd.DataFrame or None
DataFrame containing forced source data.
fs_gp : pd.core.groupby.DataFrameGroupBy or None
GroupBy object with forced source data grouped by object ID.
object_df : pd.DataFrame or None
DataFrame containing object data.
td_objects : pd.DataFrame or None
DataFrame containing time-domain objects.
"""
def __init__(self):
"""
Initializes the DataManager with empty data attributes.
"""
self.fs_df = None
self.fs_gp = None
self.object_df = None
self.td_objects = None
[docs]
def load_fs_df(self, path_source: str) -> pd.DataFrame:
"""
Load forced source data from a file.
Parameters
----------
path_source : str
The path to the source data file.
Returns
-------
pd.DataFrame or None
The loaded DataFrame or None in case of an error.
Examples
--------
>>> dm = DataManager()
>>> dm.load_fs_df('path_to_fs_df.parquet')
Forced source data loaded successfully.
"""
try:
self.fs_df = pd.read_parquet(path_source)
logging.info("Forced source data loaded successfully.")
return self.fs_df
except Exception as e:
logging.error(f"Error loading fs_df: {e}")
return None
[docs]
def group_fs_df(self) -> pd.core.groupby.DataFrameGroupBy:
"""
Group forced source data by object ID.
Returns
-------
pd.core.groupby.DataFrameGroupBy or None
The grouped DataFrame or None if fs_df is not available.
Examples
--------
>>> dm = DataManager()
>>> dm.load_fs_df('path_to_fs_df.parquet')
>>> dm.group_fs_df()
Forced source data grouped successfully.
"""
if self.fs_df is not None and self.fs_gp is None:
self.fs_gp = self.fs_df.groupby('objectId')
logging.info("Forced source data grouped successfully.")
return self.fs_gp
else:
logging.warning("fs_df is not available for grouping.")
return None
[docs]
def load_object_df(self, path_obj: str) -> pd.DataFrame:
"""
Load object data and filter for time-domain objects.
Parameters
----------
path_obj : str
The path to the object data file.
Returns
-------
pd.DataFrame or None
The filtered DataFrame or None in case of an error.
Examples
--------
>>> dm = DataManager()
>>> dm.load_object_df('path_to_object_df.parquet')
Object data loaded and processed successfully.
"""
try:
self.object_df = pd.read_parquet(path_obj)
lc_cols = [col for col in self.object_df.columns if 'Periodic' in col]
self.td_objects = self.object_df.dropna(subset=lc_cols, how='all').copy()
logging.info("Object data loaded and processed successfully.")
return self.td_objects
except Exception as e:
logging.error(f"Error loading object_df: {e}")
return None
[docs]
def get_qso(self, object_ids: list, min_points: int = 100) -> list:
"""
Get QSOs with complete u,g,r,i light curves with at least 'min_points' points.
Parameters
----------
object_ids : list
List of object IDs to check.
min_points : int, optional
Minimum number of points required in each light curve (default is 100).
Returns
-------
list
List of QSO IDs that meet the criteria.
Examples
--------
>>> dm = DataManager()
>>> dm.load_fs_df('path_to_fs_df.parquet')
>>> dm.group_fs_df()
>>> dm.load_object_df('path_to_object_df.parquet')
>>> object_ids = ['id1', 'id2', 'id3']
>>> quasar_ids = dm.get_qso(object_ids)
"""
valid_qsos = []
for obj_id in object_ids:
if obj_id in self.fs_gp.groups:
demo_lc = self.fs_gp.get_group(obj_id)
if all(len(demo_lc[demo_lc['filter'] == f].dropna()) >= min_points for f in range(1, 5)):
valid_qsos.append(obj_id)
return valid_qsos
# Initialize logging
logging.basicConfig(level=logging.INFO)
# Example usage:
# dm = DataManager()
# dm.load_fs_df('path_to_fs_df.parquet')
# dm.group_fs_df()
# dm.load_object_df('path_to_object_df.parquet')
# object_ids = ['id1', 'id2', ...] # Example object IDs
# quasar_ids = dm.get_qso(object_ids)