import numpy as np
import tensorflow as tf
from sklearn import cross_validation
from sklearn import datasets
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from math import sqrt
import random
tf.logging.set_verbosity(tf.logging.ERROR)
def test_svm(x_train, x_test, y_train, y_test):
clf = SVR(kernel='linear').fit(x_train, y_train)
y_pred = clf.predict(x_test)
r2 = r2_score(y_test, y_pred)
rmse = sqrt(mean_squared_error(y_test, y_pred))
print('SVM: R2: {0:f}, RMSE:{1:f}'.format(r2, rmse))
def test_rf(x_train, x_test, y_train, y_test):
rlf = RandomForestRegressor().fit(x_train, y_train)
y_pred = rlf.predict(x_test)
r2 = r2_score(y_test, y_pred)
rmse = sqrt(mean_squared_error(y_test, y_pred))
print(' RF: R2: {0:f}, RMSE:{1:f}'.format(r2, rmse))
def test_tf_learn_dnn(x_train, x_test, y_train, y_test, steps=10000, hidden=[20, 20]):
feature_columns = [tf.contrib.layers.real_valued_column("", dimension=13)]
tfl = tf.contrib.learn.DNNRegressor(hidden_units=hidden, feature_columns=feature_columns)
tfl.fit(x=x_train, y=y_train, steps=steps)
y_pred = tfl.predict(x_test)
r2 = r2_score(y_test, y_pred)
rmse = sqrt(mean_squared_error(y_test, y_pred))
print('TFL: R2: {0:f}, RMSE:{1:f}'.format(r2, rmse))
def inference(x):
hidden1 = 20
hidden2 = 20
with tf.name_scope("l1") as scope:
w1 = tf.Variable(tf.truncated_normal([13, hidden1]), name="w1")
b1 = tf.Variable(tf.zeros([hidden1]), name="b1")
h1 = tf.nn.relu(tf.matmul(x, w1) + b1)
#h1 = tf.nn.dropout(h1, 0.9)
with tf.name_scope("l2") as scope:
w2 = tf.Variable(tf.truncated_normal([hidden1, hidden2]), name="w1")
b2 = tf.Variable(tf.zeros([hidden2]), name="b2")
h2 = tf.nn.relu(tf.matmul(h1, w2) + b2)
#h2 = tf.nn.dropout(h2, 0.9)
with tf.name_scope("l3") as scope:
w3 = tf.Variable(tf.truncated_normal([hidden2, 1]), name="w2")
b3 = tf.Variable(tf.zeros([1]), name="b3")
y = tf.matmul(h2, w3) + b3
return y
def loss(model, y_):
return tf.reduce_mean(tf.square(tf.sub(model, y_)))
def training(loss, rate):
return tf.train.AdagradOptimizer(rate).minimize(loss)
def test_my_dnn(x_train, x_test, y_train, y_test, batch_size=32, epoch=10000, shuffle=True):
max_size = x_train.shape[0]
n = max_size - batch_size
idx = list(range(x_train.shape[0]))
x = tf.placeholder(tf.float32, shape=[None, 13])
y_ = tf.placeholder(tf.float32, shape=[None])
model = inference(x)
loss_value = loss(model, y_)
train_op = training(loss_value, 0.1)
init = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init)
for e in range(epoch):
if shuffle:
random.shuffle(idx)
x_train = x_train[idx]
y_train = y_train[idx]
for i in range(n / batch_size):
batch = batch_size * i
x_train_b = x_train[batch:batch + batch_size]
y_train_b = y_train[batch:batch + batch_size]
_, l = sess.run([train_op, loss_value], feed_dict={x: x_train_b, y_: y_train_b})
#if e % 100 == 0:
# print e, l
y_pred = sess.run(model, feed_dict={x: x_test})
y_pred = y_pred.T[0]
r2 = r2_score(y_test, y_pred)
rmse = sqrt(mean_squared_error(y_test, y_pred))
print(' MY R2: {0:f}, RMSE:{1:f}'.format(r2, rmse))
if __name__ == "__main__":
boston = datasets.load_boston()
x_train, x_test, y_train, y_test = cross_validation.train_test_split(
boston.data, boston.target, test_size=0.2, random_state=0)
test_rf(x_train, x_test, y_train, y_test)
test_svm(x_train, x_test, y_train, y_test)
test_tf_learn_dnn(x_train, x_test, y_train, y_test)
test_my_dnn(x_train, x_test, y_train, y_test, shuffle=True, epoch=30000)