tensorflow 논문 구현 코드가 많이 있는 Github 공유

2019. 6. 30. 15:56분석 Python/Tensorflow

728x90

https://github.com/tensorflow/tensor2tensor/blob/master/tensor2tensor/layers/common_layers.py

 

tensorflow/tensor2tensor

Library of deep learning models and datasets designed to make deep learning more accessible and accelerate ML research. - tensorflow/tensor2tensor

github.com

https://towardsdatascience.com/extending-pytorch-with-custom-activation-functions-2d8b065ef2fa

 

Extending PyTorch with Custom Activation Functions

A Tutorial for PyTorch and Deep Learning Beginners

towardsdatascience.com

Tensor flow 유저로써, 뭔가를 구현해서 써야 할 때가 있는데 Activation을 찾다가 우연히 찾았다.

 

실제로 작동하는지는 해보지 않아서 모르겠으나, 논문을 구현해 놓은 것이 많아서 공유한다.

google이 만든 패키지인 것 같다! 

암튼 먼가 원하는게 있으면 쓰기 좋을 것 같다.

## brelu , belu , gelu , nac , nalu , lrelu

def brelu(x):
  """Bipolar ReLU as in https://arxiv.org/abs/1709.04054."""
  x_shape = shape_list(x)
  x1, x2 = tf.split(tf.reshape(x, x_shape[:-1] + [-1, 2]), 2, axis=-1)
  y1 = tf.nn.relu(x1)
  y2 = -tf.nn.relu(-x2)
  return tf.reshape(tf.concat([y1, y2], axis=-1), x_shape)


def belu(x):
  """Bipolar ELU as in https://arxiv.org/abs/1709.04054."""
  x_shape = shape_list(x)
  x1, x2 = tf.split(tf.reshape(x, x_shape[:-1] + [-1, 2]), 2, axis=-1)
  y1 = tf.nn.elu(x1)
  y2 = -tf.nn.elu(-x2)
  return tf.reshape(tf.concat([y1, y2], axis=-1), x_shape)


def gelu(x):
  """Gaussian Error Linear Unit.
  This is a smoother version of the RELU.
  Original paper: https://arxiv.org/abs/1606.08415
  Args:
    x: float Tensor to perform activation.
  Returns:
    x with the GELU activation applied.
  """
  cdf = 0.5 * (1.0 + tf.tanh(
      (np.sqrt(2 / np.pi) * (x + 0.044715 * tf.pow(x, 3)))))
  return x * cdf


def nac(x, depth, name=None, reuse=None):
  """NAC as in https://arxiv.org/abs/1808.00508."""
  with tf.variable_scope(name, default_name="nac", values=[x], reuse=reuse):
    x_shape = shape_list(x)
    w = tf.get_variable("w", [x_shape[-1], depth])
    m = tf.get_variable("m", [x_shape[-1], depth])
    w = tf.tanh(w) * tf.nn.sigmoid(m)
    x_flat = tf.reshape(x, [-1, x_shape[-1]])
    res_flat = tf.matmul(x_flat, w)
    return tf.reshape(res_flat, x_shape[:-1] + [depth])


def nalu(x, depth, epsilon=1e-30, name=None, reuse=None):
  """NALU as in https://arxiv.org/abs/1808.00508."""
  with tf.variable_scope(name, default_name="nalu", values=[x], reuse=reuse):
    x_shape = shape_list(x)
    x_flat = tf.reshape(x, [-1, x_shape[-1]])
    gw = tf.get_variable("w", [x_shape[-1], depth])
    g = tf.nn.sigmoid(tf.matmul(x_flat, gw))
    g = tf.reshape(g, x_shape[:-1] + [depth])
    a = nac(x, depth, name="nac_lin")
    log_x = tf.log(tf.abs(x) + epsilon)
    m = nac(log_x, depth, name="nac_log")
    return g * a + (1 - g) * tf.exp(m)
    
def lrelu(input_, leak=0.2, name="lrelu"):
  return tf.maximum(input_, leak * input_, name=name)

## layer norm / group norm / noam_norm / l2_norm / spectral norm  / instance norm

def layer_norm_vars(filters):
  """Create Variables for layer norm."""
  scale = tf.get_variable(
      "layer_norm_scale", [filters], initializer=tf.ones_initializer())
  bias = tf.get_variable(
      "layer_norm_bias", [filters], initializer=tf.zeros_initializer())
  return scale, bias


def layer_norm_compute(x, epsilon, scale, bias, layer_collection=None):
  """Layer norm raw computation."""

  # Save these before they get converted to tensors by the casting below
  params = (scale, bias)

  epsilon, scale, bias = [cast_like(t, x) for t in [epsilon, scale, bias]]
  mean = tf.reduce_mean(x, axis=[-1], keepdims=True)
  variance = tf.reduce_mean(
      tf.squared_difference(x, mean), axis=[-1], keepdims=True)
  norm_x = (x - mean) * tf.rsqrt(variance + epsilon)

  output = norm_x * scale + bias


  return output


def layer_norm(x,
               filters=None,
               epsilon=1e-6,
               name=None,
               reuse=None,
               layer_collection=None):
  """Layer normalize the tensor x, averaging over the last dimension."""
  if filters is None:
    filters = shape_list(x)[-1]
  with tf.variable_scope(
      name, default_name="layer_norm", values=[x], reuse=reuse):
    scale, bias = layer_norm_vars(filters)
    return layer_norm_compute(x, epsilon, scale, bias,
                              layer_collection=layer_collection)


def group_norm(x, filters=None, num_groups=8, epsilon=1e-5):
  """Group normalization as in https://arxiv.org/abs/1803.08494."""
  x_shape = shape_list(x)
  if filters is None:
    filters = x_shape[-1]
  assert len(x_shape) == 4
  assert filters % num_groups == 0
  # Prepare variables.
  scale = tf.get_variable(
      "group_norm_scale", [filters], initializer=tf.ones_initializer())
  bias = tf.get_variable(
      "group_norm_bias", [filters], initializer=tf.zeros_initializer())
  epsilon, scale, bias = [cast_like(t, x) for t in [epsilon, scale, bias]]
  # Reshape and compute group norm.
  x = tf.reshape(x, x_shape[:-1] + [num_groups, filters // num_groups])
  # Calculate mean and variance on heights, width, channels (not groups).
  mean, variance = tf.nn.moments(x, [1, 2, 4], keep_dims=True)
  norm_x = (x - mean) * tf.rsqrt(variance + epsilon)
  return tf.reshape(norm_x, x_shape) * scale + bias


def noam_norm(x, epsilon=1.0, name=None):
  """One version of layer normalization."""
  with tf.name_scope(name, default_name="noam_norm", values=[x]):
    shape = x.get_shape()
    ndims = len(shape)
    return (tf.nn.l2_normalize(x, ndims - 1, epsilon=epsilon) * tf.sqrt(
        to_float(shape[-1])))


def l2_norm(x, filters=None, epsilon=1e-6, name=None, reuse=None):
  """Layer normalization with l2 norm."""
  if filters is None:
    filters = shape_list(x)[-1]
  with tf.variable_scope(name, default_name="l2_norm", values=[x], reuse=reuse):
    scale = tf.get_variable(
        "l2_norm_scale", [filters], initializer=tf.ones_initializer())
    bias = tf.get_variable(
        "l2_norm_bias", [filters], initializer=tf.zeros_initializer())
    epsilon, scale, bias = [cast_like(t, x) for t in [epsilon, scale, bias]]
    mean = tf.reduce_mean(x, axis=[-1], keepdims=True)
    l2norm = tf.reduce_sum(
        tf.squared_difference(x, mean), axis=[-1], keepdims=True)
    norm_x = (x - mean) * tf.rsqrt(l2norm + epsilon)
    return norm_x * scale + bias


def apply_spectral_norm(x):
  """Normalizes x using the spectral norm.
  The implementation follows Algorithm 1 of
  https://arxiv.org/abs/1802.05957. If x is not a 2-D Tensor, then it is
  reshaped such that the number of channels (last-dimension) is the same.
  Args:
    x: Tensor with the last dimension equal to the number of filters.
  Returns:
    x: Tensor with the same shape as x normalized by the spectral norm.
    assign_op: Op to be run after every step to update the vector "u".
  """
  weights_shape = shape_list(x)
  other, num_filters = tf.reduce_prod(weights_shape[:-1]), weights_shape[-1]

  # Reshape into a 2-D matrix with outer size num_filters.
  weights_2d = tf.reshape(x, (other, num_filters))

  # v = Wu / ||W u||
  with tf.variable_scope("u", reuse=tf.AUTO_REUSE):
    u = tf.get_variable(
        "u", [num_filters, 1],
        initializer=tf.truncated_normal_initializer(),
        trainable=False)
  v = tf.nn.l2_normalize(tf.matmul(weights_2d, u))

  # u_new = vW / ||v W||
  u_new = tf.nn.l2_normalize(tf.matmul(tf.transpose(v), weights_2d))

  # s = v*W*u
  spectral_norm = tf.squeeze(
      tf.matmul(tf.transpose(v), tf.matmul(weights_2d, tf.transpose(u_new))))

  # set u equal to u_new in the next iteration.
  assign_op = tf.assign(u, tf.transpose(u_new))
  return tf.divide(x, spectral_norm), assign_op

def instance_norm(x):
  """Instance normalization layer."""
  with tf.variable_scope("instance_norm"):
    epsilon = 1e-5
    mean, var = tf.nn.moments(x, [1, 2], keep_dims=True)
    scale = tf.get_variable(
        "scale", [x.get_shape()[-1]],
        initializer=tf.truncated_normal_initializer(mean=1.0, stddev=0.02))
    offset = tf.get_variable(
        "offset", [x.get_shape()[-1]], initializer=tf.constant_initializer(0.0))
    out = scale * tf.div(x - mean, tf.sqrt(var + epsilon)) + offset

    return out

def apply_norm(x, norm_type, depth, epsilon, layer_collection=None):
  """Apply Normalization."""
  if layer_collection is not None:
    assert norm_type == "layer"
  if norm_type == "layer":
    return layer_norm(
        x, filters=depth, epsilon=epsilon, layer_collection=layer_collection)
  if norm_type == "group":
    return group_norm(x, filters=depth, epsilon=epsilon)
  if norm_type == "batch":
    return layers().BatchNormalization(epsilon=epsilon)(x)
  if norm_type == "noam":
    return noam_norm(x, epsilon)
  if norm_type == "l2":
    return l2_norm(x, filters=depth, epsilon=epsilon)
  if norm_type == "none":
    return x
  raise ValueError("Parameter normalizer_fn must be one of: 'layer', 'batch',"
                   "'noam', 'lr', 'none'.")

## Loss inspired by the sliced WGAN paper: https://arxiv.org/abs/1804.01947.

https://github.com/skolouri/swae/blob/master/MNIST_SlicedWassersteinAutoEncoder_Ring.ipynb

 

skolouri/swae

Implementation of the Sliced Wasserstein Autoencoders - skolouri/swae

github.com

def sliced_gan_loss(input1,
                    input2,
                    discriminator,
                    num_vecs,
                    do_random_vecs=True,
                    do_tanh=True,
                    return_logits=False):
  """Loss inspired by the sliced WGAN paper: https://arxiv.org/abs/1804.01947.
  Puts input1 and input2 through the provided discriminator to get logits.
  Then, computes num_vecs random projections of the logits, sorts them on
  the batch dimension and returns the L2 loss between the sorted vectors.
  See the above-mentioned paper for the reasoning behind it.
  Args:
    input1: first discriminator inputs.
    input2: second discriminator inputs.
    discriminator: inputs -> logits function.
    num_vecs: how many random vectors to use for projections.
    do_random_vecs: whether to use random vectors or just tanh of the logits.
    do_tanh: if true (default) we'll also just use tanh of the logits.
    return_logits: Whether or not to return the logits.
  Returns:
    The generator loss, i.e., the sliced approximation of the distance between
    the projected distributions (warning: discriminator should maximize it).
  """
  with tf.variable_scope("sliced_gan"):
    with tf.variable_scope("discriminator"):
      logits1 = discriminator(input1)
    with tf.variable_scope("discriminator", reuse=True):
      logits2 = discriminator(input2)

    if do_random_vecs:
      random_vecs = tf.nn.l2_normalize(
          tf.random_uniform([shape_list(logits1)[-1], num_vecs]), axis=0)

    def get_sorted_projections(x):
      """Make projections of x and sort them on the batch dimension."""
      x = tf.reshape(x, [-1, shape_list(x)[-1]])
      batch_size = shape_list(x)[0]
      if do_random_vecs and do_tanh:
        n = tf.nn.l2_normalize(x, axis=1)
        proj = tf.concat([tf.matmul(n, random_vecs), tf.tanh(n)], axis=1)
      elif do_random_vecs:
        n = tf.nn.l2_normalize(x, axis=1)
        proj = tf.matmul(n, random_vecs)
      else:
        proj = tf.tanh(x)
      proj = tf.transpose(proj, [1, 0])  # [num_vecs, batch] after this.

      if is_xla_compiled():
        proj_dtype = proj.dtype
        proj = tf.cast(proj, tf.bfloat16)

        # Currently TPU only supports 1-D top_k calls.
        map_fn = lambda x: tf.nn.top_k(x, k=batch_size, sorted=True)[0]
        values = tf.map_fn(map_fn, proj)

        values = tf.cast(values, proj_dtype)
      else:
        values, _ = tf.nn.top_k(proj, k=batch_size, sorted=True)

      return values

    proj1 = get_sorted_projections(logits1)
    proj2 = get_sorted_projections(logits2)
    dist = tf.reduce_mean(tf.squared_difference(proj1, proj2))
    if return_logits:
      return dist, logits1, logits2
    return dist

## KL Divergence

def kl_divergence(mu, log_var, mu_p=0.0, log_var_p=0.0):
  """KL divergence of diagonal gaussian N(mu,exp(log_var)) and N(0,1).
  Args:
    mu: mu parameter of the distribution.
    log_var: log(var) parameter of the distribution.
    mu_p: optional mu from a learned prior distribution
    log_var_p: optional log(var) from a learned prior distribution
  Returns:
    the KL loss.
  """

  batch_size = shape_list(mu)[0]
  prior_distribution = tfp.distributions.Normal(
      mu_p, tf.exp(tf.multiply(0.5, log_var_p)))
  posterior_distribution = tfp.distributions.Normal(
      mu, tf.exp(tf.multiply(0.5, log_var)))
  kld = tfp.distributions.kl_divergence(posterior_distribution,
                                        prior_distribution)
  return tf.reduce_sum(kld) / to_float(batch_size)

 

여러 자료가 더 있으니, 참고해서 사용하면 될 것 같다!

- 끝 -

728x90