# Copyright 2017 Neosapience, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========================================================================
"""
References
----------
.. [1] Pang Wei Koh and Percy Liang "Understanding Black-box Predictions via Influence Functions" ICML2017
"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
from .feeder import InfluenceFeeder # noqa: ignore=F401
from ..log import logger
import numpy as np
import tensorflow as tf
from tensorflow.python.ops.gradients_impl import _hessian_vector_product
import os
import time
import hashlib
import json
from functools import wraps
_using_fully_tf = True
def _timing(f):
@wraps(f)
def wrap(*args, **kwargs):
time1 = time.time()
ret = f(*args, **kwargs)
time2 = time.time()
logger.debug('* %s function took [%.3fs]' % (f.__name__, time2-time1))
return ret
return wrap
[docs]class Influence:
""" Influence Class
Parameters
----------
workspace: str
Path for workspace directory
feeder : InfluenceFeeder
Dataset feeder
loss_op_train : tf.Operation
Tensor for loss function used for training. it may includes regularization.
loss_op_test : tf.Operation
Tensor for loss function for inference.
x_placeholder : tf.Tensor
Data place holder
Tensor from tf.placeholder()
y_placeholder : tf.Tensor
Target place holder
Tensor from tf.placeholder()
test_feed_options : dict
Optional parameters to run loss operation in testset
train_feed_options : dict
Optional parameters to run loss operation in trainset
trainable_variables : tuple, or list
Trainable variables to be used
If None, all variables are trainable
Default: None
"""
def __init__(self, workspace, feeder, loss_op_train, loss_op_test, x_placeholder, y_placeholder,
test_feed_options=None, train_feed_options=None, trainable_variables=None):
self.workspace = workspace
self.feeder = feeder
self.x_placeholder = x_placeholder
self.y_placeholder = y_placeholder
self.test_feed_options = test_feed_options if test_feed_options else dict()
self.train_feed_options = train_feed_options if train_feed_options else dict()
if trainable_variables is None:
trainable_variables = (
tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES) +
tf.get_collection(tf.GraphKeys.TRAINABLE_RESOURCE_VARIABLES))
self.loss_op_train = loss_op_train
self.grad_op_train = tf.gradients(loss_op_train, trainable_variables)
self.grad_op_test = tf.gradients(loss_op_test, trainable_variables)
self.v_cur_estimated = [tf.placeholder(tf.float32, shape=a.get_shape()) for a in trainable_variables]
self.v_test_grad = [tf.placeholder(tf.float32, shape=a.get_shape()) for a in trainable_variables]
self.v_ihvp = tf.placeholder(tf.float64, shape=[None])
self.v_param_damping = tf.placeholder(tf.float32)
self.v_param_scale = tf.placeholder(tf.float32)
self.v_param_total_trainset = tf.placeholder(tf.float64)
self.inverse_hvp = None
self.trainable_variables = trainable_variables
with tf.name_scope('darkon_ihvp'):
self.hessian_vector_op = _hessian_vector_product(loss_op_train, trainable_variables, self.v_cur_estimated)
self.estimation_op = [
a + (b * self.v_param_damping) - (c / self.v_param_scale)
for a, b, c in zip(self.v_test_grad, self.v_cur_estimated, self.hessian_vector_op)
]
with tf.name_scope('darkon_grad_diff'):
flatten_inverse_hvp = tf.reshape(self.v_ihvp, shape=(-1, 1))
flatten_grads = tf.concat([tf.reshape(a, (-1,)) for a in self.grad_op_train], 0)
flatten_grads = tf.reshape(flatten_grads, shape=(1, -1,))
flatten_grads = tf.cast(flatten_grads, tf.float64)
flatten_grads /= self.v_param_total_trainset
self.grad_diff_op = tf.matmul(flatten_grads, flatten_inverse_hvp)
self.ihvp_config = {
'scale': 1e4,
'damping': 0.01,
'num_repeats': 1,
'recursion_batch_size': 10,
'recursion_depth': 10000
}
if not os.path.exists(self.workspace):
os.makedirs(self.workspace)
[docs] @_timing
def upweighting_influence(self, sess, test_indices, test_batch_size, approx_params,
train_indices, num_total_train_example, force_refresh=False):
""" Calculate influence score of given training samples that affect on the test samples
Negative value indicates bad effect on the test loss
Parameters
----------
sess: tf.Session
Tensorflow session
test_indices: list
Test samples to be used. Influence on these samples are calculated.
test_batch_size: int
batch size for test samples
approx_params: dict
Parameters for inverse hessian vector product approximation
Default:
{'scale': 1e4,
'damping': 0.01,
'num_repeats': 1,
'recursion_batch_size': 10,
'recursion_depth': 10000}
train_indices: list
Training samples indices to be calculated.
num_total_train_example: int
Number of total training samples used for training,
which might be different from the size of train_indices
force_refresh: bool
If False, it calculates only when test samples and parameters are changed.
Default: False
Returns
-------
numpy.ndarray
"""
self._prepare(sess, test_indices, test_batch_size, approx_params, force_refresh)
self.feeder.reset()
score = self._grad_diffs(sess, train_indices, num_total_train_example)
logger.info('Multiplying by %s train examples' % score.size)
return score
[docs] @_timing
def upweighting_influence_batch(self, sess, test_indices, test_batch_size, approx_params,
train_batch_size, train_iterations, subsamples=-1, force_refresh=False):
""" Iteratively calculate influence scores for training data sampled by batch sampler
Negative value indicates bad effect on the test loss
Parameters
----------
sess: tf.Session
Tensorflow session
test_indices: list
Test samples to be used. Influence on these samples are calculated.
test_batch_size: int
batch size for test samples
approx_params: dict
Parameters for inverse hessian vector product approximation
Default:
{'scale': 1e4,
'damping': 0.01,
'num_repeats': 1,
'recursion_batch_size': 10,
'recursion_depth': 10000}
train_batch_size: int
Batch size of training samples
train_iterations: int
Number of iterations
subsamples: int
Number of training samples in a batch to be calculated.
If -1, all samples are calculated (no subsampling).
Default: -1
force_refresh: bool
If False, it calculates only when test samples and parameters are changed.
Default: False
Returns
-------
numpy.ndarray
"""
self._prepare(sess, test_indices, test_batch_size, approx_params, force_refresh)
self.feeder.reset()
score = self._grad_diffs_all(sess, train_batch_size, train_iterations, subsamples)
logger.info('Multiplying by %s train examples' % score.size)
return score
@_timing
def _prepare(self, sess, test_indices, test_batch_size, approx_params, force_refresh):
""" Calculate inverse hessian vector product, and save it in workspace
Parameters
----------
sess: tf.Session
Tensorflow session
test_indices: list
Test samples to be used. Influence on these samples are calculated.
test_batch_size: int
batch size for test samples
force_refresh: bool
If False, it calculates only when test samples and parameters are changed.
Default: False
approx_params: dict
Parameters for inverse hessian vector product approximation
"""
# update ihvp approx params
if approx_params is not None:
for param_key in approx_params.keys():
if param_key not in self.ihvp_config:
raise RuntimeError('unknown ihvp config param is approx_params')
self.ihvp_config.update(approx_params)
inv_hvp_path = self._path(self._approx_filename(sess, test_indices))
if not os.path.exists(inv_hvp_path) or force_refresh:
self.feeder.reset()
test_grad_loss = self._get_test_grad_loss(sess, test_indices, test_batch_size)
logger.info('Norm of test gradient: %s' % np.linalg.norm(np.concatenate([a.reshape(-1) for a in test_grad_loss])))
self.inverse_hvp = self._get_inverse_hvp_lissa(sess, test_grad_loss)
np.savez(inv_hvp_path, inverse_hvp=self.inverse_hvp, encoding='bytes')
logger.info('Saved inverse HVP to %s' % inv_hvp_path)
else:
self.inverse_hvp = np.load(inv_hvp_path, encoding='bytes')['inverse_hvp']
logger.info('Loaded inverse HVP from %s' % inv_hvp_path)
def _get_test_grad_loss(self, sess, test_indices, test_batch_size):
if test_indices is not None:
num_iter = int(np.ceil(len(test_indices) / test_batch_size))
test_grad_loss = None
for i in range(num_iter):
start = i * test_batch_size
end = int(min((i + 1) * test_batch_size, len(test_indices)))
size = float(end - start)
test_feed_dict = self._make_test_feed_dict(*self.feeder.test_indices(test_indices[start:end]))
temp = sess.run(self.grad_op_test, feed_dict=test_feed_dict)
temp = np.asarray(temp)
temp *= size
if test_grad_loss is None:
test_grad_loss = temp
else:
test_grad_loss += temp
test_grad_loss /= len(test_indices)
else:
raise RuntimeError('unsupported yet')
return test_grad_loss
def _approx_filename(self, sess, test_indices):
sha = hashlib.sha1()
# weights
vs = sess.run(self.trainable_variables)
for a in vs:
sha.update(a.data)
# test_indices
np_test_indices = np.array(list(test_indices))
sha.update(np_test_indices.data)
# approx_params
sha.update(json.dumps(self.ihvp_config, sort_keys=True).encode('utf-8'))
return 'ihvp.' + sha.hexdigest() + '.npz'
def _get_inverse_hvp_lissa(self, sess, test_grad_loss):
ihvp_config = self.ihvp_config
print_iter = ihvp_config['recursion_depth'] / 10
inverse_hvp = None
for _ in range(ihvp_config['num_repeats']):
cur_estimate = test_grad_loss
# debug_diffs_estimation = []
# prev_estimation_norm = np.linalg.norm(np.concatenate([a.reshape(-1) for a in cur_estimate]))
for j in range(ihvp_config['recursion_depth']):
train_batch_data, train_batch_label = self.feeder.train_batch(ihvp_config['recursion_batch_size'])
feed_dict = self._make_train_feed_dict(train_batch_data, train_batch_label)
feed_dict = self._update_feed_dict(feed_dict, cur_estimate, test_grad_loss)
if _using_fully_tf:
feed_dict.update({
self.v_param_damping: 1 - self.ihvp_config['damping'],
self.v_param_scale: self.ihvp_config['scale']
})
cur_estimate = sess.run(self.estimation_op, feed_dict=feed_dict)
else:
hessian_vector_val = sess.run(self.hessian_vector_op, feed_dict=feed_dict)
hessian_vector_val = np.array(hessian_vector_val)
cur_estimate = test_grad_loss + (1 - ihvp_config['damping']) * cur_estimate - hessian_vector_val / ihvp_config['scale']
# curr_estimation_norm = np.linalg.norm(np.concatenate([a.reshape(-1) for a in cur_estimate]))
# debug_diffs_estimation.append(curr_estimation_norm - prev_estimation_norm)
# prev_estimation_norm = curr_estimation_norm
if (j % print_iter == 0) or (j == ihvp_config['recursion_depth'] - 1):
logger.info("Recursion at depth %s: norm is %.8lf" %
(j, np.linalg.norm(np.concatenate([a.reshape(-1) for a in cur_estimate]))))
if inverse_hvp is None:
inverse_hvp = np.array(cur_estimate) / ihvp_config['scale']
else:
inverse_hvp += np.array(cur_estimate) / ihvp_config['scale']
# np.savetxt(self._path('debug_diffs_estimation_{}.txt'.format(sample_idx)), debug_diffs_estimation)
inverse_hvp /= ihvp_config['num_repeats']
return inverse_hvp
def _update_feed_dict(self, feed_dict, cur_estimated, test_grad_loss):
for placeholder, var in zip(self.v_cur_estimated, cur_estimated):
feed_dict[placeholder] = var
for placeholder, var in zip(self.v_test_grad, test_grad_loss):
feed_dict[placeholder] = var
return feed_dict
@_timing
def _grad_diffs(self, sess, train_indices, num_total_train_example):
inverse_hvp = np.concatenate([a.reshape(-1) for a in self.inverse_hvp])
num_to_remove = len(train_indices)
predicted_grad_diffs = np.zeros([num_to_remove])
for counter, idx_to_remove in enumerate(train_indices):
single_data, single_label = self.feeder.train_one(idx_to_remove)
feed_dict = self._make_train_feed_dict([single_data], [single_label])
predicted_grad_diffs[counter] = self._grad_diff(sess, feed_dict, num_total_train_example, inverse_hvp)
if (counter % 1000) == 0:
logger.info('counter: {} / {}'.format(counter, num_to_remove))
return predicted_grad_diffs
@_timing
def _grad_diffs_all(self, sess, train_batch_size, num_iters, num_subsampling):
num_total_train_example = num_iters * train_batch_size
if num_subsampling > 0:
num_diffs = num_iters * num_subsampling
else:
num_diffs = num_iters * train_batch_size
inverse_hvp = np.concatenate([a.reshape(-1) for a in self.inverse_hvp])
predicted_grad_diffs = np.zeros([num_diffs])
counter = 0
for it in range(num_iters):
train_batch_data, train_batch_label = self.feeder.train_batch(train_batch_size)
if num_subsampling > 0:
for idx in range(num_subsampling):
feed_dict = self._make_train_feed_dict(train_batch_data[idx:idx + 1], train_batch_label[idx:idx + 1])
predicted_grad_diffs[counter] = self._grad_diff(sess, feed_dict, num_total_train_example, inverse_hvp)
counter += 1
else:
for single_data, single_label in zip(train_batch_data, train_batch_label):
feed_dict = self._make_train_feed_dict([single_data], [single_label])
predicted_grad_diffs[counter] = self._grad_diff(sess, feed_dict, num_total_train_example, inverse_hvp)
counter += 1
if (it % 100) == 0:
logger.info('iter: {}/{}'.format(it, num_iters))
return predicted_grad_diffs
def _grad_diff(self, sess, feed_dict, num_total_train_example, inverse_hvp):
if _using_fully_tf:
feed_dict.update({
self.v_ihvp: inverse_hvp,
self.v_param_total_trainset: num_total_train_example
})
return sess.run(self.grad_diff_op, feed_dict=feed_dict)
else:
train_grads = sess.run(self.grad_op_train, feed_dict=feed_dict)
train_grads = np.concatenate([a.reshape(-1) for a in train_grads])
train_grads /= num_total_train_example
return np.dot(inverse_hvp, train_grads)
def _make_test_feed_dict(self, xs, ys):
ret = {
self.x_placeholder: xs,
self.y_placeholder: ys,
}
ret.update(self.test_feed_options)
return ret
def _make_train_feed_dict(self, xs, ys):
ret = {
self.x_placeholder: xs,
self.y_placeholder: ys,
}
ret.update(self.train_feed_options)
return ret
def _path(self, *paths):
return os.path.join(self.workspace, *paths)