
176 lines
5.3 KiB

Implements rainflow cycle counting algorythm for fatigue analysis
according to section 5.4.4 in ASTM E1049-85 (2011).
from __future__ import division
from collections import deque, defaultdict
import math
from importlib import metadata as _importlib_metadata
except ImportError:
import importlib_metadata as _importlib_metadata
# __version__ = _importlib_metadata.version("rainflow")
def _get_round_function(ndigits=None):
if ndigits is None:
def func(x):
return x
def func(x):
return round(x, ndigits)
return func
def reversals(series):
"""Iterate reversal points in the series.
A reversal point is a point in the series at which the first derivative
changes sign. Reversal is undefined at the first (last) point because the
derivative before (after) this point is undefined. The first and the last
points are treated as reversals.
series : iterable sequence of numbers
Reversal points as tuples (index, value).
series = iter(series)
x_last, x = next(series, None), next(series, None)
if x_last is None or x is None:
d_last = (x - x_last)
yield 0, x_last
index = None
for index, x_next in enumerate(series, start=1):
if x_next == x:
d_next = x_next - x
if d_last * d_next < 0:
yield index, x
x_last, x = x, x_next
d_last = d_next
if index is not None:
yield index + 1, x_next
def extract_cycles(series):
"""Iterate cycles in the series.
series : iterable sequence of numbers
cycle : tuple
Each tuple contains (range, mean, count, start index, end index).
Count equals to 1.0 for full cycles and 0.5 for half cycles.
points = deque()
def format_output(point1, point2, count):
i1, x1 = point1
i2, x2 = point2
rng = abs(x1 - x2)
mean = 0.5 * (x1 + x2)
return rng, mean, count, i1, i2
for point in reversals(series):
while len(points) >= 3:
# Form ranges X and Y from the three most recent points
x1, x2, x3 = points[-3][1], points[-2][1], points[-1][1]
X = abs(x3 - x2)
Y = abs(x2 - x1)
if X < Y:
# Read the next point
elif len(points) == 3:
# Y contains the starting point
# Count Y as one-half cycle and discard the first point
yield format_output(points[0], points[1], 0.5)
# Count Y as one cycle and discard the peak and the valley of Y
yield format_output(points[-3], points[-2], 1.0)
last = points.pop()
# Count the remaining ranges as one-half cycles
while len(points) > 1:
yield format_output(points[0], points[1], 0.5)
def count_cycles(series, ndigits=None, nbins=None, binsize=None):
"""Count cycles in the series.
series : iterable sequence of numbers
ndigits : int, optional
Round cycle magnitudes to the given number of digits before counting.
Use a negative value to round to tens, hundreds, etc.
nbins : int, optional
Specifies the number of cycle-counting bins.
binsize : int, optional
Specifies the width of each cycle-counting bin
Arguments ndigits, nbins and binsize are mutually exclusive.
A sorted list containing pairs of range and cycle count.
The counts may not be whole numbers because the rainflow counting
algorithm may produce half-cycles. If binning is used then ranges
correspond to the right (high) edge of a bin.
if sum(value is not None for value in (ndigits, nbins, binsize)) > 1:
raise ValueError(
"Arguments ndigits, nbins and binsize are mutually exclusive"
counts = defaultdict(float)
cycles = (
(rng, count)
for rng, mean, count, i_start, i_end in extract_cycles(series)
if nbins is not None:
binsize = (max(series) - min(series)) / nbins
if binsize is not None:
nmax = 0
for rng, count in cycles:
quotient = rng / binsize
n = int(math.ceil(quotient)) # using int for Python 2 compatibility
if nbins and n > nbins:
# Due to floating point accuracy we may get n > nbins,
# in which case we move rng to the preceeding bin.
if (quotient % 1) > 1e-6:
raise Exception("Unexpected error")
n = n - 1
counts[n * binsize] += count
nmax = max(n, nmax)
for i in range(1, nmax):
counts.setdefault(i * binsize, 0.0)
elif ndigits is not None:
round_ = _get_round_function(ndigits)
for rng, count in cycles:
counts[round_(rng)] += count
for rng, count in cycles:
counts[rng] += count
return sorted(counts.items())