Source code for stingray.utils

from __future__ import (absolute_import, unicode_literals, division,
                        print_function)
import sys
import collections

import warnings
import numpy as np
# If numba is installed, import jit. Otherwise, define an empty decorator with
# the same name.

try:
    from numba import jit
except:
[docs] def jit(fun): return fun
[docs]class UnrecognizedMethod(Exception): pass
[docs]def simon(message, **kwargs): """ The Statistical Interpretation MONitor. A warning system designed to always remind the user that Simon is watching him/her. Parameters ---------- message : string The message that is thrown kwargs : dict The rest of the arguments that are passed to warnings.warn """ warnings.warn("SIMON says: {0}".format(message), **kwargs)
[docs]def rebin_data(x, y, dx_new, method='sum'): """ Rebin some data to an arbitrary new data resolution. Either sum the data points in the new bins or average them. Parameters ---------- x: iterable The dependent variable with some resolution dx_old = x[1]-x[0] y: iterable The independent variable to be binned dx_new: float The new resolution of the dependent variable x method: {"sum" | "average" | "mean"}, optional, default "sum" The method to be used in binning. Either sum the samples y in each new bin of x, or take the arithmetic mean. Returns ------- xbin: numpy.ndarray The midpoints of the new bins in x ybin: numpy.ndarray The binned quantity y """ y = np.asarray(y) dx_old = x[1] - x[0] assert dx_new >= dx_old, "New frequency resolution must be larger than " \ "old frequency resolution." step_size = dx_new / dx_old output = [] for i in np.arange(0, y.shape[0], step_size): total = 0 int_i = int(i) prev_frac = int_i + 1 - i prev_bin = int_i total += prev_frac * y[prev_bin] if i + step_size < len(x): # Fractional part of next bin: next_frac = i + step_size - int(i + step_size) next_bin = int(i + step_size) total += next_frac * y[next_bin] total += sum(y[int(i+1):int(i+step_size)]) output.append(total) output = np.asarray(output) if method in ['mean', 'avg', 'average', 'arithmetic mean']: ybin = output / np.float(step_size) elif method == "sum": ybin = output else: raise UnrecognizedMethod("Method for summing or averaging not recognized. " "Please enter either 'sum' or 'mean'.") tseg = x[-1] - x[0] + dx_old if (tseg / dx_new % 1) > 0: ybin = ybin[:-1] xbin = np.arange(ybin.shape[0]) * dx_new + x[0] - dx_old + dx_new return xbin, ybin, step_size
[docs]def assign_value_if_none(value, default): return default if value is None else value
[docs]def look_for_array_in_array(array1, array2): return next((i for i in array1 if i in array2), None)
[docs]def is_string(s): # pragma : no cover """Portable function to answer this question.""" PY2 = sys.version_info[0] == 2 if PY2: return isinstance(s, basestring) # NOQA else: return isinstance(s, str) # NOQA
[docs]def is_iterable(stuff): """Test if stuff is an iterable.""" return isinstance(stuff, collections.Iterable)
[docs]def order_list_of_arrays(data, order): if hasattr(data, 'items'): data = dict([(key, value[order]) for key, value in data.items()]) elif is_iterable(data): data = [i[order] for i in data] else: data = None return data
[docs]def optimal_bin_time(fftlen, tbin): """Vary slightly the bin time to have a power of two number of bins. Given an FFT length and a proposed bin time, return a bin time slightly shorter than the original, that will produce a power-of-two number of FFT bins. """ return fftlen / (2 ** np.ceil(np.log2(fftlen / tbin)))
[docs]def contiguous_regions(condition): """Find contiguous True regions of the boolean array "condition". Return a 2D array where the first column is the start index of the region and the second column is the end index. Parameters ---------- condition : boolean array Returns ------- idx : [[i0_0, i0_1], [i1_0, i1_1], ...] A list of integer couples, with the start and end of each True blocks in the original array Notes ----- From http://stackoverflow.com/questions/4494404/find-large-number-of-consecutive-values-fulfilling-condition-in-a-numpy-array """ # NOQA # Find the indicies of changes in "condition" diff = np.diff(condition) idx, = diff.nonzero() # We need to start things after the change in "condition". Therefore, # we'll shift the index by 1 to the right. idx += 1 if condition[0]: # If the start of condition is True prepend a 0 idx = np.r_[0, idx] if condition[-1]: # If the end of condition is True, append the length of the array idx = np.r_[idx, condition.size] # Reshape the result into two columns idx.shape = (-1, 2) return idx