# detection.py
import numpy as np
from QhX.light_curve import get_lctiktok, get_lc22
# Ensure to import or define other necessary functions like hybrid2d, periods, same_periods, etc.
from QhX.algorithms.wavelets.wwtz import *
from QhX.calculation import *
# Example ntau parameter
DEFAULT_NTAU = 80
# Example ngrid parameter
DEFAULT_NGRID = 800
# Example provided_minfq parameter
DEFAULT_PROVIDED_MINFQ = 2000
# Example provided maxfq parameter
DEFAULT_PROVIDED_MAXFQ = 10
#from QhX.algorithms.wavelets.wwt import estimate_wavelet_periods
[docs]
def process1tiktok(data_manager,set1, initial_period, damping_factor_amplitude, damping_factor_frequency, snr=None, inject_signal=False, ntau=None, ngrid=None, minfq=None, maxfq=None, parallel = False):
if set1 not in data_manager.fs_gp.groups:
print(f"Set ID {set1} not found.")
return None
det_periods = []
tt0, yy0, tt1, yy1, tt2, yy2, tt3, yy3, sampling0, sampling1, sampling2, sampling3, tik0, tik1, tik2, tik3 = get_lctiktok(data_manager,set1, initial_period, damping_factor_amplitude, damping_factor_frequency, snr, inject_signal)
wwz_matrx0, corr0, extent0 = hybrid2d(tt0, yy0, 120, 3200, minfq=1500., maxfq=10., parallel=parallel)
peaks0, hh0, r_periods0, up0, low0 = periods(int(set1), corr0, 3200, plot=False)
wwzmatrx1, corr1, extent1 = hybrid2d(tt1, yy1, 120, 3200, minfq=1500., maxfq=10., parallel=parallel)
peaks1, hh1, r_periods1, up1, low1 = periods(int(set1), corr1, 3200, plot=False)
wwz_matrx2, corr2, extent2 = hybrid2d(tt2, yy2, 120, 3200, minfq=1500., maxfq=10., parallel=parallel)
peaks2, hh2, r_periods2, up2, low2 = periods(int(set1), corr2, 3200, plot=False)
wwzmatrx3, corr3, extent3 = hybrid2d(tt3, yy3, 120, 3200, minfq=1500., maxfq=10., parallel=parallel)
peaks3, hh3, r_periods3, up3, low3 = periods(int(set1), corr3, 3200, plot=False)
r_periods01, u01, low01, sig01 = same_periods(r_periods0, r_periods1, up0, low0, up1, low1, peaks0, hh0, tt0, yy0, peaks1, hh1, tt1, yy1, ntau=ntau, ngrid=ngrid, minfq=minfq, maxfq=maxfq)
print(set1)
if r_periods01.size > 0 and u01.size > 0 and low01.size > 0 and sig01.size > 0:
for j in range(len(r_periods01.ravel())):
det_periods.append([int(set1), sampling0, sampling1, r_periods01[j], u01[j], low01[j], sig01[j], 12])
elif r_periods01.size == 0:
det_periods.append([int(set1), 0, 0, 0, 0, 0, 0, 12])
r_periods02, u02, low02, sig02 = same_periods(r_periods0, r_periods2, up0, low0, up2, low2, peaks0, hh0, tt0, yy0, peaks2, hh2, tt2, yy2, ntau=ntau, ngrid=ngrid, minfq=minfq, maxfq=maxfq)
if r_periods02.size > 0 and u02.size > 0 and low02.size > 0 and sig02.size > 0:
for j in range(len(r_periods02.ravel())):
det_periods.append([int(set1), sampling0, sampling2, r_periods02[j], u02[j], low02[j], sig02[j], 13])
elif r_periods02.size == 0:
det_periods.append([int(set1), 0, 0, 0, 0, 0, 0, 13])
r_periods03, u03, low03, sig03 = same_periods(r_periods0, r_periods3, up0, low0, up3, low3, peaks0, hh0, tt0, yy0, peaks3, hh3, tt3, yy3, ntau=ntau, ngrid=ngrid, minfq=minfq, maxfq=maxfq)
if r_periods03.size > 0 and u03.size > 0 and low03.size > 0 and sig03.size > 0:
for j in range(len(r_periods03.ravel())):
det_periods.append([int(set1), sampling0, sampling3, r_periods03[j], u03[j], low03[j], sig03[j], 14])
elif r_periods03.size == 0:
det_periods.append([int(set1), 0, 0, 0, 0, 0, 0, 14])
return np.array(det_periods)
[docs]
def process1_new(data_manager, set1, ntau=None, ngrid=None, provided_minfq=None, provided_maxfq=None, include_errors=True, parallel=False):
"""
Processes and analyzes light curve data from a single object to detect common periods across different bands.
The process involves:
- Verifying the existence of the dataset.
- Retrieving and processing light curve data for different bands.
- Applying hybrid wavelet techniques to each band's light curve data.
- Comparing periods detected in different bands to find common periods, if they do not differ more than 10%.
- Compiling the results, including period values, errors, and significance, into a structured format.
Parameters
----------
set1 : int
An identifier representing the dataset to be processed.
ntau : int, optional
Number of time delays in the wavelet analysis.
ngrid : int, optional
Number of grid points in the wavelet analysis.
provided_minfq : float, optional
Period corresponding to the Minimum frequency for analysis, default is calculated from data.
provided_maxfq : float, optional
Period corresponding to the Maximum frequency for analysis, default is calculated from data.
include_errors : bool, optional
Include magnitude errors in analysis. Defaults to True.
Returns
-------
A list of dictionaries representing the results of the analysis performed on light curve data. Each dictionary contains:
- objectid (int): Identifier of the object ID.
- sampling_i (float): Mean sampling rate in the first band of the pair where a common period is detected.
- sampling_j (float): Mean sampling rate in the second band in the pair.
- period (float): Detected common period between the two bands. NaN if no period is detected.
- upper_error (float): Upper error of the detected period. NaN if no period is detected.
- lower_error (float): Lower error of the detected period. NaN if no period is detected.
- significance (float): Measure of the statistical significance of the detected period. NaN if no period is detected.
- label (str): Label identifying the pair of bands where the period was detected (e.g., '0-1', '1-2').
"""
# Check if set1 exists
if set1 not in data_manager.fs_gp.groups:
print(f"Set ID {set1} not found.")
return None
# Retrieve light curves for different bands
light_curves_data = get_lc22(data_manager, set1, include_errors)
if any(len(data) == 0 for data in light_curves_data if isinstance(data, np.ndarray)):
print(f"Insufficient data for set ID {set1}.")
return None
# Unpack light curve data and sampling rates
tt0, yy0, tt1, yy1, tt2, yy2, tt3, yy3, sampling0, sampling1, sampling2, sampling3 = light_curves_data
results = []
# Process each band's light curve with hybrid2d and collect periods
for tt, yy in [(tt0, yy0), (tt1, yy1), (tt2, yy2), (tt3, yy3)]:
wwz_matrix, corr, extent = hybrid2d(tt, yy, ntau=ntau, ngrid=ngrid, minfq=provided_minfq, maxfq=provided_maxfq, parallel=parallel)
peaks, hh, r_periods, up, low = periods(set1, corr, ngrid=ngrid, plot=False, minfq=provided_minfq, maxfq=provided_maxfq)
results.append((r_periods, up, low, peaks, hh))
# Define sampling rates and labels for bands
sampling_rates = [sampling0, sampling1, sampling2, sampling3]
light_curve_labels = ['0', '1', '2', '3']
det_periods = []
# Loop through all pairs of filters, ensuring no redundancy
for i in range(len(results)):
for j in range(i + 1, len(results)): # i + 1 ensures no redundant comparisons like '0-1' vs '0-1'
r_periods_i, up_i, low_i, peaks_i, hh_i = results[i]
r_periods_j, up_j, low_j, peaks_j, hh_j = results[j]
# Compare periods between two bands and find common ones
r_periods_common, u_common, low_common, sig_common = same_periods(
r_periods_i, r_periods_j, up_i, low_i, up_j, low_j, peaks_i, hh_i, tt0, yy0, peaks_j, hh_j, tt1, yy1,
ntau=ntau, ngrid=ngrid, minfq=provided_minfq, maxfq=provided_maxfq
)
# Append results
if len(r_periods_common) == 0:
det_periods.append({
"objectid": set1,
"sampling_i": sampling_rates[i],
"sampling_j": sampling_rates[j],
"period": np.nan,
"upper_error": np.nan,
"lower_error": np.nan,
"significance": np.nan,
"label": f"{light_curve_labels[i]}-{light_curve_labels[j]}"
})
else:
for k in range(len(r_periods_common)):
det_periods.append({
"objectid": set1,
"sampling_i": sampling_rates[i],
"sampling_j": sampling_rates[j],
"period": r_periods_common[k],
"upper_error": u_common[k],
"lower_error": low_common[k],
"significance": round(sig_common[k], 2), # Ensure two decimal places for significance
"label": f"{light_curve_labels[i]}-{light_curve_labels[j]}"
})
return det_periods
[docs]
def process1(data_manager, set1, ntau=None, ngrid=None, provided_minfq=None, provided_maxfq=None, include_errors=True, parallel=False):
"""
Processes and analyzes data related to light curves of a single object to detect common periods across different light curves.
Parameters
----------
set1 : int
Identifier representing the dataset to be processed.
ntau : int, optional
Number of time delays in the wavelet analysis.
ngrid : int, optional
Number of grid points in the wavelet analysis.
provided_minfq : float, optional
Period corresponding to the Minimum frequency for analysis, default is calculated from data.
provided_maxfq : float, optional
Period corresponding to the Maximum frequency for analysis, default is calculated from data.
include_errors : bool, optional
Flag to include magnitude errors in analysis, defaults to True.
Returns
-------
np.ndarray
An array containing common periods and related data across light curves. This includes information on the periods detected in multiple bands, their errors, and significance levels. The focus is on identifying periods that do not differ more than 10% when detected in different bands, with emphasis on numerical values of periods, errors, and significance for a baseline band for comparison.
Notes
-----
The function involves several steps:
- Verifying the existence of the dataset.
- Retrieving and processing light curve data from different bands.
- Applying hybrid wavelet techniques to each band's data.
- Comparing periods detected in different bands to find common periods, ensuring they do not differ more than 10%.
- Compiling results into a structured format, including periods, errors, and significance for the baseline comparison band.
"""
if set1 not in data_manager.fs_gp.groups:
print(f"Set ID {set1} not found.")
return None
light_curves_data = get_lc22(data_manager, set1, include_errors)
# Check if each array in light_curves_data is non-empty
if any(len(data) == 0 for data in light_curves_data if isinstance(data, np.ndarray)):
print(f"Insufficient data for set ID {set1}.")
return None
tt0, yy0, tt1, yy1, tt2, yy2, tt3, yy3, sampling0, sampling1, sampling2, sampling3 = light_curves_data
det_periods = []
# Function to get or calculate minfq and maxfq
# def get_or_estimate_freq(tt, known_minfq, known_maxfq):
# if known_minfq is None or known_maxfq is None:
# _, fmin, fmax = estimate_wavelet_periods(tt, ngrid)
# return 1/fmax, 1/fmin
# return known_minfq, known_maxfq
# Analyze each light curve
results = []
for tt, yy in [(tt0, yy0), (tt1, yy1), (tt2, yy2), (tt3, yy3)]:
# local_minfq, local_maxfq = get_or_estimate_freq(tt, provided_minfq, provided_maxfq)
wwz_matrix, corr, extent = hybrid2d(tt, yy, ntau=ntau, ngrid=ngrid, minfq=provided_minfq, maxfq=provided_maxfq, parallel=parallel)
peaks, hh, r_periods, up, low = periods(set1, corr, ngrid=ngrid, plot=False, minfq=provided_minfq, maxfq=provided_maxfq)
results.append((r_periods, up, low, peaks, hh))
# Light curve labels (for clarity)
light_curve_labels = ['0', '1', '2', '3']
# Compare periods within the light curves of the same object
for i in range(len(results)):
for j in range(i + 1, len(results)):
r_periods_i, up_i, low_i, peaks_i, hh_i = results[i]
r_periods_j, up_j, low_j, peaks_j, hh_j = results[j]
r_periods_common, u_common, low_common, sig_common = same_periods(
r_periods_i, r_periods_j, up_i, low_i, up_j, low_j, peaks_i, hh_i, tt0, yy0, peaks_j, hh_j, tt1, yy1,
ntau=ntau, ngrid=ngrid, minfq=provided_minfq, maxfq=provided_maxfq
)
if len(r_periods_common) > 0:
for k in range(len(r_periods_common)):
det_periods.append([
set1, r_periods_common[k], u_common[k], low_common[k], sig_common[k],
f"{light_curve_labels[i]}-{light_curve_labels[j]}"
])
return np.array(det_periods)
[docs]
def same_periods(r_periods0, r_periods1, up0, low0, up1, low1, peaks0, hh0, tt0, yy0, peaks1, hh1, tt1, yy1, ntau, ngrid, minfq, maxfq):
"""
Analyzes and identifies common periods between two sets of light curve data,
assessing their consistency and statistical significance based on a relative tolerance.
"""
try:
# Convert inputs to numpy arrays for efficient computation
r_periods0, r_periods1 = np.array(r_periods0), np.array(r_periods1)
up0, low0, up1, low1 = np.array(up0), np.array(low0), np.array(up1), np.array(low1)
except Exception as e:
raise ValueError(f"Error converting inputs to numpy arrays: {e}")
number_of_lcs = 50 # Number of LCs used for a simulation
# Function to find common periods and calculate significance
def find_common_periods_and_significance(rp0, rp1, up, low, peaks, hh, tt, yy, ntau, ngrid, minfq, maxfq):
# Find common periods using np.isclose, handling NaN values safely
common_indices = np.where(np.isclose(rp0, rp1, rtol=1e-01, equal_nan=True))[0]
r_periods = np.take(rp0, common_indices)
up, low = np.take(up, common_indices), np.take(low, common_indices)
sig = []
if len(r_periods) > 0:
for peak_of_interest in common_indices:
try:
# Calculate significance using the 'signif_johnson' function
_, _, _, siger = signif_johnson(number_of_lcs, peak_of_interest, peaks, hh, tt, yy, ntau=ntau, ngrid=ngrid, f=2, peakHeight=0.6, minfq=minfq, maxfq=maxfq)
sig_value = 1. - siger if siger is not None else np.nan
sig.append(sig_value)
except Exception as e:
print(f"Error in significance calculation: {e}")
sig.append(np.nan) # Propagate NaN on failed significance calculation
return np.array(r_periods), np.array(up), np.array(low), np.array(sig)
# Ensure the return values from the function are numpy arrays
if len(r_periods0) == len(r_periods1):
return find_common_periods_and_significance(r_periods0, r_periods1, up0, low0, peaks0, hh0, tt0, yy0, ntau, ngrid, minfq, maxfq)
elif len(r_periods0) < len(r_periods1):
return find_common_periods_and_significance(np.resize(r_periods0, len(r_periods1)), r_periods1, up1, low1, peaks1, hh1, tt1, yy1, ntau, ngrid, minfq, maxfq)
else:
return find_common_periods_and_significance(np.resize(r_periods1, len(r_periods0)), r_periods0, up0, low0, peaks0, hh0, tt0, yy0, ntau, ngrid, minfq, maxfq)