Feat: implemented battery profile simulation via BatProfile class

This commit is contained in:
Richard Wong 2024-01-15 10:37:12 +09:00
parent 626ca54c9e
commit 698fcd2874
Signed by: richard
GPG Key ID: 5BD36BA2E9EE33D0
4 changed files with 619 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
python/__pycache__/
python/functions/__pycache__/

View File

@ -1,3 +1,10 @@
### Profile Simulation with BLAST
This project is forked from ![BLAST](https://github.com/NREL/BLAST-Lite) with
an implementation of a custom profile simulator. Refer to
`simulation_with_custom_profile.ipynb` for the usage of the custom profile. The
custom profile is implemented in `battery_profile.py`.
### BLAST-Lite
Battery Lifetime Analysis and Simulation Toolsuite (BLAST) provides a library of battery lifetime and degradation models for various commercial lithium-ion batteries from recent years. Degradation models are indentified from publically available lab-based aging data using NREL's battery life model identification toolkit. The battery life models predicted the expected lifetime of batteries used in mobile or stationary applications as functions of their temperature and use (state-of-charge, depth-of-discharge, and charge/discharge rates). Model implementation is in both Python and MATLAB programming languages. The MATLAB code also provides example applications (stationary storage and EV), climate data, and simple thermal management options. For more information on battery health diagnostics, prediction, and optimization, see [NREL's Battery Lifespan](https://www.nrel.gov/transportation/battery-lifespan.html) webpage.

298
python/battery_profile.py Normal file
View File

@ -0,0 +1,298 @@
import pandas as pd
import numpy as np
import math
import datetime
import random
class BatProfile:
# attributes
intervals = None
df = None
soc_sequence_list = []
soc_time_sequence_list = []
temp_sequence_list = []
temp_time_sequence_list = []
# initialize dataframe
def __init__(self):
# process dataframe
# file_path = "/home/richard/Projects/06_research/battery_degradation_study/battery-anomaly-detection/ISS_data/EP_Battery.Thing_HMD8310.csv"
file_path = "/home/richard/Projects/06_research/battery_degradation_study/BLAST-Lite/data/EP_Battery.Thing_HMD8310.csv"
fields = ['PACK1_CRIDATA_SOC', 'time']
df = pd.read_csv(file_path, skipinitialspace=True, usecols=fields)
df['time'] = pd.to_datetime(df['time'])
# filter only 2023 data
threshold_date = pd.to_datetime('2023-01-01').tz_localize('UTC')
df = df[df['time'] >= threshold_date].reset_index(drop = True)
df[fields[0]] = df[fields[0]].replace(86, 85)
self.df = df
# methods
# obtain clean intervals from data
def init_intervals(self):
# this dataset contains the full data for all 7 packs
df = self.df
fields = ['PACK1_CRIDATA_SOC', 'time']
def find_intervals_below_threshold(data, threshold):
below_threshold = data < threshold
# shift series by one, then take bitwise AND, only start points will be 1
starts = np.where(below_threshold & ~np.roll(below_threshold, 1))[0]
ends = np.where(below_threshold & ~np.roll(below_threshold, -1))[0]
if below_threshold[0]: # case when first value already is in interval
starts = np.insert(starts, 0, 0)
if below_threshold[len(below_threshold)-1]: # case when last value is also in interval
ends = np.append(ends, len(data) - 1)
intervals = list(zip(starts, ends))
return intervals
# intervals time length
def filter_time_length(intervals):
return [(x,y) for x,y in intervals if ((df[fields[1]][y] - df[fields[1]][x]) > datetime.timedelta(minutes=10) and
(df[fields[1]][y] - df[fields[1]][x]) < datetime.timedelta(hours=3))]
# intervals depth
def interval_depth(intervals):
# Find the minimum value within the specified range
return [(x,y) for x,y in intervals if (np.min(df[fields[0]][x:y+1]) < 75) and (np.min(df[fields[0]][x:y+1]) > 5)]
def has_only_horizontal_line(series, start_index, end_index):
interval = series[start_index+1:end_index]
gradient_series = np.gradient(interval)
return all(value == 0 for value in gradient_series)
def filter_only_horizontal(intervals):
return [(x,y) for x,y in intervals if not has_only_horizontal_line(df['PACK1_CRIDATA_SOC'], x, y)]
def is_valley(time_series):
gradient = np.gradient(time_series)
has_negative_value = np.any(gradient < 0)
has_positive_value = np.any(gradient > 0)
return has_negative_value and has_positive_value
def filter_valley(intervals):
return [(x,y) for x,y in intervals if is_valley(df['PACK1_CRIDATA_SOC'][x:y])]
bounding_threshold = 80
intervals = find_intervals_below_threshold(df[fields[0]], bounding_threshold)
intervals = filter_time_length(intervals)
intervals = interval_depth(intervals)
intervals = filter_only_horizontal(intervals)
intervals = filter_valley(intervals)
self.intervals = intervals
# method to ensure that each interval soc begins and ends at 85
# this means extending both the soc and time sequences
def preprocess_soc_intervals(self):
intervals = self.intervals
df = self.df
raw_time_list = [df["time"][start:end].reset_index(drop = True) for start,end in intervals]
raw_soc_list = [df['PACK1_CRIDATA_SOC'][start:end].reset_index(drop = True) for start,end in intervals]
def extend_segment(discharge_soc_list, discharge_time_list, index):
soc_sequence = discharge_soc_list[index]
discharge_time_sequence_datetime = discharge_time_list[index]
start_time = discharge_time_sequence_datetime[0]
discharge_time_sequence = [ int((time - start_time).total_seconds()) for time in discharge_time_sequence_datetime ]
num_points = 5
num_extrapolate = 100
# extend the series in the beginning to 85%
coefficients = np.polyfit(np.arange(num_points), soc_sequence[:num_points], 1)
extended_points = np.polyval(coefficients, np.arange(-1, -num_extrapolate-1, -1))
extended_points = np.clip(extended_points, None, 85) # ensure values reach 85, but not over
# Find the index where differences start repeating
repeating_index = np.where(np.diff(extended_points) != 0)[0][-1] + 2
# Truncate the array to remove repeating values at the end
extended_points = extended_points[:repeating_index]
extended_len_1 = len(extended_points)
extended_soc_sequence = np.concatenate((extended_points[::-1], soc_sequence))
discharge_time_sequence = [ 60 * time for time in range(-extended_len_1, 0)] + discharge_time_sequence
# extend the series after the end to 85%
coefficients = np.polyfit(np.arange(-num_points, 0), soc_sequence[-num_points:], 1)
extended_points = np.polyval(coefficients, np.arange(1, num_extrapolate+1))
extended_points = np.clip(extended_points, None, 85) # ensure values reach 85, but not over
# Find the index where differences start repeating
repeating_index = np.where(np.diff(extended_points) != 0)[0][-1] + 2
# Truncate the array to remove repeating values at the end
extended_points = extended_points[:repeating_index]
extended_len = len(extended_points)
extended_soc_sequence = np.concatenate((extended_soc_sequence, extended_points,))
end_time = discharge_time_sequence[-1]
discharge_time_sequence = discharge_time_sequence + [ end_time + 60 * time for time in range(1,extended_len+1)]
# reset index to start at 0
discharge_time_sequence = [ time + 60 * extended_len_1 for time in discharge_time_sequence ]
# return the modified soc and time series
return extended_soc_sequence, discharge_time_sequence
# process intervals to start and end at 85
soc_time_sequence_list = []
soc_sequence_list = []
for index in range(len(intervals)):
soc_sequence, time_sequence = extend_segment(raw_soc_list, raw_time_list, index)
soc_sequence_list.append(soc_sequence)
soc_time_sequence_list.append(time_sequence)
# save into class variable
self.soc_sequence_list = soc_sequence_list
self.soc_time_sequence_list = soc_time_sequence_list
# method to generate temperature sequence for each of the intervals
def temp_sequence_generation(self):
intervals = self.intervals
soc_time_sequence_list = self.soc_time_sequence_list
def gen_temp_sequence(soc_time_sequence, start_temp):
baseline_temp = 24.0
max_temp = 30.0
temp_rate = 4 / 60
# we will increase the temperature at a rate of 4/60 degrees per minutes
# we will then clip at 30
# we use the whole soc discharge+charge interval as the entire warm-up period
num_gen = len(soc_time_sequence)
x = np.linspace(0, 10, 20)
known_gradient = temp_rate
y = baseline_temp + known_gradient * x
# Use polyfit with deg=1 (linear polynomial) and known gradient as the weight for the first coefficient
coefficients = np.polyfit(x, y, deg=1, w=[known_gradient] * len(x))
warmup_temp = np.polyval(coefficients, np.arange(1, num_gen))
warmup_temp = np.clip(warmup_temp, None, max_temp)
# we then use the last temperature as the start of the cooldown phase
start_temp = warmup_temp[-1]
x = np.linspace(0, 10, 20)
known_gradient = temp_rate # Replace this with your known gradient
y = start_temp - known_gradient * x
coefficients = np.polyfit(x, y, deg=1, w=[known_gradient] * len(x))
cooldown_temp = np.polyval(coefficients, np.arange(1, 100))
cooldown_temp = np.clip(cooldown_temp, baseline_temp, None)
# find where there is no change
# take difference in consecutive elements
# note that numpy returns a tuple of elements, we only need the first
# take the last element of the array
repeating_index = np.where(np.diff(cooldown_temp) != 0)[0][-1]
# Truncate the array to remove repeating values at the end
cooldown_temp = cooldown_temp[:repeating_index]
temp_sequence = np.concatenate((warmup_temp, cooldown_temp))
temp_time_sequence = [ 60 * time for time in range(0,len(temp_sequence))]
return temp_sequence, temp_time_sequence
temp_time_sequence_list = []
temp_sequence_list = []
baseline_temp = 24
for index in range(len(intervals)):
temp_sequence, time_sequence = gen_temp_sequence(soc_time_sequence_list[index], baseline_temp)
temp_sequence_list.append(temp_sequence)
temp_time_sequence_list.append(time_sequence)
self.temp_sequence_list = temp_sequence_list
self.temp_time_sequence_list = temp_time_sequence_list
# there is a mismatch in number of values between soc and temp
# we will pad soc sequence to match that of temp
def process_soc_time(self):
# we will pad the end soc values with 85
# ensure that the number of values matches that of temperature sequence
def extend_soc_time(soc_sequence_list, temp_time_sequence_list, index):
previous_soc_count = len(soc_sequence_list[index])
new_soc_count = len(temp_time_sequence_list[index])
num_to_generate = new_soc_count - previous_soc_count
padding = np.repeat(85, num_to_generate)
extended_soc_sequence = np.concatenate((soc_sequence_list[index], padding))
return extended_soc_sequence
temp_time_sequence_list = self.temp_time_sequence_list
soc_sequence_list = self.soc_sequence_list
soc_sequence_list = [ extend_soc_time(soc_sequence_list, temp_time_sequence_list, i) for i in range(len(soc_sequence_list))]
self.soc_sequence_list = soc_sequence_list
# generate day values
def generate_day_values(self, num_discharges):
soc_sequence_list = self.soc_sequence_list
temp_sequence_list = self.temp_sequence_list
temp_time_sequence_list = self.temp_time_sequence_list
# function to give which intervals to include
# and where in the day to insert these intervals
def sample_intervals(time_sequence_list, num_discharges):
# sample with repeats from the list of discharge samples
selections = np.random.choice(range(len(soc_sequence_list)), num_discharges, replace=True)
# create soc, temp and time lists
time_list = []
for index in selections:
time_list.append(time_sequence_list[index])
total_day_time = 60 * 60 * 24 # in seconds
# function to check for overlap
def is_overlap(range1, range2):
a, b = range1
c, d = range2
return not (b <= c or d <= a)
event_duration_list = [ time_sequence[-1] - time_sequence[0] for time_sequence in time_list]
time_intervals = []
iterations = 0
max_iterations = 1000 # to ensure that it ends even if candidate cannot be found
for event_duration in event_duration_list:
while iterations < max_iterations:
iterations += 1
random_start_time = random.randint(0, total_day_time - event_duration)
proposed_range = (random_start_time, random_start_time + event_duration)
if any(is_overlap(proposed_range, time_interval) for time_interval in time_intervals):
continue
else:
time_intervals.append(proposed_range)
break
sorted_order = sorted(range(len(time_intervals)), key=lambda i: time_intervals[i][0])
selections = [ selections[i] for i in sorted_order ]
time_intervals = [ time_intervals[i] for i in sorted_order ]
return selections, time_intervals
# generate day soc values
def gen_day_soc(selections, time_intervals, soc_sequence_list, time_sequence_list):
# prepare the start of each sequence
soc_day_sequence = np.array([85])
time_day_sequence = [0]
# add each segment of interest
for index in range(len(selections)):
soc_day_sequence = np.concatenate((soc_day_sequence, soc_sequence_list[selections[index]]))
start_time = time_intervals[index][0]
time_day_sequence = time_day_sequence + [ start_time + time for time in time_sequence_list[selections[index]]]
# finishing touch
soc_day_sequence = np.concatenate((soc_day_sequence, np.array([85])))
total_day_time = 60 * 60 * 24
time_day_sequence = time_day_sequence + [total_day_time]
return soc_day_sequence, time_day_sequence
def gen_day_temp(selections, time_intervals, temp_sequence_list, time_sequence_list):
baseline_temp = 24
# prepare the start of each sequence
temp_day_sequence = np.array([baseline_temp])
time_day_sequence = [0]
# add each segment of interest
for index in range(len(selections)):
temp_day_sequence = np.concatenate((temp_day_sequence, temp_sequence_list[selections[index]]))
start_time = time_intervals[index][0]
time_day_sequence = time_day_sequence + [ start_time + time for time in time_sequence_list[selections[index]]]
# finishing touch
temp_day_sequence = np.concatenate((temp_day_sequence, np.array([baseline_temp])))
total_day_time = 60 * 60 * 24
time_day_sequence = time_day_sequence + [total_day_time]
return temp_day_sequence, time_day_sequence
selections, time_intervals = sample_intervals(temp_time_sequence_list, num_discharges)
soc_day_sequence, _ = gen_day_soc(selections, time_intervals, soc_sequence_list, temp_time_sequence_list)
temp_day_sequence, time_day_sequence = gen_day_temp(selections, time_intervals, temp_sequence_list, temp_time_sequence_list)
return soc_day_sequence, temp_day_sequence, time_day_sequence

File diff suppressed because one or more lines are too long