PaddleDetection-MaskRcnn相关结构以及优化器
作者:互联网
2021SC@SDUSC
首先上接Head部分
modeling/mask.py、modeling/head/mask_head.py解读:
相关配置文件:
'''
Mask: #掩膜
mask_target_generator: #产生掩膜
name: MaskTargetGenerator #产生掩膜类名
mask_resolution: 28 #像素值
'''
掩膜类:
@register
class Mask(object):
__inject__ = ['mask_target_generator']
def __init__(self, mask_target_generator):
super(Mask, self).__init__()
self.mask_target_generator = mask_target_generator
def __call__(self, inputs, rois, targets):
mask_rois, rois_has_mask_int32 = self.generate_mask_target(inputs, rois,
targets)
return mask_rois, rois_has_mask_int32
def generate_mask_target(self, inputs, rois, targets):
labels_int32 = targets['labels_int32']
proposals, proposals_num = rois
mask_rois, mask_rois_num, self.rois_has_mask_int32, self.mask_int32 = self.mask_target_generator(
im_info=inputs['im_info'],
gt_classes=inputs['gt_class'],
is_crowd=inputs['is_crowd'],
gt_segms=inputs['gt_poly'],
rois=proposals,
rois_num=proposals_num,
labels_int32=labels_int32)
self.mask_rois = (mask_rois, mask_rois_num)
return self.mask_rois, self.rois_has_mask_int32
def get_targets(self):
return self.mask_int32
modeling/head/mask_head.py:
相关配置文件:
'''
MaskHead: #掩膜头
mask_feat: #掩膜特征
name: MaskFeat #掩膜特征名
num_convs: 4 #卷积个数
feat_in: 256 #输入特征维度
feat_out: 256 #输出特征维度
mask_roi_extractor: #ROI提取
name: RoIAlign #RoIAlign
resolution: 14 #像素值
sampling_ratio: 2 #采样率
share_bbox_feat: False #是否贡献bbox特征
feat_in: 256 #特征维度
'''
掩膜特征类
@register
class MaskFeat(Layer):
__inject__ = ['mask_roi_extractor']
def __init__(self,
mask_roi_extractor=None,
num_convs=0,
feat_in=2048,
feat_out=256,
mask_num_stages=1,
share_bbox_feat=False):
super(MaskFeat, self).__init__()
self.num_convs = num_convs
self.feat_in = feat_in
self.feat_out = feat_out
self.mask_roi_extractor = mask_roi_extractor
self.mask_num_stages = mask_num_stages
self.share_bbox_feat = share_bbox_feat
self.upsample_module = []
fan_conv = feat_out * 3 * 3
fan_deconv = feat_out * 2 * 2
for i in range(self.mask_num_stages):
name = 'stage_{}'.format(i)
mask_conv = Sequential()
for j in range(self.num_convs):
conv_name = 'mask_inter_feat_{}'.format(j + 1)
mask_conv.add_sublayer(
conv_name,
Conv2D(
in_channels=feat_in if j == 0 else feat_out,
out_channels=feat_out,
kernel_size=3,
padding=1,
weight_attr=ParamAttr(
initializer=KaimingNormal(fan_in=fan_conv)),
bias_attr=ParamAttr(
learning_rate=2., regularizer=L2Decay(0.))))
mask_conv.add_sublayer(conv_name + 'act', ReLU())
mask_conv.add_sublayer(
'conv5_mask',
Conv2DTranspose(
in_channels=self.feat_in,
out_channels=self.feat_out,
kernel_size=2,
stride=2,
weight_attr=ParamAttr(
initializer=KaimingNormal(fan_in=fan_deconv)),
bias_attr=ParamAttr(
learning_rate=2., regularizer=L2Decay(0.))))
mask_conv.add_sublayer('conv5_mask' + 'act', ReLU())
upsample = self.add_sublayer(name, mask_conv)
self.upsample_module.append(upsample)
def forward(self,
body_feats,
bboxes,
bbox_feat,
mask_index,
spatial_scale,
stage=0,
bbox_head_feat_func=None,
mode='train'):
if self.share_bbox_feat and mask_index is not None:
rois_feat = paddle.gather(bbox_feat, mask_index)
else:
rois_feat = self.mask_roi_extractor(body_feats, bboxes,
spatial_scale)
if self.share_bbox_feat and bbox_head_feat_func is not None and mode == 'infer':
rois_feat = bbox_head_feat_func(rois_feat)
# upsample
mask_feat = self.upsample_module[stage](rois_feat)
return mask_feat
掩膜头类
@register
class MaskHead(Layer):
__shared__ = ['num_classes', 'mask_num_stages']
__inject__ = ['mask_feat']
def __init__(self,
mask_feat,
feat_in=256,
num_classes=81,
mask_num_stages=1):
super(MaskHead, self).__init__()
self.mask_feat = mask_feat
self.feat_in = feat_in
self.num_classes = num_classes
self.mask_num_stages = mask_num_stages
self.mask_fcn_logits = []
for i in range(self.mask_num_stages):
name = 'mask_fcn_logits_{}'.format(i)
self.mask_fcn_logits.append(
self.add_sublayer(
name,
Conv2D(
in_channels=self.feat_in,
out_channels=self.num_classes,
kernel_size=1,
weight_attr=ParamAttr(initializer=KaimingNormal(
fan_in=self.num_classes)),
bias_attr=ParamAttr(
learning_rate=2., regularizer=L2Decay(0.0)))))
# 训练网络时
def forward_train(self,
body_feats,
bboxes,
bbox_feat,
mask_index,
spatial_scale,
stage=0):
# feat
mask_feat = self.mask_feat(
body_feats,
bboxes,
bbox_feat,
mask_index,
spatial_scale,
stage,
mode='train')
# logits
mask_head_out = self.mask_fcn_logits[stage](mask_feat)
return mask_head_out
# 测试网络时
def forward_test(self,
scale_factor,
body_feats,
bboxes,
bbox_feat,
mask_index,
spatial_scale,
stage=0,
bbox_head_feat_func=None):
bbox, bbox_num = bboxes
if bbox.shape[0] == 0:
mask_head_out = paddle.full([1, 6], -1)
else:
scale_factor_list = []
for idx in range(bbox_num.shape[0]):
num = bbox_num[idx]
scale = scale_factor[idx, 0]
ones = paddle.ones(num)
scale_expand = ones * scale
scale_factor_list.append(scale_expand)
scale_factor_list = paddle.cast(
paddle.concat(scale_factor_list), 'float32')
scale_factor_list = paddle.reshape(scale_factor_list, shape=[-1, 1])
scaled_bbox = paddle.multiply(bbox[:, 2:], scale_factor_list)
scaled_bboxes = (scaled_bbox, bbox_num)
mask_feat = self.mask_feat(
body_feats,
scaled_bboxes,
bbox_feat,
mask_index,
spatial_scale,
stage,
bbox_head_feat_func,
mode='infer')
mask_logit = self.mask_fcn_logits[stage](mask_feat)
mask_head_out = F.sigmoid(mask_logit)
return mask_head_out
# 前向推理
def forward(self,
inputs,
body_feats,
bboxes,
bbox_feat,
mask_index,
spatial_scale,
bbox_head_feat_func=None,
stage=0):
if inputs['mode'] == 'train':
mask_head_out = self.forward_train(body_feats, bboxes, bbox_feat,
mask_index, spatial_scale, stage)
else:
scale_factor = inputs['scale_factor']
mask_head_out = self.forward_test(
scale_factor, body_feats, bboxes, bbox_feat, mask_index,
spatial_scale, stage, bbox_head_feat_func)
return mask_head_out
#掩膜损失
def get_loss(self, mask_head_out, mask_target):
mask_logits = paddle.flatten(mask_head_out, start_axis=1, stop_axis=-1)
mask_label = paddle.cast(x=mask_target, dtype='float32')
mask_label.stop_gradient = True
loss_mask = ops.sigmoid_cross_entropy_with_logits(
input=mask_logits,
label=mask_label,
ignore_index=-1,
normalize=True)
loss_mask = paddle.sum(loss_mask)
return {'loss_mask': loss_mask}
然后是MaskRcnn的Post_process部分
post_process.py源码解析:
在yaml的配置文件:/configs/_base_/models/mask_rcnn_r50_fpn.yml
'''
BBoxPostProcess: #BBox后处理
decode: #解码
name: RCNNBox # RCNNBox类名
num_classes: 81 #物体类别+北京类
batch_size: 1 #输入batch数
nms: #非极大值抑制
name: MultiClassNMS #非极大值抑制类名
keep_top_k: 100 #最多框数
score_threshold: 0.05 #置信度阈值
nms_threshold: 0.5 #iou阈值
MaskPostProcess: #掩膜后处理
mask_resolution: 28 #掩膜像素值
'''
相关引用库:
import numpy as np
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from ppdet.core.workspace import register
from ppdet.py_op.post_process import mask_post_process
from . import ops
bbox后处理类
@register
class BBoxPostProcess(object):
__inject__ = ['decode', 'nms']
def __init__(self, decode=None, nms=None):
super(BBoxPostProcess, self).__init__()
self.decode = decode
self.nms = nms
def __call__(self, head_out, rois, im_shape, scale_factor=None):
bboxes, score = self.decode(head_out, rois, im_shape, scale_factor)
bbox_pred, bbox_num, _ = self.nms(bboxes, score)
return bbox_pred, bbox_num
掩膜后处理类
@register
class MaskPostProcess(object):
__shared__ = ['mask_resolution']
def __init__(self, mask_resolution=28, binary_thresh=0.5):
super(MaskPostProcess, self).__init__()
self.mask_resolution = mask_resolution
self.binary_thresh = binary_thresh
def __call__(self, bboxes, mask_head_out, im_shape, scale_factor=None):
# TODO: modify related ops for deploying
bboxes_np = (i.numpy() for i in bboxes)
mask = mask_post_process(bboxes_np,
mask_head_out.numpy(),
im_shape.numpy(), scale_factor[:, 0].numpy(),
self.mask_resolution, self.binary_thresh)
mask = {'mask': mask}
return mask
优化器部分对应源码解析:
ppdet/optimizer.py源码解析:
在yaml的配置文件:./_base_/optimizers/rcnn_1x.yml
'''
#./_base_/optimizers/rcnn_1x.yml
epoch: 12
LearningRate: #学习率类名
# 初始学习率, 一般情况下8卡gpu,batch size为2时设置为0.02
# 可以根据具体情况,按比例调整
# 比如说4卡V100,bs=2时,设置为0.01
base_lr: 0.01 #学习率
schedulers: #实例化优化器策略
- !PiecewiseDecay #分段式衰减
gamma: 0.1 #衰减系数
milestones: [8, 11] #衰减点[列表]
- !LinearWarmup #学习率从非常小的数值线性增加到预设值之后,然后再线性减小。
start_factor: 0.3333333333333333 #初始值
steps: 500 #线性增长步长
OptimizerBuilder: #构建优化器
optimizer: #优化器
momentum: 0.9 #动量系数
type: Momentum #类型
regularizer: #正则初始化
factor: 0.0001 #正则系数
type: L2 #L2正则
'''
相关引用库:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import paddle
import paddle.nn as nn
import paddle.optimizer as optimizer
import paddle.regularizer as regularizer
from paddle import cos
from ppdet.core.workspace import register, serializable
__all__ = ['LearningRate', 'OptimizerBuilder']
from ppdet.utils.logger import setup_logger
logger = setup_logger(__name__)
分段式衰减模块(调整学习率)
@serializable
class PiecewiseDecay(object):
"""
Multi step learning rate decay
Args:
gamma (float | list): decay factor
milestones (list): steps at which to decay learning rate
"""
def __init__(self, gamma=[0.1, 0.01], milestones=[8, 11]):
super(PiecewiseDecay, self).__init__()
if type(gamma) is not list:
self.gamma = []
for i in range(len(milestones)):
self.gamma.append(gamma / 10**i)
else:
self.gamma = gamma
self.milestones = milestones
def __call__(self,
base_lr=None,
boundary=None,
value=None,
step_per_epoch=None):
if boundary is not None:
boundary.extend([int(step_per_epoch) * i for i in self.milestones])
if value is not None:
for i in self.gamma:
value.append(base_lr * i)
return optimizer.lr.PiecewiseDecay(boundary, value)
线性预热模块(调整学习率)
@serializable
class LinearWarmup(object):
"""
Warm up learning rate linearly
Args:
steps (int): warm up steps
start_factor (float): initial learning rate factor
"""
def __init__(self, steps=500, start_factor=1. / 3):
super(LinearWarmup, self).__init__()
self.steps = steps
self.start_factor = start_factor
def __call__(self, base_lr):
boundary = []
value = []
for i in range(self.steps + 1):
alpha = i / self.steps
factor = self.start_factor * (1 - alpha) + alpha
lr = base_lr * factor
value.append(lr)
if i > 0:
boundary.append(i)
return boundary, value
学习率优化模块(将上面两种学习率优化方法调入)
@register
class LearningRate(object):
"""
Learning Rate configuration
Args:
base_lr (float): base learning rate
schedulers (list): learning rate schedulers
"""
__category__ = 'optim'
def __init__(self,
base_lr=0.01,
schedulers=[PiecewiseDecay(), LinearWarmup()]):
super(LearningRate, self).__init__()
self.base_lr = base_lr
self.schedulers = schedulers
def __call__(self, step_per_epoch):
# TODO: split warmup & decay
# warmup
boundary, value = self.schedulers[1](self.base_lr)
# decay
decay_lr = self.schedulers[0](self.base_lr, boundary, value,
step_per_epoch)
return decay_lr
优化器模块(学习率优化模块调入)
@register
class OptimizerBuilder():
"""
Build optimizer handles
Args:
regularizer (object): an `Regularizer` instance
optimizer (object): an `Optimizer` instance
"""
__category__ = 'optim'
def __init__(self,
clip_grad_by_norm=None,
regularizer={'type': 'L2',
'factor': .0001},
optimizer={'type': 'Momentum',
'momentum': .9}):
self.clip_grad_by_norm = clip_grad_by_norm
self.regularizer = regularizer
self.optimizer = optimizer
def __call__(self, learning_rate, params=None):
if self.clip_grad_by_norm is not None:
grad_clip = nn.GradientClipByGlobalNorm(
clip_norm=self.clip_grad_by_norm)
else:
grad_clip = None
if self.regularizer:
reg_type = self.regularizer['type'] + 'Decay'
reg_factor = self.regularizer['factor']
regularization = getattr(regularizer, reg_type)(reg_factor)
else:
regularization = None
optim_args = self.optimizer.copy()
optim_type = optim_args['type']
del optim_args['type']
op = getattr(optimizer, optim_type)
return op(learning_rate=learning_rate,
parameters=params,
weight_decay=regularization,
grad_clip=grad_clip,
**optim_args)
由此整个Mack Rcnn完整的pipeline就是这样通过yaml文件构建好了,后面根据train、test、val的具体应用情况来拔插相应的模块。
标签:__,feat,self,mask,MaskRcnn,PaddleDetection,num,bbox,优化 来源: https://blog.csdn.net/qq_45684033/article/details/122149350