-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
rbm.py
88 lines (66 loc) · 3.13 KB
/
rbm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# coding:utf-8
import logging
import numpy as np
from scipy.special import expit
from mla.base import BaseEstimator
from mla.utils import batch_iterator
np.random.seed(9999)
sigmoid = expit
"""
References:
A Practical Guide to Training Restricted Boltzmann Machines https://www.cs.toronto.edu/~hinton/absps/guideTR.pdf
"""
class RBM(BaseEstimator):
y_required = False
def __init__(self, n_hidden=128, learning_rate=0.1, batch_size=10, max_epochs=100):
"""Bernoulli Restricted Boltzmann Machine (RBM)
Parameters
----------
n_hidden : int, default 128
The number of hidden units.
learning_rate : float, default 0.1
batch_size : int, default 10
max_epochs : int, default 100
"""
self.max_epochs = max_epochs
self.batch_size = batch_size
self.lr = learning_rate
self.n_hidden = n_hidden
def fit(self, X, y=None):
self.n_visible = X.shape[1]
self._init_weights()
self._setup_input(X, y)
self._train()
def _init_weights(self):
self.W = np.random.randn(self.n_visible, self.n_hidden) * 0.1
# Bias for visible and hidden units
self.bias_v = np.zeros(self.n_visible, dtype=np.float32)
self.bias_h = np.zeros(self.n_hidden, dtype=np.float32)
self.errors = []
def _train(self):
"""Use CD-1 training procedure, basically an exact inference for `positive_associations`,
followed by a "non burn-in" block Gibbs Sampling for the `negative_associations`."""
for i in range(self.max_epochs):
error = 0
for batch in batch_iterator(self.X, batch_size=self.batch_size):
positive_hidden = sigmoid(np.dot(batch, self.W) + self.bias_h)
hidden_states = self._sample(positive_hidden) # sample hidden state h1
positive_associations = np.dot(batch.T, positive_hidden)
negative_visible = sigmoid(np.dot(hidden_states, self.W.T) + self.bias_v)
negative_visible = self._sample(negative_visible) # use the sampled hidden state h1 to sample v1
negative_hidden = sigmoid(np.dot(negative_visible, self.W) + self.bias_h)
negative_associations = np.dot(negative_visible.T, negative_hidden)
lr = self.lr / float(batch.shape[0])
self.W += lr * ((positive_associations - negative_associations) / float(self.batch_size))
self.bias_h += lr * (negative_hidden.sum(axis=0) - negative_associations.sum(axis=0))
self.bias_v += lr * (np.asarray(batch.sum(axis=0)).squeeze() - negative_visible.sum(axis=0))
error += np.sum((batch - negative_visible) ** 2)
self.errors.append(error)
logging.info("Iteration %s, error %s" % (i, error))
logging.debug("Weights: %s" % self.W)
logging.debug("Hidden bias: %s" % self.bias_h)
logging.debug("Visible bias: %s" % self.bias_v)
def _sample(self, X):
return X > np.random.random_sample(size=X.shape)
def _predict(self, X=None):
return sigmoid(np.dot(X, self.W) + self.bias_h)