import numpy as np from recordclass import recordclass from typing import NamedTuple, Tuple, List, Callable, Generator from import_data import train_x_y, test_x_y import sys class LossFun(NamedTuple): exec: Callable[[np.array, np.array], float] deriv: Callable[[np.array, np.array], np.array] def sum_squares_loss_func(predicted: np.array, gold: np.array) -> float: return sum((predicted - gold) ** 2) def sum_squares_loss_derivative(predicted: np.array, gold: np.array) -> np.array: return 2 * (predicted - gold) sum_squares_loss = LossFun(sum_squares_loss_func, sum_squares_loss_derivative) def sigmoid(x: np.array) -> np.array: return np.where(x >= 0, 1 / (1 + np.exp(-x)), np.exp(x) / (1 + np.exp(x))) def sigmoid_deriv(x: np.array) -> np.array: return sigmoid(x) * (1 - sigmoid(x)) def softmax(x: np.array) -> np.array: array_sum = sum(x) return np.exp(x) / np.exp(array_sum) class FFNeuralNetwork: Layer = recordclass("Layer", "weights last_out last_linear") def __init__(self, structure: List[int], loss_fun: LossFun, learn_rate: float = 0.001): self.learn_rate = learn_rate self.loss_fun: LossFun = loss_fun self.layers: List[FFNeuralNetwork.Layer] = [] for i, layer_size in enumerate(structure[1:]): self.layers.append( FFNeuralNetwork.Layer( weights=np.zeros([structure[i], layer_size]), last_out=np.zeros(layer_size), last_linear=np.zeros(layer_size))) def feed_forward(self, datum: List[float]): out = datum for i, layer in enumerate(self.layers): layer.last_out = sigmoid(self.linear_forward(i, out)) out = layer.last_out return sigmoid(out) def linear_forward(self, layer_index, last_output: np.array): layer = self.layers[layer_index] result = np.dot(layer.weights.T, last_output) # + layer.bias layer.last_linear = result return result def predict(self, datum: List[float]): return np.argmax(self.feed_forward(datum)) def calculate_loss(self, input_data: List[float], golden: List[int]): return self.loss_fun.exec(self.feed_forward(input_data), np.array(golden)) def back_prop(self, input_data: List[int], golden: int, output: np.array): golden = [1 if i == golden else 0 for i in range(len(output))] layer = self.layers[-1] dloss_dout = self.loss_fun.deriv(output, golden) dout_dlast_layer = sigmoid_deriv(layer.last_linear) dlast_layer_dweights = self.layers[-2].last_out dloss_dweights = np.outer(dlast_layer_dweights, (dloss_dout * dout_dlast_layer)) layer.weights += self.learn_rate * dloss_dweights dloss_dprev = self.backprop_middle_layers(dloss_dweights) if len(self.layers) > 2 else dloss_dweights layer = self.layers[0] dout_dlast_linear = sigmoid_deriv(layer.last_linear) dlast_linear_dinput = input_data dloss_dinput = np.outer(dlast_linear_dinput, np.dot(dout_dlast_linear, dloss_dprev)) layer.weights += self.learn_rate * dloss_dinput def backprop_middle_layers(self, dloss_dprev: np.array): for i, layer in enumerate(reversed(self.layers[1:-1])): dout_dlast_layer = sigmoid_deriv(layer.last_linear) dlast_layer_dweights = self.layers[i - 1].last_out dloss_dweights = np.dot(dlast_layer_dweights, np.dot(dout_dlast_layer, dloss_dprev)) layer.weights += self.learn_rate * dloss_dweights dloss_dprev = dloss_dweights return dloss_dprev def train(self, input_data: Callable[[], Generator], epochs: int = 4): for epoch in range(epochs): print(f"Training epoch: {epoch + 1}") for datum, label in input_data(): self.back_prop(datum, label, self.feed_forward(datum)) def train_and_test_neural_network(): model = FFNeuralNetwork([28**2, 100, 10], sum_squares_loss, 0.0001) training_data_gen = train_x_y(1000) test_data = test_x_y(10)() model.train(training_data_gen, 5) for test_datum, label in test_data: print(model.feed_forward(test_datum), label) np.set_printoptions(threshold=sys.maxsize) print(model.layers[0].weights) if __name__ == "__main__": train_and_test_neural_network()