Event-Extraction/models/Zero-Shot Transfer Learning for Event Extraction/layers.py
2020-10-04 21:48:22 +08:00

985 lines
37 KiB
Python

import numpy
import theano
import theano.tensor as T
from theano.tensor.nnet import conv
import theano.sandbox.neighbours as TSN
class LogisticRegressionMulti(object):
"""Multi-class Logistic Regression Class
The logistic regression is fully described by a weight matrix :math:`W`
and bias vector :math:`b`. Classification is done by projecting data
points onto a set of hyperplanes, the distance to which is used to
determine a class membership probability.
"""
def __init__(self, input, n_in, n_out):
""" Initialize the parameters of the logistic regression
:type input: theano.tensor.TensorType
:param input: symbolic variable that describes the input of the
architecture (one minibatch)
:type n_in: int
:param n_in: number of input units, the dimension of the space in
which the datapoints lie
:type n_out: int
:param n_out: number of output units, the dimension of the space in
which the labels lie
"""
# start-snippet-1
# initialize with 0 the weights W as a matrix of shape (n_in, n_out)
self.W = theano.shared(
value=numpy.zeros(
(n_in, n_out),
dtype=theano.config.floatX
),
name='W',
borrow=True
)
# initialize the biases b as a vector of n_out 0s
self.b = theano.shared(
value=numpy.zeros(
(n_out,),
dtype=theano.config.floatX
),
name='b',
borrow=True
)
# symbolic expression for computing the matrix of class-membership
# probabilities
# Where:
# W is a matrix where column-k represent the separation hyperplane for
# class-k
# x is a matrix where row-j represents input training sample-j
# b is a vector where element-k represent the free parameter of
# hyperplane-k
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
# symbolic description of how to compute prediction as class whose
# probability is maximal
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
# end-snippet-1
# parameters of the model
self.params = [self.W, self.b]
# keep track of model input
self.input = input
def negative_log_likelihood(self, y):
"""Return the mean of the negative log-likelihood of the prediction
of this model under a given target distribution.
.. math::
\frac{1}{|\mathcal{D}|} \mathcal{L} (\theta=\{W,b\}, \mathcal{D}) =
\frac{1}{|\mathcal{D}|} \sum_{i=0}^{|\mathcal{D}|}
\log(P(Y=y^{(i)}|x^{(i)}, W,b)) \\
\ell (\theta=\{W,b\}, \mathcal{D})
:type y: theano.tensor.TensorType
:param y: corresponds to a vector that gives for each example the
correct label
Note: we use the mean instead of the sum so that
the learning rate is less dependent on the batch size
"""
# start-snippet-2
# y.shape[0] is (symbolically) the number of rows in y, i.e.,
# number of examples (call it n) in the minibatch
# T.arange(y.shape[0]) is a symbolic vector which will contain
# [0,1,2,... n-1] T.log(self.p_y_given_x) is a matrix of
# Log-Probabilities (call it LP) with one row per example and
# one column per class LP[T.arange(y.shape[0]),y] is a vector
# v containing [LP[0,y[0]], LP[1,y[1]], LP[2,y[2]], ...,
# LP[n-1,y[n-1]]] and T.mean(LP[T.arange(y.shape[0]),y]) is
# the mean (across minibatch examples) of the elements in v,
# i.e., the mean log-likelihood across the minibatch.
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
# end-snippet-2
def errors(self, y):
"""Return a float representing the number of errors in the minibatch
over the total number of examples of the minibatch ; zero one
loss over the size of the minibatch
:type y: theano.tensor.TensorType
:param y: corresponds to a vector that gives for each example the
correct label
"""
# check if y has same dimension of y_pred
if y.ndim != self.y_pred.ndim:
raise TypeError(
'y should have the same shape as self.y_pred',
('y', y.type, 'y_pred', self.y_pred.type)
)
# check if y is of the correct datatype
if y.dtype.startswith('int'):
# the T.neq operator returns a vector of 0s and 1s, where 1
# represents a mistake in prediction
return T.mean(T.neq(self.y_pred, y))
else:
raise NotImplementedError()
class LogisticRegression(object):
"""Multi-class Logistic Regression Class
The logistic regression is fully described by a weight matrix :math:`W`
and bias vector :math:`b`. Classification is done by projecting data
points onto a set of hyperplanes, the distance to which is used to
determine a class membership probability.
"""
def __init__(self, input, n_in, n_out, W = None, b = None):
""" Initialize the parameters of the logistic regression
:type input: theano.tensor.TensorType
:param input: symbolic variable that describes the input of the
architecture (one minibatch)
:type n_in: int
:param n_in: number of input units, the dimension of the space in
which the datapoints lie
:type n_out: int
:param n_out: number of output units, the dimension of the space in
which the labels lie
"""
if W == None:
# initialize with 0 the weights W as a matrix of shape (n_in, n_out)
self.W = theano.shared(value=numpy.zeros((n_in, n_out),
dtype=theano.config.floatX),
name='softmax_W', borrow=True)
else:
self.W = W
if b == None:
# initialize the baises b as a vector of n_out 0s
self.b = theano.shared(value=numpy.zeros((n_out,),
dtype=theano.config.floatX),
name='softmax_b', borrow=True)
else:
self.b = b
# compute vector of class-membership probabilities in symbolic form
#self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b + 1e-7)
self.p_y_given_x = T.nnet.softmax(input.dot(self.W) + self.b)
# compute prediction as class whose probability is maximal in
# symbolic form
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
# parameters of the model
self.params = [self.W, self.b]
def negative_log_likelihood(self, y):
"""Return the mean of the negative log-likelihood of the prediction
of this model under a given target distribution.
.. math::
\frac{1}{|\mathcal{D}|} \mathcal{L} (\theta=\{W,b\}, \mathcal{D}) =
\frac{1}{|\mathcal{D}|} \sum_{i=0}^{|\mathcal{D}|} \log(P(Y=y^{(i)}|x^{(i)}, W,b)) \\
\ell (\theta=\{W,b\}, \mathcal{D})
:type y: theano.tensor.TensorType
:param y: corresponds to a vector that gives for each example the
correct label
Note: we use the mean instead of the sum so that
the learning rate is less dependent on the batch size
"""
# y.shape[0] is (symbolically) the number of rows in y, i.e.,
# number of examples (call it n) in the minibatch
# T.arange(y.shape[0]) is a symbolic vector which will contain
# [0,1,2,... n-1] T.log(self.p_y_given_x) is a matrix of
# Log-Probabilities (call it LP) with one row per example and
# one column per class LP[T.arange(y.shape[0]),y] is a vector
# v containing [LP[0,y[0]], LP[1,y[1]], LP[2,y[2]], ...,
# LP[n-1,y[n-1]]] and T.mean(LP[T.arange(y.shape[0]),y]) is
# the mean (across minibatch examples) of the elements in v,
# i.e., the mean log-likelihood across the minibatch.
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
def results(self, y):
"""Return a float representing the number of errors in the minibatch
over the total number of examples of the minibatch ; zero one
loss over the size of the minibatch
:type y: theano.tensor.TensorType
:param y: corresponds to a vector that gives for each example the
correct label
"""
#self.all_p_y_given_x = self.in2.dot(self.W) + self.b
# check if y has same dimension of y_pred
if y.ndim != self.y_pred.ndim:
raise TypeError('y should have the same shape as self.y_pred',
('y', y.type, 'y_pred', self.y_pred.type))
# check if y is of the correct datatype
if y.dtype.startswith('int'):
# the T.neq operator returns a vector of 0s and 1s, where 1
# represents a mistake in prediction
return [T.argmax(self.p_y_given_x, axis=1), T.max(self.p_y_given_x, axis=1), self.p_y_given_x]
#return self.p_y_given_x
else:
raise NotImplementedError()
class HiddenLayer(object):
def __init__(self, rng, input, n_in, n_out, W=None, b=None,
activation=T.tanh, name=""):
"""
Typical hidden layer of a MLP: units are fully-connected and have
sigmoidal activation function. Weight matrix W is of shape (n_in,n_out)
and the bias vector b is of shape (n_out,).
NOTE : The nonlinearity used here is tanh
Hidden unit activation is given by: tanh(dot(input,W) + b)
:type rng: numpy.random.RandomState
:param rng: a random number generator used to initialize weights
:type input: theano.tensor.dmatrix
:param input: a symbolic tensor of shape (n_examples, n_in)
:type n_in: int
:param n_in: dimensionality of input
:type n_out: int
:param n_out: number of hidden units
:type activation: theano.Op or function
:param activation: Non linearity to be applied in the hidden
layer
"""
self.input = input
# `W` is initialized with `W_values` which is uniformely sampled
# from sqrt(-6./(n_in+n_hidden)) and sqrt(6./(n_in+n_hidden))
# for tanh activation function
# the output of uniform if converted using asarray to dtype
# theano.config.floatX so that the code is runable on GPU
# Note : optimal initialization of weights is dependent on the
# activation function used (among other things).
# For example, results presented in [Xavier10] suggest that you
# should use 4 times larger initial weights for sigmoid
# compared to tanh
# We have no info for other function, so we use the same as
# tanh.
if name != "":
prefix = name
else:
prefix = "mlp_"
if W is None:
W_values = numpy.asarray(rng.uniform(
low=-numpy.sqrt(6. / (n_in + n_out)),
high=numpy.sqrt(6. / (n_in + n_out)),
size=(n_in, n_out)), dtype=theano.config.floatX)
if activation == theano.tensor.nnet.sigmoid:
W_values *= 4
W = theano.shared(value=W_values, name=prefix+'W', borrow=True)
if b is None:
b_values = numpy.zeros((n_out,), dtype=theano.config.floatX)
b = theano.shared(value=b_values, name=prefix+'b', borrow=True)
self.W = W
self.b = b
lin_output = T.dot(input, self.W) + self.b
self.output = (lin_output if activation is None
else activation(lin_output))
# parameters of the model
self.params = [self.W, self.b]
class LeNetConvLayer(object):
"""Pool Layer of a convolutional network """
def kmaxPooling(self, conv_out, k):
neighborsForPooling = TSN.images2neibs(ten4=conv_out, neib_shape=(1,conv_out.shape[3]), mode='ignore_borders')
self.neighbors = neighborsForPooling
neighborsArgSorted = T.argsort(neighborsForPooling, axis=1)
self.neighborsArgSorted = neighborsArgSorted
kNeighborsArg = neighborsArgSorted[:,-k:]
self.neigborsSorted = kNeighborsArg
kNeighborsArgSorted = T.sort(kNeighborsArg, axis=1)
ii = T.repeat(T.arange(neighborsForPooling.shape[0]), k)
jj = kNeighborsArgSorted.flatten()
self.ii = ii
self.jj = jj
pooledkmaxTmp = neighborsForPooling[ii, jj]
self.pooled = pooledkmaxTmp
# reshape pooled_out
new_shape = T.cast(T.join(0, conv_out.shape[:-2],
T.as_tensor([conv_out.shape[2]]),
T.as_tensor([k])),
'int64')
pooledkmax = T.reshape(pooledkmaxTmp, new_shape, ndim=4)
return pooledkmax
def convStep(self, curInput, curFilter):
return conv.conv2d(input=curInput, filters=curFilter,
filter_shape=self.filter_shape,
image_shape=None)
def __init__(self, rng, W, b, input, filter_shape, image_shape):
"""
Allocate a LeNetConvPoolLayer with shared variable internal parameters.
:type rng: numpy.random.RandomState
:param rng: a random number generator used to initialize weights
:type W: theano.matrix
:param W: the weight matrix used for convolution
:type b: theano vector
:param b: the bias used for convolution
:type input: theano.tensor.dtensor4
:param input: symbolic image tensor, of shape image_shape
:type filter_shape: tuple or list of length 4
:param filter_shape: (number of filters, num input feature maps,
filter height,filter width)
:type image_shape: tuple or list of length 4
:param image_shape: (batch size, num input feature maps,
image height, image width)
:type poolsize: tuple or list of length 2
:param poolsize: the downsampling (pooling) factor (#rows,#cols)
"""
assert image_shape[1] == filter_shape[1]
self.input = input
self.W = W
self.b = b
self.filter_shape = filter_shape
# convolve input feature maps with filters
conv_out = self.convStep(self.input, self.W)
conv_with_bias = T.tanh(conv_out+self.b.dimshuffle('x', 0, 'x', 'x'))
self.output = conv_with_bias
# k = 3
# self.pooledkmax = self.kmaxPooling(conv_out, k)
# add the bias term. Since the bias is a vector (1D array), we first
# reshape it to a tensor of shape (1,n_filters,1,1). Each bias will
# thus be broadcasted across mini-batches and feature map
# width & height
#self.output = T.tanh(self.pooledkmax + self.b.dimshuffle('x', 0, 'x', 'x'))
# store parameters of this layer
self.params = [self.W, self.b]
class LeNetConvLayerWithStrides(object):
def convStep(self, curInput, curFilter):
return conv.conv2d(input=curInput, filters=curFilter,
filter_shape=self.filter_shape, subsample=(2, 2),
image_shape=None)
def __init__(self, rng, W, b, input, filter_shape, image_shape):
"""
Allocate a LeNetConvPoolLayer with shared variable internal parameters.
:type rng: numpy.random.RandomState
:param rng: a random number generator used to initialize weights
:type W: theano.matrix
:param W: the weight matrix used for convolution
:type b: theano vector
:param b: the bias used for convolution
:type input: theano.tensor.dtensor4
:param input: symbolic image tensor, of shape image_shape
:type filter_shape: tuple or list of length 4
:param filter_shape: (number of filters, num input feature maps,
filter height,filter width)
:type image_shape: tuple or list of length 4
:param image_shape: (batch size, num input feature maps,
image height, image width)
:type poolsize: tuple or list of length 2
:param poolsize: the downsampling (pooling) factor (#rows,#cols)
"""
assert image_shape[1] == filter_shape[1]
self.input = input
self.W = W
self.b = b
self.filter_shape = filter_shape
# convolve input feature maps with filters
conv_out = self.convStep(self.input, self.W)
conv_with_bias = T.tanh(conv_out+self.b.dimshuffle('x', 0, 'x', 'x'))
self.output = conv_with_bias
#k = poolsize[1]
#self.pooledkmax = self.kmaxPooling(conv_out, k)
# add the bias term. Since the bias is a vector (1D array), we first
# reshape it to a tensor of shape (1,n_filters,1,1). Each bias will
# thus be broadcasted across mini-batches and feature map
# width & height
#self.output = T.tanh(self.pooledkmax + self.b.dimshuffle('x', 0, 'x', 'x'))
# store parameters of this layer
self.params = [self.W, self.b]
class LeNetConvPoolLayer(object):
"""Pool Layer of a convolutional network """
def kmaxPooling(self, conv_out, k):
neighborsForPooling = TSN.images2neibs(ten4=conv_out, neib_shape=(1,conv_out.shape[3]), mode='ignore_borders')
self.neighbors = neighborsForPooling
neighborsArgSorted = T.argsort(neighborsForPooling, axis=1)
self.neighborsArgSorted = neighborsArgSorted
kNeighborsArg = neighborsArgSorted[:,-k:]
self.neigborsSorted = kNeighborsArg
kNeighborsArgSorted = T.sort(kNeighborsArg, axis=1)
ii = T.repeat(T.arange(neighborsForPooling.shape[0]), k)
jj = kNeighborsArgSorted.flatten()
self.ii = ii
self.jj = jj
pooledkmaxTmp = neighborsForPooling[ii, jj]
self.pooled = pooledkmaxTmp
# reshape pooled_out
new_shape = T.cast(T.join(0, conv_out.shape[:-2],
T.as_tensor([conv_out.shape[2]]),
T.as_tensor([k])),
'int64')
pooledkmax = T.reshape(pooledkmaxTmp, new_shape, ndim=4)
return pooledkmax
def convStep(self, curInput, curFilter):
return conv.conv2d(input=curInput, filters=curFilter,
filter_shape=self.filter_shape,
image_shape=None)
def __init__(self, rng, W, b, input, filter_shape, image_shape, poolsize=(2, 2)):
"""
Allocate a LeNetConvPoolLayer with shared variable internal parameters.
:type rng: numpy.random.RandomState
:param rng: a random number generator used to initialize weights
:type W: theano.matrix
:param W: the weight matrix used for convolution
:type b: theano vector
:param b: the bias used for convolution
:type input: theano.tensor.dtensor4
:param input: symbolic image tensor, of shape image_shape
:type filter_shape: tuple or list of length 4
:param filter_shape: (number of filters, num input feature maps,
filter height,filter width)
:type image_shape: tuple or list of length 4
:param image_shape: (batch size, num input feature maps,
image height, image width)
:type poolsize: tuple or list of length 2
:param poolsize: the downsampling (pooling) factor (#rows,#cols)
"""
assert image_shape[1] == filter_shape[1]
self.input = input
self.W = W
self.b = b
self.filter_shape = filter_shape
# convolve input feature maps with filters
conv_out = self.convStep(self.input, self.W)
k = poolsize[1]
self.pooledkmax = self.kmaxPooling(conv_out, k)
# add the bias term. Since the bias is a vector (1D array), we first
# reshape it to a tensor of shape (1,n_filters,1,1). Each bias will
# thus be broadcasted across mini-batches and feature map
# width & height
self.output = T.tanh(self.pooledkmax + self.b.dimshuffle('x', 0, 'x', 'x'))
# store parameters of this layer
self.params = [self.W, self.b]
class LeNetConvPoolLayerWithStrides(object):
"""Pool Layer of a convolutional network """
def kmaxPooling(self, conv_out, k):
neighborsForPooling = TSN.images2neibs(ten4=conv_out, neib_shape=(1,conv_out.shape[3]), mode='ignore_borders')
self.neighbors = neighborsForPooling
neighborsArgSorted = T.argsort(neighborsForPooling, axis=1)
self.neighborsArgSorted = neighborsArgSorted
kNeighborsArg = neighborsArgSorted[:,-k:]
self.neigborsSorted = kNeighborsArg
kNeighborsArgSorted = T.sort(kNeighborsArg, axis=1)
ii = T.repeat(T.arange(neighborsForPooling.shape[0]), k)
jj = kNeighborsArgSorted.flatten()
self.ii = ii
self.jj = jj
pooledkmaxTmp = neighborsForPooling[ii, jj]
self.pooled = pooledkmaxTmp
# reshape pooled_out
new_shape = T.cast(T.join(0, conv_out.shape[:-2],
T.as_tensor([conv_out.shape[2]]),
T.as_tensor([k])),
'int64')
pooledkmax = T.reshape(pooledkmaxTmp, new_shape, ndim=4)
return pooledkmax
def convStep(self, curInput, curFilter):
return conv.conv2d(input=curInput, filters=curFilter,
filter_shape=self.filter_shape, subsample=(2, 2),
image_shape=None)
def __init__(self, rng, W, b, input, filter_shape, image_shape, poolsize=(2, 2)):
"""
Allocate a LeNetConvPoolLayer with shared variable internal parameters.
:type rng: numpy.random.RandomState
:param rng: a random number generator used to initialize weights
:type W: theano.matrix
:param W: the weight matrix used for convolution
:type b: theano vector
:param b: the bias used for convolution
:type input: theano.tensor.dtensor4
:param input: symbolic image tensor, of shape image_shape
:type filter_shape: tuple or list of length 4
:param filter_shape: (number of filters, num input feature maps,
filter height,filter width)
:type image_shape: tuple or list of length 4
:param image_shape: (batch size, num input feature maps,
image height, image width)
:type poolsize: tuple or list of length 2
:param poolsize: the downsampling (pooling) factor (#rows,#cols)
"""
assert image_shape[1] == filter_shape[1]
self.input = input
self.W = W
self.b = b
self.filter_shape = filter_shape
# convolve input feature maps with filters
self.conv_out = self.convStep(self.input, self.W)
k = poolsize[1]
self.pooledkmax = self.kmaxPooling(self.conv_out, k)
# add the bias term. Since the bias is a vector (1D array), we first
# reshape it to a tensor of shape (1,n_filters,1,1). Each bias will
# thus be broadcasted across mini-batches and feature map
# width & height
self.output = T.tanh(self.pooledkmax + self.b.dimshuffle('x', 0, 'x', 'x'))
# store parameters of this layer
self.params = [self.W, self.b]
class KMax_Pooling(object):
def __init__(self, conv_out, k):
self.conv_out = conv_out
self.k = k;
self.output = self.kmaxPooling(conv_out, k)
def kmaxPooling(self, conv_out, k):
neighborsForPooling = TSN.images2neibs(ten4=conv_out, neib_shape=(1,conv_out.shape[3]), mode='ignore_borders')
self.neighbors = neighborsForPooling
neighborsArgSorted = T.argsort(neighborsForPooling, axis=1)
self.neighborsArgSorted = neighborsArgSorted
kNeighborsArg = neighborsArgSorted[:,-k:]
self.neigborsSorted = kNeighborsArg
kNeighborsArgSorted = T.sort(kNeighborsArg, axis=1)
ii = T.repeat(T.arange(neighborsForPooling.shape[0]), k)
jj = kNeighborsArgSorted.flatten()
self.ii = ii
self.jj = jj
pooledkmaxTmp = neighborsForPooling[ii, jj]
self.pooled = pooledkmaxTmp
# reshape pooled_out
new_shape = T.cast(T.join(0, conv_out.shape[:-2],
T.as_tensor([conv_out.shape[2]]),
T.as_tensor([k])),
'int64')
pooledkmax = T.reshape(pooledkmaxTmp, new_shape, ndim=4)
return pooledkmax
class Max_Pooling(object):
"""The input is output of Conv: a tensor. The output here should also be tensor"""
def __init__(self, rng, input): # length_l, length_r: valid lengths after conv
#input_l_matrix = input.reshape((input.shape[2], input.shape[3]))
#input_l_matrix = debug_print(input_l_matrix[:, left_l:(input_l_matrix.shape[1] - right_l)],
#'input_l_matrix')
self.output_maxpooling = T.max(input, axis=3)
class ComposeLayerMatrix(object):
def __init__(self, input, input_binary_relation, rel_w, rel_vec_size):
self.input = input
self.input_binary_relation = input_binary_relation
self.rel_w = rel_w
input_binary_relation_1 = T.transpose(input_binary_relation, [0,1,3,2]) # 100*1*5*26
input_binary_relation_2 = input_binary_relation_1.reshape(
(input_binary_relation_1.shape[0]*input_binary_relation_1.shape[1]*input_binary_relation_1.shape[2],
input_binary_relation_1.shape[3])) #500*26
r = input_binary_relation_2.dot(rel_w) # 500 * 160000
input_1 = T.transpose(input, [0, 1, 3, 2]) # 100*1*5*400
input_2 = input_1.reshape((input_1.shape[0]*input_1.shape[1]*input_1.shape[2], input_1.shape[3])) # 500*400
input_3 = input_2.reshape((input_2.shape[0], input_2.shape[1], 1)) # 500*400*1
input_4 = T.repeat(input_3, rel_vec_size, axis=2) # 500*400*400
input_5 = T.transpose(input_4, [0,2,1]) # 500*400*400
input_6 = input_5.reshape((input_5.shape[0], input_5.shape[1]*input_5.shape[2])) # 500*160000
r1 = input_6*r # 500*160000
r2 = r1.reshape((r1.shape[0], rel_vec_size, rel_vec_size))
r3 = T.mean(r2, axis=2)
r4 = r3.reshape((input_1.shape[0], input_1.shape[1], input_1.shape[2], r3.shape[1]))
input_update = T.transpose(r4, [0,1,3,2])
self.output = input_update
# parameters of the model
self.params = [self.rel_w]
class ComposeLayerFactor(object):
def __init__(self, input, input_binary_relation, rel_w, rel_vec_size):
self.input = input
self.input_binary_relation = input_binary_relation
self.rel_w = rel_w # relationsize * 400
input_binary_relation_1 = T.transpose(input_binary_relation, [0,1,3,2]) # 100*1*5*26
input_binary_relation_1 = input_binary_relation_1.reshape(
(input_binary_relation_1.shape[0]*input_binary_relation_1.shape[1]*input_binary_relation_1.shape[2],
input_binary_relation_1.shape[3])) # 500*26
r = input_binary_relation_1.dot(rel_w) # 500 * 400
input_1 = T.transpose(input, [0, 1, 3, 2]) # 100*1*5*400
input_2 = input_1.reshape((input_1.shape[0]*input_1.shape[1]*input_1.shape[2], input_1.shape[3])) # 500*400
r1 = input_2*r # 500*400
r4 = r1.reshape((input_1.shape[0], input_1.shape[1], input_1.shape[2], r1.shape[1]))
input_update = T.transpose(r4, [0,1,3,2])
self.output = input_update
self.params = [self.rel_w]
class ComposeLayerTensor(object):
def __init__(self, input, tensor):
self.input = input
self.tensor = tensor # 400*400*400
r = self.tensor.reshape((self.tensor.shape[0]*self.tensor.shape[1], self.tensor.shape[2]))
input_1 = T.transpose(input, [0, 1, 3, 2]) # 100*1*5*400
input_2 = input_1.reshape((input_1.shape[0]*input_1.shape[1]*input_1.shape[2], input_1.shape[3])) # 500*400
input_3 = input_2.reshape((input_2.shape[0], input_2.shape[1], 1))
input_4 = input_2.reshape((input_2.shape[0], 1, input_2.shape[1]))
input_dot = T.batched_dot(input_3, input_4)
input_dot = input_dot.reshape((input_dot.shape[0], input_dot.shape[1]*input_dot.shape[2]))
r1 = T.dot(input_dot, r) # 500*400
r4 = r1.reshape((input_1.shape[0], input_1.shape[1], input_1.shape[2], r1.shape[1]))
input_update = T.transpose(r4, [0,1,3,2])
self.output = input_update
self.params = [self.tensor]
class MaxRankingMarginCosine1(object):
def __init__(self, rng, input, input_label, true_label, n_in, margin, batch_size, type_size, train_type_signal, pos_neg_flag):
self.input = input
self.input_label = input_label
self.true_label = true_label
W_values = numpy.asarray(rng.uniform(
low=-numpy.sqrt(6. / (n_in + n_in)),
high=numpy.sqrt(6. / (n_in + n_in)),
size=(n_in, n_in)), dtype=theano.config.floatX)
W = theano.shared(value=W_values, borrow=True)
self.W = W
self.margin = margin
self.params = [self.W]
sim = self.cosine(input, input_label)
true_sim = sim*true_label
neg_sim = sim ## batchsize*labelsize
self.true_sim = true_sim
self.neg_sim = neg_sim
true_sim = T.sum(true_sim, axis=1)
true_sim = true_sim.reshape((batch_size, 1))
true_sim = T.repeat(true_sim, type_size, axis=1) ## batchsize*labelsize
train_type_signal = train_type_signal.reshape((train_type_signal.shape[0], 1))
train_type_signal = T.repeat(train_type_signal, batch_size, axis=1)
train_type_signal = T.transpose(train_type_signal, [1,0]) ## batchsize*labelsize
sim_norm = sim*train_type_signal
max_sim = T.max(sim, axis=1) ## batchsize*1
max_sim = max_sim.reshape((max_sim.shape[0], 1))
max_sim = T.repeat(max_sim, type_size, axis=1) ## batchsize*labelsize
max_sim = max_sim*train_type_signal
pos_neg_flag_norm = T.repeat(pos_neg_flag, type_size, axis=1) ## pos == 1, neg == 0
pos_neg_flag_norm_1 = 1-pos_neg_flag_norm
f1 = T.maximum(0, margin-max_sim+sim_norm) ## num = #of train types
loss1 = T.mean(T.sum(f1*pos_neg_flag_norm_1))
f = T.maximum(0, margin-true_sim+neg_sim)
self.pos_neg_flag_norm = pos_neg_flag_norm
self.pos_neg_flag_norm_1 = pos_neg_flag_norm_1
self.max_sim = max_sim
self.sim_norm = sim_norm
loss = T.mean(T.sum(T.max((T.maximum(0, margin-true_sim+neg_sim))*pos_neg_flag_norm, axis=1))) + \
T.mean(T.sum(T.max(T.maximum(0, margin-max_sim+sim_norm)*pos_neg_flag_norm_1, axis=1)))
self.p_y_given_x = sim
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
self.p = T.max(self.p_y_given_x, axis=1)
self.loss = loss
self.loss1 = loss1
def results(self, y):
"""Return a float representing the number of errors in the minibatch
over the total number of examples of the minibatch ; zero one
loss over the size of the minibatch
:type y: theano.tensor.TensorType
:param y: corresponds to a vector that gives for each example the
correct label
"""
#self.all_p_y_given_x = self.in2.dot(self.W) + self.b
# check if y has same dimension of y_pred
if y.dtype.startswith('int'):
# the T.neq operator returns a vector of 0s and 1s, where 1
# represents a mistake in prediction
return [T.argmax(self.p_y_given_x, axis=1)+1, T.max(self.p_y_given_x, axis=1), self.p_y_given_x]
#return self.p_y_given_x
else:
raise NotImplementedError()
raise NotImplementedError()
def cosine(self, input_1, input_2):
input_2_t = input_2.T ## n_in * labelsize
multi = input_1.dot(input_2_t) ## batchsize*labelsize
l1 = T.sqrt(T.sum(T.sqr(input_1), axis=1)) ## batchsize*1
l2 = T.sqrt(T.sum(T.sqr(input_2), axis=1)) ## labelsize*1
l1 = l1.reshape((input_1.shape[0], 1))
l1 = T.repeat(l1, input_2.shape[0], axis=1)
l2 = l2.reshape((input_2.shape[0], 1))
l2 = T.repeat(l2, input_1.shape[0], axis=1)
l2 = T.transpose(l2, [1,0])
sim_matrix = multi/(l1*l2)
return sim_matrix
class MaxRankingMarginCosine1Arg1(object):
def __init__(self, rng, input, input_label, true_label, n_in, n_in2, margin, batch_size, type_size,
train_type_signal, pos_neg_flag, limited_role):
self.input = input
self.input_label = input_label
self.true_label = true_label
W_values = numpy.asarray(rng.uniform(
# low=-numpy.sqrt(6. / (n_in * n_in2)),
# high=numpy.sqrt(6. / (n_in * n_in2)),
low=-numpy.sqrt(numpy.sqrt(6./(n_in * n_in2))),
high=numpy.sqrt(numpy.sqrt(6./(n_in * n_in2))),
size=(n_in, n_in2)), dtype=theano.config.floatX)
W = theano.shared(value=W_values, borrow=True)
self.W = W
self.margin = margin
self.params = [self.W]
input_1 = input.dot(self.W)
self.input_1 = input_1
sim_0 = self.cosine(input, input_label)
sim = sim_0*limited_role
true_sim = sim*true_label
neg_sim = sim ## batchsize*labelsize
self.true_sim = true_sim
self.neg_sim = neg_sim
true_sim = T.sum(true_sim, axis=1)
true_sim = true_sim.reshape((batch_size, 1))
true_sim = T.repeat(true_sim, type_size, axis=1) ## batchsize*labelsize
train_type_signal = train_type_signal.reshape((train_type_signal.shape[0], 1))
train_type_signal = T.repeat(train_type_signal, batch_size, axis=1)
train_type_signal = T.transpose(train_type_signal, [1,0]) ## batchsize*labelsize
sim_norm = sim*train_type_signal
max_sim = T.max(sim, axis=1) ## batchsize*1
max_sim = max_sim.reshape((max_sim.shape[0], 1))
max_sim = T.repeat(max_sim, type_size, axis=1) ## batchsize*labelsize
max_sim = max_sim*train_type_signal
pos_neg_flag_norm = T.repeat(pos_neg_flag, type_size, axis=1) ## pos == 1, neg == 0
pos_neg_flag_norm_1 = 1-pos_neg_flag_norm
f1 = T.maximum(0, margin-max_sim+sim_norm) ## num = #of train types
loss1 = T.mean(T.sum(f1*pos_neg_flag_norm_1))
f = T.maximum(0, margin-true_sim+neg_sim)
## loss = T.mean(T.sum(f*posNegFlagNorm)) + 0.5*loss1
self.pos_neg_flag_norm = pos_neg_flag_norm
self.pos_neg_flag_norm_1 = pos_neg_flag_norm_1
self.max_sim = max_sim
self.sim_norm = sim_norm
## loss1 = T.mean(T.sum(T.max(T.maximum(0, margin-maxSimRepeatNorm+simNorm)*posNegFlagNorm1, axis=1)))
loss = T.mean(T.sum(T.max((T.maximum(0, margin-true_sim+neg_sim))*pos_neg_flag_norm, axis=1))) + \
T.mean(T.sum(T.max(T.maximum(0, margin-max_sim+sim_norm)*pos_neg_flag_norm_1, axis=1)))
## loss = T.maximum(0, margin-trueSim3+negSim)
self.p_y_given_x = sim
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
self.p = T.max(self.p_y_given_x, axis=1)
self.loss = loss
self.loss1 = loss1
def results(self, y):
"""Return a float representing the number of errors in the minibatch
over the total number of examples of the minibatch ; zero one
loss over the size of the minibatch
:type y: theano.tensor.TensorType
:param y: corresponds to a vector that gives for each example the
correct label
"""
#self.all_p_y_given_x = self.in2.dot(self.W) + self.b
# check if y has same dimension of y_pred
if y.dtype.startswith('int'):
# the T.neq operator returns a vector of 0s and 1s, where 1
# represents a mistake in prediction
return [T.argmax(self.p_y_given_x, axis=1)+1, T.max(self.p_y_given_x, axis=1), self.p_y_given_x]
#return self.p_y_given_x
else:
raise NotImplementedError()
raise NotImplementedError()
def cosine(self, input_1, input_2):
input_2_t = input_2.T ## n_in * labelsize
multi = input_1.dot(input_2_t) ## batchsize*labelsize
l1 = T.sqrt(T.sum(T.sqr(input_1), axis=1)) ## batchsize*1
l2 = T.sqrt(T.sum(T.sqr(input_2), axis=1)) ## labelsize*1
l1 = l1.reshape((input_1.shape[0], 1))
l1 = T.repeat(l1, input_2.shape[0], axis=1)
l2 = l2.reshape((input_2.shape[0], 1))
l2 = T.repeat(l2, input_1.shape[0], axis=1)
l2 = T.transpose(l2, [1,0])
sim_matrix = multi/(l1*l2)
return sim_matrix