Source code for darkon.influence.influence

# Copyright 2017 Neosapience, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========================================================================
"""
References
----------
.. [1] Pang Wei Koh and Percy Liang "Understanding Black-box Predictions via Influence Functions" ICML2017

"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals

from .feeder import InfluenceFeeder  # noqa: ignore=F401
from ..log import logger

import numpy as np
import tensorflow as tf
from tensorflow.python.ops.gradients_impl import _hessian_vector_product

import os
import time
import hashlib
import json
from functools import wraps

_using_fully_tf = True


def _timing(f):
    @wraps(f)
    def wrap(*args, **kwargs):
        time1 = time.time()
        ret = f(*args, **kwargs)
        time2 = time.time()
        logger.debug('* %s function took [%.3fs]' % (f.__name__, time2-time1))
        return ret
    return wrap


[docs]class Influence: """ Influence Class Parameters ---------- workspace: str Path for workspace directory feeder : InfluenceFeeder Dataset feeder loss_op_train : tf.Operation Tensor for loss function used for training. it may includes regularization. loss_op_test : tf.Operation Tensor for loss function for inference. x_placeholder : tf.Tensor Data place holder Tensor from tf.placeholder() y_placeholder : tf.Tensor Target place holder Tensor from tf.placeholder() test_feed_options : dict Optional parameters to run loss operation in testset train_feed_options : dict Optional parameters to run loss operation in trainset trainable_variables : tuple, or list Trainable variables to be used If None, all variables are trainable Default: None """ def __init__(self, workspace, feeder, loss_op_train, loss_op_test, x_placeholder, y_placeholder, test_feed_options=None, train_feed_options=None, trainable_variables=None): self.workspace = workspace self.feeder = feeder self.x_placeholder = x_placeholder self.y_placeholder = y_placeholder self.test_feed_options = test_feed_options if test_feed_options else dict() self.train_feed_options = train_feed_options if train_feed_options else dict() if trainable_variables is None: trainable_variables = ( tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES) + tf.get_collection(tf.GraphKeys.TRAINABLE_RESOURCE_VARIABLES)) self.loss_op_train = loss_op_train self.grad_op_train = tf.gradients(loss_op_train, trainable_variables) self.grad_op_test = tf.gradients(loss_op_test, trainable_variables) self.v_cur_estimated = [tf.placeholder(tf.float32, shape=a.get_shape()) for a in trainable_variables] self.v_test_grad = [tf.placeholder(tf.float32, shape=a.get_shape()) for a in trainable_variables] self.v_ihvp = tf.placeholder(tf.float64, shape=[None]) self.v_param_damping = tf.placeholder(tf.float32) self.v_param_scale = tf.placeholder(tf.float32) self.v_param_total_trainset = tf.placeholder(tf.float64) self.inverse_hvp = None self.trainable_variables = trainable_variables with tf.name_scope('darkon_ihvp'): self.hessian_vector_op = _hessian_vector_product(loss_op_train, trainable_variables, self.v_cur_estimated) self.estimation_op = [ a + (b * self.v_param_damping) - (c / self.v_param_scale) for a, b, c in zip(self.v_test_grad, self.v_cur_estimated, self.hessian_vector_op) ] with tf.name_scope('darkon_grad_diff'): flatten_inverse_hvp = tf.reshape(self.v_ihvp, shape=(-1, 1)) flatten_grads = tf.concat([tf.reshape(a, (-1,)) for a in self.grad_op_train], 0) flatten_grads = tf.reshape(flatten_grads, shape=(1, -1,)) flatten_grads = tf.cast(flatten_grads, tf.float64) flatten_grads /= self.v_param_total_trainset self.grad_diff_op = tf.matmul(flatten_grads, flatten_inverse_hvp) self.ihvp_config = { 'scale': 1e4, 'damping': 0.01, 'num_repeats': 1, 'recursion_batch_size': 10, 'recursion_depth': 10000 } if not os.path.exists(self.workspace): os.makedirs(self.workspace)
[docs] @_timing def upweighting_influence(self, sess, test_indices, test_batch_size, approx_params, train_indices, num_total_train_example, force_refresh=False): """ Calculate influence score of given training samples that affect on the test samples Negative value indicates bad effect on the test loss Parameters ---------- sess: tf.Session Tensorflow session test_indices: list Test samples to be used. Influence on these samples are calculated. test_batch_size: int batch size for test samples approx_params: dict Parameters for inverse hessian vector product approximation Default: {'scale': 1e4, 'damping': 0.01, 'num_repeats': 1, 'recursion_batch_size': 10, 'recursion_depth': 10000} train_indices: list Training samples indices to be calculated. num_total_train_example: int Number of total training samples used for training, which might be different from the size of train_indices force_refresh: bool If False, it calculates only when test samples and parameters are changed. Default: False Returns ------- numpy.ndarray """ self._prepare(sess, test_indices, test_batch_size, approx_params, force_refresh) self.feeder.reset() score = self._grad_diffs(sess, train_indices, num_total_train_example) logger.info('Multiplying by %s train examples' % score.size) return score
[docs] @_timing def upweighting_influence_batch(self, sess, test_indices, test_batch_size, approx_params, train_batch_size, train_iterations, subsamples=-1, force_refresh=False): """ Iteratively calculate influence scores for training data sampled by batch sampler Negative value indicates bad effect on the test loss Parameters ---------- sess: tf.Session Tensorflow session test_indices: list Test samples to be used. Influence on these samples are calculated. test_batch_size: int batch size for test samples approx_params: dict Parameters for inverse hessian vector product approximation Default: {'scale': 1e4, 'damping': 0.01, 'num_repeats': 1, 'recursion_batch_size': 10, 'recursion_depth': 10000} train_batch_size: int Batch size of training samples train_iterations: int Number of iterations subsamples: int Number of training samples in a batch to be calculated. If -1, all samples are calculated (no subsampling). Default: -1 force_refresh: bool If False, it calculates only when test samples and parameters are changed. Default: False Returns ------- numpy.ndarray """ self._prepare(sess, test_indices, test_batch_size, approx_params, force_refresh) self.feeder.reset() score = self._grad_diffs_all(sess, train_batch_size, train_iterations, subsamples) logger.info('Multiplying by %s train examples' % score.size) return score
@_timing def _prepare(self, sess, test_indices, test_batch_size, approx_params, force_refresh): """ Calculate inverse hessian vector product, and save it in workspace Parameters ---------- sess: tf.Session Tensorflow session test_indices: list Test samples to be used. Influence on these samples are calculated. test_batch_size: int batch size for test samples force_refresh: bool If False, it calculates only when test samples and parameters are changed. Default: False approx_params: dict Parameters for inverse hessian vector product approximation """ # update ihvp approx params if approx_params is not None: for param_key in approx_params.keys(): if param_key not in self.ihvp_config: raise RuntimeError('unknown ihvp config param is approx_params') self.ihvp_config.update(approx_params) inv_hvp_path = self._path(self._approx_filename(sess, test_indices)) if not os.path.exists(inv_hvp_path) or force_refresh: self.feeder.reset() test_grad_loss = self._get_test_grad_loss(sess, test_indices, test_batch_size) logger.info('Norm of test gradient: %s' % np.linalg.norm(np.concatenate([a.reshape(-1) for a in test_grad_loss]))) self.inverse_hvp = self._get_inverse_hvp_lissa(sess, test_grad_loss) np.savez(inv_hvp_path, inverse_hvp=self.inverse_hvp, encoding='bytes') logger.info('Saved inverse HVP to %s' % inv_hvp_path) else: self.inverse_hvp = np.load(inv_hvp_path, encoding='bytes')['inverse_hvp'] logger.info('Loaded inverse HVP from %s' % inv_hvp_path) def _get_test_grad_loss(self, sess, test_indices, test_batch_size): if test_indices is not None: num_iter = int(np.ceil(len(test_indices) / test_batch_size)) test_grad_loss = None for i in range(num_iter): start = i * test_batch_size end = int(min((i + 1) * test_batch_size, len(test_indices))) size = float(end - start) test_feed_dict = self._make_test_feed_dict(*self.feeder.test_indices(test_indices[start:end])) temp = sess.run(self.grad_op_test, feed_dict=test_feed_dict) temp = np.asarray(temp) temp *= size if test_grad_loss is None: test_grad_loss = temp else: test_grad_loss += temp test_grad_loss /= len(test_indices) else: raise RuntimeError('unsupported yet') return test_grad_loss def _approx_filename(self, sess, test_indices): sha = hashlib.sha1() # weights vs = sess.run(self.trainable_variables) for a in vs: sha.update(a.data) # test_indices np_test_indices = np.array(list(test_indices)) sha.update(np_test_indices.data) # approx_params sha.update(json.dumps(self.ihvp_config, sort_keys=True).encode('utf-8')) return 'ihvp.' + sha.hexdigest() + '.npz' def _get_inverse_hvp_lissa(self, sess, test_grad_loss): ihvp_config = self.ihvp_config print_iter = ihvp_config['recursion_depth'] / 10 inverse_hvp = None for _ in range(ihvp_config['num_repeats']): cur_estimate = test_grad_loss # debug_diffs_estimation = [] # prev_estimation_norm = np.linalg.norm(np.concatenate([a.reshape(-1) for a in cur_estimate])) for j in range(ihvp_config['recursion_depth']): train_batch_data, train_batch_label = self.feeder.train_batch(ihvp_config['recursion_batch_size']) feed_dict = self._make_train_feed_dict(train_batch_data, train_batch_label) feed_dict = self._update_feed_dict(feed_dict, cur_estimate, test_grad_loss) if _using_fully_tf: feed_dict.update({ self.v_param_damping: 1 - self.ihvp_config['damping'], self.v_param_scale: self.ihvp_config['scale'] }) cur_estimate = sess.run(self.estimation_op, feed_dict=feed_dict) else: hessian_vector_val = sess.run(self.hessian_vector_op, feed_dict=feed_dict) hessian_vector_val = np.array(hessian_vector_val) cur_estimate = test_grad_loss + (1 - ihvp_config['damping']) * cur_estimate - hessian_vector_val / ihvp_config['scale'] # curr_estimation_norm = np.linalg.norm(np.concatenate([a.reshape(-1) for a in cur_estimate])) # debug_diffs_estimation.append(curr_estimation_norm - prev_estimation_norm) # prev_estimation_norm = curr_estimation_norm if (j % print_iter == 0) or (j == ihvp_config['recursion_depth'] - 1): logger.info("Recursion at depth %s: norm is %.8lf" % (j, np.linalg.norm(np.concatenate([a.reshape(-1) for a in cur_estimate])))) if inverse_hvp is None: inverse_hvp = np.array(cur_estimate) / ihvp_config['scale'] else: inverse_hvp += np.array(cur_estimate) / ihvp_config['scale'] # np.savetxt(self._path('debug_diffs_estimation_{}.txt'.format(sample_idx)), debug_diffs_estimation) inverse_hvp /= ihvp_config['num_repeats'] return inverse_hvp def _update_feed_dict(self, feed_dict, cur_estimated, test_grad_loss): for placeholder, var in zip(self.v_cur_estimated, cur_estimated): feed_dict[placeholder] = var for placeholder, var in zip(self.v_test_grad, test_grad_loss): feed_dict[placeholder] = var return feed_dict @_timing def _grad_diffs(self, sess, train_indices, num_total_train_example): inverse_hvp = np.concatenate([a.reshape(-1) for a in self.inverse_hvp]) num_to_remove = len(train_indices) predicted_grad_diffs = np.zeros([num_to_remove]) for counter, idx_to_remove in enumerate(train_indices): single_data, single_label = self.feeder.train_one(idx_to_remove) feed_dict = self._make_train_feed_dict([single_data], [single_label]) predicted_grad_diffs[counter] = self._grad_diff(sess, feed_dict, num_total_train_example, inverse_hvp) if (counter % 1000) == 0: logger.info('counter: {} / {}'.format(counter, num_to_remove)) return predicted_grad_diffs @_timing def _grad_diffs_all(self, sess, train_batch_size, num_iters, num_subsampling): num_total_train_example = num_iters * train_batch_size if num_subsampling > 0: num_diffs = num_iters * num_subsampling else: num_diffs = num_iters * train_batch_size inverse_hvp = np.concatenate([a.reshape(-1) for a in self.inverse_hvp]) predicted_grad_diffs = np.zeros([num_diffs]) counter = 0 for it in range(num_iters): train_batch_data, train_batch_label = self.feeder.train_batch(train_batch_size) if num_subsampling > 0: for idx in range(num_subsampling): feed_dict = self._make_train_feed_dict(train_batch_data[idx:idx + 1], train_batch_label[idx:idx + 1]) predicted_grad_diffs[counter] = self._grad_diff(sess, feed_dict, num_total_train_example, inverse_hvp) counter += 1 else: for single_data, single_label in zip(train_batch_data, train_batch_label): feed_dict = self._make_train_feed_dict([single_data], [single_label]) predicted_grad_diffs[counter] = self._grad_diff(sess, feed_dict, num_total_train_example, inverse_hvp) counter += 1 if (it % 100) == 0: logger.info('iter: {}/{}'.format(it, num_iters)) return predicted_grad_diffs def _grad_diff(self, sess, feed_dict, num_total_train_example, inverse_hvp): if _using_fully_tf: feed_dict.update({ self.v_ihvp: inverse_hvp, self.v_param_total_trainset: num_total_train_example }) return sess.run(self.grad_diff_op, feed_dict=feed_dict) else: train_grads = sess.run(self.grad_op_train, feed_dict=feed_dict) train_grads = np.concatenate([a.reshape(-1) for a in train_grads]) train_grads /= num_total_train_example return np.dot(inverse_hvp, train_grads) def _make_test_feed_dict(self, xs, ys): ret = { self.x_placeholder: xs, self.y_placeholder: ys, } ret.update(self.test_feed_options) return ret def _make_train_feed_dict(self, xs, ys): ret = { self.x_placeholder: xs, self.y_placeholder: ys, } ret.update(self.train_feed_options) return ret def _path(self, *paths): return os.path.join(self.workspace, *paths)