import csv
import json
import numpy as np
from scipy import interpolate, stats
import math
from scipy.optimize import curve_fit
from datetime import datetime
from decimal import Decimal
from sys import stdout
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
sh = logging.StreamHandler(stdout)
formatter = logging.Formatter('[%(levelname)s] %(asctime)s %(message)s')
sh.setFormatter(formatter)
logger.addHandler(sh)
[docs]def flatten(t):
return [item for sublist in t for item in sublist]
[docs]def get_logger():
return logger
[docs]def date_to_int(d):
return int(datetime.strftime(d, '%Y%m%d'))
[docs]def int_to_date(d):
return datetime.strptime(str(d), '%Y%m%d').date()
[docs]def doubling_time(case_data):
days = len(case_data)
Td = days / np.log2(case_data[-1] / case_data[0])
return Td
[docs]def json_handler(o):
if isinstance(o, datetime):
return o.isoformat()
elif isinstance(o, Decimal):
return float(o)
else:
logger.warning('Unknown Type in json_handler: %s', str(o))
return str(o)
[docs]def csv_to_json(infile, outfile):
csvFile = open(infile, 'r')
j = dict()
csvReader = csv.DictReader(csvFile)
for line in csvReader:
state = line['State']
county = line['County']
pop = int(line['Population'])
if state not in j:
j[state] = {}
j[state][county] = pop
else:
j[state][county] = pop
jsonFile = open(outfile, 'w')
jsonFile.write(json.dumps(j))
[docs]def interp_mat(M, old_domain, new_domain, rows, cols):
lons = np.linspace(old_domain[0], old_domain[1], M.shape[1])
lats = np.linspace(old_domain[3], old_domain[2], M.shape[0])
f = interpolate.interp2d(lons, lats, M, kind='cubic')
lons_new = np.linspace(new_domain[0], new_domain[1], cols)
lats_new = np.linspace(new_domain[3], new_domain[2], rows)
return f(lons_new, lats_new)
[docs]def laplacian(M, dx, dy):
"""Laplacian operator for use in spatial models"""
rows, cols = M.shape
L = np.zeros((rows, cols))
for i in range(rows):
for j in range(cols):
# interior
if 1 <= i <= (rows - 2) and 1 <= j <= (cols - 2):
L[i, j] = (M[i - 1, j] + M[i + 1, j] - 2 * M[i, j])
L[i, j] /= (dy * dy)
L[i, j] += (M[i, j - 1] + M[i, j + 1] - 2 * M[i, j])
L[i, j] /= (dx * dx)
# sides
if i == 0 and 1 <= j <= (cols - 2):
L[i, j] = (M[i + 1, j] - 2 * M[i, j]) / (dy * dy)
L[i, j] += (M[i, j - 1] + M[i, j + 1] - 2 * M[i, j])
L[i, j] /= (dx * dx)
if i == rows - 1 and 1 <= j <= (cols - 2):
L[i, j] = (M[i - 1, j] - 2 * M[i, j]) / (dy * dy)
L[i, j] += (M[i, j - 1] + M[i, j + 1] - 2 * M[i, j])
L[i, j] /= (dx * dx)
if j == 0 and 1 <= i <= (rows - 2):
L[i, j] = (M[i - 1, j] + M[i + 1, j] - 2 * M[i, j]) / (dy * dy)
L[i, j] += (M[i, j + 1] - 2 * M[i, j]) / (dx * dx)
if j == cols - 1 and 1 <= i <= (rows - 2):
L[i, j] = (M[i - 1, j] + M[i + 1, j] - 2 * M[i, j]) / (dy * dy)
L[i, j] += (M[i, j - 1] - 2 * M[i, j]) / (dx * dx)
# corners
if i == 0 and j == 0:
L[i, j] = (M[i + 1, j] - 2 * M[i, j]) / (dy * dy)
L[i, j] += (M[i, j + 1] - 2 * M[i, j]) / (dx * dx)
if i == 0 and j == cols - 1:
L[i, j] = (M[i + 1, j] - 2 * M[i, j]) / (dy * dy)
L[i, j] += (M[i, j - 1] - 2 * M[i, j]) / (dx * dx)
if i == rows - 1 and j == 0:
L[i, j] = (M[i - 1, j] - 2 * M[i, j]) / (dy * dy)
L[i, j] += (M[i, j + 1] - 2 * M[i, j]) / (dx * dx)
if i == rows - 1 and j == cols - 1:
L[i, j] = (M[i - 1, j] - 2 * M[i, j]) / (dy * dy)
L[i, j] += (M[i, j - 1] - 2 * M[i, j]) / (dx * dx)
return L
[docs]def vec_diff(v):
tmp = [v[0] - (v[1] - v[0])] + list(v)
diff = [tmp[i + 1] - tmp[i] for i in range(len(v))]
return diff
[docs]def get_beta_ramp(p):
beta_array = []
for i in range(p.n_days):
if i < p.Sd_delay:
tmp = p.beta
elif p.Sd_delay <= i < p.Sd_delay + p.n_ramp:
tmp = p.beta * (1 - p.Sd * (float((i - p.Sd_delay) / p.n_ramp)))
else:
tmp = p.beta * (1 - p.Sd)
beta_array.append(tmp)
return beta_array
[docs]def get_seasonal_beta_variation(date, peak_date, amp):
"""Use a cosine function to simulate seasonal variation"""
var = amp * math.cos(2 * math.pi * (date - peak_date).days / 365.0)
return var
[docs]def cost_func(v0, v1):
avg_step = 3
tmp0 = [np.mean(v0[i: i + avg_step])
for i in range(len(v0) - avg_step + 1)]
tmp1 = v1[: len(tmp0)]
min_len = min((len(tmp0), len(tmp1)))
return np.sqrt(np.sum([((tmp1[i] - tmp0[i]))**2
for i in range(min_len)])) / (min_len)
[docs]def grad_descent(p0, J0, J1, dp, alpha):
"""Gradient descent for parameter optimization"""
new_p = {}
for k in p0:
new_p[k] = p0[k] - alpha[k] * (J1[k] - J0[k]) / dp[k]
return new_p
[docs]def exp_func(t, a, b):
return a * np.exp(b * t)
[docs]def beta_interp(t, beta, Sd, t0, n_ramp):
if t < t0:
return beta
elif t0 <= t <= t0 + n_ramp:
return (1 - (t - t0) * Sd / (n_ramp)) * beta
else:
return (1 - Sd) * beta
[docs]def exp_decay_func(t, a, beta, Sd, t0, n_ramp):
return [a * np.exp(i * beta_interp(i, beta, Sd, t0, n_ramp)) for i in t]
[docs]def exp_lin_func(t, a, b, c):
return a * np.exp(b * t) + c * t
[docs]def logistic_func(t, a, b, c, d, e):
return a / (d * np.exp(-b * (t - e)) + c)
[docs]class initial_values:
pass
[docs]def cases_to_beta(data, params):
popt, _ = curve_fit(exp_func, list(range(len(data))), data)
params.R0 = (popt[1] / params.gamma + 1)
params.beta = params.R0 * params.gamma
return params
[docs]def doubling_trend_to_Sd(Td_array):
days = list(range(len(Td_array)))
r = stats.linregress(days, Td_array)
Sd = r.slope / (1 + r.slope)
return Sd