Source code for covid.models

"""Compartmental models"""
from abc import ABC, abstractmethod
from scipy.optimize import curve_fit
import numpy as np

from covid.utilities import laplacian, flatten, get_logger, vec_diff

logger = get_logger()


[docs]def ramp_function(constant, amplitude, ramp_rate, shift, t_step): """Function to ramp model parameters over time""" return constant + amplitude / (1 + np.exp(-ramp_rate * (t_step + shift)))
[docs]class CompartmentalModel(ABC): """Base class for compartmental models from which specific models like SIR can be derived """ def __init__(self, compartment_names, parameters=None): self.compartment_number = len(compartment_names) self.transfer_matrix = np.zeros((self.compartment_number, self.compartment_number), dtype=object) self.compartment_names = {compartment_names[i]: i for i in range(self.compartment_number)} self.compartment_index = {i: compartment_names[i] for i in range(self.compartment_number)} self.compartment_series = {} self.initial_values = None self.fit_compartments = list(self.compartment_names.keys()) self.fit_derivatives = True self.parameter_index = 0 self.model_params = [] self.params = {} self.epsilon = 1.0e-3 self.n_substeps = 20 self.dt = 1.0 / self.n_substeps self.start_step = 0 self.define_parameters() if parameters is not None: for k, v in parameters.items(): self.params[k] = v self.define_transfer_matrix()
[docs] @abstractmethod def define_parameters(self): """Define compartmental model parameters"""
[docs] @abstractmethod def define_transfer_matrix(self): """Define transfer matrix used in simulating model"""
[docs] @abstractmethod def update_transfer_matrix(self, last_compartments, step_number): """Update transfer matrix values to calculate next time step"""
[docs] def get_parameters_from_data(self, data): """Get model parameters from data""" self.initialize(initial_values={compartment: data[compartment][0] for compartment in data}) input = flatten([list(data[name]) for name in self.fit_compartments]) if self.fit_derivatives: input += flatten([vec_diff(list(data[name])) for name in self.fit_compartments]) lower_bounds = {} for k in self.model_params: if 'ramp' in str(k) or 'adjustment' in str(k): lower_bounds[k] = -np.inf else: lower_bounds[k] = 0.0 upper_bounds = {k: np.inf for k in self.model_params} initial_parameters = {k: self.params[k] for k in self.model_params} return self.fit_parameters(input, lower_bounds, upper_bounds, initial_parameters)
[docs] def update_parameters(self, params): """Update model parameters from dict""" for k, v in params.items(): self.params[k] = v self.define_transfer_matrix()
[docs] def transfer_value(self, to_compartment, from_compartment): """Get transfer matrix entry""" return self.transfer_matrix[self.compartment_names[to_compartment], self.compartment_names[from_compartment]]
[docs] def update_transfer_value(self, value, to_compartment, from_compartment): """Update transfer matrix entry""" self.transfer_matrix[self.compartment_names[to_compartment], self.compartment_names[from_compartment]] = value
[docs] def last_compartment_value(self, compartment): """Get last entry in timeseries""" return self.compartment_series[compartment][-1]
[docs] def fit_parameters(self, data, lower_bounds, upper_bounds, initial_parameters): """Determine model parameters from data fit""" self.parameter_index = {k: v for k, v in enumerate(initial_parameters.keys())} lower_bounds = [lower_bounds[k] for k in self.parameter_index.values()] upper_bounds = [upper_bounds[k] for k in self.parameter_index.values()] initial_parameters = [initial_parameters[k] for k in self.parameter_index.values()] self.start_step = 0 n_days = len(data) // len(self.fit_compartments) if self.fit_derivatives: n_days //= 2 days = list(range(n_days)) popt, _ = curve_fit(self.model_fit_function, days, data, method='trf', bounds=(lower_bounds, upper_bounds), p0=initial_parameters) for k, v in self.parameter_index.items(): self.params[v] = popt[k] self.start_step = n_days return self.params
[docs] def model_fit_function(self, t, *args): """Model function providing fit interface""" for i, v in enumerate(args): self.params[self.parameter_index[i]] = v results = self.run_model(len(t) - 1) output = flatten([list(results[name]) for name in self.fit_compartments]) if self.fit_derivatives: output += flatten([vec_diff(list(results[name])) for name in self.fit_compartments]) return output
[docs] def set_initial_values(self, initial_values): """Set initial values for model compartments""" self.initial_values = initial_values
[docs] def initialize(self, initial_values=None): """Initialize each model compartment""" if initial_values is not None: self.set_initial_values(initial_values) for name in self.compartment_names: self.compartment_series[name] = [self.initial_values[name]]
[docs] def model_eqns(self, last_compartments, step_number): """Model equation definition used in rk step""" self.update_transfer_matrix(last_compartments, step_number) next_compartments = [] for i in range(self.compartment_number): entry = 0 for j in range(self.compartment_number): entry += self.transfer_matrix[i, j] * last_compartments[j] next_compartments.append(entry * self.dt) return next_compartments
[docs] def rk_step(self, last_compartments, step_number): """Runge-Kutta step for model simulation""" rk_step = [0.5, 0.5, 1, 0] tmp = list(last_compartments) compartments_k = [[] for i in range(self.compartment_number)] for t in range(4): tmp = self.model_eqns(tmp, step_number) for i in range(self.compartment_number): compartments_k[i].append(tmp[i]) tmp = np.multiply(tmp, rk_step[t]) + last_compartments stencil = [1, 2, 2, 1] next_compartments = (last_compartments + 1.0 / 6.0 * np.tensordot(compartments_k, stencil, axes=(1, 0))) next_compartments[next_compartments < 0] = 0.0 return next_compartments
[docs] def run_model(self, n_days=100): """Run model simulation for n_days""" self.initialize() self.define_transfer_matrix() last_compartments = [self.compartment_series[name][0] for name in self.compartment_names] for day in range(n_days): for _ in range(self.n_substeps): last_compartments = self.rk_step(last_compartments, day) for i, name in enumerate(self.compartment_names): self.compartment_series[name].append(last_compartments[i]) return self.compartment_series
[docs]class SIR(CompartmentalModel): """Susceptible, Infected, Recovered Compartmental Model""" def __init__(self, parameters=None): super().__init__(['S', 'I', 'R'], parameters) self.model_params = ['recovery_rate', 'infection_rate', 'infection_rate_adjustment', 'infection_rate_ramp']
[docs] def define_parameters(self): self.params['recovery_rate'] = 1.0 / 14.0 self.params['infection_rate'] = 3.0 / 14.0 self.params['infection_rate_ramp'] = 0.0 self.params['infection_rate_adjustment'] = 0.0
[docs] def define_transfer_matrix(self): self.params['R0'] = (self.params['infection_rate'] / self.params['recovery_rate']) self.update_transfer_value(-self.params['recovery_rate'], 'I', 'I') self.update_transfer_value(self.params['recovery_rate'], 'R', 'I')
[docs] def update_transfer_matrix(self, last_compartments, step_number): self.params['N'] = sum(last_compartments) infection_rate_ramped = ramp_function( self.params['infection_rate'], self.params['infection_rate_adjustment'], self.params['infection_rate_ramp'], self.start_step, step_number) infection_rate_ramped *= last_compartments[ self.compartment_names['I']] / self.params['N'] self.update_transfer_value(-infection_rate_ramped, 'S', 'S') self.update_transfer_value(infection_rate_ramped, 'I', 'S')
[docs]class SIRV(CompartmentalModel): """Susceptible, Infected, Recovered, Vaccinated Model""" def __init__(self, parameters=None): super().__init__(['S', 'I', 'R', 'V'], parameters) self.model_params = ['recovery_rate', 'infection_rate', 'vaccination_rate', 'recovery_rate_adjustment', 'infection_rate_adjustment', 'vaccination_rate_adjustment', 'infection_rate_ramp', 'vaccination_rate_ramp', 'recovery_rate_ramp']
[docs] def define_parameters(self): self.params['recovery_rate'] = 1.0 / 14.0 self.params['infection_rate'] = 3.0 / 14.0 self.params['vaccination_rate'] = 1.0 / 7.0 self.params['recovery_rate_adjustment'] = 0.0 self.params['infection_rate_adjustment'] = 0.0 self.params['vaccination_rate_adjustment'] = 0.0 self.params['infection_rate_ramp'] = 0.0 self.params['vaccination_rate_ramp'] = 0.0 self.params['recovery_rate_ramp'] = 0.0
[docs] def define_transfer_matrix(self): self.params['R0'] = (self.params['infection_rate'] / self.params['recovery_rate'])
[docs] def update_transfer_matrix(self, last_compartments, step_number): self.params['N'] = sum(last_compartments) recovery_rate_ramped = ramp_function( self.params['recovery_rate'], self.params['recovery_rate_adjustment'], self.params['recovery_rate_ramp'], self.start_step, step_number) vaccination_rate_ramped = ramp_function( self.params['vaccination_rate'], self.params['vaccination_rate_adjustment'], self.params['vaccination_rate_ramp'], self.start_step, step_number) infection_rate_ramped = ramp_function( self.params['infection_rate'], self.params['infection_rate_adjustment'], self.params['infection_rate_ramp'], self.start_step, step_number) infection_rate_ramped *= last_compartments[ self.compartment_names['I']] / self.params['N'] self.update_transfer_value(-recovery_rate_ramped, 'I', 'I') self.update_transfer_value(recovery_rate_ramped, 'R', 'I') self.update_transfer_value(vaccination_rate_ramped, 'V', 'S') self.update_transfer_value( -infection_rate_ramped - vaccination_rate_ramped, 'S', 'S') self.update_transfer_value(infection_rate_ramped, 'I', 'S')
[docs]class SIRVD(CompartmentalModel): """Susceptible, Infected, Recovered, Vaccinated, Died Model""" def __init__(self, parameters=None): super().__init__(['S', 'I', 'R', 'V', 'D'], parameters) self.model_params = ['recovery_rate', 'infection_rate', 'vaccination_rate', 'death_rate', # 'recovery_rate_adjustment', 'infection_rate_adjustment', 'vaccination_rate_adjustment', 'death_rate_adjustment', 'infection_rate_ramp', 'vaccination_rate_ramp', # 'recovery_rate_ramp', 'death_rate_ramp', ]
[docs] def define_parameters(self): self.params['recovery_rate'] = 1.0 / 14.0 self.params['infection_rate'] = 3.0 / 14.0 self.params['vaccination_rate'] = 1.0 / 7.0 self.params['death_rate'] = 0.02 self.params['recovery_rate_adjustment'] = 0.0 self.params['infection_rate_adjustment'] = 0.0 self.params['vaccination_rate_adjustment'] = 0.0 self.params['death_rate_adjustment'] = 0.0 self.params['infection_rate_ramp'] = 0.0 self.params['vaccination_rate_ramp'] = 0.0 self.params['recovery_rate_ramp'] = 0.0 self.params['death_rate_ramp'] = 0.0
[docs] def define_transfer_matrix(self): self.params['R0'] = (self.params['infection_rate'] / self.params['recovery_rate'])
[docs] def update_transfer_matrix(self, last_compartments, step_number): self.params['N'] = sum(last_compartments) recovery_rate_ramped = ramp_function( self.params['recovery_rate'], self.params['recovery_rate_adjustment'], self.params['recovery_rate_ramp'], self.start_step, step_number) vaccination_rate_ramped = ramp_function( self.params['vaccination_rate'], self.params['vaccination_rate_adjustment'], self.params['vaccination_rate_ramp'], self.start_step, step_number) death_rate_ramped = ramp_function( self.params['death_rate'], self.params['death_rate_adjustment'], self.params['death_rate_ramp'], self.start_step, step_number) infection_rate_ramped = ramp_function( self.params['infection_rate'], self.params['infection_rate_adjustment'], self.params['infection_rate_ramp'], self.start_step, step_number) infection_rate_ramped *= last_compartments[ self.compartment_names['I']] / self.params['N'] self.update_transfer_value( -recovery_rate_ramped - death_rate_ramped, 'I', 'I') self.update_transfer_value(recovery_rate_ramped, 'R', 'I') self.update_transfer_value(death_rate_ramped, 'D', 'I') self.update_transfer_value(vaccination_rate_ramped, 'V', 'S') self.update_transfer_value( -infection_rate_ramped - vaccination_rate_ramped, 'S', 'S') self.update_transfer_value(infection_rate_ramped, 'I', 'S')
[docs]class SIRD(CompartmentalModel): """Susceptible, Infected, Recovered, Died Model""" def __init__(self, parameters=None): super().__init__(['S', 'I', 'R', 'D'], parameters) self.model_params = ['recovery_rate', 'infection_rate', 'death_rate']
[docs] def define_parameters(self): self.params['recovery_rate'] = 1.0 / 14.0 self.params['infection_rate'] = 3.0 / 14.0 self.params['death_rate'] = 1.0 / 14.0
[docs] def define_transfer_matrix(self): self.params['R0'] = (self.params['infection_rate'] / self.params['recovery_rate']) self.update_transfer_value( -self.params['recovery_rate'] - self.params['death_rate'], 'I', 'I') self.update_transfer_value(self.params['recovery_rate'], 'R', 'I') self.update_transfer_value(self.params['death_rate'], 'D', 'I')
[docs] def update_transfer_matrix(self, last_compartments): self.params['N'] = sum(last_compartments) alpha = (self.params['infection_rate'] * last_compartments[self.compartment_names['I']] / self.params['N']) self.update_transfer_value(-alpha, 'S', 'S') self.update_transfer_value(alpha, 'I', 'S')
[docs]class Spatial_SIR(CompartmentalModel): """Susceptible, Infected, Recovered Model with spatial variation""" def __init__(self, parameters=None): super().__init__(['S', 'I', 'R'], parameters) self.model_params = ['recovery_rate', 'infection_rate']
[docs] def define_parameters(self): self.params['recovery_rate'] = 1.0 / 14.0 self.params['infection_rate'] = 3.0 / 14.0 self.params['grid_size_x'] = 20 self.params['grid_size_y'] = 20 self.params['dx'] = 1 self.params['dy'] = 1 self.params['nx'] = int(self.params['grid_size_x'] / self.params['dx']) self.params['ny'] = int(self.params['grid_size_y'] / self.params['dy']) self.params['r0'] = 1.0
[docs] def define_transfer_matrix(self): self.params['R0'] = (self.params['infection_rate'] / self.params['recovery_rate']) self.update_transfer_value(-self.params['recovery_rate'], 'I', 'I') self.update_transfer_value(self.params['recovery_rate'], 'R', 'I')
[docs] def update_transfer_matrix(self, last_compartments): self.params['N'] = sum(last_compartments) alpha = (self.params['infection_rate'] / self.params['N'] * (last_compartments[self.compartment_names['I']] + self.params['r0']**2 / 8.0 * laplacian(last_compartments[self.compartment_names['I']], self.params['dx'], self.params['dy']))) self.update_transfer_value(-alpha, 'S', 'S') self.update_transfer_value(alpha, 'I', 'S')
[docs] def example(self): """Spatial SIR example""" S = np.zeros((self.params['nx'], self.params['ny'])) I = np.zeros((self.params['nx'], self.params['ny'])) # noqa: E741 R = np.zeros((self.params['nx'], self.params['ny'])) S[:, :] = 10 midx = self.params['nx'] // 2 midy = self.params['ny'] // 2 I[midx - 2: midx + 2, midy - 2: midy + 2] = 3 self.compartment_series['S'] = [S] self.compartment_series['I'] = [I] self.compartment_series['R'] = [R]
[docs] def initialize(self, initial_values=None): if initial_values is None: self.example()
[docs]class SEIR(CompartmentalModel): """Susceptible, Exposed, Infected, Recovered Model""" def __init__(self, parameters=None): super().__init__(['S', 'E', 'I', 'R'], parameters) self.model_params = ['recovery_rate', 'infection_rate', 'latency_rate']
[docs] def define_parameters(self): self.params['recovery_rate'] = 1.0 / 14.0 self.params['infection_rate'] = 3.0 / 14.0 self.params['latency_rate'] = 1.0 / 7.0
[docs] def define_transfer_matrix(self): self.params['R0'] = (self.params['infection_rate'] / self.params['recovery_rate']) self.update_transfer_value(-self.params['latency_rate'], 'E', 'E') self.update_transfer_value(self.params['latency_rate'], 'I', 'E') self.update_transfer_value(-self.params['recovery_rate'], 'I', 'I') self.update_transfer_value(self.params['recovery_rate'], 'R', 'I')
[docs] def update_transfer_matrix(self, last_compartments): self.params['N'] = sum(last_compartments) alpha = (self.params['infection_rate'] * last_compartments[self.compartment_names['I']] / self.params['N']) self.update_transfer_value(-alpha, 'S', 'S') self.update_transfer_value(alpha, 'E', 'S')
[docs]class SEAIQHRD(CompartmentalModel): """Susceptible, Exposed, Asymptomatic, Infected, Quarantined, Hospitalized, Recovered, Died Model""" def __init__(self, parameters=None): super().__init__(['S', 'E', 'A', 'I', 'Q', 'H', 'R', 'D'], parameters) self.model_params = ['infection_rate', 'E_to_A_rate', 'A_to_I_rate', 'A_to_R_rate', 'I_to_Q_rate', 'I_to_H_rate', 'I_to_R_rate', 'I_to_D_rate', 'Q_to_H_rate', 'Q_to_R_rate', 'Q_to_D_rate', 'H_to_R_rate', 'H_to_D_rate']
[docs] def define_parameters(self): self.params['infection_rate'] = 3.0 / 14.0 self.params['E_to_A_rate'] = 1.0 / 4.0 self.params['A_to_I_rate'] = 1.0 / 7.0 self.params['A_to_R_rate'] = 1.0 / 3.0 self.params['I_to_Q_rate'] = 1.0 / 2.0 self.params['I_to_H_rate'] = 1.0 / 3.0 self.params['I_to_R_rate'] = 1.0 / 3.0 self.params['I_to_D_rate'] = 1.0 / 3.0 self.params['Q_to_H_rate'] = 1.0 / 5.0 self.params['Q_to_R_rate'] = 1.0 / 3.0 self.params['Q_to_D_rate'] = 1.0 / 3.0 self.params['H_to_R_rate'] = 1.0 / 3.0 self.params['H_to_D_rate'] = 1.0 / 3.0
[docs] def define_transfer_matrix(self): self.params['recovery_rate'] = np.sum([self.params['A_to_R_rate'], self.params['I_to_R_rate'], self.params['Q_to_R_rate'], self.params['H_to_R_rate']]) self.params['R0'] = (self.params['infection_rate'] / self.params['recovery_rate']) self.params['E_decay_rate'] = self.params['E_to_A_rate'] self.params['A_decay_rate'] = np.sum([self.params['A_to_I_rate'], self.params['A_to_R_rate']]) self.params['H_decay_rate'] = np.sum([self.params['H_to_R_rate'], self.params['H_to_D_rate']]) self.params['Q_decay_rate'] = np.sum([self.params['Q_to_H_rate'], self.params['Q_to_R_rate'], self.params['Q_to_D_rate']]) self.params['I_decay_rate'] = np.sum([self.params['I_to_Q_rate'], self.params['I_to_H_rate'], self.params['I_to_R_rate'], self.params['I_to_D_rate']]) self.update_transfer_value(-self.params['A_decay_rate'], 'A', 'A') self.update_transfer_value(-self.params['E_decay_rate'], 'E', 'E') self.update_transfer_value(-self.params['I_decay_rate'], 'I', 'I') self.update_transfer_value(-self.params['Q_decay_rate'], 'Q', 'Q') self.update_transfer_value(-self.params['H_decay_rate'], 'H', 'H') self.update_transfer_value(self.params['E_to_A_rate'], 'A', 'E') self.update_transfer_value(self.params['A_to_I_rate'], 'I', 'A') self.update_transfer_value(self.params['A_to_R_rate'], 'R', 'A') self.update_transfer_value(self.params['I_to_Q_rate'], 'Q', 'I') self.update_transfer_value(self.params['I_to_H_rate'], 'H', 'I') self.update_transfer_value(self.params['I_to_D_rate'], 'D', 'I') self.update_transfer_value(self.params['I_to_R_rate'], 'R', 'I') self.update_transfer_value(self.params['Q_to_H_rate'], 'H', 'Q') self.update_transfer_value(self.params['Q_to_R_rate'], 'R', 'Q') self.update_transfer_value(self.params['Q_to_D_rate'], 'D', 'Q') self.update_transfer_value(self.params['H_to_R_rate'], 'R', 'H') self.update_transfer_value(self.params['H_to_D_rate'], 'D', 'H')
[docs] def update_transfer_matrix(self, last_compartments): self.params['N'] = sum(last_compartments) div = self.params['N'] div -= last_compartments[self.compartment_names['Q']] div -= last_compartments[self.compartment_names['H']] div -= last_compartments[self.compartment_names['D']] alpha = (self.params['infection_rate'] * (last_compartments[self.compartment_names['I']] + last_compartments[self.compartment_names['A']]) / div) self.update_transfer_value(-alpha, 'S', 'S') self.update_transfer_value(alpha, 'E', 'S')