其他分享
首页 > 其他分享> > DCGAN,WGAN,SGAN核心代码

DCGAN,WGAN,SGAN核心代码

作者:互联网

SGAN:用自己的图片
# 导入需要的包
from PIL import Image  # Image 用于读取影像
from skimage import io  # io也可用于读取影响,效果比Image读取的更好一些

import tensorflow as tf  # 用于构建神经网络模型
import matplotlib.pyplot as plt  # 用于绘制生成影像的结果
import numpy as np  # 读取影像
import os  # 文件夹操作
import time  # 计时
from keras.layers import Input, Dense, Reshape, Flatten, Dropout
from keras.layers import BatchNormalization, Activation, ZeroPadding2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam

import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
import sys

import numpy as np

class GAN():
    def __init__(self):
        self.img_rows = 64
        self.img_cols = 64
        self.channels = 3
        self.img_shape = (self.img_rows, self.img_cols, self.channels)
        self.latent_dim = 100

        optimizer = Adam(0.0002, 0.5)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',
            optimizer=optimizer,
            metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        # The generator takes noise as input and generates imgs
        z = Input(shape=(self.latent_dim,))
        img = self.generator(z)

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated images as input and determines validity
        validity = self.discriminator(img)

        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z, validity)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)


    def build_generator(self):

        model = Sequential()

        model.add(Dense(256, input_dim=self.latent_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))

        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))

        model.add(Dense(1024))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))

        model.add(Dense(1024))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))

        model.add(Dense(1024))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))


        model.add(Dense(np.prod(self.img_shape), activation='tanh'))
        model.add(Reshape(self.img_shape))

        model.summary()

        noise = Input(shape=(self.latent_dim,))
        img = model(noise)

        return Model(noise, img)

    def build_discriminator(self):

        model = Sequential()

        model.add(Flatten(input_shape=self.img_shape))

        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))

        model.add(Dense(256))
        model.add(LeakyReLU(alpha=0.2))

        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))

        model.add(Dense(256))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))

        model.add(Dense(256))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(1, activation='sigmoid'))
        model.summary()

        img = Input(shape=self.img_shape)
        validity = model(img)

        return Model(img, validity)

    def train(self, epochs, batch_size=128, sample_interval=50):

        # # Load the dataset
        # path = "./mnist.npz"
        # f = np.load(path)
        # X_train, y_train = f["x_train"], f["y_train"]
        #
        # # Rescale -1 to 1
        # X_train = X_train / 127.5 - 1.
        # print("X_train.shape:", X_train.shape)# X_train.shape: (60000, 28, 28)
        # X_train = np.expand_dims(X_train, axis=3)# expand_X_train.shape: (60000, 28, 28, 1)
        # print("expand_X_train.shape:", X_train.shape)
        input_dir = "./papa_image"
        images = os.listdir(input_dir)
        image_len = len(images)

        # 设置一个空data,用于存放数据
        data = np.empty((image_len, self.img_rows, self.img_rows, self.channels), dtype="float32")

        # 逐个图像读取
        for i in range(image_len):
            # 如果导入的是skimage.io,则读取影像应该写为img = io.imread(input_dir + images[i])
            img = Image.open(input_dir + "/" + images[i])  # 打开图像
            img = img.resize((self.img_rows, self.img_rows))  # 将256*256变成64*64
            arr = np.asarray(img, dtype="float32")  # 将格式改为np.array
            data[i, :, :, :] = arr  # 将其放入data中

        X_train = data
        X_train = X_train / 127.5 - 1
        # Adversarial ground truths
        valid = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        for epoch in range(epochs):

            # ---------------------
            #  Train Discriminator
            # ---------------------

            # Select a random batch of images
            idx = np.random.randint(0, X_train.shape[0], batch_size)
            imgs = X_train[idx]

            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))

            # Generate a batch of new images
            gen_imgs = self.generator.predict(noise)

            # Train the discriminator
            d_loss_real = self.discriminator.train_on_batch(imgs, valid)
            d_loss_fake = self.discriminator.train_on_batch(gen_imgs, fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------

            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))

            # Train the generator (to have the discriminator label samples as valid)
            g_loss = self.combined.train_on_batch(noise, valid)

            # Plot the progress
            print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))

            # If at save interval => save generated image samples
            if epoch % sample_interval == 0:
                self.sample_images(epoch)

    def sample_images(self, epoch):
        r, c = 5, 5
        noise = np.random.normal(0, 1, (r * c, self.latent_dim))
        gen_imgs = self.generator.predict(noise)

        # Rescale images 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5
        print("gen_imgs.shape:", gen_imgs.shape)# gen_imgs.shape: (25, 28, 28, 3)
        images = gen_imgs
        # 将一个batch_size的所有图像进行保存。
        batch_size = len(images)
        n = np.int(np.sqrt(batch_size))

        # 读取图像大小,并生成掩模canvas
        image_size = np.shape(images)[2]
        n_channel = np.shape(images)[3]
        # images = np.reshape(images[batch_size - 1, image_size, image_size, n_channel])
        canvas = np.empty((n * image_size, n * image_size, n_channel))

        # 为每个掩模赋值
        for i in range(n):
            for j in range(n):
                canvas[i * image_size:(i + 1) * image_size, j * image_size:(j + 1) * image_size, :] = images[
                    n * i + j].reshape(image_size, image_size, n_channel)

        # 绘制结果,并设置坐标轴
        plt.figure(figsize=(5, 5))
        plt.imshow(canvas, cmap="gray")
        label = "Epoch: {0}".format(epoch + 1)
        # 保存绘制的结果
        plt.savefig("images/%d.png" % epoch)
        plt.close()





if __name__ == '__main__':
    gan = GAN()
    gan.train(epochs=30000, batch_size=32, sample_interval=200)
   SGAN:	手写字体生成。
   from keras.layers import Input, Dense, Reshape, Flatten, Dropout
from keras.layers import BatchNormalization, Activation, ZeroPadding2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam

import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
import sys

import numpy as np

class GAN():
    def __init__(self):
        self.img_rows = 28
        self.img_cols = 28
        self.channels = 1
        self.img_shape = (self.img_rows, self.img_cols, self.channels)
        self.latent_dim = 100

        optimizer = Adam(0.0002, 0.5)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',
            optimizer=optimizer,
            metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        # The generator takes noise as input and generates imgs
        z = Input(shape=(self.latent_dim,))
        img = self.generator(z)

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated images as input and determines validity
        validity = self.discriminator(img)

        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z, validity)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)


    def build_generator(self):

        model = Sequential()

        model.add(Dense(256, input_dim=self.latent_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(1024))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(np.prod(self.img_shape), activation='tanh'))
        model.add(Reshape(self.img_shape))

        model.summary()

        noise = Input(shape=(self.latent_dim,))
        img = model(noise)

        return Model(noise, img)

    def build_discriminator(self):

        model = Sequential()

        model.add(Flatten(input_shape=self.img_shape))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(256))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(1, activation='sigmoid'))
        model.summary()

        img = Input(shape=self.img_shape)
        validity = model(img)

        return Model(img, validity)

    def train(self, epochs, batch_size=128, sample_interval=50):

        # Load the dataset
        path = "./mnist.npz"
        f = np.load(path)
        X_train, y_train = f["x_train"], f["y_train"]

        # Rescale -1 to 1
        X_train = X_train / 127.5 - 1.
        print("X_train.shape:", X_train.shape)# X_train.shape: (60000, 28, 28)
        X_train = np.expand_dims(X_train, axis=3)# expand_X_train.shape: (60000, 28, 28, 1)
        print("expand_X_train.shape:", X_train.shape)

        # Adversarial ground truths
        valid = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        for epoch in range(epochs):

            # ---------------------
            #  Train Discriminator
            # ---------------------

            # Select a random batch of images
            idx = np.random.randint(0, X_train.shape[0], batch_size)
            imgs = X_train[idx]

            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))

            # Generate a batch of new images
            gen_imgs = self.generator.predict(noise)

            # Train the discriminator
            d_loss_real = self.discriminator.train_on_batch(imgs, valid)
            d_loss_fake = self.discriminator.train_on_batch(gen_imgs, fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------

            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))

            # Train the generator (to have the discriminator label samples as valid)
            g_loss = self.combined.train_on_batch(noise, valid)

            # Plot the progress
            print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))

            # If at save interval => save generated image samples
            if epoch % sample_interval == 0:
                self.sample_images(epoch)

    def sample_images(self, epoch):
        r, c = 5, 5
        noise = np.random.normal(0, 1, (r * c, self.latent_dim))
        gen_imgs = self.generator.predict(noise)

        # Rescale images 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5

        fig, axs = plt.subplots(r, c)
        cnt = 0
        for i in range(r):
            for j in range(c):
                axs[i,j].imshow(gen_imgs[cnt, :,:,0], cmap='gray')
                axs[i,j].axis('off')
                cnt += 1
        fig.savefig("images/%d.png" % epoch)
        plt.close()


if __name__ == '__main__':
    gan = GAN()
    gan.train(epochs=30000, batch_size=32, sample_interval=200)
DCGAN:自己图片。
# 导入需要的包
from PIL import Image  # Image 用于读取影像
from skimage import io  # io也可用于读取影响,效果比Image读取的更好一些

import tensorflow as tf  # 用于构建神经网络模型
import matplotlib.pyplot as plt  # 用于绘制生成影像的结果
import numpy as np  # 读取影像
import os  # 文件夹操作
import time  # 计时

# 设置相关参数
is_training = True
input_dir = "./data/papa_image/"  # 原始数据的文件夹路径

# 设置超参数 hyper parameters
batch_size = 64
image_width = 64
image_height = 64
image_channel = 3
data_shape = [64, 64, 3]
data_length = 64 * 64 * 3

z_dim = 100
learning_rate = 0.00005
beta1 = 0.5
epoch = 500
# 读取数据的函数
def prepare_data(input_dir):
    '''
    函数功能:通过输入图像的路径,读取训练数据
    :参数 input_dir: 图像数据所在的根目录,即"./face"
    :参数 floder: 图像数据所在的子目录, 即"./face/A"
    :return: 返回读取好的训练数据
    '''

    # 遍历图像路径,并获取图像数量
    images = os.listdir(input_dir)
    image_len = len(images)

    # 设置一个空data,用于存放数据
    data = np.empty((image_len, image_width, image_height, image_channel), dtype="float32")

    # 逐个图像读取
    for i in range(image_len):
        # 如果导入的是skimage.io,则读取影像应该写为img = io.imread(input_dir + images[i])
        img = Image.open(input_dir  + "/" + images[i])  # 打开图像
        img = img.resize((image_width, image_height))  # 将256*256变成64*64
        arr = np.asarray(img, dtype="float32")  # 将格式改为np.array
        data[i, :, :, :] = arr  # 将其放入data中

    sess = tf.Session()
    sess.run(tf.initialize_all_variables())
    data = tf.reshape(data, [-1, image_width, image_height, image_channel])
    train_data = data * 1.0 / 127.5 - 1.0  # 对data进行正则化
    train_data = tf.reshape(train_data, [-1, data_length])  # 将其拉伸成一维向量
    train_set = sess.run(train_data)
    sess.close()
    return train_set


# 定义生成器
def Generator(z, is_training, reuse):
    '''
    函数功能:输入噪声z,生成图像gen_img
    :param z:即输入数据,一般为噪声
    :param is_training:是否为训练环节
    :return: 返回生成影像gen_img
    '''

    # 图像的channel维度变化为1->1024->512->256->128->3
    depths = [1024, 512, 256, 128] + [data_shape[2]]

    with tf.variable_scope("Generator", reuse=reuse):
        # 第一层全连接层
        with tf.variable_scope("g_fc1", reuse=reuse):
            output = tf.layers.dense(z, depths[0] * 4 * 4, trainable=is_training)
            output = tf.reshape(output, [batch_size, 4, 4, depths[0]])
            output = tf.nn.relu(tf.layers.batch_normalization(output, training=is_training))

        # 第二层反卷积层1024
        with tf.variable_scope("g_dc1", reuse=reuse):
            output = tf.layers.conv2d_transpose(output, depths[1], [5, 5], strides=(2, 2),
                                                padding="SAME", trainable=is_training)
            output = tf.nn.relu(tf.layers.batch_normalization(output, training=is_training))

        # 第三层反卷积层512
        with tf.variable_scope("g_dc2", reuse=reuse):
            output = tf.layers.conv2d_transpose(output, depths[2], [5, 5], strides=(2, 2),
                                                padding="SAME", trainable=is_training)
            output = tf.nn.relu(tf.layers.batch_normalization(output, training=is_training))

        # 第四层反卷积层256
        with tf.variable_scope("g_dc3", reuse=reuse):
            output = tf.layers.conv2d_transpose(output, depths[3], [5, 5], strides=(2, 2),
                                                padding="SAME", trainable=is_training)
            output = tf.nn.relu(tf.layers.batch_normalization(output, training=is_training))

        # 第五层反卷积层128
        with tf.variable_scope("g_dc4", reuse=reuse):
            output = tf.layers.conv2d_transpose(output, depths[4], [5, 5], strides=(2, 2),
                                                padding="SAME", trainable=is_training)
            gen_img = tf.nn.tanh(output)

    return gen_img


# 定义判别器
def Discriminator(x, is_training, reuse):
    '''
    函数功能:判别输入的图像是真或假
    :param x: 输入数据
    :param is_training: 是否为训练环节
    :return: 判别结果
    '''

    # channel维度变化为:3->64->128->256->512
    depths = [data_shape[2]] + [64, 128, 256, 512]

    with tf.variable_scope("Discriminator", reuse=reuse):
        # 第一层卷积层,注意用的是leaky_relu函数
        with tf.variable_scope("d_cv1", reuse=reuse):
            output = tf.layers.conv2d(x, depths[1], [5, 5], strides=(2, 2),
                                      padding="SAME", trainable=is_training)
            output = tf.nn.leaky_relu(tf.layers.batch_normalization(output, training=is_training))

        # 第二层卷积层,注意用的是leaky_relu函数
        with tf.variable_scope("d_cv2", reuse=reuse):
            output = tf.layers.conv2d(output, depths[2], [5, 5], strides=(2, 2),
                                      padding="SAME", trainable=is_training)
            output = tf.nn.leaky_relu(tf.layers.batch_normalization(output, training=is_training))

        # 第三层卷积层,注意用的是leaky_relu函数
        with tf.variable_scope("d_cv3", reuse=reuse):
            output = tf.layers.conv2d(output, depths[3], [5, 5], strides=(2, 2),
                                      padding="SAME", trainable=is_training)
            output = tf.nn.leaky_relu(tf.layers.batch_normalization(output, training=is_training))

        # 第四层卷积层,注意用的是leaky_relu函数
        with tf.variable_scope("d_cv4", reuse=reuse):
            output = tf.layers.conv2d(output, depths[4], [5, 5], strides=(2, 2),
                                      padding="SAME", trainable=is_training)
            output = tf.nn.leaky_relu(tf.layers.batch_normalization(output, training=is_training))

        # 第五层全链接层
        with tf.variable_scope("d_fc1", reuse=reuse):
            output = tf.layers.flatten(output)
            disc_img = tf.layers.dense(output, 1, trainable=is_training)

    return disc_img


def plot_and_save(order, images):
    '''
    函数功能:绘制生成器的结果,并保存
    :param order:
    :param images:
    :return:
    '''

    # 将一个batch_size的所有图像进行保存
    batch_size = len(images)
    n = np.int(np.sqrt(batch_size))

    # 读取图像大小,并生成掩模canvas
    image_size = np.shape(images)[2]
    n_channel = np.shape(images)[3]
    images = np.reshape(images, [-1, image_size, image_size, n_channel])
    canvas = np.empty((n * image_size, n * image_size, image_channel))

    # 为每个掩模赋值
    for i in range(n):
        for j in range(n):
            canvas[i * image_size:(i + 1) * image_size, j * image_size:(j + 1) * image_size, :] = images[
                n * i + j].reshape(64, 64, 3)

    # 绘制结果,并设置坐标轴
    plt.figure(figsize=(8, 8))
    plt.imshow(canvas, cmap="gray")
    label = "Epoch: {0}".format(order + 1)
    plt.xlabel(label)

    # 为每个文件命名
    if type(order) is str:
        file_name = order
    else:
        file_name = "face_gen" + str(order)

    # 保存绘制的结果
    plt.savefig(file_name)
    print(os.getcwd())
    print("Image saved in file: ", file_name)
    plt.close()


# 定义训练过程
def training():
    '''
    函数功能:实现DCGAN的训练过程
    '''
    # 准备数据。这里输入根目录,以A的影像为例进行图像生成
    data = prepare_data(input_dir)

    # 构建网络结构,这是程序的核心部分---------------------------------------------
    x = tf.placeholder(tf.float32, shape=[None, data_length], name="Input_data")
    x_img = tf.reshape(x, [-1] + data_shape)
    z = tf.placeholder(tf.float32, shape=[None, z_dim], name="latent_var")

    G = Generator(z, is_training=True, reuse=False)
    D_fake_logits = Discriminator(G, is_training=True, reuse=False)
    D_true_logits = Discriminator(x_img, is_training=True, reuse=True)

    # 定义生成器的损失函数G_loss
    G_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
        logits=D_fake_logits, labels=tf.ones_like(D_fake_logits)))

    # 定义判别器的损失函数D_loss
    D_loss_1 = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
        logits=D_true_logits, labels=tf.ones_like(D_true_logits)))
        
    D_loss_2 = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
        logits=D_fake_logits, labels=tf.zeros_like(D_fake_logits)))
    D_loss = D_loss_1 + D_loss_2

    # 定义方差
    total_vars = tf.trainable_variables()
    d_vars = [var for var in total_vars if "d_" in var.name]
    g_vars = [var for var in total_vars if "g_" in var.name]

    # 定义优化方式
    with tf.control_dependencies(tf.get_collection(tf.GraphKeys.UPDATE_OPS)):
        g_optimization = tf.train.AdamOptimizer(learning_rate=learning_rate,
                                                beta1=beta1).minimize(G_loss, var_list=g_vars)
        d_optimization = tf.train.AdamOptimizer(learning_rate=learning_rate,
                                                beta1=beta1).minimize(D_loss, var_list=d_vars)
    print("we successfully make the network")

    start_time = time.time()
    sess = tf.Session()
    sess.run(tf.initialize_all_variables())

    for i in range(epoch):
        total_batch = int(len(data)/batch_size)
        d_value = 0
        g_value = 0
        for j in range(total_batch):
            batch_xs = data[j * batch_size: j*batch_size + batch_size]
            # 判别器
            z_sampled1 = np.random.uniform(low=-1.0, high=1.0, size=[batch_size, z_dim])
            Op_d, d_ = sess.run([d_optimization, D_loss], feed_dict={x:batch_xs, z:z_sampled1})
            # 生成器
            z_sampled2 = np.random.uniform(low=-1.0, high=1.0, size=[batch_size, z_dim])
            Op_g, g_ = sess.run([g_optimization, G_loss], feed_dict={x: batch_xs, z:z_sampled2})
            # 尝试保存生成图像
            images_generated = sess.run(G, feed_dict={z:z_sampled2})
            d_value += d_/total_batch
            g_value += g_/total_batch
            plot_and_save(i, images_generated)

            # 输出时间和损失函数loss
            hour = int((time.time() - start_time) / 3600)
            min = int(((time.time() - start_time) - 3600 * hour) / 60)
            sec = int((time.time() - start_time) - 3600 * hour - 60 * min)
            print("Time: ", hour, "h", min, "min", sec, "sec", "   Epoch: ",
                  i, "G_loss: ", g_value, "D_loss: ", d_value)



if __name__ == "__main__":
    training()
   WGAN:
   from skimage import io, transform  # 用于读取影像
import tensorflow as tf  # 构造网络
import numpy as np
import matplotlib.pyplot as plt  # 绘制结果并保存
import os  # 创建文件夹
from keras.models import Model

image_width = 128  # 图像宽128像素
image_height = 128  # 图像高128像素
image_channel = 3  # 图像的通道数为3

input_dir = "./data/trainB/"
output_dir = "./data/result/"
batch_size = 64
z_dim = 128
lr_gen = 5e-5  # 生成器的学习率
lr_dis = 5e-5  # 判别器的学习率
epoch = 1000


# 读取数据的函数,参照之间的DCGAN代码,这里做的改进在于读取数据的库使用的是skimage而非PIL
def process_data():
    '''
    函数功能:读取路径下的所有图像,返回读取的图像数据集train_set和图像个数image_len
    '''
    images = os.listdir(input_dir)
    image_len = len(images)

    data = np.empty((image_len, image_width, image_height, image_channel), dtype="float32")

    for i in range(image_len):
        # 利用skimage.io.image函数读取图像。如果用PIL.Image读取则会报错
        img = io.imread(input_dir + images[i])
        print(img.shape)
        # 将所有图像resize成128*128
        img = transform.resize(img, (image_width, image_height))
        arr = (np.asarray(img, dtype="float32"))
        # 这里暂时不要对图像进行归一化处理,否则结果全是噪声
        data[i, :, :, :] = arr

    with tf.Session() as sess:
        sess.run(tf.initialize_all_variables())
        data = tf.reshape(data, [-1, image_width, image_height, image_channel])
        train_set = sess.run(data)

    return train_set, image_len

def leaky_relu(x, n, leak=0.2):
    return tf.maximum(x, leak * x, name=n)

def generator(input, random_dim, is_train, reuse=False):
    print("generator--is_train:", is_train)#generator--is_train: Tensor("input/is_train:0", dtype=bool)
    with tf.variable_scope('generator') as scope:
        if reuse:
            scope.reuse_variables()
        w1 = tf.get_variable('w1', shape=[random_dim, 4 * 4 * 512], dtype=tf.float32,
                             initializer=tf.truncated_normal_initializer(stddev=0.02))
        b1 = tf.get_variable('b1', shape=[512 * 4 * 4], dtype=tf.float32,
                             initializer=tf.constant_initializer(0.0))
        flat_conv1 = tf.add(tf.matmul(input, w1), b1, name='flat_conv1')
        # print("flat_conv1:", flat_conv1) flat_conv1: Tensor(" shape=(?, 8192), dtype=float32)

        # 4*4*512                            # -1 指的是随便
        conv1 = tf.reshape(flat_conv1, shape=[-1, 4, 4, 512], name='conv1')
        # print("conv1:", conv1)conv1: Tensor("generator/conv1:0", shape=(?, 4, 4, 512), dtype=float32)
        bn1 = tf.contrib.layers.batch_norm(conv1, is_training=is_train, epsilon=1e-5, decay=0.9,
                                           updates_collections=None, scope='bn1')
        act1 = tf.nn.relu(bn1, name='act1')

        # 8*8*256
        conv2 = tf.layers.conv2d_transpose(act1, 256, kernel_size=[5, 5], strides=[2, 2], padding="SAME",
                                           kernel_initializer=tf.truncated_normal_initializer(stddev=0.02),
                                           name='conv2')
        bn2 = tf.contrib.layers.batch_norm(conv2, is_training=is_train, epsilon=1e-5, decay=0.9,
                                           updates_collections=None, scope='bn2')
        act2 = tf.nn.relu(bn2, name='act2')

        # 16*16*128
        conv3 = tf.layers.conv2d_transpose(act2, 128, kernel_size=[5, 5], strides=[2, 2], padding="SAME",
                                           kernel_initializer=tf.truncated_normal_initializer(stddev=0.02),
                                           name='conv3')
        bn3 = tf.contrib.layers.batch_norm(conv3, is_training=is_train, epsilon=1e-5, decay=0.9,
                                           updates_collections=None, scope='bn3')
        act3 = tf.nn.relu(bn3, name='act3')

        # 32*32*64
        conv4 = tf.layers.conv2d_transpose(act3, 64, kernel_size=[5, 5], strides=[2, 2], padding="SAME",
                                           kernel_initializer=tf.truncated_normal_initializer(stddev=0.02),
                                           name='conv4')
        bn4 = tf.contrib.layers.batch_norm(conv4, is_training=is_train, epsilon=1e-5, decay=0.9,
                                           updates_collections=None, scope='bn4')
        act4 = tf.nn.relu(bn4, name='act4')

        # 64*64*32
        conv5 = tf.layers.conv2d_transpose(act4, 32, kernel_size=[5, 5], strides=[2, 2], padding="SAME",
                                           kernel_initializer=tf.truncated_normal_initializer(stddev=0.02),
                                           name='conv5')
        bn5 = tf.contrib.layers.batch_norm(conv5, is_training=is_train, epsilon=1e-5, decay=0.9,
                                           updates_collections=None, scope='bn5')
        act5 = tf.nn.relu(bn5, name='act5')

        # 128*128*3
        conv6 = tf.layers.conv2d_transpose(act5, image_channel, kernel_size=[5, 5], strides=[2, 2], padding="SAME",
                                           kernel_initializer=tf.truncated_normal_initializer(stddev=0.02),
                                           name='conv6')

        act6 = tf.nn.tanh(conv6, name='act6')
        # print("act6:", act6) # act6: Tensor("generator/act6:0", shape=(?, 128, 128, 3), dtype=float32)
        return act6


def discriminator(input, is_train, reuse=False):
    # print("is_train:", is_train)
    with tf.variable_scope('discriminator') as scope:
        if reuse:
            scope.reuse_variables()

        # 64*64*64
        conv1 = tf.layers.conv2d(input, 64, kernel_size=[5, 5], strides=[2, 2], padding="SAME",
                                 kernel_initializer=tf.truncated_normal_initializer(stddev=0.02),
                                 name='conv1')
        act1 = leaky_relu(conv1, n='act1')

        # 32*32*128
        conv2 = tf.layers.conv2d(act1, 128, kernel_size=[5, 5], strides=[2, 2], padding="SAME",
                                 kernel_initializer=tf.truncated_normal_initializer(stddev=0.02),
                                 name='conv2')
        bn2 = tf.contrib.layers.batch_norm(conv2, is_training=is_train, epsilon=1e-5, decay=0.9,
                                           updates_collections=None, scope='bn2')
        act2 = leaky_relu(bn2, n='act2')

        # 16*16*256
        conv3 = tf.layers.conv2d(act2, 256, kernel_size=[5, 5], strides=[2, 2], padding="SAME",
                                 kernel_initializer=tf.truncated_normal_initializer(stddev=0.02),
                                 name='conv3')
        bn3 = tf.contrib.layers.batch_norm(conv3, is_training=is_train, epsilon=1e-5, decay=0.9,
                                           updates_collections=None, scope='bn3')
        act3 = leaky_relu(bn3, n='act3')

        # 8*8*512
        conv4 = tf.layers.conv2d(act3, 512, kernel_size=[5, 5], strides=[2, 2], padding="SAME",
                                 kernel_initializer=tf.truncated_normal_initializer(stddev=0.02),
                                 name='conv4')
        bn4 = tf.contrib.layers.batch_norm(conv4, is_training=is_train, epsilon=1e-5, decay=0.9,
                                           updates_collections=None,
                                           scope='bn4')
        act4 = leaky_relu(bn4, n='act4')
        print("act4", act4)
        # start from act4
        dim = int(np.prod(act4.get_shape()[1:]))
        print("dim:", dim)
        fc1 = tf.reshape(act4, shape=[-1, dim], name='fc1')
        w2 = tf.get_variable('w2', shape=[fc1.shape[-1], 1], dtype=tf.float32,
                             initializer=tf.truncated_normal_initializer(stddev=0.02))
        b2 = tf.get_variable('b2', shape=[1], dtype=tf.float32,
                             initializer=tf.constant_initializer(0.0))
        # wgan不适用sigmoid
        logits = tf.add(tf.matmul(fc1, w2), b2, name='logits')
        print("logits:", logits)
        return logits


def plot_and_save(num, images):
    print("----",num, images.shape)# ---- 0 (64, 128, 128, 3)
    batch_size = len(images)
    n = np.int(np.sqrt(batch_size))

    image_size = np.shape(images)[2]
    n_channel = np.shape(images)[3]
    images = np.reshape(images, [-1, image_size, image_size, n_channel])
    canvas = np.empty((n * image_size, n * image_size, image_channel))

    for i in range(n):
        for j in range(n):
            canvas[i * image_size:(i + 1) * image_size, j * image_size:(j + 1) * image_size, :] = images[
                n * i + j].reshape(128, 128, 3)

    plt.figure(figsize=(8, 8))
    plt.imshow(canvas, cmap="gray")
    label = "Epoch: {0}".format(num + 1)
    plt.xlabel(label)

    if type(num) is str:
        file_name = num
    else:
        file_name = "pikachu_gen" + str(num)

    plt.savefig(file_name)
    print(output_dir)
    print("Image saved in file: ", file_name)
    plt.close()


def train():
    # 构建模型
    with tf.variable_scope("input"):
        # 模型中的输入数据
        real_image = tf.placeholder(tf.float32, shape=[None, image_height, image_width, image_channel], name="real_image")
        random_input = tf.placeholder(tf.float32, shape=[None, z_dim], name="rand_input")
        is_train = tf.placeholder(tf.bool, name="is_train")
    # print("real_image:", real_image) #  Tensor("input/real_image:0", shape=(?, 128, 128, 3), dtype=float32)
    # # 定义WGAN
    # print("is_train:", is_train)
    """
    real_image: Tensor("input/real_image:0", shape=(?, 128, 128, 3), dtype=float32)
    random_input: Tensor("input/rand_input:0", shape=(?, 128), dtype=float32)
    fake_image: Tensor("generator/act6:0", shape=(?, 128, 128, 3), dtype=float32)
    is_train: Tensor("input/is_train:0", dtype=bool)
    """
    fake_image = generator(random_input, z_dim, is_train)
    real_result = discriminator(real_image, is_train)
    print("--real_result:", real_image)
    fake_result = discriminator(fake_image, is_train, reuse=True)

    # 定义损失函数, 这是WGAN的改进所在
    d_loss = tf.reduce_mean(fake_result) - tf.reduce_mean(real_result)
    g_loss = -tf.reduce_mean(fake_result)

    # 定义方差
    t_vars = tf.trainable_variables()

    d_vars = [var
                 for var in t_vars
                     if 'discriminator' in var.name]
    g_vars = [var
                 for var in t_vars
                      if 'generator' in var.name]

    # 定义优化器,这里使用RMSProp
    trainer_d = tf.train.RMSPropOptimizer(learning_rate=0.0002).minimize(d_loss, var_list=d_vars)
    trainer_g = tf.train.RMSPropOptimizer(learning_rate=0.0002).minimize(g_loss, var_list=g_vars)
    # 权重裁剪至[-0.01, 0.01]
    d_clip = [v.assign(tf.clip_by_value(v, -0.01, 0.01)) for v in d_vars]
    # 模型构建完毕
    # 读取数据
    image_batch, samples_num = process_data()
    # print(image_batch.shape, samples_num)(200, 128, 128, 3) 200
    # 数据读取完毕
    batch_num = int(samples_num / batch_size)
    total_batch = 0
    # 创建会话并且初始化
    sess = tf.Session()
    sess.run(tf.global_variables_initializer())
    sess.run(tf.local_variables_initializer())
    print('total training sample num:%d' % samples_num)
    print('batch size: %d, batch num per epoch: %d, epoch num: %d' % (batch_size, batch_num, epoch))
    print('start training...')
    # 逐个epoch进行训练


    for i in range(epoch):
        # 逐个batch进行训练
        for j in range(batch_num):
            # 每次训练d_iters次判别器,训练g_iters次生成器
            d_iters = 5
            g_iters = 1
            # 随机噪声作为输入数据
            train_noise = np.random.uniform(-1.0, 1.0, size=[batch_size, z_dim]).astype(np.float32)
            # 每次训练判别器
            for k in range(d_iters):
                # 拿出batch_size张图像进行训练
                train_image = image_batch[j*batch_size:j*batch_size + batch_size]
                # 权值截断
                sess.run(d_clip)
                # 更新discriminator
                _, dLoss = sess.run([trainer_d, d_loss],
                                    feed_dict={random_input: train_noise, real_image: train_image, is_train: True})
            # 更新generator
            for k in range(g_iters):
                _, gLoss = sess.run([trainer_g, g_loss],
                                    feed_dict={random_input:train_noise, is_train:True})
            # 打印generator和discriminator的loss值
            print("train:[%d/%d], d_loss:%f, g_loss:%f" % (i, j, dLoss, gLoss))

        # 把训练10个epoch进行一次保持结果
        if i % 10 == 0:
            # 判断保存结果的文件夹是否存在,若不存在,则创建
            if not os.path.exists(output_dir):
                os.makedirs(output_dir)
            # 随机生成噪声作为输入
            sample_noise = np.random.uniform(-1.0, 1.0, size=[batch_size, z_dim]).astype(np.float32)
            # 根据generator生成结果
            # fake_image = generator(random_input, z_dim, is_train)
            imgtest = sess.run(fake_image, feed_dict={random_input:sample_noise, is_train:False})
            # imgtest的格式转换
            imgtest.astype(np.uint8)
            # 保存结果
            plot_and_save(i, imgtest)
            print("train:[%d], d_loss%f, g_loss:%f" % (i, dLoss, gLoss))





if __name__ == "__main__":
    train()

gz153016 发布了481 篇原创文章 · 获赞 236 · 访问量 50万+ 他的留言板 关注

标签:size,image,batch,train,DCGAN,tf,SGAN,self,WGAN
来源: https://blog.csdn.net/gz153016/article/details/104413369