utils
1from datetime import datetime, time, timedelta 2import json 3import random 4import numpy as np 5import pandas as pd 6import ast 7import scipy 8import calendar 9from numpy.random import SeedSequence, default_rng 10from scipy.stats import ( 11 poisson, bernoulli, triang, erlang, weibull_min, exponweib, 12 betabinom, pearson3, cauchy, chi2, expon, exponpow, 13 gamma, lognorm, norm, powerlaw, rayleigh, uniform 14) 15import logging 16 17class Utils: 18 19 RESULTS_FOLDER = 'data' 20 ALL_RESULTS_CSV = f'{RESULTS_FOLDER}/all_results.csv' 21 RUN_RESULTS_CSV = f'{RESULTS_FOLDER}/run_results.csv' 22 HISTORICAL_FOLDER = 'historical_data' 23 DISTRIBUTION_FOLDER = 'distribution_data' 24 25 # External file containing details of resources 26 # hours of operation and servicing schedules 27 HEMS_ROTA_DEFAULT = pd.read_csv('actual_data/HEMS_ROTA_DEFAULT.csv') 28 HEMS_ROTA = pd.read_csv('actual_data/HEMS_ROTA.csv') 29 30 SERVICING_SCHEDULES_BY_MODEL = pd.read_csv('actual_data/service_schedules_by_model.csv') 31 32 TIME_TYPES = ["call start", "mobile", "at scene", "leaving scene", "at hospital", "handover", "clear", "stand down"] 33 34 35 def __init__(self, master_seed=SeedSequence(42), print_debug_messages=False): 36 37 # Record the primary master seed sequence passed in to the sequence 38 self.master_seed_sequence = master_seed 39 40 ############################### 41 # Set up remaining attributes # 42 ############################### 43 self.print_debug_messages = print_debug_messages 44 45 # Load in mean inter_arrival_times 46 self.hourly_arrival_by_qtr_probs_df = pd.read_csv('distribution_data/hourly_arrival_by_qtr_probs.csv') 47 self.hour_by_ampds_df = pd.read_csv('distribution_data/hour_by_ampds_card_probs.csv') 48 self.sex_by_ampds_df = pd.read_csv('distribution_data/sex_by_ampds_card_probs.csv') 49 self.care_cat_by_ampds_df = pd.read_csv('distribution_data/enhanced_or_critical_care_by_ampds_card_probs.csv') 50 # self.callsign_by_care_category_df = pd.read_csv('distribution_data/callsign_group_by_care_category_probs.csv') 51 self.callsign_group_df = pd.read_csv('distribution_data/callsign_group_probs.csv') 52 # New addition without stratification by month 53 self.vehicle_type_df = pd.read_csv('distribution_data/vehicle_type_probs.csv') 54 self.vehicle_type_quarter_df = pd.read_csv('distribution_data/vehicle_type_by_quarter_probs.csv') 55 56 # ========= ARCHIVED CODE ================== ## 57 # self.vehicle_type_by_month_df = pd.read_csv('distribution_data/vehicle_type_by_month_probs.csv') 58 # self.inter_arrival_rate_df = pd.read_csv('distribution_data/inter_arrival_times.csv') 59 # self.callsign_by_ampds_and_hour_df = pd.read_csv('distribution_data/callsign_group_by_ampds_card_and_hour_probs.csv') 60 # self.callsign_by_ampds_df = pd.read_csv('distribution_data/callsign_group_by_ampds_card_probs.csv') 61 # self.hems_result_by_callsign_group_and_vehicle_type_df = pd.read_csv('distribution_data/hems_result_by_callsign_group_and_vehicle_type_probs.csv') 62 # self.hems_result_by_care_category_and_helicopter_benefit_df = pd.read_csv('distribution_data/hems_result_by_care_cat_and_helicopter_benefit_probs.csv') 63 # self.pt_outcome_by_hems_result_and_care_category_df = pd.read_csv('distribution_data/pt_outcome_by_hems_result_and_care_category_probs.csv') 64 # ========= END ARCHIVED CODE ============== # 65 66 # NEW PATIENT OUTCOME AND HEMS RESULT DATA FRAMES 67 self.patient_outcome_by_care_category_and_quarter_probs_df = pd.read_csv('distribution_data/patient_outcome_by_care_category_and_quarter_probs.csv') 68 # self.hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_probs_df = pd.read_csv('distribution_data/hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_probs.csv') 69 self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df = pd.read_csv('distribution_data/hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs.csv') 70 71 72 # Import maximum call duration times 73 self.min_max_values_df = pd.read_csv('actual_data/upper_allowable_time_bounds.csv') 74 75 # Load ad hoc unavailability probability table 76 try: 77 # This file may not exist depending on when utility is called e.g. fitting 78 self.ad_hoc_probs = pd.read_csv("distribution_data/ad_hoc_unavailability.csv") 79 # Remove ad-hoc probs for any times outside of historical 80 except: 81 self.debug('ad_hoc spreadsheet does not exist yet') 82 83 # Read in age distribution data into a dictionary 84 age_data = [] 85 with open("distribution_data/age_distributions.txt", "r") as inFile: 86 age_data = ast.literal_eval(inFile.read()) 87 inFile.close() 88 self.age_distr = age_data 89 90 # Read in activity time distribution data into a dictionary 91 activity_time_data = [] 92 with open("distribution_data/activity_time_distributions.txt", "r") as inFile: 93 activity_time_data = ast.literal_eval(inFile.read()) 94 inFile.close() 95 self.activity_time_distr = activity_time_data 96 97 # Read in incident per day distribution data into a dictionary 98 inc_per_day_data = [] 99 with open("distribution_data/inc_per_day_distributions.txt", "r") as inFile: 100 inc_per_day_data = ast.literal_eval(inFile.read()) 101 inFile.close() 102 self.inc_per_day_distr = inc_per_day_data 103 104 inc_per_day_per_qtr_data = [] 105 with open("distribution_data/inc_per_day_qtr_distributions.txt", "r") as inFile: 106 inc_per_day_per_qtr_data = ast.literal_eval(inFile.read()) 107 inFile.close() 108 self.inc_per_day_per_qtr_distr = inc_per_day_per_qtr_data 109 110 # Incident per day samples 111 with open('distribution_data/inc_per_day_samples.json', 'r') as f: 112 self.incident_per_day_samples = json.load(f) 113 114 # Turn the min-max activity times data into a format that supports easier/faster 115 # lookups 116 self.min_max_cache = { 117 row['time']: (row['min_value_mins'], row['max_value_mins']) 118 for _, row in self.min_max_values_df.iterrows() 119 } 120 121 def setup_seeds(self): 122 ######################################################### 123 # Control generation of required random number streams # 124 ######################################################### 125 126 # Spawn substreams for major simulation modules 127 module_keys = [ 128 "activity_times", 129 "ampds_code_selection", 130 "care_category_selection", 131 "callsign_group_selection", 132 "vehicle_type_selection", 133 "hems_result_by_callsign_group_and_vehicle_type_selection", 134 "hems_result_by_care_category_and_helicopter_benefit_selection", 135 "hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_selection", 136 "pt_outcome_selection", 137 "sex_selection", 138 "age_sampling", 139 "calls_per_day", 140 "calls_per_hour", 141 "predetermine_call_arrival", 142 "call_iat", 143 "helicopter_benefit_from_reg", 144 "hems_case", 145 "ad_hoc_reason_selection", 146 "know_heli_benefit", 147 "helicopter_benefit_from_cc", 148 "helicopter_benefit_from_ec", 149 "know_cc_ec_benefit", 150 ] 151 # Efficiently spawn substreams 152 spawned = self.master_seed_sequence.spawn(len(module_keys)) 153 154 # Map substreams and RNGs to keys 155 self.seed_substreams = dict(zip(module_keys, spawned)) 156 self.rngs = {key: default_rng(ss) for key, ss in self.seed_substreams.items()} 157 158 self.build_seeded_distributions(self.master_seed_sequence) 159 160 161 def debug(self, message: str): 162 if self.print_debug_messages: 163 logging.debug(message) 164 165 def current_time() -> str: 166 """ 167 Return the current time as a string in the format hh:mm:ss 168 """ 169 now = datetime.now() 170 return now.strftime("%H:%M:%S") 171 172 def date_time_of_call(self, start_dt: str, elapsed_time: int) -> list[int, int, str, int, pd.Timestamp]: 173 """ 174 Calculate a range of time-based parameters given a specific date-time 175 176 **Returns:** 177 list( 178 `dow` : int 179 `current_hour` : int 180 `weekday` : str 181 `current_month` : int 182 `current_quarter` : int 183 `current_dt` : datetime 184 ) 185 186 """ 187 # Elapsed_time = time in minutes since simulation started 188 189 start_dt = pd.to_datetime(start_dt) 190 191 current_dt = start_dt + pd.Timedelta(elapsed_time, unit='min') 192 193 dow = current_dt.strftime('%a') 194 # 0 = Monday, 6 = Sunday 195 weekday = "weekday" if current_dt.dayofweek < 5 else "weekend" 196 197 current_hour = current_dt.hour 198 199 current_month = current_dt.month 200 201 current_quarter = current_dt.quarter 202 203 return [dow, current_hour, weekday, current_month, current_quarter, current_dt] 204 205 # SR 2025-04-17: Commenting out as I believe this is no longer used due to changes 206 # in the way inter-arrival times for calls are managed 207 # def inter_arrival_rate(self, hour: int, quarter: int) -> float: 208 # """ 209 # This function will return the current mean inter_arrival rate in minutes 210 # for the provided hour of day and yearly quarter 211 212 # NOTE: not used with NSPPThinning 213 # """ 214 215 # #print(f"IA with values hour {hour} and quarter {quarter}") 216 # df = self.inter_arrival_rate_df 217 # mean_ia = df[(df['hour'] == hour) & (df['quarter'] == quarter)]['mean_iat'] 218 219 # # Currently have issue in that if hour and quarter not in data e.g. 0200 in quarter 3 220 # # then iloc value broken. Set default to 120 in that case. 221 222 # return 120 if len(mean_ia) == 0 else mean_ia.iloc[0] 223 # #return mean_ia.iloc[0] 224 225 226 def ampds_code_selection(self, hour: int) -> int: 227 """ 228 This function will allocate and return an AMPDS card category 229 based on the hour of day 230 """ 231 232 df = self.hour_by_ampds_df[self.hour_by_ampds_df['hour'] == hour] 233 234 return pd.Series.sample(df['ampds_card'], weights = df['proportion'], 235 random_state=self.rngs["ampds_code_selection"]).iloc[0] 236 237 238 def is_time_in_range(self, current: int, start: int, end: int) -> bool: 239 """ 240 Function to check if a given time is within a range of start and end times on a 24-hour clock. 241 242 Parameters: 243 - current (datetime.time): The time to check. 244 - start (datetime.time): The start time. 245 - end (datetime.time): The end time. 246 247 """ 248 249 current = time(current, 0) 250 start = time(start, 0) 251 end = time(end, 0) 252 253 if start <= end: 254 # Range does not cross midnight 255 return start <= current < end 256 else: 257 # Range crosses midnight 258 return current >= start or current < end 259 260 def care_category_selection(self, ampds_card: str) -> str: 261 """ 262 This function will allocate and return an care category 263 based on AMPDS card 264 """ 265 266 #print(f"Callsign group selection with {hour} and {ampds_card}") 267 268 df = self.care_cat_by_ampds_df[ 269 (self.care_cat_by_ampds_df['ampds_card'] == ampds_card) 270 ] 271 272 return pd.Series.sample(df['care_category'], weights = df['proportion'], 273 random_state=self.rngs["care_category_selection"]).iloc[0] 274 275 # def LEGACY_callsign_group_selection(self, ampds_card: str) -> int: 276 # """ 277 # This function will allocate and return an callsign group 278 # based on AMPDS card 279 # """ 280 281 # #print(f"Callsign group selection with {hour} and {ampds_card}") 282 283 # df = self.callsign_by_ampds_df[ 284 # (self.callsign_by_ampds_df['ampds_card'] == ampds_card) 285 # ] 286 287 # return pd.Series.sample(df['callsign_group'], weights = df['proportion'], 288 # random_state=self.rngs["callsign_group_selection"]).iloc[0] 289 290 # def callsign_group_selection(self, care_category: str) -> int: 291 # """ 292 # This function will allocate and return an callsign group 293 # based on the care category 294 # """ 295 296 # #print(f"Callsign group selection with {hour} and {ampds_card}") 297 298 # df = self.callsign_by_care_category_df[ 299 # (self.callsign_by_care_category_df['care_category'] == care_category) 300 # ] 301 302 # return pd.Series.sample(df['callsign_group'], weights = df['proportion'], 303 # random_state=self.rngs["callsign_group_selection"]).iloc[0] 304 305 def callsign_group_selection(self) -> int: 306 """ 307 This function will allocate and return an callsign group 308 based on the care category 309 """ 310 311 #print(f"Callsign group selection with {hour} and {ampds_card}") 312 313 return pd.Series.sample(self.callsign_group_df['callsign_group'], weights = self.callsign_group_df['proportion'], 314 random_state=self.rngs["callsign_group_selection"]).iloc[0] 315 316 # def vehicle_type_selection(self, callsign_group: str) -> int: 317 # """ 318 # This function will allocate and return a vehicle type 319 # based callsign group 320 # """ 321 322 # df = self.vehicle_type_df[ 323 # (self.vehicle_type_df['callsign_group'] == int(callsign_group)) # Cater for Other 324 # ] 325 326 # return pd.Series.sample(df['vehicle_type'], weights = df['proportion'], 327 # random_state=self.rngs["vehicle_type_selection"]).iloc[0] 328 329 def vehicle_type_selection_qtr(self, callsign_group: str, qtr: int) -> int: 330 """ 331 This function will allocate and return a vehicle type 332 based callsign group 333 """ 334 335 df = self.vehicle_type_quarter_df[ 336 (self.vehicle_type_quarter_df['callsign_group'] == int(callsign_group)) & # Cater for Other 337 (self.vehicle_type_quarter_df['quarter'] == int(qtr)) 338 ] 339 340 return pd.Series.sample(df['vehicle_type'], weights = df['proportion'], 341 random_state=self.rngs["vehicle_type_selection"]).iloc[0] 342 343 344 # def hems_result_by_callsign_group_and_vehicle_type_selection(self, callsign_group: str, vehicle_type: str) -> str: 345 # """ 346 # This function will allocate a HEMS result based on callsign group and vehicle type 347 # """ 348 349 # df = self.hems_result_by_callsign_group_and_vehicle_type_df[ 350 # (self.hems_result_by_callsign_group_and_vehicle_type_df['callsign_group'] == int(callsign_group)) & 351 # (self.hems_result_by_callsign_group_and_vehicle_type_df['vehicle_type'] == vehicle_type) 352 # ] 353 354 # return pd.Series.sample(df['hems_result'], weights = df['proportion'], 355 # random_state=self.rngs["hems_result_by_callsign_group_and_vehicle_type_selection"]).iloc[0] 356 357 # def hems_result_by_care_category_and_helicopter_benefit_selection(self, care_category: str, helicopter_benefit: str) -> str: 358 # """ 359 # This function will allocate a HEMS result based on care category and helicopter benefit 360 # """ 361 362 # df = self.hems_result_by_care_category_and_helicopter_benefit_df[ 363 # (self.hems_result_by_care_category_and_helicopter_benefit_df['care_cat'] == care_category) & 364 # (self.hems_result_by_care_category_and_helicopter_benefit_df['helicopter_benefit'] == helicopter_benefit) 365 # ] 366 367 # return pd.Series.sample(df['hems_result'], weights = df['proportion'], 368 # random_state=self.rngs["hems_result_by_care_category_and_helicopter_benefit_selection"]).iloc[0] 369 370 def hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs(self, pt_outcome: str, quarter: int, vehicle_type: str, callsign_group: int, hour: int): 371 """ 372 This function will allocate a HEMS result based on patient outcome, yearly quarter and HEMS deets. 373 """ 374 375 if (hour >= 7) and (hour <= 18): 376 time_of_day = 'day' 377 else: 378 time_of_day = 'night' 379 380 self.debug(f"Hour is {hour}: for hems_result sampling, this is {time_of_day}") 381 382 #(self.hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_probs_df.head()) 383 384 df = self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df[ 385 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['pt_outcome'] == pt_outcome) & 386 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['quarter'] == quarter) & 387 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['vehicle_type'] == vehicle_type) & 388 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['callsign_group'] == callsign_group) & 389 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['time_of_day'] == time_of_day) 390 ] 391 392 #print(df.head()) 393 394 return pd.Series.sample(df['hems_result'], weights = df['proportion'], 395 random_state=self.rngs["hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_selection"]).iloc[0] 396 397 def pt_outcome_selection(self, care_category: str, quarter: int) -> int: 398 """ 399 This function will allocate and return an patient outcome 400 based on the HEMS result 401 """ 402 403 df = self.patient_outcome_by_care_category_and_quarter_probs_df[ 404 (self.patient_outcome_by_care_category_and_quarter_probs_df['care_category'] == care_category) & 405 (self.patient_outcome_by_care_category_and_quarter_probs_df['quarter'] == quarter) 406 ] 407 408 #print(df) 409 return pd.Series.sample(df['pt_outcome'], weights = df['proportion'], 410 random_state=self.rngs["pt_outcome_selection"]).iloc[0] 411 412 # TODO: RANDOM SEED SETTING 413 def sex_selection(self, ampds_card: int) -> str: 414 """ 415 This function will allocate and return the patient sex 416 based on allocated AMPDS card category 417 """ 418 419 prob_female = self.sex_by_ampds_df[self.sex_by_ampds_df['ampds_card'] == ampds_card]['proportion'] 420 421 return 'Female' if (self.rngs["sex_selection"].uniform(0, 1) < prob_female.iloc[0]) else 'Male' 422 423 # TODO: RANDOM SEED SETTING 424 def age_sampling(self, ampds_card: int, max_age: int) -> float: 425 """ 426 This function will return the patient's age based 427 on sampling from the distribution that matches the allocated AMPDS card 428 """ 429 430 distribution = {} 431 432 #print(self.age_distr) 433 434 for i in self.age_distr: 435 #print(i) 436 if i['ampds_card'] == str(ampds_card): 437 #print('Match') 438 distribution = i['best_fit'] 439 440 #print(f"Getting age for {ampds_card}") 441 #print(distribution) 442 443 age = 100000 444 while age > max_age: 445 age = self.sample_from_distribution(distribution, rng=self.rngs["age_sampling"]) 446 447 return age 448 449 def activity_time(self, vehicle_type: str, time_type: str) -> float: 450 """ 451 This function will return a dictionary containing 452 the distribution and parameters for the distribution 453 that match the provided HEMS vehicle type and time type 454 455 """ 456 457 dist = self.activity_time_distr.get((vehicle_type, time_type)) 458 # print(dist) 459 460 if dist is None: 461 raise ValueError(f"No distribution found for ({vehicle_type}, {time_type})") 462 463 try: 464 min_time, max_time = self.min_max_cache[time_type] 465 # print(f"Min time {time_type}: {min_time}") 466 # print(f"Max time {time_type}: {max_time}") 467 except KeyError: 468 raise ValueError(f"Min/max bounds not found for time_type='{time_type}'") 469 470 sampled_time = -1 471 while not (min_time <= sampled_time <= max_time): 472 sampled_time = dist.sample() 473 474 # print(sampled_time) 475 476 return sampled_time 477 478 # TODO: RANDOM SEED SETTING 479 def inc_per_day(self, quarter: int, quarter_or_season: str = 'season') -> float: 480 """ 481 This function will return the number of incidents for a given 482 day 483 484 """ 485 486 season = 'summer' if quarter in [2, 3] else 'winter' 487 488 distribution = {} 489 max_n = 0 490 min_n = 0 491 492 distr_data = self.inc_per_day_distr if quarter_or_season == 'season' else self.inc_per_day_per_qtr_distr 493 494 for i in distr_data: 495 #print(i) 496 if(quarter_or_season == 'season'): 497 if (i['season'] == season): 498 #print('Match') 499 distribution = i['best_fit'] 500 max_n = i['max_n_per_day'] 501 min_n = i['min_n_per_day'] 502 else: 503 if (i['quarter'] == quarter): 504 #print('Match') 505 distribution = i['best_fit'] 506 max_n = i['max_n_per_day'] 507 min_n = i['min_n_per_day'] 508 509 sampled_inc_per_day = -1 510 511 while not (sampled_inc_per_day >= min_n and sampled_inc_per_day <= max_n): 512 sampled_inc_per_day = self.sample_from_distribution(distribution, rng=self.rngs["calls_per_day"]) 513 514 return sampled_inc_per_day 515 516 def inc_per_day_samples(self, quarter: int, quarter_or_season: str = 'season') -> float: 517 """ 518 This function will return a dictionary containing 519 the distribution and parameters for the distribution 520 that match the provided HEMS vehicle type and time type 521 522 """ 523 524 season = 'summer' if quarter in [2, 3] else 'winter' 525 526 if quarter_or_season == 'season': 527 return self.rngs["calls_per_day"].choice(self.incident_per_day_samples[season]) 528 else: 529 return self.rngs["calls_per_day"].choice(self.incident_per_day_samples[f"Q{quarter}"]) 530 531 532 def sample_from_distribution(self, distr: dict, rng: np.random.Generator) -> float: 533 """ 534 Sample a single float value from a seeded scipy distribution. 535 536 Parameters 537 ---------- 538 distr : dict 539 A dictionary with one key (the distribution name) and value as the parameters. 540 rng : np.random.Generator 541 A seeded RNG from the simulation's RNG stream pool. 542 543 Returns 544 ------- 545 float 546 A positive sampled value from the specified distribution. 547 """ 548 if len(distr) != 1: 549 raise ValueError("Expected one distribution name in distr dictionary") 550 551 dist_name, params = list(distr.items())[0] 552 sci_distr = getattr(scipy.stats, dist_name) 553 554 while True: 555 sampled_value = np.floor(sci_distr.rvs(random_state=rng, **params)) 556 if sampled_value > 0: 557 return sampled_value 558 559 def get_nth_weekday(self, year: int, month: int, weekday: int, n: int): 560 561 """ 562 Calculate date of the nth occurrence of a weekday in a given month and year. 563 """ 564 565 first_day = datetime(year, month, 1) 566 first_weekday = first_day.weekday() 567 days_until_weekday = (weekday - first_weekday + 7) % 7 568 first_occurrence = first_day + timedelta(days=days_until_weekday) 569 570 return first_occurrence + timedelta(weeks = n - 1) 571 572 def get_last_weekday(self, year, month, weekday): 573 """ 574 Return the date of the last occurrence of a weekday in a given month and year. 575 """ 576 577 last_day = datetime(year, month, calendar.monthrange(year, month)[1]) 578 last_weekday = last_day.weekday() 579 days_since_weekday = (last_weekday - weekday + 7) % 7 580 581 return last_day - timedelta(days = days_since_weekday) 582 583 def calculate_term_holidays(self, year): 584 585 holidays = [] 586 587 # Jan to Easter 588 start_date = datetime(year, 1, 1) 589 590 first_mon_of_year = self.get_nth_weekday(year, 1, calendar.MONDAY, 1) 591 592 jan_term_start = first_mon_of_year 593 594 if start_date.weekday() in [0, 6]: 595 #print(f"1st jan is {start_date.weekday()}") 596 # 1st Jan is a a weekday 597 jan_term_start += timedelta(days = 1) 598 599 #print(f"Year {year} - start_date: {start_date} and term_start is {jan_term_start}") 600 601 holidays.append({'year': int(year), 'start_date': start_date, 'end_date' : jan_term_start - timedelta(days = 1)}) 602 603 # Spring half-term 604 605 spring_half_term_start = self.get_nth_weekday(year, 2, calendar.MONDAY, 2) 606 spring_half_term_end = spring_half_term_start + timedelta(days = 4) 607 608 holidays.append({'year': int(year), 'start_date': spring_half_term_start, 'end_date' : spring_half_term_end}) 609 610 # Easter hols 611 612 # Calculate Good Friday 613 easter_sunday = self.calculate_easter(year) 614 615 #print(f"Easter Sunday is {easter_sunday}") 616 good_friday = easter_sunday - timedelta(days = 2) 617 618 # If ES is in March, 1st two weeks of Apri 619 # Otherwise, Monday of ES week + 1 week 620 start_date = easter_sunday - timedelta(days = 6) 621 end_date = start_date + timedelta(days = 13) 622 623 if easter_sunday.month == 3 or (easter_sunday.month == 4 and easter_sunday >= self.get_nth_weekday(year, 4, calendar.SUNDAY, 2)): 624 start_date = self.get_nth_weekday(year, 4, calendar.MONDAY, 1) 625 if easter_sunday.month == 4 and easter_sunday >= self.get_nth_weekday(year, 4, calendar.SUNDAY, 2): 626 # Will also likely be a late Easter Monday 627 end_date = start_date + timedelta(days = 14) 628 else: 629 end_date = start_date + timedelta(days = 13) 630 631 holidays.append({ 632 'year': int(year), 633 'start_date': start_date, 634 'end_date' : end_date 635 }) 636 637 # Summer half-term 638 639 summer_half_term_start = self.get_last_weekday(year, 5, calendar.MONDAY) 640 summer_half_term_end = summer_half_term_start + timedelta(days = 6) 641 642 holidays.append({ 643 'year': int(year), 644 'start_date': summer_half_term_start, 645 'end_date' : summer_half_term_end 646 }) 647 648 # Summer Holidays 649 650 summer_start = self.get_last_weekday(year, 7, calendar.MONDAY) 651 summer_end = self.get_nth_weekday(year, 9, calendar.MONDAY, 1) 652 653 holidays.append({ 654 'year': int(year), 655 'start_date': summer_start, 656 'end_date' : summer_end 657 }) 658 659 # Autumn Term 660 661 autumn_half_term_start = summer_end + timedelta(weeks = 8) 662 663 if summer_end.day >= 4: 664 autumn_half_term_start = summer_end + timedelta(weeks = 7) 665 666 autumn_half_term_end = autumn_half_term_start + timedelta(days = 6) 667 668 holidays.append({ 669 'year': int(year), 670 'start_date': autumn_half_term_start, 671 'end_date' : autumn_half_term_end 672 }) 673 674 # Christmas Hols 675 676 start_date = self.get_last_weekday(year, 12, calendar.MONDAY) - timedelta(days = 7) 677 678 holidays.append({ 679 'year': int(year), 680 'start_date': start_date, 681 'end_date' : datetime(year, 12, 31) 682 }) 683 684 return pd.DataFrame(holidays) 685 686 def calculate_easter(self, year): 687 688 """ 689 Calculate the date of Easter Sunday for a given year using the Anonymous Gregorian algorithm. 690 Converted to Python from this SO answer: https://stackoverflow.com/a/49558298/3650230 691 Really interesting rabbit hole to go down about this in the whole thread: https://stackoverflow.com/questions/2192533/function-to-return-date-of-easter-for-the-given-year 692 """ 693 694 a = year % 19 695 b = year // 100 696 c = year % 100 697 d = b // 4 698 e = b % 4 699 f = (b + 8) // 25 700 g = (b - f + 1) // 3 701 h = (19 * a + b - d - g + 15) % 30 702 k = c % 4 703 i = (c - k) // 4 704 l = (32 + 2 * e + 2 * i - h - k) % 7 705 m = (a + 11 * h + 22 * l) // 451 706 month = (h + l - 7 * m + 114) // 31 707 day = ((h + l - 7 * m + 114) % 31) + 1 708 709 return datetime(year, month, day) 710 711 def years_between(self, start_date: datetime, end_date: datetime) -> list[int]: 712 return list(range(start_date.year, end_date.year + 1)) 713 714 def biased_mean(series: pd.Series, bias: float = .5) -> float: 715 """ 716 717 Compute a weighted mean, favoring the larger value since demand 718 likely to only increase with time 719 720 """ 721 722 if len(series) == 1: 723 return series.iloc[0] # Return the only value if there's just one 724 725 sorted_vals = np.sort(series) # Ensure values are sorted 726 weights = np.linspace(1, bias * 2, len(series)) # Increasing weights with larger values 727 return np.average(sorted_vals, weights=weights) 728 729 730 def build_seeded_distributions(self, seed_seq): 731 """ 732 Build a dictionary of seeded distributions keyed by (vehicle_type, time_type) 733 734 Returns 735 ------- 736 dict of (vehicle_type, time_type) -> SeededDistribution 737 738 e.g. {('helicopter', 'time_allocation'): <utils.SeededDistribution object at 0x000001521627C750>, 739 ('car', 'time_allocation'): <utils.SeededDistribution object at 0x000001521627D410>, 740 ('helicopter', 'time_mobile'): <utils.SeededDistribution object at 0x0000015216267E90>} 741 """ 742 n = len(self.activity_time_distr) 743 rngs = [default_rng(s) for s in seed_seq.spawn(n)] 744 745 dist_map = { 746 'poisson': poisson, 747 'bernoulli': bernoulli, 748 'triang': triang, 749 'erlang': erlang, 750 'weibull_min': weibull_min, 751 'expon_weib': exponweib, 752 'betabinom': betabinom, 753 'pearson3': pearson3, 754 'cauchy': cauchy, 755 'chi2': chi2, 756 'expon': expon, 757 'exponential': expon, # alias for consistency 758 'exponpow': exponpow, 759 'gamma': gamma, 760 'lognorm': lognorm, 761 'norm': norm, 762 'normal': norm, # alias for consistency 763 'powerlaw': powerlaw, 764 'rayleigh': rayleigh, 765 'uniform': uniform 766 } 767 768 seeded_distributions = {} 769 770 # print(activity_time_distr) 771 # print(len(self.activity_time_distr)) 772 i = 0 773 for entry, rng in zip(self.activity_time_distr, rngs): 774 i += 1 775 vt = entry['vehicle_type'] 776 tt = entry['time_type'] 777 best_fit = entry['best_fit'] 778 779 # dist_name = best_fit['dist'].lower() 780 dist_name = list(best_fit.keys())[0] 781 dist_cls = dist_map.get(dist_name) 782 self.debug(f"{i}/{len(self.activity_time_distr)} for {vt} {tt} set up {dist_name} {dist_cls} with params: {best_fit[dist_name]}") 783 784 if dist_cls is None: 785 raise ValueError(f"Unsupported distribution type: {dist_name}") 786 787 params = best_fit[dist_name] 788 789 seeded_distributions[(vt, tt)] = SeededDistribution(dist_cls, rng, **params) 790 791 self.activity_time_distr = seeded_distributions 792 793 return seeded_distributions 794 795 def sample_ad_hoc_reason(self, hour: int, quarter: int, registration: str) -> bool: 796 """ 797 Sample from ad hoc unavailability probability table based on time bin and quarter. 798 Returns a reason string (e.g. 'available', 'crew', 'weather', etc.). 799 """ 800 # Determine time bin 801 if 0 <= hour <= 5: 802 bin_label = '00-05' 803 elif 6 <= hour <= 11: 804 bin_label = '06-11' 805 elif 12 <= hour <= 17: 806 bin_label = '12-17' 807 else: 808 bin_label = '18-23' 809 810 # Main subset for registration, bin, and quarter 811 subset = self.ad_hoc_probs[ 812 (self.ad_hoc_probs['registration'] == registration) & 813 (self.ad_hoc_probs['six_hour_bin'] == bin_label) & 814 (self.ad_hoc_probs['quarter'] == quarter) 815 ] 816 817 if subset.empty: 818 self.debug(f"[AD HOC] No data for {registration} in bin {bin_label}, Q{quarter}. Falling back to all registrations.") 819 subset = self.ad_hoc_probs[ 820 (self.ad_hoc_probs['six_hour_bin'] == bin_label) & 821 (self.ad_hoc_probs['quarter'] == quarter) 822 ] 823 if subset.empty: 824 self.debug(f"[AD HOC] No data at all for bin {bin_label}, Q{quarter}. Defaulting to 'available'.") 825 return 'available' 826 827 filled_subset = subset.copy() 828 829 for idx, row in filled_subset.iterrows(): 830 if pd.isna(row['probability']): 831 reason = row['reason'] 832 # Try registration-wide average across all bins in this quarter 833 reg_avg = self.ad_hoc_probs[ 834 (self.ad_hoc_probs['registration'] == row['registration']) & 835 (self.ad_hoc_probs['quarter'] == row['quarter']) & 836 (self.ad_hoc_probs['reason'] == reason) & 837 (self.ad_hoc_probs['probability'].notna()) 838 ]['probability'].mean() 839 840 if pd.isna(reg_avg): 841 # Fall back: average for this bin/quarter across all registrations 842 fallback_avg = self.ad_hoc_probs[ 843 (self.ad_hoc_probs['six_hour_bin'] == row['six_hour_bin']) & 844 (self.ad_hoc_probs['quarter'] == row['quarter']) & 845 (self.ad_hoc_probs['reason'] == reason) & 846 (self.ad_hoc_probs['probability'].notna()) 847 ]['probability'].mean() 848 849 if pd.notna(fallback_avg): 850 self.debug(f"[AD HOC] Missing prob for {registration}, {bin_label}, Q{quarter}, reason '{reason}'. Using fallback avg across registrations: {fallback_avg:.4f}") 851 filled_subset.at[idx, 'probability'] = fallback_avg 852 else: 853 self.debug(f"[AD HOC] Missing prob for {registration}, {bin_label}, Q{quarter}, reason '{reason}'. No fallback avg found. Defaulting to 'available'.") 854 return 'available' 855 else: 856 self.debug(f"[AD HOC] Missing prob for {registration}, {bin_label}, Q{quarter}, reason '{reason}'. Using reg-wide avg: {reg_avg:.4f}") 857 filled_subset.at[idx, 'probability'] = reg_avg 858 859 if filled_subset['probability'].isna().any(): 860 self.debug(f"[AD HOC] Still missing probabilities after imputation for {registration}, {bin_label}, Q{quarter}. Defaulting to 'available'.") 861 return 'available' 862 863 total_prob = filled_subset['probability'].sum() 864 if total_prob == 0 or pd.isna(total_prob): 865 self.debug(f"[AD HOC] Total probability is zero or NaN for {registration}, {bin_label}, Q{quarter}. Defaulting to 'available'.") 866 return 'available' 867 868 norm_probs = filled_subset['probability'] / total_prob 869 870 sampled_reason = self.rngs["ad_hoc_reason_selection"].choice( 871 filled_subset['reason'].tolist(), 872 p=norm_probs.tolist() 873 ) 874 875 return sampled_reason 876 877class SeededDistribution: 878 def __init__(self, dist, rng, **kwargs): 879 self.dist = dist(**kwargs) 880 self.rng = rng 881 882 def sample(self, size=None): 883 return self.dist.rvs(size=size, random_state=self.rng)
18class Utils: 19 20 RESULTS_FOLDER = 'data' 21 ALL_RESULTS_CSV = f'{RESULTS_FOLDER}/all_results.csv' 22 RUN_RESULTS_CSV = f'{RESULTS_FOLDER}/run_results.csv' 23 HISTORICAL_FOLDER = 'historical_data' 24 DISTRIBUTION_FOLDER = 'distribution_data' 25 26 # External file containing details of resources 27 # hours of operation and servicing schedules 28 HEMS_ROTA_DEFAULT = pd.read_csv('actual_data/HEMS_ROTA_DEFAULT.csv') 29 HEMS_ROTA = pd.read_csv('actual_data/HEMS_ROTA.csv') 30 31 SERVICING_SCHEDULES_BY_MODEL = pd.read_csv('actual_data/service_schedules_by_model.csv') 32 33 TIME_TYPES = ["call start", "mobile", "at scene", "leaving scene", "at hospital", "handover", "clear", "stand down"] 34 35 36 def __init__(self, master_seed=SeedSequence(42), print_debug_messages=False): 37 38 # Record the primary master seed sequence passed in to the sequence 39 self.master_seed_sequence = master_seed 40 41 ############################### 42 # Set up remaining attributes # 43 ############################### 44 self.print_debug_messages = print_debug_messages 45 46 # Load in mean inter_arrival_times 47 self.hourly_arrival_by_qtr_probs_df = pd.read_csv('distribution_data/hourly_arrival_by_qtr_probs.csv') 48 self.hour_by_ampds_df = pd.read_csv('distribution_data/hour_by_ampds_card_probs.csv') 49 self.sex_by_ampds_df = pd.read_csv('distribution_data/sex_by_ampds_card_probs.csv') 50 self.care_cat_by_ampds_df = pd.read_csv('distribution_data/enhanced_or_critical_care_by_ampds_card_probs.csv') 51 # self.callsign_by_care_category_df = pd.read_csv('distribution_data/callsign_group_by_care_category_probs.csv') 52 self.callsign_group_df = pd.read_csv('distribution_data/callsign_group_probs.csv') 53 # New addition without stratification by month 54 self.vehicle_type_df = pd.read_csv('distribution_data/vehicle_type_probs.csv') 55 self.vehicle_type_quarter_df = pd.read_csv('distribution_data/vehicle_type_by_quarter_probs.csv') 56 57 # ========= ARCHIVED CODE ================== ## 58 # self.vehicle_type_by_month_df = pd.read_csv('distribution_data/vehicle_type_by_month_probs.csv') 59 # self.inter_arrival_rate_df = pd.read_csv('distribution_data/inter_arrival_times.csv') 60 # self.callsign_by_ampds_and_hour_df = pd.read_csv('distribution_data/callsign_group_by_ampds_card_and_hour_probs.csv') 61 # self.callsign_by_ampds_df = pd.read_csv('distribution_data/callsign_group_by_ampds_card_probs.csv') 62 # self.hems_result_by_callsign_group_and_vehicle_type_df = pd.read_csv('distribution_data/hems_result_by_callsign_group_and_vehicle_type_probs.csv') 63 # self.hems_result_by_care_category_and_helicopter_benefit_df = pd.read_csv('distribution_data/hems_result_by_care_cat_and_helicopter_benefit_probs.csv') 64 # self.pt_outcome_by_hems_result_and_care_category_df = pd.read_csv('distribution_data/pt_outcome_by_hems_result_and_care_category_probs.csv') 65 # ========= END ARCHIVED CODE ============== # 66 67 # NEW PATIENT OUTCOME AND HEMS RESULT DATA FRAMES 68 self.patient_outcome_by_care_category_and_quarter_probs_df = pd.read_csv('distribution_data/patient_outcome_by_care_category_and_quarter_probs.csv') 69 # self.hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_probs_df = pd.read_csv('distribution_data/hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_probs.csv') 70 self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df = pd.read_csv('distribution_data/hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs.csv') 71 72 73 # Import maximum call duration times 74 self.min_max_values_df = pd.read_csv('actual_data/upper_allowable_time_bounds.csv') 75 76 # Load ad hoc unavailability probability table 77 try: 78 # This file may not exist depending on when utility is called e.g. fitting 79 self.ad_hoc_probs = pd.read_csv("distribution_data/ad_hoc_unavailability.csv") 80 # Remove ad-hoc probs for any times outside of historical 81 except: 82 self.debug('ad_hoc spreadsheet does not exist yet') 83 84 # Read in age distribution data into a dictionary 85 age_data = [] 86 with open("distribution_data/age_distributions.txt", "r") as inFile: 87 age_data = ast.literal_eval(inFile.read()) 88 inFile.close() 89 self.age_distr = age_data 90 91 # Read in activity time distribution data into a dictionary 92 activity_time_data = [] 93 with open("distribution_data/activity_time_distributions.txt", "r") as inFile: 94 activity_time_data = ast.literal_eval(inFile.read()) 95 inFile.close() 96 self.activity_time_distr = activity_time_data 97 98 # Read in incident per day distribution data into a dictionary 99 inc_per_day_data = [] 100 with open("distribution_data/inc_per_day_distributions.txt", "r") as inFile: 101 inc_per_day_data = ast.literal_eval(inFile.read()) 102 inFile.close() 103 self.inc_per_day_distr = inc_per_day_data 104 105 inc_per_day_per_qtr_data = [] 106 with open("distribution_data/inc_per_day_qtr_distributions.txt", "r") as inFile: 107 inc_per_day_per_qtr_data = ast.literal_eval(inFile.read()) 108 inFile.close() 109 self.inc_per_day_per_qtr_distr = inc_per_day_per_qtr_data 110 111 # Incident per day samples 112 with open('distribution_data/inc_per_day_samples.json', 'r') as f: 113 self.incident_per_day_samples = json.load(f) 114 115 # Turn the min-max activity times data into a format that supports easier/faster 116 # lookups 117 self.min_max_cache = { 118 row['time']: (row['min_value_mins'], row['max_value_mins']) 119 for _, row in self.min_max_values_df.iterrows() 120 } 121 122 def setup_seeds(self): 123 ######################################################### 124 # Control generation of required random number streams # 125 ######################################################### 126 127 # Spawn substreams for major simulation modules 128 module_keys = [ 129 "activity_times", 130 "ampds_code_selection", 131 "care_category_selection", 132 "callsign_group_selection", 133 "vehicle_type_selection", 134 "hems_result_by_callsign_group_and_vehicle_type_selection", 135 "hems_result_by_care_category_and_helicopter_benefit_selection", 136 "hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_selection", 137 "pt_outcome_selection", 138 "sex_selection", 139 "age_sampling", 140 "calls_per_day", 141 "calls_per_hour", 142 "predetermine_call_arrival", 143 "call_iat", 144 "helicopter_benefit_from_reg", 145 "hems_case", 146 "ad_hoc_reason_selection", 147 "know_heli_benefit", 148 "helicopter_benefit_from_cc", 149 "helicopter_benefit_from_ec", 150 "know_cc_ec_benefit", 151 ] 152 # Efficiently spawn substreams 153 spawned = self.master_seed_sequence.spawn(len(module_keys)) 154 155 # Map substreams and RNGs to keys 156 self.seed_substreams = dict(zip(module_keys, spawned)) 157 self.rngs = {key: default_rng(ss) for key, ss in self.seed_substreams.items()} 158 159 self.build_seeded_distributions(self.master_seed_sequence) 160 161 162 def debug(self, message: str): 163 if self.print_debug_messages: 164 logging.debug(message) 165 166 def current_time() -> str: 167 """ 168 Return the current time as a string in the format hh:mm:ss 169 """ 170 now = datetime.now() 171 return now.strftime("%H:%M:%S") 172 173 def date_time_of_call(self, start_dt: str, elapsed_time: int) -> list[int, int, str, int, pd.Timestamp]: 174 """ 175 Calculate a range of time-based parameters given a specific date-time 176 177 **Returns:** 178 list( 179 `dow` : int 180 `current_hour` : int 181 `weekday` : str 182 `current_month` : int 183 `current_quarter` : int 184 `current_dt` : datetime 185 ) 186 187 """ 188 # Elapsed_time = time in minutes since simulation started 189 190 start_dt = pd.to_datetime(start_dt) 191 192 current_dt = start_dt + pd.Timedelta(elapsed_time, unit='min') 193 194 dow = current_dt.strftime('%a') 195 # 0 = Monday, 6 = Sunday 196 weekday = "weekday" if current_dt.dayofweek < 5 else "weekend" 197 198 current_hour = current_dt.hour 199 200 current_month = current_dt.month 201 202 current_quarter = current_dt.quarter 203 204 return [dow, current_hour, weekday, current_month, current_quarter, current_dt] 205 206 # SR 2025-04-17: Commenting out as I believe this is no longer used due to changes 207 # in the way inter-arrival times for calls are managed 208 # def inter_arrival_rate(self, hour: int, quarter: int) -> float: 209 # """ 210 # This function will return the current mean inter_arrival rate in minutes 211 # for the provided hour of day and yearly quarter 212 213 # NOTE: not used with NSPPThinning 214 # """ 215 216 # #print(f"IA with values hour {hour} and quarter {quarter}") 217 # df = self.inter_arrival_rate_df 218 # mean_ia = df[(df['hour'] == hour) & (df['quarter'] == quarter)]['mean_iat'] 219 220 # # Currently have issue in that if hour and quarter not in data e.g. 0200 in quarter 3 221 # # then iloc value broken. Set default to 120 in that case. 222 223 # return 120 if len(mean_ia) == 0 else mean_ia.iloc[0] 224 # #return mean_ia.iloc[0] 225 226 227 def ampds_code_selection(self, hour: int) -> int: 228 """ 229 This function will allocate and return an AMPDS card category 230 based on the hour of day 231 """ 232 233 df = self.hour_by_ampds_df[self.hour_by_ampds_df['hour'] == hour] 234 235 return pd.Series.sample(df['ampds_card'], weights = df['proportion'], 236 random_state=self.rngs["ampds_code_selection"]).iloc[0] 237 238 239 def is_time_in_range(self, current: int, start: int, end: int) -> bool: 240 """ 241 Function to check if a given time is within a range of start and end times on a 24-hour clock. 242 243 Parameters: 244 - current (datetime.time): The time to check. 245 - start (datetime.time): The start time. 246 - end (datetime.time): The end time. 247 248 """ 249 250 current = time(current, 0) 251 start = time(start, 0) 252 end = time(end, 0) 253 254 if start <= end: 255 # Range does not cross midnight 256 return start <= current < end 257 else: 258 # Range crosses midnight 259 return current >= start or current < end 260 261 def care_category_selection(self, ampds_card: str) -> str: 262 """ 263 This function will allocate and return an care category 264 based on AMPDS card 265 """ 266 267 #print(f"Callsign group selection with {hour} and {ampds_card}") 268 269 df = self.care_cat_by_ampds_df[ 270 (self.care_cat_by_ampds_df['ampds_card'] == ampds_card) 271 ] 272 273 return pd.Series.sample(df['care_category'], weights = df['proportion'], 274 random_state=self.rngs["care_category_selection"]).iloc[0] 275 276 # def LEGACY_callsign_group_selection(self, ampds_card: str) -> int: 277 # """ 278 # This function will allocate and return an callsign group 279 # based on AMPDS card 280 # """ 281 282 # #print(f"Callsign group selection with {hour} and {ampds_card}") 283 284 # df = self.callsign_by_ampds_df[ 285 # (self.callsign_by_ampds_df['ampds_card'] == ampds_card) 286 # ] 287 288 # return pd.Series.sample(df['callsign_group'], weights = df['proportion'], 289 # random_state=self.rngs["callsign_group_selection"]).iloc[0] 290 291 # def callsign_group_selection(self, care_category: str) -> int: 292 # """ 293 # This function will allocate and return an callsign group 294 # based on the care category 295 # """ 296 297 # #print(f"Callsign group selection with {hour} and {ampds_card}") 298 299 # df = self.callsign_by_care_category_df[ 300 # (self.callsign_by_care_category_df['care_category'] == care_category) 301 # ] 302 303 # return pd.Series.sample(df['callsign_group'], weights = df['proportion'], 304 # random_state=self.rngs["callsign_group_selection"]).iloc[0] 305 306 def callsign_group_selection(self) -> int: 307 """ 308 This function will allocate and return an callsign group 309 based on the care category 310 """ 311 312 #print(f"Callsign group selection with {hour} and {ampds_card}") 313 314 return pd.Series.sample(self.callsign_group_df['callsign_group'], weights = self.callsign_group_df['proportion'], 315 random_state=self.rngs["callsign_group_selection"]).iloc[0] 316 317 # def vehicle_type_selection(self, callsign_group: str) -> int: 318 # """ 319 # This function will allocate and return a vehicle type 320 # based callsign group 321 # """ 322 323 # df = self.vehicle_type_df[ 324 # (self.vehicle_type_df['callsign_group'] == int(callsign_group)) # Cater for Other 325 # ] 326 327 # return pd.Series.sample(df['vehicle_type'], weights = df['proportion'], 328 # random_state=self.rngs["vehicle_type_selection"]).iloc[0] 329 330 def vehicle_type_selection_qtr(self, callsign_group: str, qtr: int) -> int: 331 """ 332 This function will allocate and return a vehicle type 333 based callsign group 334 """ 335 336 df = self.vehicle_type_quarter_df[ 337 (self.vehicle_type_quarter_df['callsign_group'] == int(callsign_group)) & # Cater for Other 338 (self.vehicle_type_quarter_df['quarter'] == int(qtr)) 339 ] 340 341 return pd.Series.sample(df['vehicle_type'], weights = df['proportion'], 342 random_state=self.rngs["vehicle_type_selection"]).iloc[0] 343 344 345 # def hems_result_by_callsign_group_and_vehicle_type_selection(self, callsign_group: str, vehicle_type: str) -> str: 346 # """ 347 # This function will allocate a HEMS result based on callsign group and vehicle type 348 # """ 349 350 # df = self.hems_result_by_callsign_group_and_vehicle_type_df[ 351 # (self.hems_result_by_callsign_group_and_vehicle_type_df['callsign_group'] == int(callsign_group)) & 352 # (self.hems_result_by_callsign_group_and_vehicle_type_df['vehicle_type'] == vehicle_type) 353 # ] 354 355 # return pd.Series.sample(df['hems_result'], weights = df['proportion'], 356 # random_state=self.rngs["hems_result_by_callsign_group_and_vehicle_type_selection"]).iloc[0] 357 358 # def hems_result_by_care_category_and_helicopter_benefit_selection(self, care_category: str, helicopter_benefit: str) -> str: 359 # """ 360 # This function will allocate a HEMS result based on care category and helicopter benefit 361 # """ 362 363 # df = self.hems_result_by_care_category_and_helicopter_benefit_df[ 364 # (self.hems_result_by_care_category_and_helicopter_benefit_df['care_cat'] == care_category) & 365 # (self.hems_result_by_care_category_and_helicopter_benefit_df['helicopter_benefit'] == helicopter_benefit) 366 # ] 367 368 # return pd.Series.sample(df['hems_result'], weights = df['proportion'], 369 # random_state=self.rngs["hems_result_by_care_category_and_helicopter_benefit_selection"]).iloc[0] 370 371 def hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs(self, pt_outcome: str, quarter: int, vehicle_type: str, callsign_group: int, hour: int): 372 """ 373 This function will allocate a HEMS result based on patient outcome, yearly quarter and HEMS deets. 374 """ 375 376 if (hour >= 7) and (hour <= 18): 377 time_of_day = 'day' 378 else: 379 time_of_day = 'night' 380 381 self.debug(f"Hour is {hour}: for hems_result sampling, this is {time_of_day}") 382 383 #(self.hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_probs_df.head()) 384 385 df = self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df[ 386 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['pt_outcome'] == pt_outcome) & 387 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['quarter'] == quarter) & 388 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['vehicle_type'] == vehicle_type) & 389 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['callsign_group'] == callsign_group) & 390 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['time_of_day'] == time_of_day) 391 ] 392 393 #print(df.head()) 394 395 return pd.Series.sample(df['hems_result'], weights = df['proportion'], 396 random_state=self.rngs["hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_selection"]).iloc[0] 397 398 def pt_outcome_selection(self, care_category: str, quarter: int) -> int: 399 """ 400 This function will allocate and return an patient outcome 401 based on the HEMS result 402 """ 403 404 df = self.patient_outcome_by_care_category_and_quarter_probs_df[ 405 (self.patient_outcome_by_care_category_and_quarter_probs_df['care_category'] == care_category) & 406 (self.patient_outcome_by_care_category_and_quarter_probs_df['quarter'] == quarter) 407 ] 408 409 #print(df) 410 return pd.Series.sample(df['pt_outcome'], weights = df['proportion'], 411 random_state=self.rngs["pt_outcome_selection"]).iloc[0] 412 413 # TODO: RANDOM SEED SETTING 414 def sex_selection(self, ampds_card: int) -> str: 415 """ 416 This function will allocate and return the patient sex 417 based on allocated AMPDS card category 418 """ 419 420 prob_female = self.sex_by_ampds_df[self.sex_by_ampds_df['ampds_card'] == ampds_card]['proportion'] 421 422 return 'Female' if (self.rngs["sex_selection"].uniform(0, 1) < prob_female.iloc[0]) else 'Male' 423 424 # TODO: RANDOM SEED SETTING 425 def age_sampling(self, ampds_card: int, max_age: int) -> float: 426 """ 427 This function will return the patient's age based 428 on sampling from the distribution that matches the allocated AMPDS card 429 """ 430 431 distribution = {} 432 433 #print(self.age_distr) 434 435 for i in self.age_distr: 436 #print(i) 437 if i['ampds_card'] == str(ampds_card): 438 #print('Match') 439 distribution = i['best_fit'] 440 441 #print(f"Getting age for {ampds_card}") 442 #print(distribution) 443 444 age = 100000 445 while age > max_age: 446 age = self.sample_from_distribution(distribution, rng=self.rngs["age_sampling"]) 447 448 return age 449 450 def activity_time(self, vehicle_type: str, time_type: str) -> float: 451 """ 452 This function will return a dictionary containing 453 the distribution and parameters for the distribution 454 that match the provided HEMS vehicle type and time type 455 456 """ 457 458 dist = self.activity_time_distr.get((vehicle_type, time_type)) 459 # print(dist) 460 461 if dist is None: 462 raise ValueError(f"No distribution found for ({vehicle_type}, {time_type})") 463 464 try: 465 min_time, max_time = self.min_max_cache[time_type] 466 # print(f"Min time {time_type}: {min_time}") 467 # print(f"Max time {time_type}: {max_time}") 468 except KeyError: 469 raise ValueError(f"Min/max bounds not found for time_type='{time_type}'") 470 471 sampled_time = -1 472 while not (min_time <= sampled_time <= max_time): 473 sampled_time = dist.sample() 474 475 # print(sampled_time) 476 477 return sampled_time 478 479 # TODO: RANDOM SEED SETTING 480 def inc_per_day(self, quarter: int, quarter_or_season: str = 'season') -> float: 481 """ 482 This function will return the number of incidents for a given 483 day 484 485 """ 486 487 season = 'summer' if quarter in [2, 3] else 'winter' 488 489 distribution = {} 490 max_n = 0 491 min_n = 0 492 493 distr_data = self.inc_per_day_distr if quarter_or_season == 'season' else self.inc_per_day_per_qtr_distr 494 495 for i in distr_data: 496 #print(i) 497 if(quarter_or_season == 'season'): 498 if (i['season'] == season): 499 #print('Match') 500 distribution = i['best_fit'] 501 max_n = i['max_n_per_day'] 502 min_n = i['min_n_per_day'] 503 else: 504 if (i['quarter'] == quarter): 505 #print('Match') 506 distribution = i['best_fit'] 507 max_n = i['max_n_per_day'] 508 min_n = i['min_n_per_day'] 509 510 sampled_inc_per_day = -1 511 512 while not (sampled_inc_per_day >= min_n and sampled_inc_per_day <= max_n): 513 sampled_inc_per_day = self.sample_from_distribution(distribution, rng=self.rngs["calls_per_day"]) 514 515 return sampled_inc_per_day 516 517 def inc_per_day_samples(self, quarter: int, quarter_or_season: str = 'season') -> float: 518 """ 519 This function will return a dictionary containing 520 the distribution and parameters for the distribution 521 that match the provided HEMS vehicle type and time type 522 523 """ 524 525 season = 'summer' if quarter in [2, 3] else 'winter' 526 527 if quarter_or_season == 'season': 528 return self.rngs["calls_per_day"].choice(self.incident_per_day_samples[season]) 529 else: 530 return self.rngs["calls_per_day"].choice(self.incident_per_day_samples[f"Q{quarter}"]) 531 532 533 def sample_from_distribution(self, distr: dict, rng: np.random.Generator) -> float: 534 """ 535 Sample a single float value from a seeded scipy distribution. 536 537 Parameters 538 ---------- 539 distr : dict 540 A dictionary with one key (the distribution name) and value as the parameters. 541 rng : np.random.Generator 542 A seeded RNG from the simulation's RNG stream pool. 543 544 Returns 545 ------- 546 float 547 A positive sampled value from the specified distribution. 548 """ 549 if len(distr) != 1: 550 raise ValueError("Expected one distribution name in distr dictionary") 551 552 dist_name, params = list(distr.items())[0] 553 sci_distr = getattr(scipy.stats, dist_name) 554 555 while True: 556 sampled_value = np.floor(sci_distr.rvs(random_state=rng, **params)) 557 if sampled_value > 0: 558 return sampled_value 559 560 def get_nth_weekday(self, year: int, month: int, weekday: int, n: int): 561 562 """ 563 Calculate date of the nth occurrence of a weekday in a given month and year. 564 """ 565 566 first_day = datetime(year, month, 1) 567 first_weekday = first_day.weekday() 568 days_until_weekday = (weekday - first_weekday + 7) % 7 569 first_occurrence = first_day + timedelta(days=days_until_weekday) 570 571 return first_occurrence + timedelta(weeks = n - 1) 572 573 def get_last_weekday(self, year, month, weekday): 574 """ 575 Return the date of the last occurrence of a weekday in a given month and year. 576 """ 577 578 last_day = datetime(year, month, calendar.monthrange(year, month)[1]) 579 last_weekday = last_day.weekday() 580 days_since_weekday = (last_weekday - weekday + 7) % 7 581 582 return last_day - timedelta(days = days_since_weekday) 583 584 def calculate_term_holidays(self, year): 585 586 holidays = [] 587 588 # Jan to Easter 589 start_date = datetime(year, 1, 1) 590 591 first_mon_of_year = self.get_nth_weekday(year, 1, calendar.MONDAY, 1) 592 593 jan_term_start = first_mon_of_year 594 595 if start_date.weekday() in [0, 6]: 596 #print(f"1st jan is {start_date.weekday()}") 597 # 1st Jan is a a weekday 598 jan_term_start += timedelta(days = 1) 599 600 #print(f"Year {year} - start_date: {start_date} and term_start is {jan_term_start}") 601 602 holidays.append({'year': int(year), 'start_date': start_date, 'end_date' : jan_term_start - timedelta(days = 1)}) 603 604 # Spring half-term 605 606 spring_half_term_start = self.get_nth_weekday(year, 2, calendar.MONDAY, 2) 607 spring_half_term_end = spring_half_term_start + timedelta(days = 4) 608 609 holidays.append({'year': int(year), 'start_date': spring_half_term_start, 'end_date' : spring_half_term_end}) 610 611 # Easter hols 612 613 # Calculate Good Friday 614 easter_sunday = self.calculate_easter(year) 615 616 #print(f"Easter Sunday is {easter_sunday}") 617 good_friday = easter_sunday - timedelta(days = 2) 618 619 # If ES is in March, 1st two weeks of Apri 620 # Otherwise, Monday of ES week + 1 week 621 start_date = easter_sunday - timedelta(days = 6) 622 end_date = start_date + timedelta(days = 13) 623 624 if easter_sunday.month == 3 or (easter_sunday.month == 4 and easter_sunday >= self.get_nth_weekday(year, 4, calendar.SUNDAY, 2)): 625 start_date = self.get_nth_weekday(year, 4, calendar.MONDAY, 1) 626 if easter_sunday.month == 4 and easter_sunday >= self.get_nth_weekday(year, 4, calendar.SUNDAY, 2): 627 # Will also likely be a late Easter Monday 628 end_date = start_date + timedelta(days = 14) 629 else: 630 end_date = start_date + timedelta(days = 13) 631 632 holidays.append({ 633 'year': int(year), 634 'start_date': start_date, 635 'end_date' : end_date 636 }) 637 638 # Summer half-term 639 640 summer_half_term_start = self.get_last_weekday(year, 5, calendar.MONDAY) 641 summer_half_term_end = summer_half_term_start + timedelta(days = 6) 642 643 holidays.append({ 644 'year': int(year), 645 'start_date': summer_half_term_start, 646 'end_date' : summer_half_term_end 647 }) 648 649 # Summer Holidays 650 651 summer_start = self.get_last_weekday(year, 7, calendar.MONDAY) 652 summer_end = self.get_nth_weekday(year, 9, calendar.MONDAY, 1) 653 654 holidays.append({ 655 'year': int(year), 656 'start_date': summer_start, 657 'end_date' : summer_end 658 }) 659 660 # Autumn Term 661 662 autumn_half_term_start = summer_end + timedelta(weeks = 8) 663 664 if summer_end.day >= 4: 665 autumn_half_term_start = summer_end + timedelta(weeks = 7) 666 667 autumn_half_term_end = autumn_half_term_start + timedelta(days = 6) 668 669 holidays.append({ 670 'year': int(year), 671 'start_date': autumn_half_term_start, 672 'end_date' : autumn_half_term_end 673 }) 674 675 # Christmas Hols 676 677 start_date = self.get_last_weekday(year, 12, calendar.MONDAY) - timedelta(days = 7) 678 679 holidays.append({ 680 'year': int(year), 681 'start_date': start_date, 682 'end_date' : datetime(year, 12, 31) 683 }) 684 685 return pd.DataFrame(holidays) 686 687 def calculate_easter(self, year): 688 689 """ 690 Calculate the date of Easter Sunday for a given year using the Anonymous Gregorian algorithm. 691 Converted to Python from this SO answer: https://stackoverflow.com/a/49558298/3650230 692 Really interesting rabbit hole to go down about this in the whole thread: https://stackoverflow.com/questions/2192533/function-to-return-date-of-easter-for-the-given-year 693 """ 694 695 a = year % 19 696 b = year // 100 697 c = year % 100 698 d = b // 4 699 e = b % 4 700 f = (b + 8) // 25 701 g = (b - f + 1) // 3 702 h = (19 * a + b - d - g + 15) % 30 703 k = c % 4 704 i = (c - k) // 4 705 l = (32 + 2 * e + 2 * i - h - k) % 7 706 m = (a + 11 * h + 22 * l) // 451 707 month = (h + l - 7 * m + 114) // 31 708 day = ((h + l - 7 * m + 114) % 31) + 1 709 710 return datetime(year, month, day) 711 712 def years_between(self, start_date: datetime, end_date: datetime) -> list[int]: 713 return list(range(start_date.year, end_date.year + 1)) 714 715 def biased_mean(series: pd.Series, bias: float = .5) -> float: 716 """ 717 718 Compute a weighted mean, favoring the larger value since demand 719 likely to only increase with time 720 721 """ 722 723 if len(series) == 1: 724 return series.iloc[0] # Return the only value if there's just one 725 726 sorted_vals = np.sort(series) # Ensure values are sorted 727 weights = np.linspace(1, bias * 2, len(series)) # Increasing weights with larger values 728 return np.average(sorted_vals, weights=weights) 729 730 731 def build_seeded_distributions(self, seed_seq): 732 """ 733 Build a dictionary of seeded distributions keyed by (vehicle_type, time_type) 734 735 Returns 736 ------- 737 dict of (vehicle_type, time_type) -> SeededDistribution 738 739 e.g. {('helicopter', 'time_allocation'): <utils.SeededDistribution object at 0x000001521627C750>, 740 ('car', 'time_allocation'): <utils.SeededDistribution object at 0x000001521627D410>, 741 ('helicopter', 'time_mobile'): <utils.SeededDistribution object at 0x0000015216267E90>} 742 """ 743 n = len(self.activity_time_distr) 744 rngs = [default_rng(s) for s in seed_seq.spawn(n)] 745 746 dist_map = { 747 'poisson': poisson, 748 'bernoulli': bernoulli, 749 'triang': triang, 750 'erlang': erlang, 751 'weibull_min': weibull_min, 752 'expon_weib': exponweib, 753 'betabinom': betabinom, 754 'pearson3': pearson3, 755 'cauchy': cauchy, 756 'chi2': chi2, 757 'expon': expon, 758 'exponential': expon, # alias for consistency 759 'exponpow': exponpow, 760 'gamma': gamma, 761 'lognorm': lognorm, 762 'norm': norm, 763 'normal': norm, # alias for consistency 764 'powerlaw': powerlaw, 765 'rayleigh': rayleigh, 766 'uniform': uniform 767 } 768 769 seeded_distributions = {} 770 771 # print(activity_time_distr) 772 # print(len(self.activity_time_distr)) 773 i = 0 774 for entry, rng in zip(self.activity_time_distr, rngs): 775 i += 1 776 vt = entry['vehicle_type'] 777 tt = entry['time_type'] 778 best_fit = entry['best_fit'] 779 780 # dist_name = best_fit['dist'].lower() 781 dist_name = list(best_fit.keys())[0] 782 dist_cls = dist_map.get(dist_name) 783 self.debug(f"{i}/{len(self.activity_time_distr)} for {vt} {tt} set up {dist_name} {dist_cls} with params: {best_fit[dist_name]}") 784 785 if dist_cls is None: 786 raise ValueError(f"Unsupported distribution type: {dist_name}") 787 788 params = best_fit[dist_name] 789 790 seeded_distributions[(vt, tt)] = SeededDistribution(dist_cls, rng, **params) 791 792 self.activity_time_distr = seeded_distributions 793 794 return seeded_distributions 795 796 def sample_ad_hoc_reason(self, hour: int, quarter: int, registration: str) -> bool: 797 """ 798 Sample from ad hoc unavailability probability table based on time bin and quarter. 799 Returns a reason string (e.g. 'available', 'crew', 'weather', etc.). 800 """ 801 # Determine time bin 802 if 0 <= hour <= 5: 803 bin_label = '00-05' 804 elif 6 <= hour <= 11: 805 bin_label = '06-11' 806 elif 12 <= hour <= 17: 807 bin_label = '12-17' 808 else: 809 bin_label = '18-23' 810 811 # Main subset for registration, bin, and quarter 812 subset = self.ad_hoc_probs[ 813 (self.ad_hoc_probs['registration'] == registration) & 814 (self.ad_hoc_probs['six_hour_bin'] == bin_label) & 815 (self.ad_hoc_probs['quarter'] == quarter) 816 ] 817 818 if subset.empty: 819 self.debug(f"[AD HOC] No data for {registration} in bin {bin_label}, Q{quarter}. Falling back to all registrations.") 820 subset = self.ad_hoc_probs[ 821 (self.ad_hoc_probs['six_hour_bin'] == bin_label) & 822 (self.ad_hoc_probs['quarter'] == quarter) 823 ] 824 if subset.empty: 825 self.debug(f"[AD HOC] No data at all for bin {bin_label}, Q{quarter}. Defaulting to 'available'.") 826 return 'available' 827 828 filled_subset = subset.copy() 829 830 for idx, row in filled_subset.iterrows(): 831 if pd.isna(row['probability']): 832 reason = row['reason'] 833 # Try registration-wide average across all bins in this quarter 834 reg_avg = self.ad_hoc_probs[ 835 (self.ad_hoc_probs['registration'] == row['registration']) & 836 (self.ad_hoc_probs['quarter'] == row['quarter']) & 837 (self.ad_hoc_probs['reason'] == reason) & 838 (self.ad_hoc_probs['probability'].notna()) 839 ]['probability'].mean() 840 841 if pd.isna(reg_avg): 842 # Fall back: average for this bin/quarter across all registrations 843 fallback_avg = self.ad_hoc_probs[ 844 (self.ad_hoc_probs['six_hour_bin'] == row['six_hour_bin']) & 845 (self.ad_hoc_probs['quarter'] == row['quarter']) & 846 (self.ad_hoc_probs['reason'] == reason) & 847 (self.ad_hoc_probs['probability'].notna()) 848 ]['probability'].mean() 849 850 if pd.notna(fallback_avg): 851 self.debug(f"[AD HOC] Missing prob for {registration}, {bin_label}, Q{quarter}, reason '{reason}'. Using fallback avg across registrations: {fallback_avg:.4f}") 852 filled_subset.at[idx, 'probability'] = fallback_avg 853 else: 854 self.debug(f"[AD HOC] Missing prob for {registration}, {bin_label}, Q{quarter}, reason '{reason}'. No fallback avg found. Defaulting to 'available'.") 855 return 'available' 856 else: 857 self.debug(f"[AD HOC] Missing prob for {registration}, {bin_label}, Q{quarter}, reason '{reason}'. Using reg-wide avg: {reg_avg:.4f}") 858 filled_subset.at[idx, 'probability'] = reg_avg 859 860 if filled_subset['probability'].isna().any(): 861 self.debug(f"[AD HOC] Still missing probabilities after imputation for {registration}, {bin_label}, Q{quarter}. Defaulting to 'available'.") 862 return 'available' 863 864 total_prob = filled_subset['probability'].sum() 865 if total_prob == 0 or pd.isna(total_prob): 866 self.debug(f"[AD HOC] Total probability is zero or NaN for {registration}, {bin_label}, Q{quarter}. Defaulting to 'available'.") 867 return 'available' 868 869 norm_probs = filled_subset['probability'] / total_prob 870 871 sampled_reason = self.rngs["ad_hoc_reason_selection"].choice( 872 filled_subset['reason'].tolist(), 873 p=norm_probs.tolist() 874 ) 875 876 return sampled_reason
36 def __init__(self, master_seed=SeedSequence(42), print_debug_messages=False): 37 38 # Record the primary master seed sequence passed in to the sequence 39 self.master_seed_sequence = master_seed 40 41 ############################### 42 # Set up remaining attributes # 43 ############################### 44 self.print_debug_messages = print_debug_messages 45 46 # Load in mean inter_arrival_times 47 self.hourly_arrival_by_qtr_probs_df = pd.read_csv('distribution_data/hourly_arrival_by_qtr_probs.csv') 48 self.hour_by_ampds_df = pd.read_csv('distribution_data/hour_by_ampds_card_probs.csv') 49 self.sex_by_ampds_df = pd.read_csv('distribution_data/sex_by_ampds_card_probs.csv') 50 self.care_cat_by_ampds_df = pd.read_csv('distribution_data/enhanced_or_critical_care_by_ampds_card_probs.csv') 51 # self.callsign_by_care_category_df = pd.read_csv('distribution_data/callsign_group_by_care_category_probs.csv') 52 self.callsign_group_df = pd.read_csv('distribution_data/callsign_group_probs.csv') 53 # New addition without stratification by month 54 self.vehicle_type_df = pd.read_csv('distribution_data/vehicle_type_probs.csv') 55 self.vehicle_type_quarter_df = pd.read_csv('distribution_data/vehicle_type_by_quarter_probs.csv') 56 57 # ========= ARCHIVED CODE ================== ## 58 # self.vehicle_type_by_month_df = pd.read_csv('distribution_data/vehicle_type_by_month_probs.csv') 59 # self.inter_arrival_rate_df = pd.read_csv('distribution_data/inter_arrival_times.csv') 60 # self.callsign_by_ampds_and_hour_df = pd.read_csv('distribution_data/callsign_group_by_ampds_card_and_hour_probs.csv') 61 # self.callsign_by_ampds_df = pd.read_csv('distribution_data/callsign_group_by_ampds_card_probs.csv') 62 # self.hems_result_by_callsign_group_and_vehicle_type_df = pd.read_csv('distribution_data/hems_result_by_callsign_group_and_vehicle_type_probs.csv') 63 # self.hems_result_by_care_category_and_helicopter_benefit_df = pd.read_csv('distribution_data/hems_result_by_care_cat_and_helicopter_benefit_probs.csv') 64 # self.pt_outcome_by_hems_result_and_care_category_df = pd.read_csv('distribution_data/pt_outcome_by_hems_result_and_care_category_probs.csv') 65 # ========= END ARCHIVED CODE ============== # 66 67 # NEW PATIENT OUTCOME AND HEMS RESULT DATA FRAMES 68 self.patient_outcome_by_care_category_and_quarter_probs_df = pd.read_csv('distribution_data/patient_outcome_by_care_category_and_quarter_probs.csv') 69 # self.hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_probs_df = pd.read_csv('distribution_data/hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_probs.csv') 70 self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df = pd.read_csv('distribution_data/hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs.csv') 71 72 73 # Import maximum call duration times 74 self.min_max_values_df = pd.read_csv('actual_data/upper_allowable_time_bounds.csv') 75 76 # Load ad hoc unavailability probability table 77 try: 78 # This file may not exist depending on when utility is called e.g. fitting 79 self.ad_hoc_probs = pd.read_csv("distribution_data/ad_hoc_unavailability.csv") 80 # Remove ad-hoc probs for any times outside of historical 81 except: 82 self.debug('ad_hoc spreadsheet does not exist yet') 83 84 # Read in age distribution data into a dictionary 85 age_data = [] 86 with open("distribution_data/age_distributions.txt", "r") as inFile: 87 age_data = ast.literal_eval(inFile.read()) 88 inFile.close() 89 self.age_distr = age_data 90 91 # Read in activity time distribution data into a dictionary 92 activity_time_data = [] 93 with open("distribution_data/activity_time_distributions.txt", "r") as inFile: 94 activity_time_data = ast.literal_eval(inFile.read()) 95 inFile.close() 96 self.activity_time_distr = activity_time_data 97 98 # Read in incident per day distribution data into a dictionary 99 inc_per_day_data = [] 100 with open("distribution_data/inc_per_day_distributions.txt", "r") as inFile: 101 inc_per_day_data = ast.literal_eval(inFile.read()) 102 inFile.close() 103 self.inc_per_day_distr = inc_per_day_data 104 105 inc_per_day_per_qtr_data = [] 106 with open("distribution_data/inc_per_day_qtr_distributions.txt", "r") as inFile: 107 inc_per_day_per_qtr_data = ast.literal_eval(inFile.read()) 108 inFile.close() 109 self.inc_per_day_per_qtr_distr = inc_per_day_per_qtr_data 110 111 # Incident per day samples 112 with open('distribution_data/inc_per_day_samples.json', 'r') as f: 113 self.incident_per_day_samples = json.load(f) 114 115 # Turn the min-max activity times data into a format that supports easier/faster 116 # lookups 117 self.min_max_cache = { 118 row['time']: (row['min_value_mins'], row['max_value_mins']) 119 for _, row in self.min_max_values_df.iterrows() 120 }
122 def setup_seeds(self): 123 ######################################################### 124 # Control generation of required random number streams # 125 ######################################################### 126 127 # Spawn substreams for major simulation modules 128 module_keys = [ 129 "activity_times", 130 "ampds_code_selection", 131 "care_category_selection", 132 "callsign_group_selection", 133 "vehicle_type_selection", 134 "hems_result_by_callsign_group_and_vehicle_type_selection", 135 "hems_result_by_care_category_and_helicopter_benefit_selection", 136 "hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_selection", 137 "pt_outcome_selection", 138 "sex_selection", 139 "age_sampling", 140 "calls_per_day", 141 "calls_per_hour", 142 "predetermine_call_arrival", 143 "call_iat", 144 "helicopter_benefit_from_reg", 145 "hems_case", 146 "ad_hoc_reason_selection", 147 "know_heli_benefit", 148 "helicopter_benefit_from_cc", 149 "helicopter_benefit_from_ec", 150 "know_cc_ec_benefit", 151 ] 152 # Efficiently spawn substreams 153 spawned = self.master_seed_sequence.spawn(len(module_keys)) 154 155 # Map substreams and RNGs to keys 156 self.seed_substreams = dict(zip(module_keys, spawned)) 157 self.rngs = {key: default_rng(ss) for key, ss in self.seed_substreams.items()} 158 159 self.build_seeded_distributions(self.master_seed_sequence)
166 def current_time() -> str: 167 """ 168 Return the current time as a string in the format hh:mm:ss 169 """ 170 now = datetime.now() 171 return now.strftime("%H:%M:%S")
Return the current time as a string in the format hh:mm:ss
173 def date_time_of_call(self, start_dt: str, elapsed_time: int) -> list[int, int, str, int, pd.Timestamp]: 174 """ 175 Calculate a range of time-based parameters given a specific date-time 176 177 **Returns:** 178 list( 179 `dow` : int 180 `current_hour` : int 181 `weekday` : str 182 `current_month` : int 183 `current_quarter` : int 184 `current_dt` : datetime 185 ) 186 187 """ 188 # Elapsed_time = time in minutes since simulation started 189 190 start_dt = pd.to_datetime(start_dt) 191 192 current_dt = start_dt + pd.Timedelta(elapsed_time, unit='min') 193 194 dow = current_dt.strftime('%a') 195 # 0 = Monday, 6 = Sunday 196 weekday = "weekday" if current_dt.dayofweek < 5 else "weekend" 197 198 current_hour = current_dt.hour 199 200 current_month = current_dt.month 201 202 current_quarter = current_dt.quarter 203 204 return [dow, current_hour, weekday, current_month, current_quarter, current_dt]
Calculate a range of time-based parameters given a specific date-time
Returns:
list(
dow
: int
current_hour
: int
weekday
: str
current_month
: int
current_quarter
: int
current_dt
: datetime
)
227 def ampds_code_selection(self, hour: int) -> int: 228 """ 229 This function will allocate and return an AMPDS card category 230 based on the hour of day 231 """ 232 233 df = self.hour_by_ampds_df[self.hour_by_ampds_df['hour'] == hour] 234 235 return pd.Series.sample(df['ampds_card'], weights = df['proportion'], 236 random_state=self.rngs["ampds_code_selection"]).iloc[0]
This function will allocate and return an AMPDS card category based on the hour of day
239 def is_time_in_range(self, current: int, start: int, end: int) -> bool: 240 """ 241 Function to check if a given time is within a range of start and end times on a 24-hour clock. 242 243 Parameters: 244 - current (datetime.time): The time to check. 245 - start (datetime.time): The start time. 246 - end (datetime.time): The end time. 247 248 """ 249 250 current = time(current, 0) 251 start = time(start, 0) 252 end = time(end, 0) 253 254 if start <= end: 255 # Range does not cross midnight 256 return start <= current < end 257 else: 258 # Range crosses midnight 259 return current >= start or current < end
Function to check if a given time is within a range of start and end times on a 24-hour clock.
Parameters:
- current (datetime.time): The time to check.
- start (datetime.time): The start time.
- end (datetime.time): The end time.
261 def care_category_selection(self, ampds_card: str) -> str: 262 """ 263 This function will allocate and return an care category 264 based on AMPDS card 265 """ 266 267 #print(f"Callsign group selection with {hour} and {ampds_card}") 268 269 df = self.care_cat_by_ampds_df[ 270 (self.care_cat_by_ampds_df['ampds_card'] == ampds_card) 271 ] 272 273 return pd.Series.sample(df['care_category'], weights = df['proportion'], 274 random_state=self.rngs["care_category_selection"]).iloc[0]
This function will allocate and return an care category based on AMPDS card
306 def callsign_group_selection(self) -> int: 307 """ 308 This function will allocate and return an callsign group 309 based on the care category 310 """ 311 312 #print(f"Callsign group selection with {hour} and {ampds_card}") 313 314 return pd.Series.sample(self.callsign_group_df['callsign_group'], weights = self.callsign_group_df['proportion'], 315 random_state=self.rngs["callsign_group_selection"]).iloc[0]
This function will allocate and return an callsign group based on the care category
330 def vehicle_type_selection_qtr(self, callsign_group: str, qtr: int) -> int: 331 """ 332 This function will allocate and return a vehicle type 333 based callsign group 334 """ 335 336 df = self.vehicle_type_quarter_df[ 337 (self.vehicle_type_quarter_df['callsign_group'] == int(callsign_group)) & # Cater for Other 338 (self.vehicle_type_quarter_df['quarter'] == int(qtr)) 339 ] 340 341 return pd.Series.sample(df['vehicle_type'], weights = df['proportion'], 342 random_state=self.rngs["vehicle_type_selection"]).iloc[0]
This function will allocate and return a vehicle type based callsign group
371 def hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs(self, pt_outcome: str, quarter: int, vehicle_type: str, callsign_group: int, hour: int): 372 """ 373 This function will allocate a HEMS result based on patient outcome, yearly quarter and HEMS deets. 374 """ 375 376 if (hour >= 7) and (hour <= 18): 377 time_of_day = 'day' 378 else: 379 time_of_day = 'night' 380 381 self.debug(f"Hour is {hour}: for hems_result sampling, this is {time_of_day}") 382 383 #(self.hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_probs_df.head()) 384 385 df = self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df[ 386 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['pt_outcome'] == pt_outcome) & 387 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['quarter'] == quarter) & 388 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['vehicle_type'] == vehicle_type) & 389 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['callsign_group'] == callsign_group) & 390 (self.hems_results_by_patient_outcome_and_time_of_day_and_quarter_and_vehicle_type_and_callsign_group_probs_df['time_of_day'] == time_of_day) 391 ] 392 393 #print(df.head()) 394 395 return pd.Series.sample(df['hems_result'], weights = df['proportion'], 396 random_state=self.rngs["hems_results_by_patient_outcome_and_quarter_and_vehicle_type_and_callsign_group_selection"]).iloc[0]
This function will allocate a HEMS result based on patient outcome, yearly quarter and HEMS deets.
398 def pt_outcome_selection(self, care_category: str, quarter: int) -> int: 399 """ 400 This function will allocate and return an patient outcome 401 based on the HEMS result 402 """ 403 404 df = self.patient_outcome_by_care_category_and_quarter_probs_df[ 405 (self.patient_outcome_by_care_category_and_quarter_probs_df['care_category'] == care_category) & 406 (self.patient_outcome_by_care_category_and_quarter_probs_df['quarter'] == quarter) 407 ] 408 409 #print(df) 410 return pd.Series.sample(df['pt_outcome'], weights = df['proportion'], 411 random_state=self.rngs["pt_outcome_selection"]).iloc[0]
This function will allocate and return an patient outcome based on the HEMS result
414 def sex_selection(self, ampds_card: int) -> str: 415 """ 416 This function will allocate and return the patient sex 417 based on allocated AMPDS card category 418 """ 419 420 prob_female = self.sex_by_ampds_df[self.sex_by_ampds_df['ampds_card'] == ampds_card]['proportion'] 421 422 return 'Female' if (self.rngs["sex_selection"].uniform(0, 1) < prob_female.iloc[0]) else 'Male'
This function will allocate and return the patient sex based on allocated AMPDS card category
425 def age_sampling(self, ampds_card: int, max_age: int) -> float: 426 """ 427 This function will return the patient's age based 428 on sampling from the distribution that matches the allocated AMPDS card 429 """ 430 431 distribution = {} 432 433 #print(self.age_distr) 434 435 for i in self.age_distr: 436 #print(i) 437 if i['ampds_card'] == str(ampds_card): 438 #print('Match') 439 distribution = i['best_fit'] 440 441 #print(f"Getting age for {ampds_card}") 442 #print(distribution) 443 444 age = 100000 445 while age > max_age: 446 age = self.sample_from_distribution(distribution, rng=self.rngs["age_sampling"]) 447 448 return age
This function will return the patient's age based on sampling from the distribution that matches the allocated AMPDS card
450 def activity_time(self, vehicle_type: str, time_type: str) -> float: 451 """ 452 This function will return a dictionary containing 453 the distribution and parameters for the distribution 454 that match the provided HEMS vehicle type and time type 455 456 """ 457 458 dist = self.activity_time_distr.get((vehicle_type, time_type)) 459 # print(dist) 460 461 if dist is None: 462 raise ValueError(f"No distribution found for ({vehicle_type}, {time_type})") 463 464 try: 465 min_time, max_time = self.min_max_cache[time_type] 466 # print(f"Min time {time_type}: {min_time}") 467 # print(f"Max time {time_type}: {max_time}") 468 except KeyError: 469 raise ValueError(f"Min/max bounds not found for time_type='{time_type}'") 470 471 sampled_time = -1 472 while not (min_time <= sampled_time <= max_time): 473 sampled_time = dist.sample() 474 475 # print(sampled_time) 476 477 return sampled_time
This function will return a dictionary containing the distribution and parameters for the distribution that match the provided HEMS vehicle type and time type
480 def inc_per_day(self, quarter: int, quarter_or_season: str = 'season') -> float: 481 """ 482 This function will return the number of incidents for a given 483 day 484 485 """ 486 487 season = 'summer' if quarter in [2, 3] else 'winter' 488 489 distribution = {} 490 max_n = 0 491 min_n = 0 492 493 distr_data = self.inc_per_day_distr if quarter_or_season == 'season' else self.inc_per_day_per_qtr_distr 494 495 for i in distr_data: 496 #print(i) 497 if(quarter_or_season == 'season'): 498 if (i['season'] == season): 499 #print('Match') 500 distribution = i['best_fit'] 501 max_n = i['max_n_per_day'] 502 min_n = i['min_n_per_day'] 503 else: 504 if (i['quarter'] == quarter): 505 #print('Match') 506 distribution = i['best_fit'] 507 max_n = i['max_n_per_day'] 508 min_n = i['min_n_per_day'] 509 510 sampled_inc_per_day = -1 511 512 while not (sampled_inc_per_day >= min_n and sampled_inc_per_day <= max_n): 513 sampled_inc_per_day = self.sample_from_distribution(distribution, rng=self.rngs["calls_per_day"]) 514 515 return sampled_inc_per_day
This function will return the number of incidents for a given day
517 def inc_per_day_samples(self, quarter: int, quarter_or_season: str = 'season') -> float: 518 """ 519 This function will return a dictionary containing 520 the distribution and parameters for the distribution 521 that match the provided HEMS vehicle type and time type 522 523 """ 524 525 season = 'summer' if quarter in [2, 3] else 'winter' 526 527 if quarter_or_season == 'season': 528 return self.rngs["calls_per_day"].choice(self.incident_per_day_samples[season]) 529 else: 530 return self.rngs["calls_per_day"].choice(self.incident_per_day_samples[f"Q{quarter}"])
This function will return a dictionary containing the distribution and parameters for the distribution that match the provided HEMS vehicle type and time type
533 def sample_from_distribution(self, distr: dict, rng: np.random.Generator) -> float: 534 """ 535 Sample a single float value from a seeded scipy distribution. 536 537 Parameters 538 ---------- 539 distr : dict 540 A dictionary with one key (the distribution name) and value as the parameters. 541 rng : np.random.Generator 542 A seeded RNG from the simulation's RNG stream pool. 543 544 Returns 545 ------- 546 float 547 A positive sampled value from the specified distribution. 548 """ 549 if len(distr) != 1: 550 raise ValueError("Expected one distribution name in distr dictionary") 551 552 dist_name, params = list(distr.items())[0] 553 sci_distr = getattr(scipy.stats, dist_name) 554 555 while True: 556 sampled_value = np.floor(sci_distr.rvs(random_state=rng, **params)) 557 if sampled_value > 0: 558 return sampled_value
Sample a single float value from a seeded scipy distribution.
Parameters
distr : dict A dictionary with one key (the distribution name) and value as the parameters. rng : np.random.Generator A seeded RNG from the simulation's RNG stream pool.
Returns
float A positive sampled value from the specified distribution.
560 def get_nth_weekday(self, year: int, month: int, weekday: int, n: int): 561 562 """ 563 Calculate date of the nth occurrence of a weekday in a given month and year. 564 """ 565 566 first_day = datetime(year, month, 1) 567 first_weekday = first_day.weekday() 568 days_until_weekday = (weekday - first_weekday + 7) % 7 569 first_occurrence = first_day + timedelta(days=days_until_weekday) 570 571 return first_occurrence + timedelta(weeks = n - 1)
Calculate date of the nth occurrence of a weekday in a given month and year.
573 def get_last_weekday(self, year, month, weekday): 574 """ 575 Return the date of the last occurrence of a weekday in a given month and year. 576 """ 577 578 last_day = datetime(year, month, calendar.monthrange(year, month)[1]) 579 last_weekday = last_day.weekday() 580 days_since_weekday = (last_weekday - weekday + 7) % 7 581 582 return last_day - timedelta(days = days_since_weekday)
Return the date of the last occurrence of a weekday in a given month and year.
584 def calculate_term_holidays(self, year): 585 586 holidays = [] 587 588 # Jan to Easter 589 start_date = datetime(year, 1, 1) 590 591 first_mon_of_year = self.get_nth_weekday(year, 1, calendar.MONDAY, 1) 592 593 jan_term_start = first_mon_of_year 594 595 if start_date.weekday() in [0, 6]: 596 #print(f"1st jan is {start_date.weekday()}") 597 # 1st Jan is a a weekday 598 jan_term_start += timedelta(days = 1) 599 600 #print(f"Year {year} - start_date: {start_date} and term_start is {jan_term_start}") 601 602 holidays.append({'year': int(year), 'start_date': start_date, 'end_date' : jan_term_start - timedelta(days = 1)}) 603 604 # Spring half-term 605 606 spring_half_term_start = self.get_nth_weekday(year, 2, calendar.MONDAY, 2) 607 spring_half_term_end = spring_half_term_start + timedelta(days = 4) 608 609 holidays.append({'year': int(year), 'start_date': spring_half_term_start, 'end_date' : spring_half_term_end}) 610 611 # Easter hols 612 613 # Calculate Good Friday 614 easter_sunday = self.calculate_easter(year) 615 616 #print(f"Easter Sunday is {easter_sunday}") 617 good_friday = easter_sunday - timedelta(days = 2) 618 619 # If ES is in March, 1st two weeks of Apri 620 # Otherwise, Monday of ES week + 1 week 621 start_date = easter_sunday - timedelta(days = 6) 622 end_date = start_date + timedelta(days = 13) 623 624 if easter_sunday.month == 3 or (easter_sunday.month == 4 and easter_sunday >= self.get_nth_weekday(year, 4, calendar.SUNDAY, 2)): 625 start_date = self.get_nth_weekday(year, 4, calendar.MONDAY, 1) 626 if easter_sunday.month == 4 and easter_sunday >= self.get_nth_weekday(year, 4, calendar.SUNDAY, 2): 627 # Will also likely be a late Easter Monday 628 end_date = start_date + timedelta(days = 14) 629 else: 630 end_date = start_date + timedelta(days = 13) 631 632 holidays.append({ 633 'year': int(year), 634 'start_date': start_date, 635 'end_date' : end_date 636 }) 637 638 # Summer half-term 639 640 summer_half_term_start = self.get_last_weekday(year, 5, calendar.MONDAY) 641 summer_half_term_end = summer_half_term_start + timedelta(days = 6) 642 643 holidays.append({ 644 'year': int(year), 645 'start_date': summer_half_term_start, 646 'end_date' : summer_half_term_end 647 }) 648 649 # Summer Holidays 650 651 summer_start = self.get_last_weekday(year, 7, calendar.MONDAY) 652 summer_end = self.get_nth_weekday(year, 9, calendar.MONDAY, 1) 653 654 holidays.append({ 655 'year': int(year), 656 'start_date': summer_start, 657 'end_date' : summer_end 658 }) 659 660 # Autumn Term 661 662 autumn_half_term_start = summer_end + timedelta(weeks = 8) 663 664 if summer_end.day >= 4: 665 autumn_half_term_start = summer_end + timedelta(weeks = 7) 666 667 autumn_half_term_end = autumn_half_term_start + timedelta(days = 6) 668 669 holidays.append({ 670 'year': int(year), 671 'start_date': autumn_half_term_start, 672 'end_date' : autumn_half_term_end 673 }) 674 675 # Christmas Hols 676 677 start_date = self.get_last_weekday(year, 12, calendar.MONDAY) - timedelta(days = 7) 678 679 holidays.append({ 680 'year': int(year), 681 'start_date': start_date, 682 'end_date' : datetime(year, 12, 31) 683 }) 684 685 return pd.DataFrame(holidays)
687 def calculate_easter(self, year): 688 689 """ 690 Calculate the date of Easter Sunday for a given year using the Anonymous Gregorian algorithm. 691 Converted to Python from this SO answer: https://stackoverflow.com/a/49558298/3650230 692 Really interesting rabbit hole to go down about this in the whole thread: https://stackoverflow.com/questions/2192533/function-to-return-date-of-easter-for-the-given-year 693 """ 694 695 a = year % 19 696 b = year // 100 697 c = year % 100 698 d = b // 4 699 e = b % 4 700 f = (b + 8) // 25 701 g = (b - f + 1) // 3 702 h = (19 * a + b - d - g + 15) % 30 703 k = c % 4 704 i = (c - k) // 4 705 l = (32 + 2 * e + 2 * i - h - k) % 7 706 m = (a + 11 * h + 22 * l) // 451 707 month = (h + l - 7 * m + 114) // 31 708 day = ((h + l - 7 * m + 114) % 31) + 1 709 710 return datetime(year, month, day)
Calculate the date of Easter Sunday for a given year using the Anonymous Gregorian algorithm. Converted to Python from this SO answer: https://stackoverflow.com/a/49558298/3650230 Really interesting rabbit hole to go down about this in the whole thread: https://stackoverflow.com/questions/2192533/function-to-return-date-of-easter-for-the-given-year
715 def biased_mean(series: pd.Series, bias: float = .5) -> float: 716 """ 717 718 Compute a weighted mean, favoring the larger value since demand 719 likely to only increase with time 720 721 """ 722 723 if len(series) == 1: 724 return series.iloc[0] # Return the only value if there's just one 725 726 sorted_vals = np.sort(series) # Ensure values are sorted 727 weights = np.linspace(1, bias * 2, len(series)) # Increasing weights with larger values 728 return np.average(sorted_vals, weights=weights)
Compute a weighted mean, favoring the larger value since demand likely to only increase with time
731 def build_seeded_distributions(self, seed_seq): 732 """ 733 Build a dictionary of seeded distributions keyed by (vehicle_type, time_type) 734 735 Returns 736 ------- 737 dict of (vehicle_type, time_type) -> SeededDistribution 738 739 e.g. {('helicopter', 'time_allocation'): <utils.SeededDistribution object at 0x000001521627C750>, 740 ('car', 'time_allocation'): <utils.SeededDistribution object at 0x000001521627D410>, 741 ('helicopter', 'time_mobile'): <utils.SeededDistribution object at 0x0000015216267E90>} 742 """ 743 n = len(self.activity_time_distr) 744 rngs = [default_rng(s) for s in seed_seq.spawn(n)] 745 746 dist_map = { 747 'poisson': poisson, 748 'bernoulli': bernoulli, 749 'triang': triang, 750 'erlang': erlang, 751 'weibull_min': weibull_min, 752 'expon_weib': exponweib, 753 'betabinom': betabinom, 754 'pearson3': pearson3, 755 'cauchy': cauchy, 756 'chi2': chi2, 757 'expon': expon, 758 'exponential': expon, # alias for consistency 759 'exponpow': exponpow, 760 'gamma': gamma, 761 'lognorm': lognorm, 762 'norm': norm, 763 'normal': norm, # alias for consistency 764 'powerlaw': powerlaw, 765 'rayleigh': rayleigh, 766 'uniform': uniform 767 } 768 769 seeded_distributions = {} 770 771 # print(activity_time_distr) 772 # print(len(self.activity_time_distr)) 773 i = 0 774 for entry, rng in zip(self.activity_time_distr, rngs): 775 i += 1 776 vt = entry['vehicle_type'] 777 tt = entry['time_type'] 778 best_fit = entry['best_fit'] 779 780 # dist_name = best_fit['dist'].lower() 781 dist_name = list(best_fit.keys())[0] 782 dist_cls = dist_map.get(dist_name) 783 self.debug(f"{i}/{len(self.activity_time_distr)} for {vt} {tt} set up {dist_name} {dist_cls} with params: {best_fit[dist_name]}") 784 785 if dist_cls is None: 786 raise ValueError(f"Unsupported distribution type: {dist_name}") 787 788 params = best_fit[dist_name] 789 790 seeded_distributions[(vt, tt)] = SeededDistribution(dist_cls, rng, **params) 791 792 self.activity_time_distr = seeded_distributions 793 794 return seeded_distributions
Build a dictionary of seeded distributions keyed by (vehicle_type, time_type)
Returns
dict of (vehicle_type, time_type) -> SeededDistribution
e.g. {('helicopter', 'time_allocation'): <utils.SeededDistribution object at 0x000001521627C750>, ('car', 'time_allocation'): <utils.SeededDistribution object at 0x000001521627D410>, ('helicopter', 'time_mobile'): <utils.SeededDistribution object at 0x0000015216267E90>}
796 def sample_ad_hoc_reason(self, hour: int, quarter: int, registration: str) -> bool: 797 """ 798 Sample from ad hoc unavailability probability table based on time bin and quarter. 799 Returns a reason string (e.g. 'available', 'crew', 'weather', etc.). 800 """ 801 # Determine time bin 802 if 0 <= hour <= 5: 803 bin_label = '00-05' 804 elif 6 <= hour <= 11: 805 bin_label = '06-11' 806 elif 12 <= hour <= 17: 807 bin_label = '12-17' 808 else: 809 bin_label = '18-23' 810 811 # Main subset for registration, bin, and quarter 812 subset = self.ad_hoc_probs[ 813 (self.ad_hoc_probs['registration'] == registration) & 814 (self.ad_hoc_probs['six_hour_bin'] == bin_label) & 815 (self.ad_hoc_probs['quarter'] == quarter) 816 ] 817 818 if subset.empty: 819 self.debug(f"[AD HOC] No data for {registration} in bin {bin_label}, Q{quarter}. Falling back to all registrations.") 820 subset = self.ad_hoc_probs[ 821 (self.ad_hoc_probs['six_hour_bin'] == bin_label) & 822 (self.ad_hoc_probs['quarter'] == quarter) 823 ] 824 if subset.empty: 825 self.debug(f"[AD HOC] No data at all for bin {bin_label}, Q{quarter}. Defaulting to 'available'.") 826 return 'available' 827 828 filled_subset = subset.copy() 829 830 for idx, row in filled_subset.iterrows(): 831 if pd.isna(row['probability']): 832 reason = row['reason'] 833 # Try registration-wide average across all bins in this quarter 834 reg_avg = self.ad_hoc_probs[ 835 (self.ad_hoc_probs['registration'] == row['registration']) & 836 (self.ad_hoc_probs['quarter'] == row['quarter']) & 837 (self.ad_hoc_probs['reason'] == reason) & 838 (self.ad_hoc_probs['probability'].notna()) 839 ]['probability'].mean() 840 841 if pd.isna(reg_avg): 842 # Fall back: average for this bin/quarter across all registrations 843 fallback_avg = self.ad_hoc_probs[ 844 (self.ad_hoc_probs['six_hour_bin'] == row['six_hour_bin']) & 845 (self.ad_hoc_probs['quarter'] == row['quarter']) & 846 (self.ad_hoc_probs['reason'] == reason) & 847 (self.ad_hoc_probs['probability'].notna()) 848 ]['probability'].mean() 849 850 if pd.notna(fallback_avg): 851 self.debug(f"[AD HOC] Missing prob for {registration}, {bin_label}, Q{quarter}, reason '{reason}'. Using fallback avg across registrations: {fallback_avg:.4f}") 852 filled_subset.at[idx, 'probability'] = fallback_avg 853 else: 854 self.debug(f"[AD HOC] Missing prob for {registration}, {bin_label}, Q{quarter}, reason '{reason}'. No fallback avg found. Defaulting to 'available'.") 855 return 'available' 856 else: 857 self.debug(f"[AD HOC] Missing prob for {registration}, {bin_label}, Q{quarter}, reason '{reason}'. Using reg-wide avg: {reg_avg:.4f}") 858 filled_subset.at[idx, 'probability'] = reg_avg 859 860 if filled_subset['probability'].isna().any(): 861 self.debug(f"[AD HOC] Still missing probabilities after imputation for {registration}, {bin_label}, Q{quarter}. Defaulting to 'available'.") 862 return 'available' 863 864 total_prob = filled_subset['probability'].sum() 865 if total_prob == 0 or pd.isna(total_prob): 866 self.debug(f"[AD HOC] Total probability is zero or NaN for {registration}, {bin_label}, Q{quarter}. Defaulting to 'available'.") 867 return 'available' 868 869 norm_probs = filled_subset['probability'] / total_prob 870 871 sampled_reason = self.rngs["ad_hoc_reason_selection"].choice( 872 filled_subset['reason'].tolist(), 873 p=norm_probs.tolist() 874 ) 875 876 return sampled_reason
Sample from ad hoc unavailability probability table based on time bin and quarter. Returns a reason string (e.g. 'available', 'crew', 'weather', etc.).
878class SeededDistribution: 879 def __init__(self, dist, rng, **kwargs): 880 self.dist = dist(**kwargs) 881 self.rng = rng 882 883 def sample(self, size=None): 884 return self.dist.rvs(size=size, random_state=self.rng)