DCGAN-使用paddlepaddle2.0实现
作者:互联网
DCGAN-使用paddlepaddle2.0实现
简介
- DCGAN(深度卷积生成对抗网络),它由一个生成模型和一个判别模型组成,生成模型用于生成图片,辨别模型用于辨别生成的图片的真伪,不断地生成与判别,网络逐渐可以生成较为逼真的图片。
- 预览效果如下
目录结构
-- root
-- data
-- imgs
-- models
-- output
config.py
Dataset.py
main.py
model64.py
model96.py
tools.py
unzip.py
目录说明
- root是工作目录
- data存放数据集目录,数据集目录下存放数据集的压缩文件
- imgs存放解压后的数据集
- models下存放不同模型的中间参数
- output存放训练过程中的预览图片
- 其余带后缀的都不为目录
我的代码
1.) 解压操作
- 对应为 unzip.py ,用于吧数据集压缩包解压到目的目录。
'''解压zip文件到目的目录'''
import zipfile
import os
# zip_src: 需要解压的文件路径
# dst_dir: 解压后文件存放路径
def unzip_file(zip_src, dst_dir):
r = zipfile.is_zipfile(zip_src)
if r:
fz = zipfile.ZipFile(zip_src, 'r')
for file in fz.namelist():
fz.extract(file, dst_dir)
else:
print('This is not a zip file !!!')
2.) 配置文件
- 对应为 config.py ,可以根据个人需要进行修改
import os
class Config:
img_size=96
lr = 0.0002
z_dim = 100 # 噪声维度
g_every = 4 # 每 5个batch训练一次生成器
d_every = 2 # 每 3个batch训练一次判别器
test = False
epoch = 100 # 迭代次数
batch_size = 24 #
# BCELOSS的两个参数
beta1=0.5
beta2=0.999
imgs_path = os.getcwd() + "/imgs/faces/" # 图片路径
# output='/root/paddlejob/workspace/output'
output = os.getcwd()
output_path = output + '/output/' # 输出图片路径
checkpoints_path= output + f'/models/models_{img_size}/' #检查点目录
3.) 数据集
- 对应 Dataset.py,主要便于异步读取数据,便于main.py里的Dataloader
import os
import paddle
from paddle.io import Dataset
from PIL import Image
import paddle.vision.transforms as T
import cv2
from config import Config
opt=Config()
class DataGenerater(Dataset):
def __init__(self,opt=opt):
super(DataGenerater, self).__init__()
self.dir = opt.imgs_path
self.datalist = os.listdir(self.dir) if opt.test==False else os.listdir(self.dir)[:100]
self.batch_size=opt.batch_size
img=Image.open(self.dir+self.datalist[0])
self.image_size = img.size
img.close()
self.transform=T.Compose([
T.Resize(opt.img_size),
T.CenterCrop(opt.img_size),
T.ToTensor(),
])
self.num_path_dict={}
# 每次迭代时返回数据和对应的标签
def __getitem__(self, idx):
path=self.dir + self.datalist[idx]
img=cv2.imread(path)
if self.transform:
img=self.transform(img)
self.num_path_dict[idx]=path
return (img, idx)
def get_img_path(self, idx):
return self.num_path_dict[idx]
# 返回整个数据集的总数
def __len__(self):
return len(self.datalist)
4.) 生成器和判别器
-
生成器(Generator)与判别器(Discriminator)可能比较难理解,建议参考 吴达恩的课。通过学习,你可以比看书更容易理解卷积,池化的过程,还有一些专业性的概念;
-
然后我有陆续构建了训练图片大小为96 x 96, 128 x 128, 256 x 256的模型,但效果不是很好,就不放出来了;
-
首先导入必要的库
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from tools import conv_initializer,bn_initializer
- 接着定义判别器
class Discriminator(nn.Layer):
def __init__(self, channels_img, features_d):
super(Discriminator, self).__init__()
# Input : N x C x 64 x 64
self.disc=nn.Sequential(
nn.Conv2D( # 32 x 32
channels_img, features_d, kernel_size=4, stride=2, padding=1,
weight_attr=paddle.ParamAttr(initializer=conv_initializer())
),
nn.LeakyReLU(0.2),
self._block(features_d , features_d*2 , 4, 2, 1), # 16 x 16
self._block(features_d*2 , features_d*4 , 4, 2, 1), # 8 x 8
self._block(features_d*4 , features_d*8 , 4, 2, 1), # 4 x 4
nn.Conv2D( features_d*8, 1, kernel_size=4, stride=2, padding=0,# 1 x 1
weight_attr=paddle.ParamAttr(initializer=conv_initializer() )
),
nn.Sigmoid(),
)
def _block(self, in_channels, out_channels, kernel_size, stride, padding):
return nn.Sequential(
nn.Conv2D(
in_channels, out_channels, kernel_size, stride, padding, bias_attr=False,
weight_attr=paddle.ParamAttr(initializer=conv_initializer() )
),
nn.LeakyReLU(0.2),
)
def forward(self, input):
return self.disc(input)
- 然后是生成器
class Generator(nn.Layer):
def __init__(self, z_dim, channels_img, features_g):
super(Generator, self).__init__()
self.gen=nn.Sequential(
# Input: N x z_dim x 1 x 1
self._block(z_dim , features_g*16 , 4, 1, 0), # N x f_g x 16 x 16
self._block(features_g*16 , features_g*8 , 4, 2, 1), # N x f_g x 32 x 32
self._block(features_g*8 , features_g*4 , 4, 2, 1), # N x f_g x 64 x 64
self._block(features_g*4 , features_g*2 , 4, 2, 1), # N x f_g x 128 x 128
nn.Conv2DTranspose(
features_g*2, channels_img, kernel_size=4, stride=2, padding=1, bias_attr=False,
weight_attr=paddle.ParamAttr(initializer=conv_initializer() )
),
nn.Tanh() # [-1, 1]
)
def _block(self, in_channels, out_channels, kernel_size, stride, padding):
return nn.Sequential(
nn.Conv2DTranspose(
in_channels, out_channels, kernel_size, stride, padding, bias_attr=False,
weight_attr=paddle.ParamAttr(initializer=conv_initializer() )
),
nn.BatchNorm2D(
out_channels,
weight_attr=paddle.ParamAttr(initializer=bn_initializer() ) ,
momentum=0.8
),
nn.ReLU(),
)
def forward(self, input):
return self.gen(input)
- 最后测试代码,检测输出形状是否出差错
def test():
N, C, H, W= 8, 3, 64, 64
z_dim = 100
X=paddle.randn( (N, C, H, W ))
disc = Discriminator(C, N)
print("1:",disc(X).shape)
assert disc(X).shape == [N, 1, 1 ,1]
z=paddle.randn( (N, z_dim, 1, 1) )
gen=Generator(z_dim, C, N)
print("2:",gen(z).shape)
test()
5.) main函数
- 导入必要的库
import os
import paddle
import paddle.nn as nn
import paddle.fluid as fluid
import paddle.optimizer as optim
import paddle.vision.transforms as T
import cv2
from tqdm import tqdm
import matplotlib
matplotlib.use('Agg')
# %matplotlib inline
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import time
from tools import tensor_to_img
from Dataset import DataGenerater
from unzip import unzip_file
from config import Config
import warnings
import math
import random
- 初始化配置,解压数据集
opt = Config()
if opt.img_size==64:
from model64 import Generator,Discriminator
elif opt.img_size==96:
from model96 import Generator,Discriminator
elif opt.img_size==256:
from model256 import Generator,Discriminator
if not os.path.exists(opt.imgs_path):
print("开始解压")
unzip_file('data/data_test/test_faces.zip', './imgs')
print("解压完成")
if not os.path.exists(os.getcwd() + f'/models/'):
os.mkdir( os.getcwd() + f'/models/' )
os.mkdir( opt.checkpoints_path )
warnings.filterwarnings('ignore')
paddle.disable_static()
use_gpu = paddle.is_compiled_with_cuda()
place = paddle.fluid.CUDAPlace(0) if use_gpu else paddle.fluid.CPUPlace()
- 主模块,包括了读入已经训练的模型,训练,展示,与保存模型
if __name__=="__main__":
batch_size=opt.batch_size
lr=opt.lr
z_dim = opt.z_dim
beta1,beta2=opt.beta1,opt.beta2
losses =[[],[]]
real_label = paddle.full( (opt.batch_size,1,1,1), 1., dtype='float32')
fake_label = paddle.full( (opt.batch_size,1,1,1), 0., dtype='float32')
X = 20 #窗口大小
#一行子窗口数量
num=math.sqrt(batch_size)
x=round(num) if math.fabs( math.floor(num)**2-batch_size )<1e-6 else math.floor(num)+1
print("start training: ")
print("---------------------------------")
print("num = ",num)
with paddle.fluid.dygraph.guard(place):
#损失函数
loss = nn.BCELoss()
netD = Discriminator(channels_img=3, features_d=10)
netG = Generator(z_dim=z_dim, channels_img=3, features_g=10)
optimizerD = optim.Adam(parameters=netD.parameters(), learning_rate=lr, beta1=beta1, beta2=beta2)
optimizerG = optim.Adam(parameters=netG.parameters(), learning_rate=lr, beta1=beta1, beta2=beta2)
if not os.path.exists( opt.checkpoints_path ):
os.mkdir( opt.checkpoints_path )
if not os.path.exists( opt.output_path ):
os.mkdir( opt.output_path)
last = opt.img_size
order_name = 9
model_path = opt.checkpoints_path+ f"model_{last}_{order_name}/"
print("model path:", model_path)
if os.path.exists(model_path):
print("model exists")
netD_dict, optD_dict = paddle.load(model_path+"netD.pdparams" ), \
paddle.load(model_path+"adamD.pdopt" )
netD.set_state_dict( netD_dict )
optimizerD.set_state_dict( optD_dict )
print(" Model D suc")
netG_dict, optG_dict = paddle.load(model_path+"netG.pdparams" ), \
paddle.load(model_path+"adamG.pdopt" )
netG.set_state_dict( netG_dict )
optimizerG.set_state_dict( optG_dict )
print(" Model G suc")
plt.ion()
train_dataset = DataGenerater(opt=opt)
train_loader = paddle.io.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
print("X, x = ",X,x)
count=0
print("all imgs len:", len(train_dataset))
for pass_id in range(opt.epoch):
print(f"epotch {pass_id}: ", end=" " )
for batch_id, (data, labels) in enumerate( tqdm(train_loader) ):
#训练判别器
if batch_id % opt.d_every==0:
# print("train dis:")
optimizerD.clear_grad()
output = netD(data)
errD_real = loss(output, real_label)
errD_real.backward()
optimizerD.step()
optimizerD.clear_grad()
noise = paddle.randn([batch_size, z_dim, 1, 1],'float32')
fake = netG(noise)
output = netD(fake.detach())
errD_fake = loss(output, fake_label)
errD_fake.backward()
optimizerD.step()
optimizerD.clear_grad()
errD = errD_real + errD_fake
losses[0].append(errD.numpy()[0])
if batch_id % opt.g_every==0:
###训练生成器
# print("train gen:")
optimizerG.clear_grad()
noise = paddle.randn([batch_size, z_dim, 1, 1] , 'float32')
fake = netG(noise)
output = netD(fake)
errG = loss(output, real_label)
errG.backward()
optimizerG.step()
optimizerG.clear_grad()
losses[1].append(errG.numpy()[0])
if batch_id % 50 == 0:
# 每轮的生成结果
generated_image = netG(noise).numpy()
imgs=np.split(generated_image, generated_image.shape[0], 0)
plt.figure(figsize=(16, 4))
for i, ele in enumerate(imgs):
if i==4:
break
temp_img=ele.squeeze(0)
temp_img=tensor_to_img(temp_img)
plt.subplot(1, 4, i+1)
plt.axis('off') #去掉坐标轴
plt.imshow(temp_img)
plt.savefig(opt.output_path+f"{pass_id}_{count}.jpg")
count+=1
plt.pause(1e-10)
if pass_id % 2==0:
order = order_name+ 1+ pass_id//2
model_path = opt.checkpoints_path + f"model_{opt.img_size}_{order}/"
if not os.path.exists(model_path):
os.mkdir(model_path)
netD_path, optimD_path = model_path+"netD.pdparams", model_path+"adamD.pdopt"
netD_dict, optD_dict = netD.state_dict(), optimizerD.state_dict()
paddle.save(netD_dict , netD_path )
paddle.save(optD_dict , optimD_path)
netG_path, optimG_path = model_path+"netG.pdparams", model_path+"adamG.pdopt"
netG_dict, optG_dict = netG.state_dict(), optimizerG.state_dict()
paddle.save(netG_dict , netG_path )
paddle.save(optG_dict , optimG_path)
其他
-
开始看视频讲解算法的时候觉得只要知道了原理,写代码应该会很容易,但事实恰恰相反,由于从视频中学到的只是个大概,具体的实施过程还需要关注很多细节,比如硬件与软件配置,还要去了解怎么获取数据,怎么读取写入数据,怎么分析数据。比如写一个DataLoader,我试图用自己的方法重写一个,确实有些成效,但是数据载入存取效率低下,即便试图通过多线程与多进程改进载入数据的速度,但还是会出现许多bug;限于自己的知识储备,只好去借他山之石了。
-
接下来抽时间看下各种算法。
2021-04-10 更新,感谢观看-
标签:opt,nn,实现,self,paddle,DCGAN,paddlepaddle2.0,import,size 来源: https://blog.csdn.net/qq_43063807/article/details/118768846