First commit
This commit is contained in:
118
mlp_network.py
Normal file
118
mlp_network.py
Normal file
@@ -0,0 +1,118 @@
|
||||
import numpy as np
|
||||
from recordclass import recordclass
|
||||
from typing import NamedTuple, Tuple, List, Callable, Generator
|
||||
from import_data import train_x_y, test_x_y
|
||||
import sys
|
||||
|
||||
|
||||
class LossFun(NamedTuple):
|
||||
exec: Callable[[np.array, np.array], float]
|
||||
deriv: Callable[[np.array, np.array], np.array]
|
||||
|
||||
|
||||
def sum_squares_loss_func(predicted: np.array, gold: np.array) -> float:
|
||||
return sum((predicted - gold) ** 2)
|
||||
|
||||
|
||||
def sum_squares_loss_derivative(predicted: np.array, gold: np.array) -> np.array:
|
||||
return 2 * (predicted - gold)
|
||||
|
||||
|
||||
sum_squares_loss = LossFun(sum_squares_loss_func, sum_squares_loss_derivative)
|
||||
|
||||
|
||||
def sigmoid(x: np.array) -> np.array:
|
||||
return np.where(x >= 0,
|
||||
1 / (1 + np.exp(-x)),
|
||||
np.exp(x) / (1 + np.exp(x)))
|
||||
|
||||
|
||||
def sigmoid_deriv(x: np.array) -> np.array:
|
||||
return sigmoid(x) * (1 - sigmoid(x))
|
||||
|
||||
|
||||
def softmax(x: np.array) -> np.array:
|
||||
array_sum = sum(x)
|
||||
return np.exp(x) / np.exp(array_sum)
|
||||
|
||||
|
||||
class FFNeuralNetwork:
|
||||
Layer = recordclass("Layer", "weights last_out last_linear")
|
||||
|
||||
def __init__(self, structure: List[int], loss_fun: LossFun, learn_rate: float = 0.001):
|
||||
self.learn_rate = learn_rate
|
||||
self.loss_fun: LossFun = loss_fun
|
||||
self.layers: List[FFNeuralNetwork.Layer] = []
|
||||
for i, layer_size in enumerate(structure[1:]):
|
||||
self.layers.append(
|
||||
FFNeuralNetwork.Layer(
|
||||
weights=np.zeros([structure[i], layer_size]),
|
||||
last_out=np.zeros(layer_size),
|
||||
last_linear=np.zeros(layer_size)))
|
||||
|
||||
def feed_forward(self, datum: List[float]):
|
||||
out = datum
|
||||
for i, layer in enumerate(self.layers):
|
||||
layer.last_out = sigmoid(self.linear_forward(i, out))
|
||||
out = layer.last_out
|
||||
return sigmoid(out)
|
||||
|
||||
def linear_forward(self, layer_index, last_output: np.array):
|
||||
layer = self.layers[layer_index]
|
||||
result = np.dot(layer.weights.T, last_output) # + layer.bias
|
||||
layer.last_linear = result
|
||||
return result
|
||||
|
||||
def predict(self, datum: List[float]):
|
||||
return np.argmax(self.feed_forward(datum))
|
||||
|
||||
def calculate_loss(self, input_data: List[float], golden: List[int]):
|
||||
return self.loss_fun.exec(self.feed_forward(input_data), np.array(golden))
|
||||
|
||||
def back_prop(self, input_data: List[int], golden: int, output: np.array):
|
||||
golden = [1 if i == golden else 0 for i in range(len(output))]
|
||||
|
||||
layer = self.layers[-1]
|
||||
dloss_dout = self.loss_fun.deriv(output, golden)
|
||||
dout_dlast_layer = sigmoid_deriv(layer.last_linear)
|
||||
dlast_layer_dweights = self.layers[-2].last_out
|
||||
dloss_dweights = np.outer(dlast_layer_dweights, (dloss_dout * dout_dlast_layer))
|
||||
layer.weights += self.learn_rate * dloss_dweights
|
||||
|
||||
dloss_dprev = self.backprop_middle_layers(dloss_dweights) if len(self.layers) > 2 else dloss_dweights
|
||||
|
||||
layer = self.layers[0]
|
||||
dout_dlast_linear = sigmoid_deriv(layer.last_linear)
|
||||
dlast_linear_dinput = input_data
|
||||
dloss_dinput = np.outer(dlast_linear_dinput, np.dot(dout_dlast_linear, dloss_dprev))
|
||||
layer.weights += self.learn_rate * dloss_dinput
|
||||
|
||||
def backprop_middle_layers(self, dloss_dprev: np.array):
|
||||
for i, layer in enumerate(reversed(self.layers[1:-1])):
|
||||
dout_dlast_layer = sigmoid_deriv(layer.last_linear)
|
||||
dlast_layer_dweights = self.layers[i - 1].last_out
|
||||
dloss_dweights = np.dot(dlast_layer_dweights, np.dot(dout_dlast_layer, dloss_dprev))
|
||||
layer.weights += self.learn_rate * dloss_dweights
|
||||
dloss_dprev = dloss_dweights
|
||||
return dloss_dprev
|
||||
|
||||
def train(self, input_data: Callable[[], Generator], epochs: int = 4):
|
||||
for epoch in range(epochs):
|
||||
print(f"Training epoch: {epoch + 1}")
|
||||
for datum, label in input_data():
|
||||
self.back_prop(datum, label, self.feed_forward(datum))
|
||||
|
||||
|
||||
def train_and_test_neural_network():
|
||||
model = FFNeuralNetwork([28**2, 100, 10], sum_squares_loss, 0.0001)
|
||||
training_data_gen = train_x_y(1000)
|
||||
test_data = test_x_y(10)()
|
||||
model.train(training_data_gen, 5)
|
||||
for test_datum, label in test_data:
|
||||
print(model.feed_forward(test_datum), label)
|
||||
np.set_printoptions(threshold=sys.maxsize)
|
||||
print(model.layers[0].weights)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
train_and_test_neural_network()
|
||||
Reference in New Issue
Block a user