BatterySimulatorBLAST/python/battery_profile.py

299 lines
15 KiB
Python

import pandas as pd
import numpy as np
import math
import datetime
import random
class BatProfile:
# attributes
intervals = None
df = None
soc_sequence_list = []
soc_time_sequence_list = []
temp_sequence_list = []
temp_time_sequence_list = []
# initialize dataframe
def __init__(self):
# process dataframe
# file_path = "/home/richard/Projects/06_research/battery_degradation_study/battery-anomaly-detection/ISS_data/EP_Battery.Thing_HMD8310.csv"
file_path = "/home/richard/Projects/06_research/battery_degradation_study/BLAST-Lite/data/EP_Battery.Thing_HMD8310.csv"
fields = ['PACK1_CRIDATA_SOC', 'time']
df = pd.read_csv(file_path, skipinitialspace=True, usecols=fields)
df['time'] = pd.to_datetime(df['time'])
# filter only 2023 data
threshold_date = pd.to_datetime('2023-01-01').tz_localize('UTC')
df = df[df['time'] >= threshold_date].reset_index(drop = True)
df[fields[0]] = df[fields[0]].replace(86, 85)
self.df = df
# methods
# obtain clean intervals from data
def init_intervals(self):
# this dataset contains the full data for all 7 packs
df = self.df
fields = ['PACK1_CRIDATA_SOC', 'time']
def find_intervals_below_threshold(data, threshold):
below_threshold = data < threshold
# shift series by one, then take bitwise AND, only start points will be 1
starts = np.where(below_threshold & ~np.roll(below_threshold, 1))[0]
ends = np.where(below_threshold & ~np.roll(below_threshold, -1))[0]
if below_threshold[0]: # case when first value already is in interval
starts = np.insert(starts, 0, 0)
if below_threshold[len(below_threshold)-1]: # case when last value is also in interval
ends = np.append(ends, len(data) - 1)
intervals = list(zip(starts, ends))
return intervals
# intervals time length
def filter_time_length(intervals):
return [(x,y) for x,y in intervals if ((df[fields[1]][y] - df[fields[1]][x]) > datetime.timedelta(minutes=10) and
(df[fields[1]][y] - df[fields[1]][x]) < datetime.timedelta(hours=3))]
# intervals depth
def interval_depth(intervals):
# Find the minimum value within the specified range
return [(x,y) for x,y in intervals if (np.min(df[fields[0]][x:y+1]) < 75) and (np.min(df[fields[0]][x:y+1]) > 5)]
def has_only_horizontal_line(series, start_index, end_index):
interval = series[start_index+1:end_index]
gradient_series = np.gradient(interval)
return all(value == 0 for value in gradient_series)
def filter_only_horizontal(intervals):
return [(x,y) for x,y in intervals if not has_only_horizontal_line(df['PACK1_CRIDATA_SOC'], x, y)]
def is_valley(time_series):
gradient = np.gradient(time_series)
has_negative_value = np.any(gradient < 0)
has_positive_value = np.any(gradient > 0)
return has_negative_value and has_positive_value
def filter_valley(intervals):
return [(x,y) for x,y in intervals if is_valley(df['PACK1_CRIDATA_SOC'][x:y])]
bounding_threshold = 80
intervals = find_intervals_below_threshold(df[fields[0]], bounding_threshold)
intervals = filter_time_length(intervals)
intervals = interval_depth(intervals)
intervals = filter_only_horizontal(intervals)
intervals = filter_valley(intervals)
self.intervals = intervals
# method to ensure that each interval soc begins and ends at 85
# this means extending both the soc and time sequences
def preprocess_soc_intervals(self):
intervals = self.intervals
df = self.df
raw_time_list = [df["time"][start:end].reset_index(drop = True) for start,end in intervals]
raw_soc_list = [df['PACK1_CRIDATA_SOC'][start:end].reset_index(drop = True) for start,end in intervals]
def extend_segment(discharge_soc_list, discharge_time_list, index):
soc_sequence = discharge_soc_list[index]
discharge_time_sequence_datetime = discharge_time_list[index]
start_time = discharge_time_sequence_datetime[0]
discharge_time_sequence = [ int((time - start_time).total_seconds()) for time in discharge_time_sequence_datetime ]
num_points = 5
num_extrapolate = 100
# extend the series in the beginning to 85%
coefficients = np.polyfit(np.arange(num_points), soc_sequence[:num_points], 1)
extended_points = np.polyval(coefficients, np.arange(-1, -num_extrapolate-1, -1))
extended_points = np.clip(extended_points, None, 85) # ensure values reach 85, but not over
# Find the index where differences start repeating
repeating_index = np.where(np.diff(extended_points) != 0)[0][-1] + 2
# Truncate the array to remove repeating values at the end
extended_points = extended_points[:repeating_index]
extended_len_1 = len(extended_points)
extended_soc_sequence = np.concatenate((extended_points[::-1], soc_sequence))
discharge_time_sequence = [ 60 * time for time in range(-extended_len_1, 0)] + discharge_time_sequence
# extend the series after the end to 85%
coefficients = np.polyfit(np.arange(-num_points, 0), soc_sequence[-num_points:], 1)
extended_points = np.polyval(coefficients, np.arange(1, num_extrapolate+1))
extended_points = np.clip(extended_points, None, 85) # ensure values reach 85, but not over
# Find the index where differences start repeating
repeating_index = np.where(np.diff(extended_points) != 0)[0][-1] + 2
# Truncate the array to remove repeating values at the end
extended_points = extended_points[:repeating_index]
extended_len = len(extended_points)
extended_soc_sequence = np.concatenate((extended_soc_sequence, extended_points,))
end_time = discharge_time_sequence[-1]
discharge_time_sequence = discharge_time_sequence + [ end_time + 60 * time for time in range(1,extended_len+1)]
# reset index to start at 0
discharge_time_sequence = [ time + 60 * extended_len_1 for time in discharge_time_sequence ]
# return the modified soc and time series
return extended_soc_sequence, discharge_time_sequence
# process intervals to start and end at 85
soc_time_sequence_list = []
soc_sequence_list = []
for index in range(len(intervals)):
soc_sequence, time_sequence = extend_segment(raw_soc_list, raw_time_list, index)
soc_sequence_list.append(soc_sequence)
soc_time_sequence_list.append(time_sequence)
# save into class variable
self.soc_sequence_list = soc_sequence_list
self.soc_time_sequence_list = soc_time_sequence_list
# method to generate temperature sequence for each of the intervals
def temp_sequence_generation(self):
intervals = self.intervals
soc_time_sequence_list = self.soc_time_sequence_list
def gen_temp_sequence(soc_time_sequence, start_temp):
baseline_temp = 24.0
max_temp = 30.0
temp_rate = 4 / 60
# we will increase the temperature at a rate of 4/60 degrees per minutes
# we will then clip at 30
# we use the whole soc discharge+charge interval as the entire warm-up period
num_gen = len(soc_time_sequence)
x = np.linspace(0, 10, 20)
known_gradient = temp_rate
y = baseline_temp + known_gradient * x
# Use polyfit with deg=1 (linear polynomial) and known gradient as the weight for the first coefficient
coefficients = np.polyfit(x, y, deg=1, w=[known_gradient] * len(x))
warmup_temp = np.polyval(coefficients, np.arange(1, num_gen))
warmup_temp = np.clip(warmup_temp, None, max_temp)
# we then use the last temperature as the start of the cooldown phase
start_temp = warmup_temp[-1]
x = np.linspace(0, 10, 20)
known_gradient = temp_rate # Replace this with your known gradient
y = start_temp - known_gradient * x
coefficients = np.polyfit(x, y, deg=1, w=[known_gradient] * len(x))
cooldown_temp = np.polyval(coefficients, np.arange(1, 100))
cooldown_temp = np.clip(cooldown_temp, baseline_temp, None)
# find where there is no change
# take difference in consecutive elements
# note that numpy returns a tuple of elements, we only need the first
# take the last element of the array
repeating_index = np.where(np.diff(cooldown_temp) != 0)[0][-1]
# Truncate the array to remove repeating values at the end
cooldown_temp = cooldown_temp[:repeating_index]
temp_sequence = np.concatenate((warmup_temp, cooldown_temp))
temp_time_sequence = [ 60 * time for time in range(0,len(temp_sequence))]
return temp_sequence, temp_time_sequence
temp_time_sequence_list = []
temp_sequence_list = []
baseline_temp = 24
for index in range(len(intervals)):
temp_sequence, time_sequence = gen_temp_sequence(soc_time_sequence_list[index], baseline_temp)
temp_sequence_list.append(temp_sequence)
temp_time_sequence_list.append(time_sequence)
self.temp_sequence_list = temp_sequence_list
self.temp_time_sequence_list = temp_time_sequence_list
# there is a mismatch in number of values between soc and temp
# we will pad soc sequence to match that of temp
def process_soc_time(self):
# we will pad the end soc values with 85
# ensure that the number of values matches that of temperature sequence
def extend_soc_time(soc_sequence_list, temp_time_sequence_list, index):
previous_soc_count = len(soc_sequence_list[index])
new_soc_count = len(temp_time_sequence_list[index])
num_to_generate = new_soc_count - previous_soc_count
padding = np.repeat(85, num_to_generate)
extended_soc_sequence = np.concatenate((soc_sequence_list[index], padding))
return extended_soc_sequence
temp_time_sequence_list = self.temp_time_sequence_list
soc_sequence_list = self.soc_sequence_list
soc_sequence_list = [ extend_soc_time(soc_sequence_list, temp_time_sequence_list, i) for i in range(len(soc_sequence_list))]
self.soc_sequence_list = soc_sequence_list
# generate day values
def generate_day_values(self, num_discharges):
soc_sequence_list = self.soc_sequence_list
temp_sequence_list = self.temp_sequence_list
temp_time_sequence_list = self.temp_time_sequence_list
# function to give which intervals to include
# and where in the day to insert these intervals
def sample_intervals(time_sequence_list, num_discharges):
# sample with repeats from the list of discharge samples
selections = np.random.choice(range(len(soc_sequence_list)), num_discharges, replace=True)
# create soc, temp and time lists
time_list = []
for index in selections:
time_list.append(time_sequence_list[index])
total_day_time = 60 * 60 * 24 # in seconds
# function to check for overlap
def is_overlap(range1, range2):
a, b = range1
c, d = range2
return not (b <= c or d <= a)
event_duration_list = [ time_sequence[-1] - time_sequence[0] for time_sequence in time_list]
time_intervals = []
iterations = 0
max_iterations = 1000 # to ensure that it ends even if candidate cannot be found
for event_duration in event_duration_list:
while iterations < max_iterations:
iterations += 1
random_start_time = random.randint(0, total_day_time - event_duration)
proposed_range = (random_start_time, random_start_time + event_duration)
if any(is_overlap(proposed_range, time_interval) for time_interval in time_intervals):
continue
else:
time_intervals.append(proposed_range)
break
sorted_order = sorted(range(len(time_intervals)), key=lambda i: time_intervals[i][0])
selections = [ selections[i] for i in sorted_order ]
time_intervals = [ time_intervals[i] for i in sorted_order ]
return selections, time_intervals
# generate day soc values
def gen_day_soc(selections, time_intervals, soc_sequence_list, time_sequence_list):
# prepare the start of each sequence
soc_day_sequence = np.array([85])
time_day_sequence = [0]
# add each segment of interest
for index in range(len(selections)):
soc_day_sequence = np.concatenate((soc_day_sequence, soc_sequence_list[selections[index]]))
start_time = time_intervals[index][0]
time_day_sequence = time_day_sequence + [ start_time + time for time in time_sequence_list[selections[index]]]
# finishing touch
soc_day_sequence = np.concatenate((soc_day_sequence, np.array([85])))
total_day_time = 60 * 60 * 24
time_day_sequence = time_day_sequence + [total_day_time]
return soc_day_sequence, time_day_sequence
def gen_day_temp(selections, time_intervals, temp_sequence_list, time_sequence_list):
baseline_temp = 24
# prepare the start of each sequence
temp_day_sequence = np.array([baseline_temp])
time_day_sequence = [0]
# add each segment of interest
for index in range(len(selections)):
temp_day_sequence = np.concatenate((temp_day_sequence, temp_sequence_list[selections[index]]))
start_time = time_intervals[index][0]
time_day_sequence = time_day_sequence + [ start_time + time for time in time_sequence_list[selections[index]]]
# finishing touch
temp_day_sequence = np.concatenate((temp_day_sequence, np.array([baseline_temp])))
total_day_time = 60 * 60 * 24
time_day_sequence = time_day_sequence + [total_day_time]
return temp_day_sequence, time_day_sequence
selections, time_intervals = sample_intervals(temp_time_sequence_list, num_discharges)
soc_day_sequence, _ = gen_day_soc(selections, time_intervals, soc_sequence_list, temp_time_sequence_list)
temp_day_sequence, time_day_sequence = gen_day_temp(selections, time_intervals, temp_sequence_list, temp_time_sequence_list)
return soc_day_sequence, temp_day_sequence, time_day_sequence