Skip to content

Shift and add #849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/changes/849.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
implementation of the shift-and-add technique for QPOs and other varying power spectral features

86 changes: 80 additions & 6 deletions stingray/crossspectrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -2132,7 +2132,22 @@ class DynamicalCrossspectrum(AveragedCrossspectrum):
The number of averaged powers in each spectral bin (initially 1, it changes after rebinning).
"""

def __init__(self, data1, data2, segment_size, norm="frac", gti=None, sample_time=None):
def __init__(
self, data1=None, data2=None, segment_size=None, norm="frac", gti=None, sample_time=None
):
self.segment_size = segment_size
self.sample_time = sample_time
self.gti = gti
self.norm = norm

if segment_size is None and data1 is None and data2 is None:
self._initialize_empty()
self.dyn_ps = None
return

if segment_size is None or data1 is None or data2 is None:
raise TypeError("data1, data2, and segment_size must all be specified")

if isinstance(data1, EventList) and sample_time is None:
raise ValueError("To pass input event lists, please specify sample_time")
elif isinstance(data1, Lightcurve):
Expand All @@ -2145,11 +2160,6 @@ def __init__(self, data1, data2, segment_size, norm="frac", gti=None, sample_tim
if segment_size < 2 * sample_time:
raise ValueError("Length of the segment is too short to form a light curve!")

self.segment_size = segment_size
self.sample_time = sample_time
self.gti = gti
self.norm = norm

self._make_matrix(data1, data2)

def _make_matrix(self, data1, data2):
Expand Down Expand Up @@ -2370,6 +2380,70 @@ def trace_maximum(self, min_freq=None, max_freq=None):

return np.array(max_positions)

def shift_and_add(self, f0_list, nbins=100, output_obj_type=AveragedCrossspectrum, rebin=None):
"""Shift and add the dynamical power spectrum.

This is the basic operation for the shift-and-add operation used to track
kHz QPOs in X-ray binaries (e.g. Méndez et al. 1998, ApJ, 494, 65).

Parameters
----------
freqs : np.array
Array of frequencies, the same for all powers. Must be sorted and on a uniform
grid.
power_list : list of np.array
List of power spectra. Each power spectrum must have the same length
as the frequency array.
f0_list : list of float
List of central frequencies

Other parameters
----------------
nbins : int, default 100
Number of bins to extract
rebin : int, default None
Rebin the final spectrum by this factor. At the moment, the rebinning
is linear.
Returns
-------
output: :class:`AveragedPowerspectrum` or :class:`AveragedCrossspectrum`
The final averaged power spectrum.

Examples
--------
>>> power_list = [[2, 5, 2, 2, 2], [1, 1, 5, 1, 1], [3, 3, 3, 5, 3]]
>>> power_list = np.array(power_list).T
>>> freqs = np.arange(5) * 0.1
>>> f0_list = [0.1, 0.2, 0.3, 0.4]
>>> dps = DynamicalCrossspectrum()
>>> dps.dyn_ps = power_list
>>> dps.freq = freqs
>>> dps.df = 0.1
>>> dps.m = 1
>>> output = dps.shift_and_add(f0_list, nbins=5)
>>> assert np.array_equal(output.m, [2, 3, 3, 3, 2])
>>> assert np.array_equal(output.power, [2. , 2. , 5. , 2. , 1.5])
>>> assert np.allclose(output.freq, [0.05, 0.15, 0.25, 0.35, 0.45])
"""
from .fourier import shift_and_add

final_freqs, final_powers, count = shift_and_add(
self.freq, self.dyn_ps.T, f0_list, nbins=nbins, M=self.m, df=self.df, rebin=rebin
)

output = output_obj_type()
good = ~np.isnan(final_powers)

output.freq = final_freqs[good]
output.power = final_powers[good]
output.m = count[good]
output.df = self.df
output.norm = self.norm
output.gti = self.gti
output.segment_size = self.segment_size

return output

def power_colors(
self,
freq_edges=[1 / 256, 1 / 32, 0.25, 2.0, 16.0],
Expand Down
135 changes: 135 additions & 0 deletions stingray/fourier.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
show_progress,
sum_if_not_none_or_initialize,
fix_segment_size_to_integer_samples,
rebin_data,
)


Expand Down Expand Up @@ -1358,6 +1359,140 @@ def fun(times, gti, segment_size):
yield cts


def center_pds_on_f(freqs, powers, f0, nbins=100):
"""Extract a slice of PDS around a given frequency.

This function extracts a slice of the power spectrum around a given frequency.
The slice has a length of ``nbins``. If the slice goes beyond the edges of the
power spectrum, the missing values are filled with NaNs.

Parameters
----------
freqs : np.array
Array of frequencies, the same for all powers
powers : np.array
Array of powers
f0 : float
Central frequency

Other parameters
----------------
nbins : int, default 100
Number of bins to extract

Examples
--------
>>> freqs = np.arange(1, 100) * 0.1
>>> powers = 10 / freqs
>>> f0 = 0.3
>>> p = center_pds_on_f(freqs, powers, f0)
>>> assert np.isnan(p[0])
>>> assert not np.any(np.isnan(p[48:]))
"""
powers = np.asarray(powers)
chunk = np.zeros(nbins) + np.nan
fchunk = np.zeros(nbins)

start_f_idx = np.searchsorted(freqs, f0)

minbin = start_f_idx - nbins // 2
maxbin = minbin + nbins

if minbin < 0:
chunk[-minbin : min(nbins, powers.size - minbin)] = powers[: minbin + nbins]
elif maxbin > powers.size:
chunk[: nbins - (maxbin - powers.size)] = powers[minbin:]
else:
chunk[:] = powers[minbin:maxbin]

return chunk


def shift_and_add(freqs, power_list, f0_list, nbins=100, rebin=None, df=None, M=None):
"""Add a list of power spectra, centered on different frequencies.

This is the basic operation for the shift-and-add operation used to track
kHz QPOs in X-ray binaries (e.g. Méndez et al. 1998, ApJ, 494, 65).

Parameters
----------
freqs : np.array
Array of frequencies, the same for all powers. Must be sorted and on a uniform
grid.
power_list : list of np.array
List of power spectra. Each power spectrum must have the same length
as the frequency array.
f0_list : list of float
List of central frequencies

Other parameters
----------------
nbins : int, default 100
Number of bins to extract
rebin : int, default None
Rebin the final spectrum by this factor. At the moment, the rebinning
is linear.
df : float, default None
Frequency resolution of the power spectra. If not given, it is calculated
from the input frequencies.
M : int or list of int, default None
Number of segments used to calculate each power spectrum. If a list is
given, it must have the same length as the power list.

Returns
-------
f : np.array
Array of output frequencies
p : np.array
Array of output powers
n : np.array
Number of contributing power spectra at each frequency

Examples
--------
>>> power_list = [[2, 5, 2, 2, 2], [1, 1, 5, 1, 1], [3, 3, 3, 5, 3]]
>>> freqs = np.arange(5) * 0.1
>>> f0_list = [0.1, 0.2, 0.3, 0.4]
>>> f, p, n = shift_and_add(freqs, power_list, f0_list, nbins=5)
>>> assert np.array_equal(n, [2, 3, 3, 3, 2])
>>> assert np.array_equal(p, [2. , 2. , 5. , 2. , 1.5])
>>> assert np.allclose(f, [0.05, 0.15, 0.25, 0.35, 0.45])
"""
final_powers = np.zeros(nbins)
freqs = np.asarray(freqs)

mid_idx = np.searchsorted(freqs, np.mean(f0_list))
if M is None:
M = 1
if not isinstance(M, Iterable):
M = np.ones(len(power_list)) * M

count = np.zeros(nbins)
for f0, powers, m in zip(f0_list, power_list, M):
idx = np.searchsorted(freqs, f0_list)

powers = np.asarray(powers) * m
new_power = center_pds_on_f(freqs, powers, f0, nbins=nbins)
bad = np.isnan(new_power)
new_power[bad] = 0.0
final_powers += new_power
count += np.array(~bad, dtype=int) * m

if df is None:
df = freqs[1] - freqs[0]

final_freqs = np.arange(-nbins // 2, nbins // 2 + 1)[:nbins] * df
final_freqs = final_freqs - (final_freqs[0] + final_freqs[-1]) / 2 + np.mean(f0_list)
final_powers = final_powers / count

if rebin is not None:
_, count, _, _ = rebin_data(final_freqs, count, rebin * df)
final_freqs, final_powers, _, _ = rebin_data(final_freqs, final_powers, rebin * df)
final_powers = final_powers / rebin

return final_freqs, final_powers, count


def avg_pds_from_iterable(
flux_iterable, dt, norm="frac", use_common_mean=True, silent=False, return_subcs=False
):
Expand Down
3 changes: 1 addition & 2 deletions stingray/lightcurve.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,8 +1383,7 @@ def analyze_lc_chunks(self, segment_size, func, fraction_step=1, **kwargs):
def to_lightkurve(self):
"""
Returns a `lightkurve.LightCurve` object.
This feature requires `Lightkurve
<https://docs.lightkurve.org/>`_ to be installed
This feature requires ``Lightkurve`` to be installed
(e.g. ``pip install lightkurve``). An `ImportError` will
be raised if this package is not available.

Expand Down
25 changes: 19 additions & 6 deletions stingray/powerspectrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,20 @@ class DynamicalPowerspectrum(DynamicalCrossspectrum):
The number of averaged cross spectra.
"""

def __init__(self, lc, segment_size, norm="frac", gti=None, sample_time=None):
def __init__(self, lc=None, segment_size=None, norm="frac", gti=None, sample_time=None):
self.segment_size = segment_size
self.sample_time = sample_time
self.gti = gti
self.norm = norm

if segment_size is None and lc is None:
self._initialize_empty()
self.dyn_ps = None
return

if segment_size is None or lc is None:
raise TypeError("lc and segment_size must all be specified")

if isinstance(lc, EventList) and sample_time is None:
raise ValueError("To pass an input event lists, please specify sample_time")
elif isinstance(lc, Lightcurve):
Expand All @@ -1033,13 +1046,13 @@ def __init__(self, lc, segment_size, norm="frac", gti=None, sample_time=None):
if segment_size < 2 * sample_time:
raise ValueError("Length of the segment is too short to form a light curve!")

self.segment_size = segment_size
self.sample_time = sample_time
self.gti = gti
self.norm = norm

self._make_matrix(lc)

def shift_and_add(self, f0_list, nbins=100, rebin=None):
return super().shift_and_add(
f0_list, nbins=nbins, output_obj_type=AveragedPowerspectrum, rebin=rebin
)

def _make_matrix(self, lc):
"""
Create a matrix of powers for each time step and each frequency step.
Expand Down
19 changes: 19 additions & 0 deletions stingray/tests/test_crossspectrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -1430,6 +1430,10 @@ def setup_class(cls):
test_counts = [2, 3, 1, 3, 1, 5, 2, 1, 4, 2, 2, 2, 3, 4, 1, 7]
cls.lc_test = Lightcurve(test_times, test_counts)

def test_bad_args(self):
with pytest.raises(TypeError, match=".must all be specified"):
_ = DynamicalCrossspectrum(1)

def test_with_short_seg_size(self):
with pytest.raises(ValueError):
dps = DynamicalCrossspectrum(self.lc, self.lc, segment_size=0)
Expand Down Expand Up @@ -1660,3 +1664,18 @@ def test_rebin_frequency_mean_method(self, method):
assert np.allclose(new_dps.freq, rebin_freq)
assert np.allclose(new_dps.dyn_ps, rebin_dps, atol=0.00001)
assert np.isclose(new_dps.df, df_new)

def test_shift_and_add(self):
power_list = [[2, 5, 2, 2, 2], [1, 1, 5, 1, 1], [3, 3, 3, 5, 3]]
power_list = np.array(power_list).T
freqs = np.arange(5) * 0.1
f0_list = [0.1, 0.2, 0.3, 0.4]
dps = DynamicalCrossspectrum()
dps.dyn_ps = power_list
dps.freq = freqs
dps.df = 0.1
dps.m = 1
output = dps.shift_and_add(f0_list, nbins=5)
assert np.array_equal(output.m, [2, 3, 3, 3, 2])
assert np.array_equal(output.power, [2.0, 2.0, 5.0, 2.0, 1.5])
assert np.allclose(output.freq, [0.05, 0.15, 0.25, 0.35, 0.45])
26 changes: 26 additions & 0 deletions stingray/tests/test_fourier.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,3 +938,29 @@ def test_deprecation_rms_calculation(self):
M=M,
)
assert np.isclose(rms, rms_from_unnorm, atol=3 * rmse_from_unnorm)


@pytest.mark.parametrize("ntimes", [100, 1000])
def test_shift_and_add_orbit(ntimes):
# This time correct for orbital motion
from stingray.fourier import shift_and_add

fmid = 0.7
freqs = np.linspace(0.699, 0.701, 1001)
porb = 2.52 * 86400
asini = 22.5
t0 = porb / 2
times = np.linspace(0, porb, ntimes + 1)[:-1]
power_list = np.zeros((times.size, freqs.size))
omega = 2 * np.pi / porb
orbit_freqs = fmid * (1 - asini * omega * np.cos(omega * (times - t0)))

idx = np.searchsorted(freqs, orbit_freqs)
for i_t, power in zip(idx, power_list):
power[i_t] = 1

f, p, n = shift_and_add(freqs, power_list, orbit_freqs, nbins=5)
# If we corrected well, the power should be the average of all max powers in the
# original series
assert np.max(p) == 1
assert np.max(n) == times.size
Loading
Loading