《神经网络与深度学习》中的Python 3.x 代码network3.py
expand_mnist.py代码:
点击下载:expand_mnist
“””expand_mnist.py
~~~~~~~~~~~~~~~~~~
Take the 50,000 MNIST training images, and create an expanded set of
250,000 images, by displacing each training image up, down, left and
right, by one pixel. Save the resulting file to
../data/mnist_expanded.pkl.gz.
Note that this program is memory intensive, and may not run on small
systems.
“””
from __future__ import print_function
#### Libraries
# Standard library
import _pickle as cPickle
# import cPickle
import gzip
import os.path
import random
# Third-party libraries
import numpy as np
print(“Expanding the MNIST training set”)
if os.path.exists(“./mnist_expanded.pkl.gz”):
print(“The expanded training set already exists. Exiting.”)
else:
f = gzip.open(“./mnist.pkl.gz”, ‘rb’)
training_data, validation_data, test_data = cPickle.load(f, encoding=’iso-8859-1′)
f.close()
expanded_training_pairs = []
j = 0 # counter
for x, y inzip(training_data[0], training_data[1]):
expanded_training_pairs.append((x, y))
image = np.reshape(x, (-1, 28))
j += 1
if j %1000==0: print(“Expanding image number”, j)
# iterate over data telling us the details of how to
# do the displacement
for d, axis, index_position, index in [
(1, 0, “first”, 0),
(-1, 0, “first”, 27),
(1, 1, “last”, 0),
(-1, 1, “last”, 27)]:
new_img = np.roll(image, d, axis)
if index_position ==”first”:
new_img[index, :] = np.zeros(28)
else:
new_img[:, index] = np.zeros(28)
expanded_training_pairs.append((np.reshape(new_img, 784), y))
random.shuffle(expanded_training_pairs)
expanded_training_data = [list(d) for d in zip(*expanded_training_pairs)]
print(“Saving expanded data. This may take a few minutes.”)
f = gzip.open(“./mnist_expanded.pkl.gz”, “w”)
cPickle.dump((expanded_training_data, validation_data, test_data), f)
f.close()
network3.py代码:
点击下载:network3
“””network3.py
~~~~~~~~~~~~~~
A Theano-based program for training and running simple neural
networks.
Supports several layer types (fully connected, convolutional, max
pooling, softmax), and activation functions (sigmoid, tanh, and
rectified linear units, with more easily added).
When run on a CPU, this program is much faster than network.py and
network2.py. However, unlike network.py and network2.py it can also
be run on a GPU, which makes it faster still.
Because the code is based on Theano, the code is different in many
ways from network.py and network2.py. However, where possible I have
tried to maintain consistency with the earlier programs. In
particular, the API is similar to network2.py. Note that I have
focused on making the code simple, easily readable, and easily
modifiable. It is not optimized, and omits many desirable features.
This program incorporates ideas from the Theano documentation on
convolutional neural nets (notably,
http://deeplearning.net/tutorial/lenet.html ), from Misha Denil’s
implementation of dropout (https://github.com/mdenil/dropout ), and
from Chris Olah (http://colah.github.io ).
Written for Theano 0.6 and 0.7, needs some changes for more recent
versions of Theano.
“””
#### Libraries
# Standard library
import _pickle as cPickle
# import cPickle
import gzip
# Third-party libraries
import numpy as np
import theano
import theano.tensor as T
from theano.tensor.nnet import conv
from theano.tensor.nnet import softmax
from theano.tensor import shared_randomstreams
from theano.tensor.signal.pool import pool_2d
# from theano.tensor.signal import downsample
# Activation functions for neurons
def linear(z): return z
def ReLU(z): return T.maximum(0.0, z)
from theano.tensor.nnet import sigmoid
from theano.tensor import tanh
#### Constants
GPU = True
if GPU:
print(“Trying to run under a GPU. If this is not desired, then modify “+\
“network3.py\nto set the GPU flag to False.”)
try: theano.config.device =’gpu’
except: pass# it’s already set
theano.config.floatX = ‘float32’
else:
print(“Running with a CPU. If this is not desired, then the modify “+\
“network3.py to set\nthe GPU flag to True.”)
#### Load the MNIST data
def load_data_shared(filename=”./mnist.pkl.gz”):
f = gzip.open(filename, ‘rb’)
training_data, validation_data, test_data = cPickle.load(f, encoding=’iso-8859-1′)
f.close()
# 减少数据量,以便测试时临时快速观察流程。
RealCount = 20
temp_data = []
count = 0
for x, y inzip(training_data[0], training_data[1]):
temp_data.append((x, y))
count += 1
if count >= RealCount :
break
training_data = [list(d) for d in zip(*temp_data)]
temp_data = []
count = 0
for x, y inzip(validation_data[0], validation_data[1]):
temp_data.append((x, y))
count += 1
if count >= RealCount :
break
validation_data = [list(d) for d in zip(*temp_data)]
temp_data = []
count = 0
for x, y inzip(test_data[0], test_data[1]):
temp_data.append((x, y))
count += 1
if count >= RealCount :
break
test_data = [list(d) for d in zip(*temp_data)]
defshared(data):
“””Place the data into shared variables. This allows Theano to copy
the data to the GPU, if one is available.
“””
shared_x = theano.shared(
np.asarray(data[0], dtype=theano.config.floatX), borrow=True)
shared_y = theano.shared(
np.asarray(data[1], dtype=theano.config.floatX), borrow=True)
return shared_x, T.cast(shared_y, “int32”)
return [shared(training_data), shared(validation_data), shared(test_data)]
#### Main class used to construct and train networks
class Network(object):
def__init__(self, layers, mini_batch_size):
“””Takes a list of `layers`, describing the network architecture, and
a value for the `mini_batch_size` to be used during training
by stochastic gradient descent.
“””
self.layers = layers
self.mini_batch_size = mini_batch_size
self.params = [param for layer inself.layers for param in layer.params]
self.x = T.matrix(“x”)
self.y = T.ivector(“y”)
init_layer = self.layers[0]
init_layer.set_inpt(self.x, self.x, self.mini_batch_size)
for j inrange(1, len(self.layers)):
prev_layer, layer = self.layers[j-1], self.layers[j]
layer.set_inpt(
prev_layer.output, prev_layer.output_dropout, self.mini_batch_size)
self.output =self.layers[-1].output
self.output_dropout =self.layers[-1].output_dropout
defSGD(self, training_data, epochs, mini_batch_size, eta,
validation_data, test_data, lmbda=0.0):
“””Train the network using mini-batch stochastic gradient descent.”””
training_x, training_y = training_data
validation_x, validation_y = validation_data
test_x, test_y = test_data
# compute number of minibatches for training, validation and testing
num_training_batches = int(size(training_data)/mini_batch_size)
num_validation_batches = int(size(validation_data)/mini_batch_size)
num_test_batches = int(size(test_data)/mini_batch_size)
# define the (regularized) cost function, symbolic gradients, and updates
l2_norm_squared = sum([(layer.w**2).sum() for layer in self.layers])
cost = self.layers[-1].cost(self)+\
0.5*lmbda*l2_norm_squared/num_training_batches
grads = T.grad(cost, self.params)
updates = [(param, param-eta*grad)
for param, grad inzip(self.params, grads)]
# define functions to train a mini-batch, and to compute the
# accuracy in validation and test mini-batches.
i = T.lscalar() # mini-batch index
train_mb = theano.function(
[i], cost, updates=updates,
givens={
self.x:
training_x[i*self.mini_batch_size: (i+1)*self.mini_batch_size],
self.y:
training_y[i*self.mini_batch_size: (i+1)*self.mini_batch_size]
})
validate_mb_accuracy = theano.function(
[i], self.layers[-1].accuracy(self.y),
givens={
self.x:
validation_x[i*self.mini_batch_size: (i+1)*self.mini_batch_size],
self.y:
validation_y[i*self.mini_batch_size: (i+1)*self.mini_batch_size]
})
test_mb_accuracy = theano.function(
[i], self.layers[-1].accuracy(self.y),
givens={
self.x:
test_x[i*self.mini_batch_size: (i+1)*self.mini_batch_size],
self.y:
test_y[i*self.mini_batch_size: (i+1)*self.mini_batch_size]
})
self.test_mb_predictions = theano.function(
[i], self.layers[-1].y_out,
givens={
self.x:
test_x[i*self.mini_batch_size: (i+1)*self.mini_batch_size]
})
# Do the actual training
best_validation_accuracy = 0.0
for epoch inrange(epochs):
for minibatch_index inrange(num_training_batches):
iteration = num_training_batches*epoch+minibatch_index
# if iteration % 1000 == 0:
if iteration %10==0:
print(“Training mini-batch number {0}”.format(iteration))
cost_ij = train_mb(minibatch_index)
if (iteration+1) % num_training_batches ==0:
validation_accuracy = np.mean(
[validate_mb_accuracy(j) for j in range(num_validation_batches)])
print(“Epoch {0}: validation accuracy {1:.2%}”.format(
epoch, validation_accuracy))
if validation_accuracy >= best_validation_accuracy:
print(“This is the best validation accuracy to date.”)
best_validation_accuracy = validation_accuracy
best_iteration = iteration
if test_data:
test_accuracy = np.mean(
[test_mb_accuracy(j) for j in range(num_test_batches)])
print(‘The corresponding test accuracy is {0:.2%}’.format(
test_accuracy))
print(“Finished training network.”)
print(“Best validation accuracy of {0:.2%} obtained at iteration {1}”.format(
best_validation_accuracy, best_iteration))
print(“Corresponding test accuracy of {0:.2%}”.format(test_accuracy))
#### Define layer types
class ConvPoolLayer(object):
“””Used to create a combination of a convolutional and a max-pooling
layer. A more sophisticated implementation would separate the
two, but for our purposes we’ll always use them together, and it
simplifies the code, so it makes sense to combine them.
“””
def__init__(self, filter_shape, image_shape, poolsize=(2, 2),
activation_fn=sigmoid):
“””`filter_shape` is a tuple of length 4, whose entries are the number
of filters, the number of input feature maps, the filter height, and the
filter width.
`image_shape` is a tuple of length 4, whose entries are the
mini-batch size, the number of input feature maps, the image
height, and the image width.
`poolsize` is a tuple of length 2, whose entries are the y and
x pooling sizes.
“””
self.filter_shape = filter_shape
self.image_shape = image_shape
self.poolsize = poolsize
self.activation_fn=activation_fn
# initialize weights and biases
n_out = (filter_shape[0]*np.prod(filter_shape[2:])/np.prod(poolsize))
self.w = theano.shared(
np.asarray(
np.random.normal(loc=0, scale=np.sqrt(1.0/n_out), size=filter_shape),
dtype=theano.config.floatX),
borrow=True)
self.b = theano.shared(
np.asarray(
np.random.normal(loc=0, scale=1.0, size=(filter_shape[0],)),
dtype=theano.config.floatX),
borrow=True)
self.params = [self.w, self.b]
defset_inpt(self, inpt, inpt_dropout, mini_batch_size):
self.inpt = inpt.reshape(self.image_shape)
conv_out = conv.conv2d(
input=self.inpt, filters=self.w, filter_shape=self.filter_shape,
image_shape=self.image_shape)
# pooled_out = downsample.max_pool_2d(
# input=conv_out, ds=self.poolsize, ignore_border=True)
pooled_out = pool_2d(
input=conv_out, ds=self.poolsize, ignore_border=True)
self.output =self.activation_fn(
pooled_out + self.b.dimshuffle(‘x’, 0, ‘x’, ‘x’))
self.output_dropout =self.output # no dropout in the convolutional layers
class FullyConnectedLayer(object):
def__init__(self, n_in, n_out, activation_fn=sigmoid, p_dropout=0.0):
self.n_in = n_in
self.n_out = n_out
self.activation_fn = activation_fn
self.p_dropout = p_dropout
# Initialize weights and biases
self.w = theano.shared(
np.asarray(
np.random.normal(
loc=0.0, scale=np.sqrt(1.0/n_out), size=(n_in, n_out)),
dtype=theano.config.floatX),
name=’w’, borrow=True)
self.b = theano.shared(
np.asarray(np.random.normal(loc=0.0, scale=1.0, size=(n_out,)),
dtype=theano.config.floatX),
name=’b’, borrow=True)
self.params = [self.w, self.b]
defset_inpt(self, inpt, inpt_dropout, mini_batch_size):
self.inpt = inpt.reshape((mini_batch_size, self.n_in))
self.output =self.activation_fn(
(1-self.p_dropout)*T.dot(self.inpt, self.w) + self.b)
self.y_out = T.argmax(self.output, axis=1)
self.inpt_dropout = dropout_layer(
inpt_dropout.reshape((mini_batch_size, self.n_in)), self.p_dropout)
self.output_dropout =self.activation_fn(
T.dot(self.inpt_dropout, self.w) + self.b)
defaccuracy(self, y):
“Return the accuracy for the mini-batch.”
return T.mean(T.eq(y, self.y_out))
class SoftmaxLayer(object):
def__init__(self, n_in, n_out, p_dropout=0.0):
self.n_in = n_in
self.n_out = n_out
self.p_dropout = p_dropout
# Initialize weights and biases
self.w = theano.shared(
np.zeros((n_in, n_out), dtype=theano.config.floatX),
name=’w’, borrow=True)
self.b = theano.shared(
np.zeros((n_out,), dtype=theano.config.floatX),
name=’b’, borrow=True)
self.params = [self.w, self.b]
defset_inpt(self, inpt, inpt_dropout, mini_batch_size):
self.inpt = inpt.reshape((mini_batch_size, self.n_in))
self.output = softmax((1-self.p_dropout)*T.dot(self.inpt, self.w) +self.b)
self.y_out = T.argmax(self.output, axis=1)
self.inpt_dropout = dropout_layer(
inpt_dropout.reshape((mini_batch_size, self.n_in)), self.p_dropout)
self.output_dropout = softmax(T.dot(self.inpt_dropout, self.w) +self.b)
defcost(self, net):
“Return the log-likelihood cost.”
return-T.mean(T.log(self.output_dropout)[T.arange(net.y.shape[0]), net.y])
defaccuracy(self, y):
“Return the accuracy for the mini-batch.”
return T.mean(T.eq(y, self.y_out))
#### Miscellanea
def size(data):
“Return the size of the dataset `data`.”
return data[0].get_value(borrow=True).shape[0]
def dropout_layer(layer, p_dropout):
srng = shared_randomstreams.RandomStreams(
np.random.RandomState(0).randint(999999))
mask = srng.binomial(n=1, p=1-p_dropout, size=layer.shape)
return layer*T.cast(mask, theano.config.floatX)
def test():
# expanded_training_data, _, _ = load_data_shared(“./mnist_expanded.pkl.gz”)
training_data, validation_data, test_data = load_data_shared()
mini_batch_size = 10
net = Network( [
ConvPoolLayer(image_shape=(mini_batch_size, 1, 28, 28), filter_shape=(20, 1, 5, 5), poolsize=(2, 2), activation_fn=ReLU),
ConvPoolLayer(image_shape=(mini_batch_size, 20, 12, 12), filter_shape=(40, 20, 5, 5), poolsize=(2, 2), activation_fn=ReLU),
FullyConnectedLayer(n_in=40*4*4, n_out=1000, activation_fn=ReLU, p_dropout=0.5),
FullyConnectedLayer(n_in=1000, n_out=1000, activation_fn=ReLU, p_dropout=0.5),
SoftmaxLayer(n_in=1000, n_out=10, p_dropout=0.5)
], mini_batch_size)
# net.SGD(expanded_training_data, 40, mini_batch_size, 0.03, validation_data, test_data)
# net.SGD(expanded_training_data, 3, mini_batch_size, 0.03, validation_data, test_data)
net.SGD(training_data, 3, mini_batch_size, 0.03, validation_data, test_data)
if __name__ == ‘__main__’:
test()