tensorflow 논문 구현 코드가 많이 있는 Github 공유
2019. 6. 30. 15:56ㆍ분석 Python/Tensorflow
https://github.com/tensorflow/tensor2tensor/blob/master/tensor2tensor/layers/common_layers.py
https://towardsdatascience.com/extending-pytorch-with-custom-activation-functions-2d8b065ef2fa
Tensor flow 유저로써, 뭔가를 구현해서 써야 할 때가 있는데 Activation을 찾다가 우연히 찾았다.
실제로 작동하는지는 해보지 않아서 모르겠으나, 논문을 구현해 놓은 것이 많아서 공유한다.
google이 만든 패키지인 것 같다!
암튼 먼가 원하는게 있으면 쓰기 좋을 것 같다.
## brelu , belu , gelu , nac , nalu , lrelu
def brelu(x):
"""Bipolar ReLU as in https://arxiv.org/abs/1709.04054."""
x_shape = shape_list(x)
x1, x2 = tf.split(tf.reshape(x, x_shape[:-1] + [-1, 2]), 2, axis=-1)
y1 = tf.nn.relu(x1)
y2 = -tf.nn.relu(-x2)
return tf.reshape(tf.concat([y1, y2], axis=-1), x_shape)
def belu(x):
"""Bipolar ELU as in https://arxiv.org/abs/1709.04054."""
x_shape = shape_list(x)
x1, x2 = tf.split(tf.reshape(x, x_shape[:-1] + [-1, 2]), 2, axis=-1)
y1 = tf.nn.elu(x1)
y2 = -tf.nn.elu(-x2)
return tf.reshape(tf.concat([y1, y2], axis=-1), x_shape)
def gelu(x):
"""Gaussian Error Linear Unit.
This is a smoother version of the RELU.
Original paper: https://arxiv.org/abs/1606.08415
Args:
x: float Tensor to perform activation.
Returns:
x with the GELU activation applied.
"""
cdf = 0.5 * (1.0 + tf.tanh(
(np.sqrt(2 / np.pi) * (x + 0.044715 * tf.pow(x, 3)))))
return x * cdf
def nac(x, depth, name=None, reuse=None):
"""NAC as in https://arxiv.org/abs/1808.00508."""
with tf.variable_scope(name, default_name="nac", values=[x], reuse=reuse):
x_shape = shape_list(x)
w = tf.get_variable("w", [x_shape[-1], depth])
m = tf.get_variable("m", [x_shape[-1], depth])
w = tf.tanh(w) * tf.nn.sigmoid(m)
x_flat = tf.reshape(x, [-1, x_shape[-1]])
res_flat = tf.matmul(x_flat, w)
return tf.reshape(res_flat, x_shape[:-1] + [depth])
def nalu(x, depth, epsilon=1e-30, name=None, reuse=None):
"""NALU as in https://arxiv.org/abs/1808.00508."""
with tf.variable_scope(name, default_name="nalu", values=[x], reuse=reuse):
x_shape = shape_list(x)
x_flat = tf.reshape(x, [-1, x_shape[-1]])
gw = tf.get_variable("w", [x_shape[-1], depth])
g = tf.nn.sigmoid(tf.matmul(x_flat, gw))
g = tf.reshape(g, x_shape[:-1] + [depth])
a = nac(x, depth, name="nac_lin")
log_x = tf.log(tf.abs(x) + epsilon)
m = nac(log_x, depth, name="nac_log")
return g * a + (1 - g) * tf.exp(m)
def lrelu(input_, leak=0.2, name="lrelu"):
return tf.maximum(input_, leak * input_, name=name)
## layer norm / group norm / noam_norm / l2_norm / spectral norm / instance norm
def layer_norm_vars(filters):
"""Create Variables for layer norm."""
scale = tf.get_variable(
"layer_norm_scale", [filters], initializer=tf.ones_initializer())
bias = tf.get_variable(
"layer_norm_bias", [filters], initializer=tf.zeros_initializer())
return scale, bias
def layer_norm_compute(x, epsilon, scale, bias, layer_collection=None):
"""Layer norm raw computation."""
# Save these before they get converted to tensors by the casting below
params = (scale, bias)
epsilon, scale, bias = [cast_like(t, x) for t in [epsilon, scale, bias]]
mean = tf.reduce_mean(x, axis=[-1], keepdims=True)
variance = tf.reduce_mean(
tf.squared_difference(x, mean), axis=[-1], keepdims=True)
norm_x = (x - mean) * tf.rsqrt(variance + epsilon)
output = norm_x * scale + bias
return output
def layer_norm(x,
filters=None,
epsilon=1e-6,
name=None,
reuse=None,
layer_collection=None):
"""Layer normalize the tensor x, averaging over the last dimension."""
if filters is None:
filters = shape_list(x)[-1]
with tf.variable_scope(
name, default_name="layer_norm", values=[x], reuse=reuse):
scale, bias = layer_norm_vars(filters)
return layer_norm_compute(x, epsilon, scale, bias,
layer_collection=layer_collection)
def group_norm(x, filters=None, num_groups=8, epsilon=1e-5):
"""Group normalization as in https://arxiv.org/abs/1803.08494."""
x_shape = shape_list(x)
if filters is None:
filters = x_shape[-1]
assert len(x_shape) == 4
assert filters % num_groups == 0
# Prepare variables.
scale = tf.get_variable(
"group_norm_scale", [filters], initializer=tf.ones_initializer())
bias = tf.get_variable(
"group_norm_bias", [filters], initializer=tf.zeros_initializer())
epsilon, scale, bias = [cast_like(t, x) for t in [epsilon, scale, bias]]
# Reshape and compute group norm.
x = tf.reshape(x, x_shape[:-1] + [num_groups, filters // num_groups])
# Calculate mean and variance on heights, width, channels (not groups).
mean, variance = tf.nn.moments(x, [1, 2, 4], keep_dims=True)
norm_x = (x - mean) * tf.rsqrt(variance + epsilon)
return tf.reshape(norm_x, x_shape) * scale + bias
def noam_norm(x, epsilon=1.0, name=None):
"""One version of layer normalization."""
with tf.name_scope(name, default_name="noam_norm", values=[x]):
shape = x.get_shape()
ndims = len(shape)
return (tf.nn.l2_normalize(x, ndims - 1, epsilon=epsilon) * tf.sqrt(
to_float(shape[-1])))
def l2_norm(x, filters=None, epsilon=1e-6, name=None, reuse=None):
"""Layer normalization with l2 norm."""
if filters is None:
filters = shape_list(x)[-1]
with tf.variable_scope(name, default_name="l2_norm", values=[x], reuse=reuse):
scale = tf.get_variable(
"l2_norm_scale", [filters], initializer=tf.ones_initializer())
bias = tf.get_variable(
"l2_norm_bias", [filters], initializer=tf.zeros_initializer())
epsilon, scale, bias = [cast_like(t, x) for t in [epsilon, scale, bias]]
mean = tf.reduce_mean(x, axis=[-1], keepdims=True)
l2norm = tf.reduce_sum(
tf.squared_difference(x, mean), axis=[-1], keepdims=True)
norm_x = (x - mean) * tf.rsqrt(l2norm + epsilon)
return norm_x * scale + bias
def apply_spectral_norm(x):
"""Normalizes x using the spectral norm.
The implementation follows Algorithm 1 of
https://arxiv.org/abs/1802.05957. If x is not a 2-D Tensor, then it is
reshaped such that the number of channels (last-dimension) is the same.
Args:
x: Tensor with the last dimension equal to the number of filters.
Returns:
x: Tensor with the same shape as x normalized by the spectral norm.
assign_op: Op to be run after every step to update the vector "u".
"""
weights_shape = shape_list(x)
other, num_filters = tf.reduce_prod(weights_shape[:-1]), weights_shape[-1]
# Reshape into a 2-D matrix with outer size num_filters.
weights_2d = tf.reshape(x, (other, num_filters))
# v = Wu / ||W u||
with tf.variable_scope("u", reuse=tf.AUTO_REUSE):
u = tf.get_variable(
"u", [num_filters, 1],
initializer=tf.truncated_normal_initializer(),
trainable=False)
v = tf.nn.l2_normalize(tf.matmul(weights_2d, u))
# u_new = vW / ||v W||
u_new = tf.nn.l2_normalize(tf.matmul(tf.transpose(v), weights_2d))
# s = v*W*u
spectral_norm = tf.squeeze(
tf.matmul(tf.transpose(v), tf.matmul(weights_2d, tf.transpose(u_new))))
# set u equal to u_new in the next iteration.
assign_op = tf.assign(u, tf.transpose(u_new))
return tf.divide(x, spectral_norm), assign_op
def instance_norm(x):
"""Instance normalization layer."""
with tf.variable_scope("instance_norm"):
epsilon = 1e-5
mean, var = tf.nn.moments(x, [1, 2], keep_dims=True)
scale = tf.get_variable(
"scale", [x.get_shape()[-1]],
initializer=tf.truncated_normal_initializer(mean=1.0, stddev=0.02))
offset = tf.get_variable(
"offset", [x.get_shape()[-1]], initializer=tf.constant_initializer(0.0))
out = scale * tf.div(x - mean, tf.sqrt(var + epsilon)) + offset
return out
def apply_norm(x, norm_type, depth, epsilon, layer_collection=None):
"""Apply Normalization."""
if layer_collection is not None:
assert norm_type == "layer"
if norm_type == "layer":
return layer_norm(
x, filters=depth, epsilon=epsilon, layer_collection=layer_collection)
if norm_type == "group":
return group_norm(x, filters=depth, epsilon=epsilon)
if norm_type == "batch":
return layers().BatchNormalization(epsilon=epsilon)(x)
if norm_type == "noam":
return noam_norm(x, epsilon)
if norm_type == "l2":
return l2_norm(x, filters=depth, epsilon=epsilon)
if norm_type == "none":
return x
raise ValueError("Parameter normalizer_fn must be one of: 'layer', 'batch',"
"'noam', 'lr', 'none'.")
## Loss inspired by the sliced WGAN paper: https://arxiv.org/abs/1804.01947.
https://github.com/skolouri/swae/blob/master/MNIST_SlicedWassersteinAutoEncoder_Ring.ipynb
def sliced_gan_loss(input1,
input2,
discriminator,
num_vecs,
do_random_vecs=True,
do_tanh=True,
return_logits=False):
"""Loss inspired by the sliced WGAN paper: https://arxiv.org/abs/1804.01947.
Puts input1 and input2 through the provided discriminator to get logits.
Then, computes num_vecs random projections of the logits, sorts them on
the batch dimension and returns the L2 loss between the sorted vectors.
See the above-mentioned paper for the reasoning behind it.
Args:
input1: first discriminator inputs.
input2: second discriminator inputs.
discriminator: inputs -> logits function.
num_vecs: how many random vectors to use for projections.
do_random_vecs: whether to use random vectors or just tanh of the logits.
do_tanh: if true (default) we'll also just use tanh of the logits.
return_logits: Whether or not to return the logits.
Returns:
The generator loss, i.e., the sliced approximation of the distance between
the projected distributions (warning: discriminator should maximize it).
"""
with tf.variable_scope("sliced_gan"):
with tf.variable_scope("discriminator"):
logits1 = discriminator(input1)
with tf.variable_scope("discriminator", reuse=True):
logits2 = discriminator(input2)
if do_random_vecs:
random_vecs = tf.nn.l2_normalize(
tf.random_uniform([shape_list(logits1)[-1], num_vecs]), axis=0)
def get_sorted_projections(x):
"""Make projections of x and sort them on the batch dimension."""
x = tf.reshape(x, [-1, shape_list(x)[-1]])
batch_size = shape_list(x)[0]
if do_random_vecs and do_tanh:
n = tf.nn.l2_normalize(x, axis=1)
proj = tf.concat([tf.matmul(n, random_vecs), tf.tanh(n)], axis=1)
elif do_random_vecs:
n = tf.nn.l2_normalize(x, axis=1)
proj = tf.matmul(n, random_vecs)
else:
proj = tf.tanh(x)
proj = tf.transpose(proj, [1, 0]) # [num_vecs, batch] after this.
if is_xla_compiled():
proj_dtype = proj.dtype
proj = tf.cast(proj, tf.bfloat16)
# Currently TPU only supports 1-D top_k calls.
map_fn = lambda x: tf.nn.top_k(x, k=batch_size, sorted=True)[0]
values = tf.map_fn(map_fn, proj)
values = tf.cast(values, proj_dtype)
else:
values, _ = tf.nn.top_k(proj, k=batch_size, sorted=True)
return values
proj1 = get_sorted_projections(logits1)
proj2 = get_sorted_projections(logits2)
dist = tf.reduce_mean(tf.squared_difference(proj1, proj2))
if return_logits:
return dist, logits1, logits2
return dist
## KL Divergence
def kl_divergence(mu, log_var, mu_p=0.0, log_var_p=0.0):
"""KL divergence of diagonal gaussian N(mu,exp(log_var)) and N(0,1).
Args:
mu: mu parameter of the distribution.
log_var: log(var) parameter of the distribution.
mu_p: optional mu from a learned prior distribution
log_var_p: optional log(var) from a learned prior distribution
Returns:
the KL loss.
"""
batch_size = shape_list(mu)[0]
prior_distribution = tfp.distributions.Normal(
mu_p, tf.exp(tf.multiply(0.5, log_var_p)))
posterior_distribution = tfp.distributions.Normal(
mu, tf.exp(tf.multiply(0.5, log_var)))
kld = tfp.distributions.kl_divergence(posterior_distribution,
prior_distribution)
return tf.reduce_sum(kld) / to_float(batch_size)
여러 자료가 더 있으니, 참고해서 사용하면 될 것 같다!
- 끝 -
728x90
'분석 Python > Tensorflow' 카테고리의 다른 글
[ Python ] gumbel softmax 알아보기 (0) | 2019.09.14 |
---|---|
[ Python ] TensorFlow 1.x save & load model & predict (0) | 2019.08.17 |
tf.contrib.learn.DNNRegressor 활용한 모델링하기 (0) | 2019.06.16 |
tf.contrib.learn.DNNClassifer 활용한 모델링하기 (0) | 2019.06.16 |
tf.Data를 활용하여 csv 파일부터 읽어서 텐서플로우 모델링하기 (0) | 2019.06.16 |