[Paper][RL] [ToDo]Mutual Information State Intrinsic Control 리뷰
2022. 5. 19. 21:20ㆍ관심있는 주제/Paper
본 논문에서는 리워드에 의존하는 문제를 해결하기 위한 방법으로 intrinsic reward를 정의하려고 함.
심리학에서의 자의식 개념에 의해 동기 부여되어, 우리는 에이전트가 자신을 구성하는 것이 무엇인지 알고 있다는 자연스러운 가정을 하고, 에이전트가 그것에 대해 최대한의 통제력을 갖도록 장려하는 새로운 intrinsic object를 제안한다고 함.
저자는 보상을 현재 에이전트 정책에 따라 에이전트 상태(agenet state)와 주변 상태(surrounding state) 사이의 상호 정보로 수학적으로 공식화했다고 함.
https://www.youtube.com/watch?v=AUCwc9RThpk&feature=youtu.be&ab_channel=RuiZhao
Code
잘 와닿지가 않아서 나중에 다시 보기로....
먼가 썩 와닿지는 않음...
# generate roll outs
o_new = np.empty((self.rollout_batch_size, self.dims['o']))
ag_new = np.empty((self.rollout_batch_size, self.dims['g']))
success = np.zeros(self.rollout_batch_size)
# compute new states and observations
for i in range(self.rollout_batch_size):
try:
# We fully ignore the reward here because it will have to be re-computed
curr_o_new, _, _, info = self.envs[i].step(u[i])
if 'is_success' in info:
success[i] = info['is_success']
o_new[i] = curr_o_new['observation']
ag_new[i] = curr_o_new['achieved_goal']
for idx, key in enumerate(self.info_keys):
info_values[idx][t, i] = info[key]
episode = dict(o=obs,
z=zs,
u=acts,
g=goals,
ag=achieved_goals,)
for key, value in zip(self.info_keys, info_values):
episode['info_{}'.format(key)] = value
def _grads(self):
critic_loss, actor_loss, Q_grad, pi_grad, neg_logp_pi, e_w = self.sess.run([
self.Q_loss_tf,
self.main.Q_pi_tf,
self.Q_grad_tf,
self.pi_grad_tf,
self.main.neg_logp_pi_tf,
self.e_w_tf,
])
return critic_loss, actor_loss, Q_grad, pi_grad, neg_logp_pi, e_w
def train(self, t, stage=True):
if not self.buffer.current_size==0:
if stage:
self.stage_batch(ir=True, t=t)
critic_loss, actor_loss, Q_grad, pi_grad, neg_logp_pi, e_w = self._grads()
self._update(Q_grad, pi_grad)
self.et_r_history.extend((( np.clip((self.et_r_scale * neg_logp_pi), *(-1, 0))) * e_w ).tolist())
return critic_loss, actor_loss
# intrinsic reward (ir) network for mutual information
with tf.variable_scope('ir') as vs:
if reuse:
vs.reuse_variables()
self.main_ir = self.create_discriminator(batch_tf, net_type='ir', **self.__dict__)
vs.reuse_variables()
# loss functions
mi_grads_tf = tf.gradients(tf.reduce_mean(self.main_ir.mi_tf), self._vars('ir/state_mi'))
assert len(self._vars('ir/state_mi')) == len(mi_grads_tf)
self.mi_grads_vars_tf = zip(mi_grads_tf, self._vars('ir/state_mi'))
self.mi_grad_tf = flatten_grads(grads=mi_grads_tf, var_list=self._vars('ir/state_mi'))
self.mi_adam = MpiAdam(self._vars('ir/state_mi'), scale_grad_by_procs=False)
sk_grads_tf = tf.gradients(tf.reduce_mean(self.main_ir.sk_tf), self._vars('ir/skill_ds'))
assert len(self._vars('ir/skill_ds')) == len(sk_grads_tf)
self.sk_grads_vars_tf = zip(sk_grads_tf, self._vars('ir/skill_ds'))
self.sk_grad_tf = flatten_grads(grads=sk_grads_tf, var_list=self._vars('ir/skill_ds'))
self.sk_adam = MpiAdam(self._vars('ir/skill_ds'), scale_grad_by_procs=False)
target_Q_pi_tf = self.target.Q_pi_tf
clip_range = (-self.clip_return, self.clip_return if self.clip_pos_returns else np.inf)
self.e_w_tf = batch_tf['e_w']
if not self.sac:
self.main.neg_logp_pi_tf = tf.zeros(1)
target_tf = tf.clip_by_value(self.r_scale * batch_tf['r'] * batch_tf['r_w'] + (tf.clip_by_value( self.mi_r_scale * batch_tf['m'], *(0, 1) ) - (1 if not self.mi_r_scale == 0 else 0)) * batch_tf['m_w'] + (tf.clip_by_value( self.sk_r_scale * batch_tf['s'], *(-1, 0))) * batch_tf['s_w'] + (tf.clip_by_value( self.et_r_scale * self.main.neg_logp_pi_tf, *(-1, 0))) * self.e_w_tf + self.gamma * target_Q_pi_tf, *clip_range)
self.td_error_tf = tf.stop_gradient(target_tf) - self.main.Q_tf
self.errors_tf = tf.square(self.td_error_tf)
self.errors_tf = tf.reduce_mean(batch_tf['w'] * self.errors_tf)
self.Q_loss_tf = tf.reduce_mean(self.errors_tf)
self.pi_loss_tf = -tf.reduce_mean(self.main.Q_pi_tf)
self.pi_loss_tf += self.action_l2 * tf.reduce_mean(tf.square(self.main.pi_tf / self.max_u))
Q_grads_tf = tf.gradients(self.Q_loss_tf, self._vars('main/Q'))
pi_grads_tf = tf.gradients(self.pi_loss_tf, self._vars('main/pi'))
assert len(self._vars('main/Q')) == len(Q_grads_tf)
assert len(self._vars('main/pi')) == len(pi_grads_tf)
self.Q_grads_vars_tf = zip(Q_grads_tf, self._vars('main/Q'))
self.pi_grads_vars_tf = zip(pi_grads_tf, self._vars('main/pi'))
self.Q_grad_tf = flatten_grads(grads=Q_grads_tf, var_list=self._vars('main/Q'))
self.pi_grad_tf = flatten_grads(grads=pi_grads_tf, var_list=self._vars('main/pi'))
# compute the negative loss (maximise loss == minimise -loss)
class Discriminator:
@store_args
def __init__(self, inputs_tf, dimo, dimz, dimg, dimu, max_u, o_stats, g_stats, hidden, layers, env_name, **kwargs):
"""The discriminator network and related training code.
Args:
inputs_tf (dict of tensors): all necessary inputs for the network: the
observation (o), the goal (g), and the action (u)
dimo (int): the dimension of the observations
dimg (int): the dimension of the goals
dimu (int): the dimension of the actions
max_u (float): the maximum magnitude of actions; action outputs will be scaled
accordingly
o_stats (baselines.her.Normalizer): normalizer for observations
g_stats (baselines.her.Normalizer): normalizer for goals
hidden (int): number of hidden units that should be used in hidden layers
layers (int): number of hidden layers
"""
self.o_tf = tf.placeholder(tf.float32, shape=(None, self.dimo))
self.z_tf = tf.placeholder(tf.float32, shape=(None, self.dimz))
self.g_tf = tf.placeholder(tf.float32, shape=(None, self.dimg))
obs_tau_excludes_goal, obs_tau_achieved_goal = split_observation_tf(self.env_name, self.o_tau_tf)
obs_excludes_goal, obs_achieved_goal = split_observation_tf(self.env_name, self.o_tf)
# Discriminator networks
with tf.variable_scope('state_mi'):
# Mutual Information Neural Estimation
# shuffle and concatenate
x_in = obs_tau_excludes_goal
y_in = obs_tau_achieved_goal
y_in_tran = tf.transpose(y_in, perm=[1, 0, 2])
y_shuffle_tran = tf.random_shuffle(y_in_tran)
y_shuffle = tf.transpose(y_shuffle_tran, perm=[1, 0, 2])
x_conc = tf.concat([x_in, x_in], axis=-2)
y_conc = tf.concat([y_in, y_shuffle], axis=-2)
# propagate the forward pass
layerx = tf_layers.linear(x_conc, int(self.hidden/2))
layery = tf_layers.linear(y_conc, int(self.hidden/2))
layer2 = tf.nn.relu(layerx + layery)
output = tf_layers.linear(layer2, 1)
output = tf.nn.tanh(output)
# split in T_xy and T_x_y predictions
N_samples = tf.shape(x_in)[-2]
T_xy = output[:,:N_samples,:]
T_x_y = output[:,N_samples:,:]
# compute the negative loss (maximise loss == minimise -loss)
mean_exp_T_x_y = tf.reduce_mean(tf.math.exp(T_x_y), axis=-2)
neg_loss = -(tf.reduce_mean(T_xy, axis=-2) - tf.math.log(mean_exp_T_x_y))
neg_loss = tf.check_numerics(neg_loss, 'check_numerics caught bad neg_loss')
self.mi_tf = neg_loss
with tf.variable_scope('skill_ds'):
self.logits_tf = nn(obs_achieved_goal, [int(self.hidden/2)] * self.layers + [self.dimz])
self.sk_tf = tf.nn.softmax_cross_entropy_with_logits(labels=self.z_tf, logits=self.logits_tf)
self.sk_r_tf = -1 * self.sk_tf
https://github.com/ruizhaogit/music
728x90
'관심있는 주제 > Paper' 카테고리의 다른 글
LLM) 논문 내용 정리 Phi-3 Technical Report: A Highly Capable Language Model Locally on Your Phone (2) | 2024.05.01 |
---|---|
CLIP (Contrastive Language-Image Pre-Training) 알아보기 (0) | 2022.08.13 |
진행중) swin transformer 알아보기 (2) | 2022.06.10 |
논문 리뷰) A Generalist Agent (GATO) (0) | 2022.05.25 |
Paper) Deep Learning for Anomaly Detection: A Review (0) | 2022.02.27 |