434 lines
18 KiB
Python
Executable File
434 lines
18 KiB
Python
Executable File
# -*- coding: utf-8 -*-
|
||
"""
|
||
Modified from sample code:
|
||
Convolutional Neural Networks for Sentence Classification
|
||
http://arxiv.org/pdf/1408.5882v2.pdf
|
||
|
||
Much of the code is modified from
|
||
- deeplearning.net (for ConvNet classes)
|
||
- https://github.com/mdenil/dropout (for dropout)
|
||
- https://groups.google.com/forum/#!topic/pylearn-dev/3QbKtCumAW4 (for Adadelta)
|
||
"""
|
||
|
||
import numpy
|
||
import theano.tensor.shared_randomstreams
|
||
import theano
|
||
import theano.tensor as T
|
||
from theano.tensor.signal import downsample
|
||
from theano.tensor.nnet import conv
|
||
|
||
def ReLU(x):
|
||
y = T.maximum(0.0, x)
|
||
return(y)
|
||
def Sigmoid(x):
|
||
y = T.nnet.sigmoid(x)
|
||
return(y)
|
||
def Tanh(x):
|
||
y = T.tanh(x)
|
||
return(y)
|
||
def Iden(x):
|
||
y = x
|
||
return(y)
|
||
|
||
class HiddenLayer(object):
|
||
"""
|
||
Class for HiddenLayer
|
||
"""
|
||
def __init__(self, rng, input, n_in, n_out, activation, W=None, b=None,
|
||
use_bias=False):
|
||
|
||
self.input = input
|
||
self.activation = activation
|
||
|
||
if W is None:
|
||
if activation.func_name == "ReLU":
|
||
W_values = numpy.asarray(0.01 * rng.standard_normal(size=(n_in, n_out)), dtype=theano.config.floatX)
|
||
else:
|
||
W_values = numpy.asarray(rng.uniform(low=-numpy.sqrt(6. / (n_in + n_out)), high=numpy.sqrt(6. / (n_in + n_out)),
|
||
size=(n_in, n_out)), dtype=theano.config.floatX)
|
||
W = theano.shared(value=W_values, name='W')
|
||
if b is None:
|
||
b_values = numpy.zeros((n_out,), dtype=theano.config.floatX)
|
||
b = theano.shared(value=b_values, name='b')
|
||
|
||
self.W = W
|
||
self.b = b
|
||
|
||
if use_bias:
|
||
lin_output = T.dot(input, self.W) + self.b
|
||
else:
|
||
lin_output = T.dot(input, self.W)
|
||
|
||
self.output = (lin_output if activation is None else activation(lin_output))
|
||
|
||
# parameters of the model
|
||
if use_bias:
|
||
self.params = [self.W, self.b]
|
||
else:
|
||
self.params = [self.W]
|
||
|
||
def _dropout_from_layer(rng, layer, p):
|
||
"""p is the probablity of dropping a unit
|
||
"""
|
||
srng = theano.tensor.shared_randomstreams.RandomStreams(rng.randint(999999))
|
||
# p=1-p because 1's indicate keep and p is prob of dropping
|
||
mask = srng.binomial(n=1, p=1-p, size=layer.shape)
|
||
# The cast is important because
|
||
# int * float32 = float64 which pulls things off the gpu
|
||
output = layer * T.cast(mask, theano.config.floatX)
|
||
return output
|
||
|
||
class DropoutHiddenLayer(HiddenLayer):
|
||
def __init__(self, rng, input, n_in, n_out,
|
||
activation, dropout_rate, use_bias, W=None, b=None):
|
||
super(DropoutHiddenLayer, self).__init__(
|
||
rng=rng, input=input, n_in=n_in, n_out=n_out, W=W, b=b,
|
||
activation=activation, use_bias=use_bias)
|
||
|
||
self.output = _dropout_from_layer(rng, self.output, p=dropout_rate)
|
||
|
||
class MLPDropout(object):
|
||
"""A multilayer perceptron with dropout"""
|
||
def __init__(self,rng,input,layer_sizes,dropout_rates,activations,use_bias=True):
|
||
|
||
#rectified_linear_activation = lambda x: T.maximum(0.0, x)
|
||
|
||
# Set up all the hidden layers
|
||
self.weight_matrix_sizes = zip(layer_sizes, layer_sizes[1:])
|
||
self.layers = []
|
||
self.dropout_layers = []
|
||
self.activations = activations
|
||
next_layer_input = input
|
||
#first_layer = True
|
||
# dropout the input
|
||
next_dropout_layer_input = _dropout_from_layer(rng, input, p=dropout_rates[0])
|
||
layer_counter = 0
|
||
for n_in, n_out in self.weight_matrix_sizes[:-1]:
|
||
next_dropout_layer = DropoutHiddenLayer(rng=rng,
|
||
input=next_dropout_layer_input,
|
||
activation=activations[layer_counter],
|
||
n_in=n_in, n_out=n_out, use_bias=use_bias,
|
||
dropout_rate=dropout_rates[layer_counter])
|
||
self.dropout_layers.append(next_dropout_layer)
|
||
next_dropout_layer_input = next_dropout_layer.output
|
||
|
||
# Reuse the parameters from the dropout layer here, in a different
|
||
# path through the graph.
|
||
next_layer = HiddenLayer(rng=rng,
|
||
input=next_layer_input,
|
||
activation=activations[layer_counter],
|
||
# scale the weight matrix W with (1-p)
|
||
W=next_dropout_layer.W * (1 - dropout_rates[layer_counter]),
|
||
b=next_dropout_layer.b,
|
||
n_in=n_in, n_out=n_out,
|
||
use_bias=use_bias)
|
||
self.layers.append(next_layer)
|
||
next_layer_input = next_layer.output
|
||
#first_layer = False
|
||
layer_counter += 1
|
||
|
||
# Set up the output layer
|
||
n_in, n_out = self.weight_matrix_sizes[-1]
|
||
dropout_output_layer = LogisticRegression(
|
||
input=next_dropout_layer_input,
|
||
n_in=n_in, n_out=n_out)
|
||
self.dropout_layers.append(dropout_output_layer)
|
||
|
||
# Again, reuse paramters in the dropout output.
|
||
output_layer = LogisticRegression(
|
||
input=next_layer_input,
|
||
# scale the weight matrix W with (1-p)
|
||
W=dropout_output_layer.W * (1 - dropout_rates[-1]),
|
||
b=dropout_output_layer.b,
|
||
n_in=n_in, n_out=n_out)
|
||
self.layers.append(output_layer)
|
||
|
||
# Use the negative log likelihood of the logistic regression layer as
|
||
# the objective.
|
||
self.dropout_negative_log_likelihood = self.dropout_layers[-1].negative_log_likelihood
|
||
self.dropout_errors = self.dropout_layers[-1].errors
|
||
|
||
self.negative_log_likelihood = self.layers[-1].negative_log_likelihood
|
||
self.errors = self.layers[-1].errors
|
||
self.testlabel = self.layers[-1].testlabel
|
||
self.testprobs = self.layers[-1].testprobs
|
||
# Grab all the parameters together.
|
||
self.params = [ param for layer in self.dropout_layers for param in layer.params ]
|
||
|
||
def predict(self, new_data):
|
||
next_layer_input = new_data
|
||
for i,layer in enumerate(self.layers):
|
||
if i<len(self.layers)-1:
|
||
next_layer_input = self.activations[i](T.dot(next_layer_input,layer.W) + layer.b)
|
||
else:
|
||
p_y_given_x = T.nnet.softmax(T.dot(next_layer_input, layer.W) + layer.b)
|
||
y_pred = T.argmax(p_y_given_x, axis=1)
|
||
return y_pred
|
||
|
||
def predict_p(self, new_data):
|
||
next_layer_input = new_data
|
||
for i,layer in enumerate(self.layers):
|
||
if i<len(self.layers)-1:
|
||
next_layer_input = self.activations[i](T.dot(next_layer_input,layer.W) + layer.b)
|
||
else:
|
||
p_y_given_x = T.nnet.softmax(T.dot(next_layer_input, layer.W) + layer.b)
|
||
return p_y_given_x
|
||
|
||
class MLP(object):
|
||
"""Multi-Layer Perceptron Class
|
||
|
||
A multilayer perceptron is a feedforward artificial neural network model
|
||
that has one layer or more of hidden units and nonlinear activations.
|
||
Intermediate layers usually have as activation function tanh or the
|
||
sigmoid function (defined here by a ``HiddenLayer`` class) while the
|
||
top layer is a softamx layer (defined here by a ``LogisticRegression``
|
||
class).
|
||
"""
|
||
|
||
def __init__(self, rng, input, n_in, n_hidden, n_out):
|
||
"""Initialize the parameters for the multilayer perceptron
|
||
|
||
:type rng: numpy.random.RandomState
|
||
:param rng: a random number generator used to initialize weights
|
||
|
||
:type input: theano.tensor.TensorType
|
||
:param input: symbolic variable that describes the input of the
|
||
architecture (one minibatch)
|
||
|
||
:type n_in: int
|
||
:param n_in: number of input units, the dimension of the space in
|
||
which the datapoints lie
|
||
|
||
:type n_hidden: int
|
||
:param n_hidden: number of hidden units
|
||
|
||
:type n_out: int
|
||
:param n_out: number of output units, the dimension of the space in
|
||
which the labels lie
|
||
|
||
"""
|
||
|
||
# Since we are dealing with a one hidden layer MLP, this will translate
|
||
# into a HiddenLayer with a tanh activation function connected to the
|
||
# LogisticRegression layer; the activation function can be replaced by
|
||
# sigmoid or any other nonlinear function
|
||
self.hiddenLayer = HiddenLayer(rng=rng, input=input,
|
||
n_in=n_in, n_out=n_hidden,
|
||
activation=T.tanh)
|
||
|
||
# The logistic regression layer gets as input the hidden units
|
||
# of the hidden layer
|
||
self.logRegressionLayer = LogisticRegression(
|
||
input=self.hiddenLayer.output,
|
||
n_in=n_hidden,
|
||
n_out=n_out)
|
||
|
||
# L1 norm ; one regularization option is to enforce L1 norm to
|
||
# be small
|
||
|
||
# negative log likelihood of the MLP is given by the negative
|
||
# log likelihood of the output of the model, computed in the
|
||
# logistic regression layer
|
||
self.negative_log_likelihood = self.logRegressionLayer.negative_log_likelihood
|
||
# same holds for the function computing the number of errors
|
||
self.errors = self.logRegressionLayer.errors
|
||
|
||
# the parameters of the model are the parameters of the two layer it is
|
||
# made out of
|
||
self.params = self.hiddenLayer.params + self.logRegressionLayer.params
|
||
|
||
class LogisticRegression(object):
|
||
"""Multi-class Logistic Regression Class
|
||
|
||
The logistic regression is fully described by a weight matrix :math:`W`
|
||
and bias vector :math:`b`. Classification is done by projecting data
|
||
points onto a set of hyperplanes, the distance to which is used to
|
||
determine a class membership probability.
|
||
"""
|
||
|
||
def __init__(self, input, n_in, n_out, W=None, b=None):
|
||
""" Initialize the parameters of the logistic regression
|
||
|
||
:type input: theano.tensor.TensorType
|
||
:param input: symbolic variable that describes the input of the
|
||
architecture (one minibatch)
|
||
|
||
:type n_in: int
|
||
:param n_in: number of input units, the dimension of the space in
|
||
which the datapoints lie
|
||
|
||
:type n_out: int
|
||
:param n_out: number of output units, the dimension of the space in
|
||
which the labels lie
|
||
|
||
"""
|
||
|
||
# initialize with 0 the weights W as a matrix of shape (n_in, n_out)
|
||
if W is None:
|
||
self.W = theano.shared(
|
||
value=numpy.zeros((n_in, n_out), dtype=theano.config.floatX),
|
||
name='W')
|
||
else:
|
||
self.W = W
|
||
|
||
# initialize the baises b as a vector of n_out 0s
|
||
if b is None:
|
||
self.b = theano.shared(
|
||
value=numpy.zeros((n_out,), dtype=theano.config.floatX),
|
||
name='b')
|
||
else:
|
||
self.b = b
|
||
|
||
# compute vector of class-membership probabilities in symbolic form
|
||
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
|
||
|
||
# compute prediction as class whose probability is maximal in
|
||
# symbolic form
|
||
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
|
||
|
||
# parameters of the model
|
||
self.params = [self.W, self.b]
|
||
|
||
def negative_log_likelihood(self, y):
|
||
"""Return the mean of the negative log-likelihood of the prediction
|
||
of this model under a given target distribution.
|
||
|
||
.. math::
|
||
|
||
\frac{1}{|\mathcal{D}|} \mathcal{L} (\theta=\{W,b\}, \mathcal{D}) =
|
||
\frac{1}{|\mathcal{D}|} \sum_{i=0}^{|\mathcal{D}|} \log(P(Y=y^{(i)}|x^{(i)}, W,b)) \\
|
||
\ell (\theta=\{W,b\}, \mathcal{D})
|
||
|
||
:type y: theano.tensor.TensorType
|
||
:param y: corresponds to a vector that gives for each example the
|
||
correct label
|
||
|
||
Note: we use the mean instead of the sum so that
|
||
the learning rate is less dependent on the batch size
|
||
"""
|
||
# y.shape[0] is (symbolically) the number of rows in y, i.e.,
|
||
# number of examples (call it n) in the minibatch
|
||
# T.arange(y.shape[0]) is a symbolic vector which will contain
|
||
# [0,1,2,... n-1] T.log(self.p_y_given_x) is a matrix of
|
||
# Log-Probabilities (call it LP) with one row per example and
|
||
# one column per class LP[T.arange(y.shape[0]),y] is a vector
|
||
# v containing [LP[0,y[0]], LP[1,y[1]], LP[2,y[2]], ...,
|
||
# LP[n-1,y[n-1]]] and T.mean(LP[T.arange(y.shape[0]),y]) is
|
||
# the mean (across minibatch examples) of the elements in v,
|
||
# i.e., the mean log-likelihood across the minibatch.
|
||
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
|
||
|
||
def errors(self, y):
|
||
"""Return a float representing the number of errors in the minibatch ;
|
||
zero one loss over the size of the sminibatch
|
||
|
||
:type y: theano.tensor.TensorType
|
||
:param y: corresponds to a vector that gives for each example the
|
||
correct label
|
||
"""
|
||
|
||
# check if y has same dimension of y_pred
|
||
if y.ndim != self.y_pred.ndim:
|
||
raise TypeError('y should have the same shape as self.y_pred',
|
||
('y', target.type, 'y_pred', self.y_pred.type))
|
||
# check if y is of the correct datatype
|
||
if y.dtype.startswith('int'):
|
||
# the T.neq operator returns a vector of 0s and 1s, where 1
|
||
# represents a mistake in prediction
|
||
return T.mean(T.neq(self.y_pred, y))
|
||
else:
|
||
raise NotImplementedError()
|
||
|
||
def testlabel(self):
|
||
return self.y_pred
|
||
|
||
def testprobs(self):
|
||
return self.p_y_given_x
|
||
|
||
class LeNetConvPoolLayer(object):
|
||
"""Pool Layer of a convolutional network """
|
||
|
||
def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2), non_linear="tanh"):
|
||
"""
|
||
Allocate a LeNetConvPoolLayer with shared variable internal parameters.
|
||
|
||
:type rng: numpy.random.RandomState
|
||
:param rng: a random number generator used to initialize weights
|
||
|
||
:type input: theano.tensor.dtensor4
|
||
:param input: symbolic image tensor, of shape image_shape
|
||
|
||
:type filter_shape: tuple or list of length 4
|
||
:param filter_shape: (number of filters, num input feature maps,
|
||
filter height,filter width)
|
||
|
||
:type image_shape: tuple or list of length 4
|
||
:param image_shape: (batch size, num input feature maps,
|
||
image height, image width)
|
||
|
||
:type poolsize: tuple or list of length 2
|
||
:param poolsize: the downsampling (pooling) factor (#rows,#cols)
|
||
"""
|
||
|
||
assert image_shape[1] == filter_shape[1]
|
||
self.input = input
|
||
self.filter_shape = filter_shape
|
||
self.image_shape = image_shape
|
||
self.poolsize = poolsize
|
||
self.non_linear = non_linear
|
||
# there are "num input feature maps * filter height * filter width"
|
||
# inputs to each hidden unit
|
||
"""numpy.prod: Return the product of array elements over a given axis."""
|
||
fan_in = numpy.prod(filter_shape[1:])
|
||
# each unit in the lower layer receives a gradient from:
|
||
# "num output feature maps * filter height * filter width" /
|
||
# pooling size
|
||
fan_out = (filter_shape[0] * numpy.prod(filter_shape[2:]) /numpy.prod(poolsize))
|
||
# initialize weights with random weights
|
||
if self.non_linear=="none" or self.non_linear=="relu":
|
||
"""self.W is the initial filter"""
|
||
self.W = theano.shared(numpy.asarray(rng.uniform(low=-0.01,high=0.01,size=filter_shape),
|
||
dtype=theano.config.floatX),borrow=True,name="W_conv")
|
||
else:
|
||
W_bound = numpy.sqrt(6. / (fan_in + fan_out))
|
||
self.W = theano.shared(numpy.asarray(rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),
|
||
dtype=theano.config.floatX),borrow=True,name="W_conv")
|
||
b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX)
|
||
self.b = theano.shared(value=b_values, borrow=True, name="b_conv")
|
||
|
||
# convolve input feature maps with filters
|
||
"""利用theano的卷积库函数进行卷积计算
|
||
返回值:Tensor of filtered images, with shape ([number images,]=100 [number filters,]=1 image height, image width).(返回的img的高和宽)
|
||
"""
|
||
conv_out = conv.conv2d(input=input, filters=self.W,filter_shape=self.filter_shape, image_shape=self.image_shape)
|
||
if self.non_linear=="tanh":
|
||
|
||
conv_out_tanh = T.tanh(conv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
|
||
"""max-pooling"""
|
||
self.output = downsample.max_pool_2d(input=conv_out_tanh, ds=self.poolsize, ignore_border=True)
|
||
elif self.non_linear=="relu":
|
||
conv_out_tanh = ReLU(conv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
|
||
self.output = downsample.max_pool_2d(input=conv_out_tanh, ds=self.poolsize, ignore_border=True)
|
||
else:
|
||
pooled_out = downsample.max_pool_2d(input=conv_out, ds=self.poolsize, ignore_border=True)
|
||
self.output = pooled_out + self.b.dimshuffle('x', 0, 'x', 'x')
|
||
self.params = [self.W, self.b]
|
||
|
||
def predict(self, new_data, batch_size):
|
||
"""
|
||
predict for new data
|
||
"""
|
||
img_shape = (batch_size, 1, self.image_shape[2], self.image_shape[3])
|
||
conv_out = conv.conv2d(input=new_data, filters=self.W, filter_shape=self.filter_shape, image_shape=img_shape)
|
||
if self.non_linear=="tanh":
|
||
conv_out_tanh = T.tanh(conv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
|
||
output = downsample.max_pool_2d(input=conv_out_tanh, ds=self.poolsize, ignore_border=True)
|
||
if self.non_linear=="relu":
|
||
conv_out_tanh = ReLU(conv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
|
||
output = downsample.max_pool_2d(input=conv_out_tanh, ds=self.poolsize, ignore_border=True)
|
||
else:
|
||
pooled_out = downsample.max_pool_2d(input=conv_out, ds=self.poolsize, ignore_border=True)
|
||
output = pooled_out + self.b.dimshuffle('x', 0, 'x', 'x')
|
||
return output
|
||
|