Source code for causaldag.utils.scores.gaussian_bge_score

import numpy as np
import numba
from scipy.special import loggamma
import math
import ipdb

@numba.jit
def numba_inv(A):
    return np.linalg.inv(A)


[docs]def local_gaussian_bge_score( node, parents, suffstat, alpha_mu=None, alpha_w=None, inverse_scale_matrix=None, parameter_mean=None, is_diagonal=True ): """ Compute the BGE score of a node given its parents. Parameters ---------- node: TODO - describe. parents: TODO - describe. suffstat: dictionary containing: * ``n`` -- number of samples * ``S`` -- sample covariance matrix * ``mu`` -- sample mean alpha_mu: TODO - describe. Default is the number of variables. alpha_w: TODO - describe. Default is the (number of variables) + alpha_mu + 1 inverse_scale_matrix: TODO - describe. Default is the identity matrix. parameter_mean: TODO - describe. Default is the zero vector. is_diagonal: TODO - describe. Returns ------- float BGE score. """ if not is_diagonal: raise NotImplementedError("BGE score not implemented for non-diagonal matrix.") k = len(parents) n = suffstat["n"] # number of samples S = suffstat["S"] # sample covariance matrix sample_mean = suffstat["mu"] p = S.shape[0] # number of variables if inverse_scale_matrix is None: inverse_scale_matrix = np.eye(p) if parameter_mean is None: parameter_mean = np.zeros(p) if alpha_mu is None: alpha_mu = p if alpha_w is None: alpha_w = p + alpha_mu + 1 print("alpha_w new", alpha_w) # === first term first_term = .5 * np.log(alpha_mu / (n + alpha_mu)) # === second term: ratio of gamma functions second_term = loggamma((n + alpha_w - p + k + 1)/2) second_term -= loggamma((alpha_w - p + k + 1)/2) second_term -= n/2 * np.log(math.pi) # === third term: ratio of determinants mean_diff = parameter_mean - sample_mean # R = inverse_scale_matrix + (n-1) * S + (n * alpha_w) / (n + alpha_w) * np.outer(mean_diff, mean_diff) R = inverse_scale_matrix + (n-1) * S + (n * alpha_mu) / (n + alpha_mu) * np.outer(mean_diff, mean_diff) Q = [node, *parents] P = list(parents) third_term = (alpha_w - p + k + 1)/2 * np.sum(np.log(np.diagonal(inverse_scale_matrix)[Q])) third_term += (n + alpha_w - p + k)/2 * np.log(np.linalg.det(R[np.ix_(P, P)])) third_term -= (alpha_w - p + k)/2 * np.sum(np.log(np.diagonal(inverse_scale_matrix)[P])) third_term -= (n + alpha_w - p + k + 1)/2 * np.log(np.linalg.det(R[np.ix_(Q, Q)])) return first_term + second_term + third_term
if __name__ == '__main__': from causaldag.rand import rand_weights, directed_erdos from causaldag.utils.ci_tests import partial_correlation_suffstat from sympy import gamma import scipy as sp from scipy import stats from scipy.linalg import ldl d = directed_erdos(10, .5) g = rand_weights(d) samples = g.sample(100) suffstat = partial_correlation_suffstat(samples) score = local_gaussian_bge_score(5, d.parents_of(5), suffstat) def integral_marginal_gaussian_bge(data): def log_c(a, b): c_value = 0 for i in range(a): c_value += np.log(float(gamma((b - i)/2))) return c_value N_mu = 3 n = 3 m = len(data) mu_0 = [0, 0, 0] T_0 = np.eye(3) N_w = 7 # T_0 = np.array([[2.25,-0.512,-0.512], [-0.512,2.25,0.512], [0.512,0.512,2.25]]) sample_variance = np.sum([np.dot(np.transpose([x - np.average(data, 0)]), [x - np.average(data, 0)]) for x in data], 0) print("sample variance", sample_variance) T_m = T_0 + sample_variance + ((N_w*m)/(N_w + m)) * np.outer((np.array(mu_0) - np.average(data, 0)), (np.array(mu_0) - np.average(data, 0))) d_x1 = [0] d_x2 = [0, 1] d_x3 = [0, 1, 2] ell_1 = len(d_x1) ell_2 = len(d_x2) ell_3 = len(d_x3) bge_coefficient_1 = np.log(np.pi) * (-ell_1*m/2) + np.log(N_mu/(N_mu + m)) * (ell_1/2) + log_c(ell_1, m + N_w - n + ell_1) - log_c(ell_1, N_w - n + ell_1) bge_coefficient_2 = np.log(np.pi) * (-ell_2*m/2) + np.log(N_mu/(N_mu + m)) * (ell_2/2) + log_c(ell_2, m + N_w - n + ell_2) - log_c(ell_2, N_w - n + ell_2) bge_coefficient_3 = np.log(np.pi) * (-ell_3*m/2) + np.log(N_mu/(N_mu + m)) * (ell_3/2) + log_c(ell_3, m + N_w - n + ell_3) - log_c(ell_3, N_w - n + ell_3) bge_d_x1 = bge_coefficient_1 + np.log(np.abs(np.linalg.det(T_0[d_x1, :][:, d_x1]))) * ((N_w - n + ell_1)/2) + np.log(np.abs(np.linalg.det(T_m[d_x1, :][:, d_x1]))) * ((-N_w-m+n-ell_1)/2) bge_d_x2 = bge_coefficient_2 + np.log(np.abs(np.linalg.det(T_0[d_x2, :][:, d_x2]))) * ((N_w - n + ell_2)/2) + np.log(np.abs(np.linalg.det(T_m[d_x2, :][:, d_x2]))) * ((-N_w-m+n-ell_2)/2) bge_d_x3 = bge_coefficient_3 + np.log(np.abs(np.linalg.det(T_0[d_x3, :][:, d_x3]))) * ((N_w - n + ell_3)/2) + np.log(np.abs(np.linalg.det(T_m[d_x3, :][:, d_x3]))) * ((-N_w-m+n-ell_3)/2) marginal_1 = bge_d_x1 marginal_2 = bge_d_x2 - bge_d_x1 marginal_3 = bge_d_x3 - bge_d_x2 print("old marginals", marginal_1, marginal_2, marginal_3) print("old total", marginal_1 + marginal_2 + marginal_3) total_log_marginal_likelihood = bge_coefficient_3 + np.log(np.abs(np.linalg.det(T_0))) * (N_w/2) + np.log(np.abs(np.linalg.det(T_m))) * ((-N_w-m)/2) return total_log_marginal_likelihood gaussian_data = np.array([[0.2, 0.2, 0.2], [0.2, 0.2, 0.2]]) print("old result", integral_marginal_gaussian_bge(gaussian_data)) suffstat = partial_correlation_suffstat(gaussian_data, invert=False) s1 = local_gaussian_bge_score(0, set(), suffstat) s2 = local_gaussian_bge_score(1, {0}, suffstat) s3 = local_gaussian_bge_score(2, {0, 1}, suffstat) print("new result node 0", s1) print("new result node 1", s2) print("new result node 2", s3) print("total:", s1+s2+s3)