import pandas as pd import numpy as np import math import datetime import random class BatProfile: # attributes intervals = None df = None soc_sequence_list = [] soc_time_sequence_list = [] temp_sequence_list = [] temp_time_sequence_list = [] # initialize dataframe def __init__(self): # process dataframe # file_path = "/home/richard/Projects/06_research/battery_degradation_study/battery-anomaly-detection/ISS_data/EP_Battery.Thing_HMD8310.csv" file_path = "path/EP_Battery.Thing_HMD8310.csv" fields = ['PACK1_CRIDATA_SOC', 'time'] df = pd.read_csv(file_path, skipinitialspace=True, usecols=fields) df['time'] = pd.to_datetime(df['time']) # filter only 2023 data threshold_date = pd.to_datetime('2023-01-01').tz_localize('UTC') df = df[df['time'] >= threshold_date].reset_index(drop = True) df[fields[0]] = df[fields[0]].replace(86, 85) self.df = df # methods # obtain clean intervals from data def init_intervals(self): # this dataset contains the full data for all 7 packs df = self.df fields = ['PACK1_CRIDATA_SOC', 'time'] def find_intervals_below_threshold(data, threshold): below_threshold = data < threshold # shift series by one, then take bitwise AND, only start points will be 1 starts = np.where(below_threshold & ~np.roll(below_threshold, 1))[0] ends = np.where(below_threshold & ~np.roll(below_threshold, -1))[0] if below_threshold[0]: # case when first value already is in interval starts = np.insert(starts, 0, 0) if below_threshold[len(below_threshold)-1]: # case when last value is also in interval ends = np.append(ends, len(data) - 1) intervals = list(zip(starts, ends)) return intervals # intervals time length def filter_time_length(intervals): return [(x,y) for x,y in intervals if ((df[fields[1]][y] - df[fields[1]][x]) > datetime.timedelta(minutes=10) and (df[fields[1]][y] - df[fields[1]][x]) < datetime.timedelta(hours=3))] # intervals depth def interval_depth(intervals): # Find the minimum value within the specified range return [(x,y) for x,y in intervals if (np.min(df[fields[0]][x:y+1]) < 75) and (np.min(df[fields[0]][x:y+1]) > 5)] def has_only_horizontal_line(series, start_index, end_index): interval = series[start_index+1:end_index] gradient_series = np.gradient(interval) return all(value == 0 for value in gradient_series) def filter_only_horizontal(intervals): return [(x,y) for x,y in intervals if not has_only_horizontal_line(df['PACK1_CRIDATA_SOC'], x, y)] def is_valley(time_series): gradient = np.gradient(time_series) has_negative_value = np.any(gradient < 0) has_positive_value = np.any(gradient > 0) return has_negative_value and has_positive_value def filter_valley(intervals): return [(x,y) for x,y in intervals if is_valley(df['PACK1_CRIDATA_SOC'][x:y])] bounding_threshold = 80 intervals = find_intervals_below_threshold(df[fields[0]], bounding_threshold) intervals = filter_time_length(intervals) intervals = interval_depth(intervals) intervals = filter_only_horizontal(intervals) intervals = filter_valley(intervals) self.intervals = intervals # method to ensure that each interval soc begins and ends at 85 # this means extending both the soc and time sequences def preprocess_soc_intervals(self): intervals = self.intervals df = self.df raw_time_list = [df["time"][start:end].reset_index(drop = True) for start,end in intervals] raw_soc_list = [df['PACK1_CRIDATA_SOC'][start:end].reset_index(drop = True) for start,end in intervals] def extend_segment(discharge_soc_list, discharge_time_list, index): soc_sequence = discharge_soc_list[index] discharge_time_sequence_datetime = discharge_time_list[index] start_time = discharge_time_sequence_datetime[0] discharge_time_sequence = [ int((time - start_time).total_seconds()) for time in discharge_time_sequence_datetime ] num_points = 5 num_extrapolate = 100 # extend the series in the beginning to 85% coefficients = np.polyfit(np.arange(num_points), soc_sequence[:num_points], 1) extended_points = np.polyval(coefficients, np.arange(-1, -num_extrapolate-1, -1)) extended_points = np.clip(extended_points, None, 85) # ensure values reach 85, but not over # Find the index where differences start repeating repeating_index = np.where(np.diff(extended_points) != 0)[0][-1] + 2 # Truncate the array to remove repeating values at the end extended_points = extended_points[:repeating_index] extended_len_1 = len(extended_points) extended_soc_sequence = np.concatenate((extended_points[::-1], soc_sequence)) discharge_time_sequence = [ 60 * time for time in range(-extended_len_1, 0)] + discharge_time_sequence # extend the series after the end to 85% coefficients = np.polyfit(np.arange(-num_points, 0), soc_sequence[-num_points:], 1) extended_points = np.polyval(coefficients, np.arange(1, num_extrapolate+1)) extended_points = np.clip(extended_points, None, 85) # ensure values reach 85, but not over # Find the index where differences start repeating repeating_index = np.where(np.diff(extended_points) != 0)[0][-1] + 2 # Truncate the array to remove repeating values at the end extended_points = extended_points[:repeating_index] extended_len = len(extended_points) extended_soc_sequence = np.concatenate((extended_soc_sequence, extended_points,)) end_time = discharge_time_sequence[-1] discharge_time_sequence = discharge_time_sequence + [ end_time + 60 * time for time in range(1,extended_len+1)] # reset index to start at 0 discharge_time_sequence = [ time + 60 * extended_len_1 for time in discharge_time_sequence ] # return the modified soc and time series return extended_soc_sequence, discharge_time_sequence # process intervals to start and end at 85 soc_time_sequence_list = [] soc_sequence_list = [] for index in range(len(intervals)): soc_sequence, time_sequence = extend_segment(raw_soc_list, raw_time_list, index) soc_sequence_list.append(soc_sequence) soc_time_sequence_list.append(time_sequence) # save into class variable self.soc_sequence_list = soc_sequence_list self.soc_time_sequence_list = soc_time_sequence_list # method to generate temperature sequence for each of the intervals def temp_sequence_generation(self): intervals = self.intervals soc_time_sequence_list = self.soc_time_sequence_list def gen_temp_sequence(soc_time_sequence, start_temp): baseline_temp = 24.0 max_temp = 30.0 temp_rate = 4 / 60 # we will increase the temperature at a rate of 4/60 degrees per minutes # we will then clip at 30 # we use the whole soc discharge+charge interval as the entire warm-up period num_gen = len(soc_time_sequence) x = np.linspace(0, 10, 20) known_gradient = temp_rate y = baseline_temp + known_gradient * x # Use polyfit with deg=1 (linear polynomial) and known gradient as the weight for the first coefficient coefficients = np.polyfit(x, y, deg=1, w=[known_gradient] * len(x)) warmup_temp = np.polyval(coefficients, np.arange(1, num_gen)) warmup_temp = np.clip(warmup_temp, None, max_temp) # we then use the last temperature as the start of the cooldown phase start_temp = warmup_temp[-1] x = np.linspace(0, 10, 20) known_gradient = temp_rate # Replace this with your known gradient y = start_temp - known_gradient * x coefficients = np.polyfit(x, y, deg=1, w=[known_gradient] * len(x)) cooldown_temp = np.polyval(coefficients, np.arange(1, 100)) cooldown_temp = np.clip(cooldown_temp, baseline_temp, None) # find where there is no change # take difference in consecutive elements # note that numpy returns a tuple of elements, we only need the first # take the last element of the array repeating_index = np.where(np.diff(cooldown_temp) != 0)[0][-1] # Truncate the array to remove repeating values at the end cooldown_temp = cooldown_temp[:repeating_index] temp_sequence = np.concatenate((warmup_temp, cooldown_temp)) temp_time_sequence = [ 60 * time for time in range(0,len(temp_sequence))] return temp_sequence, temp_time_sequence temp_time_sequence_list = [] temp_sequence_list = [] baseline_temp = 24 for index in range(len(intervals)): temp_sequence, time_sequence = gen_temp_sequence(soc_time_sequence_list[index], baseline_temp) temp_sequence_list.append(temp_sequence) temp_time_sequence_list.append(time_sequence) self.temp_sequence_list = temp_sequence_list self.temp_time_sequence_list = temp_time_sequence_list # there is a mismatch in number of values between soc and temp # we will pad soc sequence to match that of temp def process_soc_time(self): # we will pad the end soc values with 85 # ensure that the number of values matches that of temperature sequence def extend_soc_time(soc_sequence_list, temp_time_sequence_list, index): previous_soc_count = len(soc_sequence_list[index]) new_soc_count = len(temp_time_sequence_list[index]) num_to_generate = new_soc_count - previous_soc_count padding = np.repeat(85, num_to_generate) extended_soc_sequence = np.concatenate((soc_sequence_list[index], padding)) return extended_soc_sequence temp_time_sequence_list = self.temp_time_sequence_list soc_sequence_list = self.soc_sequence_list soc_sequence_list = [ extend_soc_time(soc_sequence_list, temp_time_sequence_list, i) for i in range(len(soc_sequence_list))] self.soc_sequence_list = soc_sequence_list # generate day values def generate_day_values(self, num_discharges): soc_sequence_list = self.soc_sequence_list temp_sequence_list = self.temp_sequence_list temp_time_sequence_list = self.temp_time_sequence_list # function to give which intervals to include # and where in the day to insert these intervals def sample_intervals(time_sequence_list, num_discharges): # sample with repeats from the list of discharge samples selections = np.random.choice(range(len(soc_sequence_list)), num_discharges, replace=True) # create soc, temp and time lists time_list = [] for index in selections: time_list.append(time_sequence_list[index]) total_day_time = 60 * 60 * 24 # in seconds # function to check for overlap def is_overlap(range1, range2): a, b = range1 c, d = range2 return not (b <= c or d <= a) event_duration_list = [ time_sequence[-1] - time_sequence[0] for time_sequence in time_list] time_intervals = [] iterations = 0 max_iterations = 1000 # to ensure that it ends even if candidate cannot be found for event_duration in event_duration_list: while iterations < max_iterations: iterations += 1 random_start_time = random.randint(0, total_day_time - event_duration) proposed_range = (random_start_time, random_start_time + event_duration) if any(is_overlap(proposed_range, time_interval) for time_interval in time_intervals): continue else: time_intervals.append(proposed_range) break sorted_order = sorted(range(len(time_intervals)), key=lambda i: time_intervals[i][0]) selections = [ selections[i] for i in sorted_order ] time_intervals = [ time_intervals[i] for i in sorted_order ] return selections, time_intervals # generate day soc values def gen_day_soc(selections, time_intervals, soc_sequence_list, time_sequence_list): # prepare the start of each sequence soc_day_sequence = np.array([85]) time_day_sequence = [0] # add each segment of interest for index in range(len(selections)): soc_day_sequence = np.concatenate((soc_day_sequence, soc_sequence_list[selections[index]])) start_time = time_intervals[index][0] time_day_sequence = time_day_sequence + [ start_time + time for time in time_sequence_list[selections[index]]] # finishing touch soc_day_sequence = np.concatenate((soc_day_sequence, np.array([85]))) total_day_time = 60 * 60 * 24 time_day_sequence = time_day_sequence + [total_day_time] return soc_day_sequence, time_day_sequence def gen_day_temp(selections, time_intervals, temp_sequence_list, time_sequence_list): baseline_temp = 24 # prepare the start of each sequence temp_day_sequence = np.array([baseline_temp]) time_day_sequence = [0] # add each segment of interest for index in range(len(selections)): temp_day_sequence = np.concatenate((temp_day_sequence, temp_sequence_list[selections[index]])) start_time = time_intervals[index][0] time_day_sequence = time_day_sequence + [ start_time + time for time in time_sequence_list[selections[index]]] # finishing touch temp_day_sequence = np.concatenate((temp_day_sequence, np.array([baseline_temp]))) total_day_time = 60 * 60 * 24 time_day_sequence = time_day_sequence + [total_day_time] return temp_day_sequence, time_day_sequence selections, time_intervals = sample_intervals(temp_time_sequence_list, num_discharges) soc_day_sequence, _ = gen_day_soc(selections, time_intervals, soc_sequence_list, temp_time_sequence_list) temp_day_sequence, time_day_sequence = gen_day_temp(selections, time_intervals, temp_sequence_list, temp_time_sequence_list) return soc_day_sequence, temp_day_sequence, time_day_sequence