from collections.abc import Iterable
from typing import Self
from dataclasses import dataclass, field
import numpy as np
import pandas as pd
import linecache
from datetime import datetime, timedelta
import warnings
import matplotlib.pyplot as plt
from .utils import ratio_v_u, _make_df, time_integral_v_u, integral_v_u
from .functions import get_fit_R2, smoothing
from .defaults import *
from .classes import EffectiveDelayedParams
from .constants import BASE_DATE
import logging
logger = logging.getLogger(__name__)
__all__ = [
"CountRate",
"CountRates"]
[docs]
@dataclass(slots=True)
class CountRate:
"""
``nerea.CountRate``
===================
Class storing and processing count rate data acquired as
a function of time.
Attributes
----------
**data**: ``pd.DataFrame``
the count rate as a function of time data.
**start_time**: ``datetime``
acquisition start time.
**campaign_id**: ``str``
metadatada for experimental campaign identification.
**experiment_id**: ``str``
metadata for experiment identification
**detector_id**: ``int|str``
metadata for detector identification
**deposit_id**: ``str``
metadata for deposit identification
**timebase**: ``float``, optional
acquisition timebase in seconds. Default is ``1.0``.
_dead_time_corrected: ``bool``, optional
flag labelling whether the count rates have been
corrected for dead time. Handled internally.
Default is ``False``.
_vlines: ``Iterable[datetime]``, optional
lines to draw plotting. Handled internally.
Default is ``[]``."""
data: pd.DataFrame
start_time: datetime
campaign_id: str
experiment_id: str
detector_id: str
deposit_id: str
timebase: float = 1. ## in seconds
_dead_time_corrected: bool = False
_vlines: Iterable[datetime] = field(default_factory=lambda: [])
def _linear_fit(self,
preprocessing: str='log',
nonzero: bool=True
) -> tuple[pd.Series, np.ndarray, np.ndarray, dict]:
"""
`nerea.CountRate._linear_fit`
-----------------------------
Linearly fits monitor data after preprocessing.
Parameters
----------
**preprocessing** : ``str``, optional
``numpy`` function to apply to ``self.data`` prior to
linear fitting. Default is ``'log'``.
**nonzero** : ``bool``, optional
queries non-zero values in ``self.data``.
Default is ``True``.
Returns
-------
tuple[pd.Series, np.ndarray, np.ndarray, dict]
fit output:
- The dependent data
- Parameter values minimizing RMSE
- Parameter covariance
- information"""
from scipy.optimize import curve_fit
def linear_fit(x, a, b):
return x / a + b # Linear fit function (a = T)
if nonzero:
data = self.data[self.data.value != 0]
if data.shape != self.data.shape:
str1 = "Removing 0 counts from Count Rate to enable period log fit. "
str2 = f"Removed {(self.data.shape[0] - data.shape[0])} rows."
warnings.warn(str1 + str2)
if preprocessing is not None:
y = getattr(np, preprocessing)(data.value) # apply preprocessing
else:
y = data.value
x = (data.Time - self.start_time).dt.total_seconds() # x must be in seconds from 0
popt, pcov, out, _, _ = curve_fit(linear_fit, x, y,
full_output=True,
absolute_sigma=False) # sigma scaled to math sample variance
return y, popt, pcov, out
[docs]
def average(self, start_time: datetime, duration: float) -> pd.DataFrame:
"""
`nerea.CountRate.average()`
---------------------------
Calculates the average value and uncertainty of
time series data within a specified duration.
Parameters
----------
**start_time** : ``datetime.datetime``
The starting time for the data to be analyzed.
**duration** : ``float``
The length of time in seconds for which
the average is calculated.
Returns
-------
``pd.DataFrame``
data frame containing average `'value'` and `'uncertainty'` columns.
Notes
-----
- uncertainty computed assuming Poisson distribution: 1/sqrt(`value`)
Examples
--------
>>> from datetime import datetime
>>> data = pd.DataFrame({'Time': pd.date_range('2021-01-01', periods=100, freq='S'),
'value': np.random.rand(100)})
>>> pm = CountRate(data=data, start_time=datetime(2021, 1, 1),
campaign_id='C1', experiment_id='E1', detector_id='M1')
>>> avg_df = pm.average(datetime(2021, 1, 1, 0, 0, 30), 10)
>>> print(avg_df)"""
# end_time should be 1 timebase after the real end time to use
end_time = start_time + timedelta(seconds=duration + self.timebase)
series = self.cut(start_time, end_time).data
if series.empty:
raise ValueError("No count rate data in the requested interval.")
v, u = time_integral_v_u(series)
relative = True if v != 0 else False
# Time Normalization of the Average
# If the RR time binning is not 1 s, there is a chance the query truncated
# the time series so that the difference between first and last time in
# `series` are closer than duration. Hence I define delta.
# iloc[-1] explained by the use of time_integral_v_u(): we stop at the
# beginning of the next step-post time stamp.
delta = (series.Time.iloc[-1] - series.Time.iloc[0]).total_seconds()
return _make_df(v / delta, u / delta, relative)
[docs]
def smooth(self, **kwargs) -> Self:
"""
`nerea.CountRate.smooth()`
--------------------------
Smooths the count rate data to ease feature recognition.
Parameters
----------
**kwargs
Argumnents for ``nerea.functions.smoothing()``
- **renormalize** (``bool``): Whether to renormalize the data.
- **smoothing_method** (``str``): The mehtod to implement for smoothing.
- arguments for the chosen ``nerea.functions.smoothing``.
Returns
-------
``pd.DataFrame``
data frame with time and counts columns.
Notes
-----
Allowed methods are
- ``'moving_average'`` (requires ``window``)
- ``'ewm'``
- ``'savgol_filter'`` (requires ``window_length``, ``polyorder``)
- ``'fit'``(requires ``ch_before_max``, ``order``)"""
if kwargs.get('window') or kwargs.get('window_lenght'):
w = kwargs['window'] if kwargs['smoothing_method'] == 'moving_average' else kwargs['window_length']
if w < self.timebase: ## if nor window nor window_length are passed w is False
raise ValueError("Smoothing window length should be larger than the Count Rate timebase.")
else:
out = pd.DataFrame({"Time": self.data["Time"],
"value": smoothing(self.data["value"], **kwargs)})
return self.__class__(
data=out,
start_time=self.start_time,
campaign_id=self.campaign_id,
experiment_id=self.experiment_id,
detector_id=self.detector_id,
deposit_id=self.deposit_id,
timebase=self.timebase,
_dead_time_corrected=self._dead_time_corrected
)
[docs]
def integrate(self, timebase: int, start_time: datetime | None = None) -> pd.DataFrame:
"""
`nerea.CountRate.integrate()`
-----------------------------
Integrates data over a specified timebase starting
from a given start time.
Parameters
----------
**timebase** : ``int``
The interval of time in seconds over which to calculate the average.
This interval is used to group the data for averaging.
**start_time** : ``datetime``, optional
The starting time for the integration process. Default is ``self.start_time``.
Returns
-------
``pd.DataFrame``
data frame containing average `'value'` and `'uncertainty'` columns.
Notes
-----
- uncertainty computed assuming Poisson distribution: 1/sqrt(`value`)
Examples
--------
>>> from datetime import datetime
>>> data = pd.DataFrame({'Time': pd.date_range('2021-01-01', periods=100, freq='S'),
'value': np.random.rand(100)})
>>> pm = CountRate(data=data, start_time=datetime(2021, 1, 1),
campaign_id='C1', experiment_id='E1', detector_id='M1')
>>> integrated_df = pm.integrate(10)
>>> print(integrated_df)"""
start_time_ = self.start_time if start_time is None else start_time
out = []
while start_time_ < self.data.Time.max():
out.append(self.average(start_time_, timebase))
start_time_ = start_time_ + timedelta(seconds=timebase)
return pd.concat(out, ignore_index=True)
[docs]
def plateau(self, sigma: int=2, timebase: int=10) -> pd.DataFrame:
"""
`nerea.CountRate.plateau()`
---------------------------
The plateau with the largest integral counts in the detector.
Parameters
----------
**sigma** : ``int``, optional
the amount of standard deviations to consider for the
uncertainty on the plateau.
Defaults to ``2``.
**timebase** : ``int``, optional
the time base for integration in plateau search in seconds.
Defaults to ``10`` s.
Returns
-------
``pd.DataFrame``
with ``'Time'`` and ``'value'`` columns."""
time = self.data.Time.min()
plateau_start_time, plateau_end_time = self.data.Time.min(), self.data.Time.min()
sum, max = 1, 0
while time < self.data.Time.max():
# compute the integral over timebase
time_plus_timedelta = time + timedelta(seconds=timebase)
series = self.data.query("Time > @time and Time <= @time_plus_timedelta").value
local_sum, _ = integral_v_u(series)
# check if new plateau starts
same_plateau = np.isclose(local_sum, sum,
atol=(np.sqrt(local_sum) + np.sqrt(sum)) * sigma)
# local_plateau = self.data.query("Time > @plateau_start_time and Time <= @time_plus_timedelta")
if same_plateau:
plateau_end_time = time_plus_timedelta
else:
if self.data.query("Time > @plateau_start_time and Time <= @plateau_end_time").value.sum() > max:
max = self.data.query("Time > @plateau_start_time and Time <= @plateau_end_time").value.sum()
max_plateau_start_time = plateau_start_time
max_plateau_end_time = plateau_end_time
plateau_start_time = time_plus_timedelta
# update iteration variables (next timebin next sum)
time = time_plus_timedelta
sum = local_sum
if plateau_end_time == self.data.Time.min():
raise Exception(f"No plateau found in for detector {self.detector_id} in experiment {self.experiment_id}.")
return self.data.query("Time > @max_plateau_start_time and Time <= @max_plateau_end_time")
[docs]
def per_unit_power(self, monitor: Self, **kwargs) -> pd.DataFrame:
"""
`nerea.CountRate.per_unit_power()`
----------------------------------
Normalizes the count rate to a power monitor.
Parameters
----------
**monitor** : ``nerea.CountRate``
The power monitor for the count rate normalization.
**kwargs
arguments for ``self.plateau()``.
- **sigma** (``int``): standard deviations for plateau finding.
- **timebase** (``int``): integration timebase in seconds.
Returns
-------
``pd.DataFrame``
with ``'value'`` and ``'uncertainty'`` columns."""
plateau = self.plateau(**kwargs)
duration = (plateau.Time.max() - plateau.Time.min()).seconds
normalization = monitor.average(plateau.Time.min(), duration)
return _make_df(*ratio_v_u(_make_df(*integral_v_u(plateau.value)), normalization))
[docs]
def per_unit_time_power(self, monitor: Self, *args, **kwargs) -> pd.DataFrame:
"""
`nerea.CountRate.per_unit_time_power()`
---------------------------------------
Normalizes the count rate to a power monitor and gives
the conunt rate per unit power.
Parameters
----------
monitor : CountRate
The power monitor for the count rate normalization.
Parameters
----------
**monitor** : ``nerea.CountRate``
The power monitor for the count rate normalization.
**kwargs
arguments for ``self.plateau()``.
- **sigma** (``int``): standard deviations for plateau finding.
- **timebase** (``int``): integration timebase in seconds.
Returns
-------
``pd.DataFrame``
with ``'value'`` and ``'uncertainty'`` columns."""
plateau = self.plateau(*args, **kwargs)
duration = (plateau.Time.max() - plateau.Time.min()).seconds
unit_p = self.per_unit_power(monitor, *args, **kwargs)
return _make_df(unit_p.value / duration, unit_p.uncertainty / duration)
[docs]
def dead_time_corrected(self, tau_p: float = 88e-9, tau_np: float = 108e-9) -> Self:
"""
`nerea.CountRate.dead_time_corrected()`
---------------------------------------
Apply dead time correction to the count rate data.
Parameters
----------
**tau_p** : ``float``, optional
paralizable dead time constant.
Defaults to ``88e-9``.
**tau_np** : ``float``, optional
non-paralizable dead time constant.
Defaults to ``108e-9``.
Returns
-------
``nerea.CountRate``
instance with dead time corrected data."""
if self._dead_time_corrected:
pm = self.data.copy()
else:
from scipy import optimize
def dead_time_correction(n, m, tp, tnp):
# Equation for dead time correction
return n / ((1 - tp / tnp) * n * tp + np.exp(tp * n)) - m
if self._dead_time_corrected:
logger.info("Dead time correction already applied to this detector.")
pm = self.data.copy()
pm["value"] = pm.value.apply(lambda x:
optimize.newton(lambda n:
dead_time_correction(n, x, tau_p, tau_np),
x))
return self.__class__(pm,
self.start_time,
self.campaign_id,
self.experiment_id,
self.detector_id,
self.deposit_id,
self.timebase,
_dead_time_corrected=True)
[docs]
def cut(self, start: datetime, end: datetime) -> Self:
"""
`nerea.CountRate.cut()`
-----------------------
Cuts count rate data from a set start to an end.
Parameters
----------
**start** : ``datetime``
start time of the new `nerea.CountRate`.
**end** : ``datetime``
end time of the new `nerea.CountRate`.
Returns
-------
``nerea.CountRate``
instance with truncated data.
Notes
-----
Left boundary included, right boundary excluded."""
data = self.data.query("Time >= @start and Time < @end")
return self.__class__(data,
start_time=data.Time.min(),
campaign_id=self.campaign_id,
experiment_id=self.experiment_id,
detector_id=self.detector_id,
deposit_id=self.deposit_id,
timebase=self.timebase,
_dead_time_corrected=self._dead_time_corrected
)
[docs]
def get_asymptotic_period(self,
scan_dt: float=0.,
scan_dt0: float=20.,
scan_tol: float=1e-2,
log: bool=True) -> pd.DataFrame:
"""
`nerea.CountRate.get_asymptotic_period()`
-----------------------------------------
Calculats the reactor period from a CountRate instance.
Parameters
----------
**scan_dt** : ``float``, optional
time delta to evaluate asymptotic convergence.
In seconds. Default is ``0.`` for no scan.
**scan_dt0** : ``float``, optional
seconds to skip from last: buffer time to have
a reasonable period estimate to converge to.
In seconds. Default is ``20.``.
**scan_tol** : ``float``, optional
tolerance to evaluate asymptotic convergence.
Relative. Default is ``1e-2``.
**log** : ``bool``, optional
flag to log fit R^2. Default is ``True``.
Returns
-------
``pd.DataFrame``
with reactor asymptotic period value and uncertainty.
Note
----
The scan is performed backwards, starting from the
later time in ``self`` - ``scan_dt0``. time spacing is
defined by ``scan_dt`` and tolerance by ``scan_tol``."""
if scan_dt == 0:
fitted_data, popt, pcov, out = self._linear_fit()
period = _make_df(popt[0], np.sqrt(pcov[0, 0]))
r2 = get_fit_R2(fitted_data, out['fvec'])
if log:
logger.info(f"Reactor period fit R^2 = {r2:.4f}")
else:
# starting from end
t0 = self.data.Time.max() - timedelta(seconds=scan_dt0)
full_dt = (t0 - self.start_time).total_seconds()
nbins = int(np.floor(full_dt / scan_dt))
old_period = 0
for i in np.linspace(scan_dt, full_dt, nbins):
ts = t0 - timedelta(seconds = i)
# period from ts to end of cut data
cr = self.cut(ts, self.data.Time.max())
period = cr.get_asymptotic_period(log=False
).value.values[0]
if old_period == 0:
old_period = period
else:
if (period / old_period - 1) <= scan_tol:
old_period = period
else:
break
period = self.cut(ts + timedelta(seconds=scan_dt),
self.data.Time.max()
).get_asymptotic_period(log=log)
return period
[docs]
def get_reactivity(self,
delayed_data: EffectiveDelayedParams,
ap_kwargs: dict={}) -> pd.DataFrame:
"""
`nerea.CountRate.get_reactivity()`
----------------------------------
Calculates the reactor reactivity based on the Count Rate-estimated
reactor period and on effective nuclear data computed by Serpent.
Parameters
----------
**delayed_data** : ``nerea.EffectiveDelayedParams``
effective delayed neutron paramters to use for
reactivity calculation.
Returns
-------
``pd.DataFrame``
data frame with ``'value'`` and ``'uncertainty'`` columns."""
ap_kw = DEFAULT_AP_KWARGS | ap_kwargs
bi = delayed_data.beta_i
li = delayed_data.lambda_i
# compute reactivity
assert(all(self.data.value.values != np.nan))
assert(all(self.data.value.values != 0))
T = self.get_asymptotic_period(**ap_kw)
rho = np.sum(bi.value / (1 + li.value * T.value))
# variance portions
VAR_PORT_T = np.sum((-bi.value * li.value / (1 + li.value * T.value)**2 * T.uncertainty) **2)
VAR_PORT_B = np.sum((1 / (1 + li.value * T.value) * bi.uncertainty) **2)
VAR_PORT_L = np.sum((-bi.value * T.value / (1 + li.value * T.value)**2 * li.uncertainty) **2)
return _make_df(rho, np.sqrt(VAR_PORT_T + VAR_PORT_B + VAR_PORT_L)).assign(VAR_PORT_T=VAR_PORT_T,
VAR_PORT_B=VAR_PORT_B,
VAR_PORT_L=VAR_PORT_L)
[docs]
def plot(self,
start_time: datetime=None,
duration: int=None,
experiment_time: bool=False,
ax: plt.Axes=None,
c: str='k',
**kwargs) -> plt.Axes:
"""
`nerea.CountRate.plot()`
------------------------
Plot data in this CountRate instance.
Parameters
----------
**start_time** : ``datetime.datetime``, optional
The time the count rate is considered from.
Default is ``None`` for first acquisition time.
**duration** : ``int``, optional
The time-span the count rate is considered for.
Default is ``None`` for until last acquisition time.
**ax** : ``plt.Axes``, optional
The ax where the plot is drawn.
Defauls is ``None`` for a new axes.
**c** : ``str``, optional
The color of the plotted seriese.
Default is ``'k'``.
**kwargs
Additional arguments for ``pd.DataFrame.plot()``
Returns
-------
``plt.Axes``
with the plotted data."""
start_time_ = start_time if start_time is not None else self.start_time
duration_ = duration if duration is not None else (
self.data.Time.max() - start_time_).total_seconds()
if not experiment_time:
ax = self.data.plot(x="Time", y='value', ax=ax, color=c, kind='scatter', **kwargs)
# vspans and vlines plotted only when x is real time
ax.axvspan(self.start_time, start_time_, alpha=0.5, color='gray')
ax.axvspan(start_time_ + timedelta(seconds=duration_),
self.data.Time.max(), alpha=0.5, color='gray',
label='Ingored')
for v in self._vlines:
ax.axvline(v, ls='--', c='gray', label="File joining")
else:
data = self.data.copy()
data.Time = range(data.Time.shape[0])
ax = data.plot(x="Time", y='value', ax=ax, color=c, kind='scatter', **kwargs)
ax.ticklabel_format(axis='y', style='sci', scilimits=(0,0))
ax.set_ylabel("Power monitor count rate [1/s]")
ax.tick_params(axis='y', left=False, labelleft=False, right=True, labelright=True)
ax.yaxis.set_label_position("right")
t = ax.yaxis.get_offset_text()
t.set_x(1.01)
ax.legend()
return ax
@classmethod
def _from_formatted_ads(cls, file: str, **kwargs) -> Self:
"""
`nerea.CountRate._from_formatted_ads()`
---------------------------------------
Method to create a ``nerea.CountRate`` instance
from an ASCII file generated by ADS DAQ.
Parameters
----------
**file** : ``str``
Path to the ASCII file.
**kwargs
Additional arguments for class creation
**detector_id** (``int|str``): metadata for detector identification
**deposit_id** (``str``): metadata for detector deposit.
Returns
-------
``nerea.CountRate``
A new ``nerea.CountRate`` instance.
Note
----
- infers ``campaign_id`` and ``experiment_id`` from file name.
- file format: ``'CAMP_EXP.ads'``.
- ``detector_id`` kwarg is also used to select the detector to read."""
detector = kwargs.pop('detector_id', None)
if detector is None:
raise ValueError("`'detector'` kwarg required to read CountRate.")
start_time = datetime.strptime(linecache.getline(file, 1), "%d-%m-%Y %H:%M:%S\n")
read = pd.read_csv(file, sep='\t', skiprows=[0,1], decimal=',')
read["Time"] = read["Time"].apply(lambda x: start_time + timedelta(seconds=x))
metadata = file.split('\\')[-1].split('.')[0]
campaign_id, experiment_id = metadata.split('_')
return cls(read[["Time", f"Det {detector}"]].rename(columns={f"Det {detector}": "value"}),
start_time=start_time,
campaign_id=campaign_id,
experiment_id=experiment_id,
detector_id=f"Det {detector}",
timebase=(read['Time'][1] - read['Time'][0]).total_seconds(),
**kwargs)
@classmethod
def _from_phspa(cls, file: str, **kwargs) -> Self:
"""
`nerea.CountRate._from_phspa()`
-------------------------------
Method to create a ``nerea.CountRate`` instance
from an ASCII file generated by PHSPA DAQ.
Parameters
----------
**file** : ``str``
Path to the ASCII file.
**kwargs
additional arguments for class creation
**detector_id** (``int``): metadata for detector identification
**deposit_id** (``str``): metadata for detector deposit
**campaign_id** (``str``): metadata for experimental campaign identification
**experiment_id** (``str``): metadata for experiment identification.
Returns
-------
``nerea.CountRate``
A new ``nerea.CountRate`` instance."""
detector = kwargs.pop('detector', None)
if detector is None:
raise ValueError("`'detector'` kwarg required to read CountRate.")
data = pd.read_csv(file, sep="\t", skiprows=18, decimal=',').iloc[:,:-1]
data.columns = ["Time", "value"]
data.Time = data.Time.apply(lambda x: BASE_DATE + timedelta(days=x))
warnings.warn("Average timebase considered for PHSPA acquisitions.")
timebase = data.Time.diff().dt.total_seconds().mean()
return cls(data,
start_time=data.Time.min(),
detector_id=detector,
timebase=timebase,
**kwargs)
@classmethod
def _from_formatted_phspa(cls, file: str, **kwargs) -> Self:
"""
`nerea.CountRate._from_formatted_phspa()`
-----------------------------------------
Method to create a ``nerea.CountRate`` instance
from an ASCII file generated by PHSPA DAQ.
Parameters
----------
**file** : ``str``
Path to the ASCII file.
**kwargs
additional arguments for class creation
**deposit_id** (``str``): metadata for detector deposit
**campaign_id** (``str``): metadata for experimental campaign identification
**experiment_id** (``str``): metadata for experiment identification.
Returns
-------
``nerea.CountRate``
A new ``nerea.CountRate`` instance.
Note
----
- infers ``detector_id``, ``campaign_id`` and ``experiment_id`` from file name.
- file format: ``'CAMP_EXP_DET.log'``."""
metadata = file.split('\\')[-1].split('.')[0]
campaign_id, experiment_id, det = metadata.split('_')
return cls._from_phspa(file,
detector=det,
campaign_id=campaign_id,
experiment_id=experiment_id,
**kwargs)
@classmethod
def _from_formatted_br1(cls, file: str, **kwargs) -> Self:
"""
`nerea.CountRate._from_formatted_br1()`
---------------------------------------
Method to create a ``nerea.CountRate`` instance
from an ASCII file generated by the NBS chamber
DAQ at BR1.
Parameters
----------
**file** : ``str``
Path to the ASCII file.
**kwargs
additional arguments for class creation
**experiment_id** (``str``): metadata for experiment identification.
Returns
-------
``nerea.CountRate``
A new ``nerea.CountRate`` instance.
Note
----
- sets ``detector_id`` to ``'NBS'``, ``deposit_id`` to ``'U235'`` and ``campaign_id`` to `'CAL'`."""
read = pd.read_csv(file, sep=';', header=None)[[0,4]]
read.columns = ["Time", "value"]
read["Time"] = pd.to_datetime(read["Time"])
campaign_id, detector_id, deposit_id = "CAL", "NBS", "U235"
warnings.warn("Average timebase considered for BR1 acquisitions.")
timebase = read.Time.diff().dt.total_seconds().mean()
return cls(read,
start_time=read.Time.iloc[0],
campaign_id=campaign_id,
detector_id=detector_id,
timebase=timebase,
deposit_id=deposit_id,
**kwargs)
@classmethod
def _from_formatted_vf(cls, file: str, **kwargs) -> Self:
"""
`nerea.CountRate._from_formatted_vf()`
--------------------------------------
Method to create a ``nerea.CountRate`` instance
from an ASCII file generated by the VENUS-F
monitoring system.
Parameters
----------
**file** : ``str``
Path to the ASCII file.
**kwargs
additional arguments for class creation
**deposit_id** (``str``): metadata for detector deposit
**detector_id** (``str``): metadata for detector identification
**experiment_id** (``str``): metadata for experiment identification.
Returns
-------
``nerea.CountRate``
A new ``nerea.CountRate`` instance.
Note
----
- infers ``campaign_id`` and ``experiment_id`` from file name.
- file format: ``'CAMP_EXP_DATE.vf'``
- reads experiment date from file name.
- DATE format: %Y-%m-%d."""
detector = kwargs.pop('detector_id', None)
if detector is None:
raise ValueError("`'detector_id'` kwarg required to read CountRate.")
data = pd.read_csv(file, encoding='unicode_escape', sep=r'\s+', index_col=False)
md = file.split('\\')[-1].split('.')[0]
cmp, exp, time = md.split('_')[0], md.split('_')[1], md.split('_')[2]
data["Time"] = pd.to_datetime(time + ' '+ data.time.astype(str),
format="%Y-%m-%d %H:%M:%S")
data["value"] = data[detector]
timebase = data.Time.iloc[1] - data.Time.iloc[1]
return cls(data[["Time", "value"]],
start_time=data.Time.iloc[0],
timebase=timebase,
campaign_id=cmp,
experiment_id=exp,
detector_id=detector,
**kwargs)
[docs]
@classmethod
def from_ascii(cls, file: str, filetype: str='infer', **kwargs) -> Self:
"""
`nerea.CountRate.from_ascii()`
------------------------------
Method to create a ``nerea.CountRate`` instance
from an ASCII file.
Parameters
----------
**file** : ``str``
Path to the ASCII file.
**filetype** : ``str``, optional
Type of ASCII file to process.
Default is ``'infer'`` to infer it from
file extension.
**kwargs
additional arguments for class creation
**deposit_id** (``str``): metadata for detector deposit
**detector_id** (``str``): metadata for detector identification
**experiment_id** (``str``): metadata for experiment identification
**campaign_id** (``str``): metadata for experimental campaign identification.
Returns
-------
``nerea.CountRate``
A new ``CountRate`` instance."""
ft = file.split('.')[-1] if filetype == 'infer' else filetype
match ft:
case 'ads':
out = cls._from_formatted_ads(file, **kwargs)
case 'phspa':
out = cls._from_phspa(file, **kwargs)
case 'log':
out = cls._from_formatted_phspa(file, **kwargs)
case 'br1':
out = cls._from_formatted_br1(file, **kwargs)
case 'vf':
out = cls._from_formatted_vf(file, **kwargs)
case _:
raise ValueError("ASCII file type processing not implemented")
return out
[docs]
@classmethod
def from_files(cls, files: Iterable[str], filetype: str='infer', **kwargs) -> Self:
"""
`nerea.CountRate.from_files()`
------------------------------
Method to create a ``nerea.CountRate`` instance
joing data from ASCII files of the same type.
Parameters
----------
**file** : ``str``
Path to the ASCII file.
**filetype** : ``str``, optional
Type of ASCII file to process.
Default is ``'infer'`` to infer it from
file extension.
**kwargs
additional arguments for class creation
**deposit_id** (``str``): metadata for detector deposit
**detector_id** (``str``): metadata for detector identification
**experiment_id** (``str``): metadata for experiment identification
**campaign_id** (``str``): metadata for experimental campaign identification.
Returns
-------
``nerea.CountRate``
A new ``CountRate`` instance."""
data = []
vlines = []
for i, f in enumerate(files):
rr = cls.from_ascii(f, filetype, **kwargs)
data.append(rr.data)
vlines.append(data[-1].Time.iloc[-1])
if i == 0:
_kwargs = {'campaign_id': rr.campaign_id,
'experiment_id': rr.experiment_id,
'detector_id': rr.detector_id,
'deposit_id': rr.deposit_id}
data = pd.concat(data, ignore_index=True)
timebase = data.Time.diff().dt.total_seconds().mean()
return cls(data,
start_time=data.Time.min(),
timebase=timebase,
_vlines=vlines,
**_kwargs)
[docs]
@dataclass(slots=True)
class CountRates:
"""
``nerea.CountRates``
=============================
Class storing and processing count rate data acquired as
a function of time. Stores data of many detectors/acquisitions.
Attributes
----------
**detectors** : ``dict[int, nerea.CountRate]``
Links detector id and its conunt rate.
``key`` is the id and ``value`` the count rate.
_enable_checks: ``bool``, optional
flag to enable consistency checks. Default is ``True``."""
detectors: dict[int | str, CountRate]
_enable_checks: bool = True
def __post_init__(self) -> None:
"""
Runs consistency checks.
"""
if self._enable_checks:
self._check_consistency()
def _check_consistency(self, time_tolerance: timedelta=timedelta(seconds=60),
timebase: int=100, sigma=1) -> None:
"""
`nerea.CountRates._check_consistency()`
---------------------------------------
Check the consistency of time and curve data with specified parameters.
Parameters
----------
**time_tolerance** : ``datetime.timedelta``, optional
Parameter for ``self._check_time_consistency``. Defaults to ``60`` seconds.
**timebase** : ``int``, optional
Parameter for ``self._check_curve_consistency``. Defaults to ``100``.
**sigma** : int, optional
Parameter for ``self._check_curve_consistency``. Defaults to ``1``.
Examples
--------
>>> data = pd.DataFrame({'Time': pd.date_range('2021-01-01', periods=100, freq='S'),
'value': np.random.rand(100)})
>>> pm1 = CountRate(data=data, start_time=datetime(2021, 1, 1),
campaign_id='C1', experiment_id='E1', detector_id='M1')
>>> pm2 = CountRate(data=data, start_time=datetime(2021, 1, 1),
campaign_id='C1', experiment_id='E1', detector_id='M2')
>>> pms = CountRates(detectors={1: pm1, 2: pm2})
>>> pms._check_consistency()"""
must = ['campaign_id', 'experiment_id']
for attr in must:
if not all([getattr(m, attr) == getattr(list(self.detectors.values())[0], attr)
for m in self.detectors.values()]):
raise Exception(f"Inconsistent {attr} among different CountRate instances.")
should = ['deposit_id']
for attr in should:
if not all([getattr(m, attr) == getattr(list(self.detectors.values())[0], attr)
for m in self.detectors.values()]):
warnings.warn(f"Inconsistent {attr} among different CountRate instances.")
self._check_time_consistency(time_tolerance)
self._check_curve_consistency(timebase, sigma)
def _check_time_consistency(self, time_tolerance: timedelta) -> None:
"""
`nerea.CountRates._check_time_consistency()`
--------------------------------------------
Check if the start times of power detectors are
consistent within a given time tolerance.
Parameters
----------
**time_tolerance** : ``datetime.timedelta``
The maximum allowable difference in time between
the start times of the power detectors in ``self.detectors``.
Examples
--------
>>> data = pd.DataFrame({'Time': pd.date_range('2021-01-01', periods=100, freq='S'),
'value': np.random.rand(100)})
>>> pm1 = CountRate(data=data, start_time=datetime(2021, 1, 1),
campaign_id='C1', experiment_id='E1', detector_id='M1')
>>> pm2 = CountRate(data=data, start_time=datetime(2021, 1, 1),
campaign_id='C1', experiment_id='E1', detector_id='M2')
>>> pms = CountRates(detectors={1: pm1, 2: pm2})
>>> pms._check_time_consistency(timedelta(seconds=60))"""
ref = list(self.detectors.values())[0].start_time
for monitor in self.detectors.values():
if not abs(monitor.start_time - ref) < time_tolerance:
warnings.warn(f"Power monitor start time difference > {time_tolerance}")
def _check_curve_consistency(self, timebase: int, sigma: int=1) -> None:
"""
`nerea.CountRates._check_curve_consistency()`
---------------------------------------------
Compare data from multiple detectors to check for consistency
within a sigma-uncertainty tolerance, based on a specified
timebase.
Parameters
----------
**timebase** : ``int``
The time interval in seconds for grouping the data.
This parameter determines how the data are aggregated
and compared between different detectors.
**sigma** : ``int``, optional
The uncertainty associated with the measurements. It
is used to calculate the tolerance for checking the
consistency between different power detectors. The
tolerance is computed as the average uncertainty on the
ratio of values between two detectors. Defaults to ``1``.
Examples
--------
>>> data = pd.DataFrame({'Time': pd.date_range('2021-01-01', periods=100, freq='S'),
'value': np.random.rand(100)})
>>> pm1 = CountRate(data=data, start_time=datetime(2021, 1, 1),
campaign_id='C1', experiment_id='E1', detector_id='M1')
>>> pm2 = CountRate(data=data, start_time=datetime(2021, 1, 1),
campaign_id='C1', experiment_id='E1', detector_id='M2')
>>> pms = CountRates(detectors={1: pm1, 2: pm2})
>>> pms._check_curve_consistency(10, 1)"""
ref = list(self.detectors.values())[0].data.groupby(
pd.Grouper(key="Time", freq=f'{timebase}s', closed='right'),
observed=False
).agg('sum').reset_index()
ref['uncertainty'] = np.sqrt(ref['value']) # absolute
ref['value'] = ref['value'] / timebase
for monitor in list(self.detectors.values())[1:]:
compute = monitor.data.groupby(pd.Grouper(key="Time", freq=f'{timebase}s', closed='right'),
observed=False
).agg('sum').reset_index()
compute['uncertainty'] = np.sqrt(compute['value']) # absolute
compute['value'] = compute['value'] / timebase
# filtering noise below 100 counts in time
start = max(ref.query('value >= 100').Time.min(), compute.query('value >= 100').Time.min())
end = min(ref.query('value >= 100').Time.max(), compute.query('value >= 100').Time.max())
qs = "Time >= @start and Time <= @end"
v, u = ratio_v_u(compute.query(qs), ref.query(qs))
# tolerance is scalar, therefore it is computed as average uncertainty on the ratio
tol = np.mean(sigma * u) # absolute
if not (np.isclose(v, np.roll(v, shift=1), atol=tol)).all():
warnings.warn(f"Power monitor {monitor.detector_id} inconsistent with {list(self.detectors.values())[0].detector_id}")
@property
def _first(self) -> CountRate:
"""
`nerea.CountRates._first()`
---------------------------
The first count rate in ``self.detectors``.
Returns
-------
``nerea.CountRate``
the first count rate."""
return list(self.detectors.values())[0]
@property
def campaign_id(self) -> str:
"""
`nerea.CountRates.campaign_id()`
--------------------------------
Campaign id of the first count rate in ``self.detectors``.
Returns
-------
``str``
the campaign id of the first detector."""
return self._first.campaign_id
@property
def experiment_id(self) -> str | int:
"""
`nerea.CountRates.experiment_id()`
----------------------------------
Experiment id of the first count rate in ``self.detectors``.
Returns
-------
``str``
the campaign id of the first detector."""
return self._first.experiment_id
@property
def deposit_id(self) -> str:
"""
`nerea.CountRates.deposit_id()`
-------------------------------
The deposit id of the first element of `self.detectors`.
Returns
-------
``str``
the deposit id of the first detector."""
return self._first.deposit_id
@property
def best(self) -> CountRate:
"""
`nerea.CountRates.best()`
-------------------------
Returns the count rate with the highest sum value.
Returns
-------
``nerea.CountRate``
Count rate with the highest integral count."""
max = list(self.detectors.values())[0].data.value.sum()
out = list(self.detectors.values())[0]
for monitor in list(self.detectors.values())[1:]:
if monitor.data.value.sum() > max:
out = monitor
return out
[docs]
def per_unit_power(self, monitor: int, **kwargs) -> dict[int, pd.DataFrame]:
"""
`nerea.CountRates.per_unit_power()`
-----------------------------------
Normalizes the raction rate to a power monitor.
Parameters
----------
**monitor** : ``int``
The ID of the count rate to be used as power
monitor for the count rate normalization.
**kwargs
arguments for ``CountRate.plateau()``.
- **sigma** (``int``): standard deviations for plateau finding.
- **timebase** (``int``): integration timebase in seconds.
Returns
-------
``dict[int, pd.DataFrame]``
with value and uncertainty of the normalized count rate
integrated over time. Keys are the detector IDs as in
self.detectors."""
out = {}
for key, detector in self.detectors.items():
if key != monitor:
out[key] = detector.per_unit_power(self.detectors[monitor], **kwargs)
return out
[docs]
def per_unit_time_power(self, monitor: int, **kwargs) -> dict[int, pd.DataFrame]:
"""
`nerea.CountRates.per_unit_time_power()`
----------------------------------------
Normalizes the raction rate to a power monitor and takes the average over time.
Parameters
----------
**monitor** : ``int``
The ID of the count rate to be used as power
monitor for the count rate normalization.
**kwargs
arguments for ``CountRate.plateau()``.
- **sigma** (``int``): standard deviations for plateau finding.
- **timebase** (``int``): integration timebase in seconds.
Returns
-------
``dict[int, pd.DataFrame]``
with value and uncertainty of the normalized count rate
averaged over time. Keys are the detector IDs as in
self.detectors."""
out = {}
for key, detector in self.detectors.items():
if key != monitor:
out[key] = detector.per_unit_time_power(self.detectors[monitor], **kwargs)
return out
[docs]
def cut(self, starts: list, ends: list):
dts = {}
for i, (j, d) in enumerate(self.detectors.items()):
dts[j] = d.cut(starts[i], ends[i])
return self.__class__(dts, self._enable_checks)
[docs]
@classmethod
def from_ascii(cls,
files: dict[str, tuple[Iterable[str]|Iterable[int]|None, Iterable[str]]],
filetypes: Iterable[str]='infer',
**kwargs) -> Self:
"""
`nerea.CountRates.from_ascii()`
-------------------------------
Creates an instance of ``nerea.CountRates`` using data extracted from an ASCII file.
The ASCII file should contain columns of data including timestamps and power readings.
The filename is supposed to be formatted as:
{Campaign}_{experiment} (ADS) or
{Campaign}_{experiment}_{detector} (PHSPA)
Parameters
----------
**files** : ``dict[str, tuple[Iterable[str]|Iterable[int]|None, Iterable[str]]]``
Maps each file to the detectors to read there and
the corresponding deposit id.
- key: ``str``
Path to the ASCII files containing the power monitor data.
- values: ``tuple``
first: ``Iterable[str]|Iterable[int]``
detector ids for ADS files
or ``None`` for PHSPA file (detector id inferred from filename)
second: ``Iterable[str]``
deposit ids
**filetype** : ``Iterable[str]``, optional
Type of ASCII file to process.
Default is ``'infer'`` to infer it from
file extension for each file.
**kwargs
additional arguments for class creation
**_enable_checks** (``bool``): enables consistency checks among detectors
Returns
-------
``nerea.CountRates``
initialized with the data from the ASCII file.
Note
----
- allows only for formatted source files.
- ADS files requires detectors to be passed as an iterable
in the same order as the ADS processed files."""
ft = ['infer'] * len(files) if filetypes == 'infer' else filetypes
out = {}
for i, (f, (dets, deps)) in enumerate(files.items()):
ft_ = f.split('.')[-1] if ft[i] == 'infer' else ft[i]
match ft_:
case 'ads':
for d, d_ in zip(dets, deps):
out[d] = CountRate.from_ascii(f,
filetype=ft_,
detector_id=d,
deposit_id=d_)
case 'phspa':
d = f.split('\\')[-1].split('.')[0].split('_')[-1]
d_ = deps[0]
out[d] = CountRate.from_ascii(f,
filetype=ft_,
detector_id=d,
deposit_id=d_)
case 'log':
d = f.split('\\')[-1].split('.')[0].split('_')[-1]
d_ = deps[0]
out[d] = CountRate.from_ascii(f,
filetype=ft_,
deposit_id=d_)
case 'vf':
for d, d_ in zip(dets, deps):
out[d] = CountRate.from_ascii(f,
filetype=ft_,
detector_id=d,
deposit_id=d_)
return cls(out, **kwargs)