原文:目标检测YOLO、SSD、RetinaNet、Faster RCNN、Mask RCNN(2) - 2019.01.07

作者:油腻小年轻 - 简书

学习,记录备忘.

RetinaNet 是来自Facebook AI Research 团队2018 年的新作,主要贡献成员有Tsung-Yi Lin, Priya Goyal, Ross Girshick, Kaiming He, Piotr Dollár.

在当前的目标检测领域是最强的网络(速度/精度/复杂度). 下面两张是基于COCO 本地测试集的实验数据:

image

image

可以看出在one stage的网络中,retinanet的精度是已经可以和two stage的网络媲美,甚至超越了一些two stage的经典网络.

论文: Focal Loss for Dense Object Detection - ICCV2017

1. Focal Loss

Focal Loss 论文理解及公式推导 - AIUAI

CaffeLoss - FocalLossLayer - AIUAI

作为 one stage 的网络,正负样例分布不均匀对于网络精度的潜在影响一直不可忽视. 举个简单的例子说明一下. 当识别一幅图片时,one stage会生成大量的Anchor(候选框),这其中只有少数是正例(物体),大部分的候选框只是框住了背景,在计算损失的时候,网络的loss会被大量的负例(背景)左右. two stage的网络在这方面会做的好一些,因为他们会在一开始将 anchors 做二分类,这等于做了一个初筛,这样一来就降低了正负样本分布不均匀的情况.
retinanet 通过改变分类损失计算公式,很大程度上解决了分布不均匀带来的影响,计算公式如下:

$$ FL(p_t) = - \alpha _t (1 - p_t)^{\gamma} log(p_t) $$

让我们从头说起. 二分类误差一般采用cross entropy(CE)交叉熵,对于熵这个概念不了解的可以看一下信息论的基础知识,它的计算公式如下:

$$ CE(p, y) = \begin{cases} -log(p) &\text{if } y=1 \\ -log(1-p) &\text{otherwise} \end{cases} $$

进一步泛化为:

$$ p_t = \begin{cases} p &\text{if } y=1 \\ 1-p &\text{otherwise} \end{cases} $$

此时,其交叉熵损失公式为:

$$ CE(p, y) = CE(p_t) = -log(p_t) $$

一个常用的平衡类别不均的方法是,加上一个权重 $\alpha \in [0, 1]$:

$$ CE(p_t) = - \alpha _t log(p_t) $$

Focal Loss 就是 $CE(p_t)$ 的基础上再加上一个权重 $(1 - p_t)^{\gamma}$.

为啥加一个权重就能发挥如此大的作用,可以举一个例子说明:

假设 $\alpha = 0.25, \gamma=2$,

前景的概率是 $p=0.9$,那么现在的交叉熵是

CE(foreground) = -log(0.9) = 0.1053

CE(background) = -log(1–0.1) = 0.1053

FL(foreground) = -1 x 0.25 x (1–0.9)^2 x log(0.9) = 0.00026

FL(background) = -1 x 0.25 x (1–(1–0.1))^2 x log(1–0.1) = 0.00026

损失变成了原来的 1/384: 0.1/0.00026 = 384

如果前景的概率是 $p=0.1$,那么现在的交叉熵是

CE(foreground) = -log(0.1) = 2.3025

CE(background) = -log(1–0.9) = 2.30250

FL(foreground) = -1 x 0.25 x (1–0.1)** 2 x log(0.1) = 0.4667

FL(background) = -1 x 0.25 x (1–(1–0.9))** 2 x log(1–0.9) = 0.4667

损失变成了原来的 1/5: 2.3/0.4667 = 5.

文章中也对 $\alpha$ 和 $\gamma$ 的取值做了实验,得出结论是将 $\gamma=2$ 时效果最好:

作者也进行了一系列实验,发现 Focal Loss 如果应用在其他 one stage 网络中也有不错的效果.

2. 网络结构

这么强大的性能,网络结构却十分简单,可以归纳为: resnet(backbone) + FPN + FCN.

image

<Github - fizyr/keras-retinanet>

为了方便更好的理解网络细节,从 retinanet的keras版本 进行介绍,这份源码是作者目前看到写的最清晰的一篇,代码的主要贡献者 Hans Gaiser 就职于一家做包裹分拣和处理的公司 Fizyr,主要用到的技术就是深度学习和计算机视觉的一些知识.

图像经过resnet主干网络,每经过一个 res_block,图像的size都要缩小一半:

image

retinanet 选取最后的5层构建 anchors,我们可以给它们命名为[C3, C4, C5, P6, P7]:

image

这时, FPN 登场,通过结合多层的特征信息,网络能够更好的处理小的目标;同时融合了深层语义信息和浅层的图片细节(局部特征,目标定位等)信息,网络的准确性得到进一步提升.

image

C3,C4,C5 都经过一个卷积层后后得到了P3,P4,P5,然后我们将每个"金字塔"(P3-7)都接上输出,再将他们都连接起来就得到了最终的结果:

image

每一层的输出如下:

image

3. RetinaNet - Keras 源码

3.1. backbone

代码支持resnet、mobilenet、vgg、densenet:

def backbone(backbone_name):
    """ 
    Returns a backbone object for the given backbone.
    """
    if 'resnet' in backbone_name:
        from .resnet import ResNetBackbone as b
    elif 'mobilenet' in backbone_name:
        from .mobilenet import MobileNetBackbone as b
    elif 'vgg' in backbone_name:
        from .vgg import VGGBackbone as b
    elif 'densenet' in backbone_name:
        from .densenet import DenseNetBackbone as b
    else:
        raise NotImplementedError('Backbone class for  \'{}\' not implemented.'.format(backbone))

    return b(backbone_name)

以论文中采用的 resnet 为例:

def retinanet(self, *args, **kwargs):
    """ 
    Returns a retinanet model using the correct backbone.
    """
    return resnet_retinanet(*args, backbone=self.backbone, **kwargs)

def resnet_retinanet(num_classes, 
                     backbone='resnet50', 
                     inputs=None, 
                     modifier=None, **kwargs):
    """ 
    Constructs a retinanet model using a resnet backbone.

    Args
        num_classes: Number of classes to predict.
        backbone: Which backbone to use (one of ('resnet50', 'resnet101', 'resnet152')).
        inputs: The inputs to the network (defaults to a Tensor of shape (None, None, 3)).
        modifier: A function handler which can modify the backbone before using it in retinanet (this can be used to freeze backbone layers for example).

    Returns
        RetinaNet model with a ResNet backbone.
    """
    # choose default input
    if inputs is None:
        if keras.backend.image_data_format() == 'channels_first':
            inputs = keras.layers.Input(shape=(3, None, None))
        else:
            inputs = keras.layers.Input(shape=(None, None, 3))

    # create the resnet backbone
    if backbone == 'resnet50':
        resnet = keras_resnet.models.ResNet50(
            inputs, include_top=False, freeze_bn=True)
    elif backbone == 'resnet101':
        resnet = keras_resnet.models.ResNet101(
            inputs, include_top=False, freeze_bn=True)
    elif backbone == 'resnet152':
        resnet = keras_resnet.models.ResNet152(
            inputs, include_top=False, freeze_bn=True)
    else:
        raise ValueError('Backbone (\'{}\') is invalid.'.format(backbone))

    # invoke modifier if given
    if modifier:
        resnet = modifier(resnet)

    # create the full model
    return retinanet.retinanet(inputs=inputs, 
                               num_classes=num_classes, 
                               backbone_layers=resnet.outputs[1:], **kwargs)

接下来看如何构造整个网络的模型:

def retinanet(
    inputs,
    backbone_layers,
    num_classes,
    num_anchors             = None,
    create_pyramid_features = __create_pyramid_features,
    submodels               = None,
    name                    = 'retinanet'
):
    """ 
    Construct a RetinaNet model on top of a backbone.

    This model is the minimum model necessary for training (with the unfortunate exception of anchors as output).

    Args
        inputs: keras.layers.Input (or list of) for the input to the model.
        num_classes : Number of classes to classify.
        num_anchors: Number of base anchors.
        create_pyramid_features : Functor for creating pyramid features given the features C3, C4, C5 from the backbone.
        submodels: Submodels to run on each feature map (default is regression and classification submodels).
        name: Name of the model.

    Returns
        A keras.models.Model which takes an image as input and outputs generated anchors and the result from each submodel on every pyramid level.

        The order of the outputs is as defined in submodels:
        [
        regression, classification, other[0], other[1], ...
        ]
   """
    if num_anchors is None:
    num_anchors = AnchorParameters.default.num_anchors()

    if submodels is None:
        submodels = default_submodels(num_classes, num_anchors)

    C3, C4, C5 = backbone_layers

    # compute pyramid features as per 
    #     https://arxiv.org/abs/1708.02002
    features = create_pyramid_features(C3, C4, C5)

    # for all pyramid levels, run available submodels
    pyramids = __build_pyramid(submodels, features)

    return keras.models.Model(
        inputs=inputs, 
        outputs=pyramids, 
        name=name)

submodels 就是每个金字塔层接上的输出,其结果用来做分类和目标定位:

def default_submodels(num_classes, num_anchors):
    """ 
    Create a list of default submodels used for object detection.

    The default submodels contains a regression submodel and a classification submodel.

    Args
        num_classes : Number of classes to use.
        num_anchors : Number of base anchors.

    Returns
        A list of tuple, where the first element is the name of the submodel and the second element is the submodel itself.
    """
    return [
        ('regression', default_regression_model(4, num_anchors)),
        ('classification', default_classification_model(num_classes, num_anchors))
    ]

回归模型:

def default_regression_model(num_values, 
                             num_anchors, 
                             pyramid_feature_size=256, 
                             regression_feature_size=256, 
                             name='regression_submodel'):
    """ 
    Creates the default regression submodel.

    Args
        num_values: Number of values to regress.
        num_anchors: Number of anchors to regress for each feature level.
        pyramid_feature_size: The number of filters to expect from the feature pyramid levels.
        regression_feature_size: The number of filters to use in the layers in the regression submodel.
        name: The name of the submodel.

    Returns
        A keras.models.Model that predicts regression values for each anchor.
    """
    # All new conv layers except the final one in the
    # RetinaNet (classification) subnets are initialized
    # with bias b = 0 and a Gaussian weight fill with stddev = 0.01.
    options = {
        'kernel_size': 3,
        'strides': 1,
        'padding': 'same',
        'kernel_initializer': keras.initializers.normal(mean=0.0, stddev=0.01, seed=None),
        'bias_initializer': 'zeros'
    }

    if keras.backend.image_data_format() == 'channels_first':
        inputs  = keras.layers.Input(
            shape=(pyramid_feature_size, None, None))
    else:
        inputs  = keras.layers.Input(
            shape=(None, None, pyramid_feature_size))
    outputs = inputs
    for i in range(4):
        outputs = keras.layers.Conv2D(
            filters=regression_feature_size,
            activation='relu',
            name='pyramid_regression_{}'.format(i),
            **options
        )(outputs)

    outputs = keras.layers.Conv2D(
        num_anchors * num_values, 
        name='pyramid_regression', **options)(outputs)
    if keras.backend.image_data_format() == 'channels_first':
        outputs = keras.layers.Permute(
            (2, 3, 1), 
            name='pyramid_regression_permute')(outputs)
    outputs = keras.layers.Reshape(
        (-1, num_values), 
        name='pyramid_regression_reshape')(outputs)

    return keras.models.Model(inputs=inputs, outputs=outputs, name=name)

金字塔的输出进入sub_model的回归层,历经 4 个 256 3 3,stride为 1 的卷积层,再做个reshape 的操作,得到 w h anchor_size * 4的输出.

再来看分类模型:

def default_classification_model(
    num_classes,
    num_anchors,
    pyramid_feature_size=256,
    prior_probability=0.01,
    classification_feature_size=256,
    name='classification_submodel'
):
    """ 
    Creates the default regression submodel.

    Args
        num_classes: Number of classes to predict a score for at each feature level.
        num_anchors: Number of anchors to predict classification scores for at each feature level.
        pyramid_feature_size: The number of filters to expect from the feature pyramid levels.
        classification_feature_size: The number of filters to use in the layers in the classification submodel.
        nam: The name of the submodel.

    Returns
        A keras.models.Model that predicts classes for each anchor.
    """
    options = {
        'kernel_size' : 3,
        'strides'     : 1,
        'padding'     : 'same',
    }

    if keras.backend.image_data_format() == 'channels_first':
        inputs  = keras.layers.Input(
            shape=(pyramid_feature_size, None, None))
    else:
        inputs  = keras.layers.Input(
            shape=(None, None, pyramid_feature_size))
    outputs = inputs
    for i in range(4):
        outputs = keras.layers.Conv2D(
            filters=classification_feature_size,
            activation='relu',
            name='pyramid_classification_{}'.format(i),
            kernel_initializer=keras.initializers.normal(mean=0.0, stddev=0.01, seed=None),
            bias_initializer='zeros',
            **options
        )(outputs)

    outputs = keras.layers.Conv2D(
        filters=num_classes * num_anchors,
        kernel_initializer=keras.initializers.normal(
            mean=0.0, stddev=0.01, seed=None),
        bias_initializer=initializers.PriorProbability(
            probability=prior_probability),
        name='pyramid_classification',
        **options
    )(outputs)

    # reshape output and apply sigmoid
    if keras.backend.image_data_format() == 'channels_first':
        outputs = keras.layers.Permute(
            (2, 3, 1), 
            name='pyramid_classification_permute')(outputs)
    outputs = keras.layers.Reshape(
        (-1, num_classes), 
        name='pyramid_classification_reshape')(outputs)
    outputs = keras.layers.Activation(
        'sigmoid', 
        name='pyramid_classification_sigmoid')(outputs)

    return keras.models.Model(inputs=inputs, outputs=outputs, name=name)

金字塔的输出进入sub_model的分类层,先历经4个256 3 3,stride为1的卷积层,再通过一个number_classes(目标类别的数量) 9(每个像素生成的anchor的数量) 3 3,stride为1的卷积层,然后reshape成 w h 9 number_classes的张量,最后经过一个sigmod激活层,得到每个anchor的分类.

再看一下金字塔的构建:

def __create_pyramid_features(C3, C4, C5, feature_size=256):
    """ 
    Creates the FPN layers on top of the backbone features.

    Args
        C3: Feature stage C3 from the backbone.
        C4: Feature stage C4 from the backbone.
        C5: Feature stage C5 from the backbone.
        feature_size : The feature size to use for the resulting feature levels.

    Returns
        A list of feature levels [P3, P4, P5, P6, P7].
    """
    # upsample C5 to get P5 from the FPN paper
    P5= keras.layers.Conv2D(feature_size, 
                            kernel_size=1, 
                            strides=1, 
                            padding='same', 
                            name='C5_reduced')(C5)
    P5_upsampled = layers.UpsampleLike(name='P5_upsampled')([P5, C4])
    P5 = keras.layers.Conv2D(feature_size, 
                             kernel_size=3, 
                             strides=1, 
                             padding='same', 
                             name='P5')(P5)

    # add P5 elementwise to C4
    P4= keras.layers.Conv2D(feature_size, 
                            kernel_size=1,
                            strides=1, 
                            padding='same', 
                            name='C4_reduced')(C4)
    P4 = keras.layers.Add(name='P4_merged')([P5_upsampled, P4])
    P4_upsampled = layers.UpsampleLike(name='P4_upsampled')([P4, C3])
    P4 = keras.layers.Conv2D(feature_size, 
                             kernel_size=3, 
                             strides=1, 
                             padding='same', 
                             name='P4')(P4)

    # add P4 elementwise to C3
    P3 = keras.layers.Conv2D(feature_size, 
                             kernel_size=1, 
                             strides=1, 
                             padding='same', 
                             name='C3_reduced')(C3)
    P3 = keras.layers.Add(name='P3_merged')([P4_upsampled, P3])
    P3 = keras.layers.Conv2D(feature_size, 
                             kernel_size=3, 
                             strides=1, 
                             padding='same', 
                             name='P3')(P3)

    # "P6 is obtained via a 3x3 stride-2 conv on C5"
    P6 = keras.layers.Conv2D(feature_size, 
                             kernel_size=3, 
                             strides=2, 
                             padding='same', 
                             name='P6')(C5)

    # "P7 is computed by applying ReLU followed by a 3x3 stride-2 conv on P6"
    P7 = keras.layers.Activation('relu', name='C6_relu')(P6)
    P7 = keras.layers.Conv2D(feature_size, 
                             kernel_size=3, 
                             strides=2, 
                             padding='same', 
                             name='P7')(P7)

    return [P3, P4, P5, P6, P7]
  • C3,C4,C5先经过一个256 1 1,stride为1的卷积层
  • C5经过一个256 3 3,stride为1的卷积层得到P5
  • C4和C5的上采样层相加,再过一个256 3 3,stride为1的卷积层得到P4
  • C3和C4的上采样层相加,再过一个256 3 3,stride为1的卷积层得到P3

此时我们得到了[P3, P4, P5, P6, P7].

最后一步,就是将每个金字塔层和sub_model连接起来:

def __build_pyramid(models, features):
    """ 
    Applies all submodels to each FPN level.

    Args
        models: List of sumodels to run on each pyramid level (by default only regression, classifcation).
        features : The FPN features.

    Returns
        A list of tensors, one for each submodel.
    """
    return [__build_model_pyramid(n, m, features) for n, m in models]


def __build_model_pyramid(name, model, features):
    """ 
    Applies a single submodel to each FPN level.

    Args
        name: Name of the submodel.
        model: The submodel to evaluate.
        features: The FPN features.

    Returns
        A tensor containing the response from the submodel on the FPN features.
    """
    return keras.layers.Concatenate(axis=1, name=name)([model(f) for f in features])

3.2. predict

看完了训练用的model结构,再看一下预测用的model:

# make prediction model
prediction_model = retinanet_bbox(
    model=model, anchor_params=anchor_params)

def retinanet_bbox(
    model = None,
    nms = True,
    class_specific_filter = True,
    name = 'retinanet-bbox',
    anchor_params = None,
    **kwargs
):
    """ 
    Construct a RetinaNet model on top of a backbone and 
    adds convenience functions to output boxes directly.

    This model uses the minimum retinanet model and appends a few layers to compute boxes within the graph.
    These layers include applying the regression values to the anchors and performing NMS.

    Args
        model: RetinaNet model to append bbox layers to. If None, it will create a RetinaNet model using **kwargs.
        nms : Whether to use non-maximum suppression for the filtering step.
        class_specific_filter: Whether to use class specific filtering or filter for the best scoring class only.
        name: Name of the model.
        anchor_params: Struct containing anchor parameters. If None, default values are used.
        *kwargs : Additional kwargs to pass to the minimal retinanet model.

    Returns
        A keras.models.Model which takes an image as input and outputs the detections on the image.

        The order is defined as follows:
        [
            boxes, scores, labels, other[0], other[1], ..
        ]
    """
    
    # if no anchor parameters are passed, use default values
    if anchor_params is None:
        anchor_params = AnchorParameters.default
    
    # create RetinaNet model
    if model is None:
        model = retinanet(num_anchors=anchor_params.num_anchors(), **kwargs)
    else:
        assert_training_model(model)
    
    # compute the anchors
    features = [model.get_layer(p_name).output for p_name in ['P3', 'P4', 'P5', 'P6', 'P7']]
    anchors  = __build_anchors(anchor_params, features)
    
    # we expect the anchors, regression and classification values as first output
    regression     = model.outputs[0]
    classification = model.outputs[1]
    
    # "other" can be any additional output from custom submodels, by default this will be []
    other = model.outputs[2:]
    
    # apply predicted regression to anchors
    boxes = layers.RegressBoxes(name='boxes')([anchors, regression])
    boxes = layers.ClipBoxes(name='clipped_boxes')([model.inputs[0], boxes])
    
    # filter detections (apply NMS / score threshold / select top-k)
    detections = layers.FilterDetections(
        nms                   = nms,
        class_specific_filter = class_specific_filter,
        name                  = 'filtered_detections'
    )([boxes, classification] + other)
    
    # construct the model
    return keras.models.Model(inputs=model.inputs, outputs=detections, name=name)

详细步骤如下:

[1] - 获取features,也就是[P3,P4,P5,P6,P7]的输出,在每个输出的图上构建的anchors,构建anchors的代码:

def __build_anchors(anchor_parameters, features):
    """ 
    Builds anchors for the shape of the features from FPN.

    Args
        anchor_parameters : Parameteres that determine how anchors are generated.
        features          : The FPN features.
    
    Returns
        A tensor containing the anchors for the FPN features.
    
        The shape is:
        (batch_size, num_anchors, 4)
    """
    anchors = [
        layers.Anchors(
            size=anchor_parameters.sizes[i],
            stride=anchor_parameters.strides[i],
            ratios=anchor_parameters.ratios,
            scales=anchor_parameters.scales,
            name='anchors_{}'.format(i)
        )(f) for i, f in enumerate(features)
    ]
    
    return keras.layers.Concatenate(axis=1, name='anchors')(anchors)

看一下layers.Anchors这个类:

class Anchors(keras.layers.Layer):
    """ 
    Keras layer for generating achors for a given shape.
    """

    def __init__(self, size, stride, ratios=None, scales=None, *args, **kwargs):
        """ Initializer for an Anchors layer.

        Args
            size: The base size of the anchors to generate.
            stride: The stride of the anchors to generate.
            ratios: The ratios of the anchors to generate (defaults to AnchorParameters.default.ratios).
            scales: The scales of the anchors to generate (defaults to AnchorParameters.default.scales).
        """
        self.size   = size
        self.stride = stride
        self.ratios = ratios
        self.scales = scales

        if ratios is None:
            self.ratios  = utils_anchors.AnchorParameters.default.ratios
        elif isinstance(ratios, list):
            self.ratios  = np.array(ratios)
        if scales is None:
            self.scales  = utils_anchors.AnchorParameters.default.scales
        elif isinstance(scales, list):
            self.scales  = np.array(scales)

        self.num_anchors = len(ratios) * len(scales)
        self.anchors     = keras.backend.variable(utils_anchors.generate_anchors(
            base_size=size,
            ratios=ratios,
            scales=scales,
        ))

        super(Anchors, self).__init__(*args, **kwargs)

    def call(self, inputs, **kwargs):
        features = inputs
        features_shape = keras.backend.shape(features)

        # generate proposals from bbox deltas and shifted anchors
        if keras.backend.image_data_format() == 'channels_first':
            anchors = backend.shift(features_shape[2:4], self.stride, self.anchors)
        else:
            anchors = backend.shift(features_shape[1:3], self.stride, self.anchors)
        anchors = keras.backend.tile(
            keras.backend.expand_dims(anchors, axis=0), (features_shape[0], 1, 1))

        return anchors

    def compute_output_shape(self, input_shape):
        if None not in input_shape[1:]:
            if keras.backend.image_data_format() == 'channels_first':
                total = np.prod(input_shape[2:4]) * self.num_anchors
            else:
                total = np.prod(input_shape[1:3]) * self.num_anchors

            return (input_shape[0], total, 4)
        else:
            return (input_shape[0], None, 4)

    def get_config(self):
        config = super(Anchors, self).get_config()
        config.update({
            'size'   : self.size,
            'stride' : self.stride,
            'ratios' : self.ratios.tolist(),
            'scales' : self.scales.tolist(),
        })

        return config

[2] - 将金字塔层的输出应用到anchors中去,得到anchors的坐标:

# apply predicted regression to anchors
boxes = layers.RegressBoxes(name='boxes')([anchors, regression])
boxes = layers.ClipBoxes(name='clipped_boxes')([model.inputs[0], boxes])

作者写了一个继承自keras.layers.layer的类RegressBoxes完成这个目标:

class RegressBoxes(keras.layers.Layer):
    """ 
    Keras layer for applying regression values to boxes.
    """

    def __init__(self, mean=None, std=None, *args, **kwargs):
        """ 
        Initializer for the RegressBoxes layer.

        Args
            mean: The mean value of the regression values which was used for normalization.
            std: The standard value of the regression values which was used for normalization.
        """
        if mean is None:
            mean = np.array([0, 0, 0, 0])
        if std is None:
            std = np.array([0.2, 0.2, 0.2, 0.2])

        if isinstance(mean, (list, tuple)):
            mean = np.array(mean)
        elif not isinstance(mean, np.ndarray):
            raise ValueError('Expected mean to be a np.ndarray, list or tuple. Received: {}'.format(type(mean)))

        if isinstance(std, (list, tuple)):
            std = np.array(std)
        elif not isinstance(std, np.ndarray):
            raise ValueError('Expected std to be a np.ndarray, list or tuple. Received: {}'.format(type(std)))

        self.mean = mean
        self.std  = std
        super(RegressBoxes, self).__init__(*args, **kwargs)

    def call(self, inputs, **kwargs):
        anchors, regression = inputs
        return backend.bbox_transform_inv(
            anchors, regression, mean=self.mean, std=self.std)

    def compute_output_shape(self, input_shape):
        return input_shape[0]

    def get_config(self):
        config = super(RegressBoxes, self).get_config()
        config.update({
            'mean': self.mean.tolist(),
            'std' : self.std.tolist(),
        })

        return config

重点来看这段代码就可以了解retinanet是如何利用金字塔的回归层生成目标的锚框的.

def bbox_transform_inv(boxes, deltas, mean=None, std=None):
    """ 
    Applies deltas (usually regression results) to boxes (usually anchors).

    Before applying the deltas to the boxes, 
      the normalization that was previously applied (in the generator) has to be removed.
    The mean and std are the mean and std as applied in the generator. 
    They are unnormalized in this function and then applied to the boxes.

    Args
        boxes : np.array of shape (B, N, 4), where B is the batch size, N the number of boxes and 4 values for (x1, y1, x2, y2).
        deltas: np.array of same shape as boxes. These deltas (d_x1, d_y1, d_x2, d_y2) are a factor of the width/height.
        mean: The mean value used when computing deltas (defaults to [0, 0, 0, 0]).
        std: The standard deviation used when computing deltas (defaults to [0.2, 0.2, 0.2, 0.2]).

    Returns
        A np.array of the same shape as boxes, but with deltas applied to each box.
        The mean and std are used during training to normalize the regression values (networks love normalization).
    """
    if mean is None:
        mean = [0, 0, 0, 0]
    if std is None:
        std = [0.2, 0.2, 0.2, 0.2]

    width  = boxes[:, :, 2] - boxes[:, :, 0]
    height = boxes[:, :, 3] - boxes[:, :, 1]

    x1 = boxes[:, :, 0] + (deltas[:, :, 0] * std[0] + mean[0]) * width
    y1 = boxes[:, :, 1] + (deltas[:, :, 1] * std[1] + mean[1]) * height
    x2 = boxes[:, :, 2] + (deltas[:, :, 2] * std[2] + mean[2]) * width
    y2 = boxes[:, :, 3] + (deltas[:, :, 3] * std[3] + mean[3]) * height

    pred_boxes = keras.backend.stack([x1, y1, x2, y2], axis=2)

    return pred_boxes

跟 YOLOV3,SSD不一样,我们可以看到 retinanet 直接预测目标锚框的左下角 (x1,y1) 和右上角 (x2,y2) 的坐标.

[3] - 最后就是筛选出目标锚框:

# filter detections (apply NMS / score threshold / select top-k)
detections = layers.FilterDetections(
        nms = nms,
        class_specific_filter = class_specific_filter,
        name = 'filtered_detections'
    )([boxes, classification] + other)

class FilterDetections(keras.layers.Layer):
    """ 
    Keras layer for filtering detections using score threshold and NMS.
    """

    def __init__(
        self,
        nms                   = True,
        class_specific_filter = True,
        nms_threshold         = 0.5,
        score_threshold       = 0.05,
        max_detections        = 300,
        parallel_iterations   = 32,
        **kwargs
    ):
        """ 
        Filters detections using score threshold, NMS and selecting the top-k detections.

        Args
            nms: Flag to enable/disable NMS.
            class_specific_filter: Whether to perform filtering per class, or take the best scoring class and filter those.
            nms_threshold: Threshold for the IoU value to determine when a box should be suppressed.
            score_threshold: Threshold used to prefilter the boxes with.
            max_detections: Maximum number of detections to keep.
            parallel_iterations: Number of batch items to process in parallel.
        """
        self.nms                   = nms
        self.class_specific_filter = class_specific_filter
        self.nms_threshold         = nms_threshold
        self.score_threshold       = score_threshold
        self.max_detections        = max_detections
        self.parallel_iterations   = parallel_iterations
        super(FilterDetections, self).__init__(**kwargs)

    def call(self, inputs, **kwargs):
        """ 
        Constructs the NMS graph.

        Args
            inputs : List of [boxes, classification, other[0], other[1], ...] tensors.
        """
        boxes          = inputs[0]
        classification = inputs[1]
        other          = inputs[2:]

        # wrap nms with our parameters
        def _filter_detections(args):
            boxes          = args[0]
            classification = args[1]
            other          = args[2]

            return filter_detections(
                boxes,
                classification,
                other,
                nms                   = self.nms,
                class_specific_filter = self.class_specific_filter,
                score_threshold       = self.score_threshold,
                max_detections        = self.max_detections,
                nms_threshold         = self.nms_threshold,
            )

        # call filter_detections on each batch
        outputs = backend.map_fn(
            _filter_detections,
            elems=[boxes, classification, other],
            dtype=[keras.backend.floatx(), keras.backend.floatx(), 'int32'] + [o.dtype for o in other],
            parallel_iterations=self.parallel_iterations
        )

        return outputs

    def compute_output_shape(self, input_shape):
        """ 
        Computes the output shapes given the input shapes.

        Args
            input_shape : List of input shapes [boxes, classification, other[0], other[1], ...].

        Returns
            List of tuples representing the output shapes:
            [filtered_boxes.shape, filtered_scores.shape, filtered_labels.shape, filtered_other[0].shape, filtered_other[1].shape, ...]
        """
        return [
            (input_shape[0][0], self.max_detections, 4),
            (input_shape[1][0], self.max_detections),
            (input_shape[1][0], self.max_detections),
        ] + [
            tuple([input_shape[i][0], self.max_detections] + list(input_shape[i][2:])) for i in range(2, len(input_shape))
        ]

    def compute_mask(self, inputs, mask=None):
        """ 
        This is required in Keras when there is more than 1 output.
        """
        return (len(inputs) + 1) * [None]

    def get_config(self):
        """ 
        Gets the configuration of this layer.

        Returns
            Dictionary containing the parameters of this layer.
        """
        config = super(FilterDetections, self).get_config()
        config.update({
            'nms'                   : self.nms,
            'class_specific_filter' : self.class_specific_filter,
            'nms_threshold'         : self.nms_threshold,
            'score_threshold'       : self.score_threshold,
            'max_detections'        : self.max_detections,
            'parallel_iterations'   : self.parallel_iterations,
        })

        return config

主要关注这个方法: filter_detections:

def filter_detections(
    boxes,
    classification,
    other                 = [],
    class_specific_filter = True,
    nms                   = True,
    score_threshold       = 0.05,
    max_detections        = 300,
    nms_threshold         = 0.5
):
    """ 
    Filter detections using the boxes and classification values.

    Args
        boxes: Tensor of shape (num_boxes, 4) containing the boxes in (x1, y1, x2, y2) format.
        classification: Tensor of shape (num_boxes, num_classes) containing the classification scores.
        other: List of tensors of shape (num_boxes, ...) to filter along with the boxes and classification scores.
        class_specific_filter : Whether to perform filtering per class, or take the best scoring class and filter those.
        nms: Flag to enable/disable non maximum suppression.
        score_threshold: Threshold used to prefilter the boxes with.
        max_detections: Maximum number of detections to keep.
        nms_threshold: Threshold for the IoU value to determine when a box should be suppressed.

    Returns
        A list of [boxes, scores, labels, other[0], other[1], ...].
        boxes is shaped (max_detections, 4) and contains the (x1, y1, x2, y2) of the non-suppressed boxes.
        scores is shaped (max_detections,) and contains the scores of the predicted class.
        labels is shaped (max_detections,) and contains the predicted label.
        other[i] is shaped (max_detections, ...) and contains the filtered other[i] data.
        In case there are less than max_detections detections, the tensors are padded with -1's.
    """
    def _filter_detections(scores, labels):
        # threshold based on score
        indices = backend.where(keras.backend.greater(scores, score_threshold))

        if nms:
            filtered_boxes = backend.gather_nd(boxes, indices)
            filtered_scores = keras.backend.gather(scores, indices)[:, 0]

            # perform NMS
            nms_indices = backend.non_max_suppression(
                filtered_boxes, 
                filtered_scores, 
                max_output_size=max_detections, 
                iou_threshold=nms_threshold)

            # filter indices based on NMS
            indices = keras.backend.gather(indices, nms_indices)

        # add indices to list of all indices
        labels = backend.gather_nd(labels, indices)
        indices = keras.backend.stack([indices[:, 0], labels], axis=1)

        return indices

    if class_specific_filter:
        all_indices = []
        # perform per class filtering
        for c in range(int(classification.shape[1])):
            scores = classification[:, c]
            labels = c * backend.ones((keras.backend.shape(scores)[0],), dtype='int64')
            all_indices.append(_filter_detections(scores, labels))

        # concatenate indices to single tensor
        indices = keras.backend.concatenate(all_indices, axis=0)
    else:
        scores  = keras.backend.max(classification, axis    = 1)
        labels  = keras.backend.argmax(classification, axis = 1)
        indices = _filter_detections(scores, labels)

    # select top k
    scores = backend.gather_nd(classification, indices)
    labels = indices[:, 1]
    scores, top_indices = backend.top_k(
        scores, 
        k=keras.backend.minimum(max_detections, keras.backend.shape(scores)[0]))

    # filter input using the final set of indices
    indices = keras.backend.gather(indices[:, 0], top_indices)
    boxes = keras.backend.gather(boxes, indices)
    labels = keras.backend.gather(labels, top_indices)
    other_ = [keras.backend.gather(o, indices) for o in other]

    # zero pad the outputs
    pad_size = keras.backend.maximum(0, max_detections - keras.backend.shape(scores)[0])
    boxes = backend.pad(boxes, [[0, pad_size], [0, 0]], constant_values=-1)
    scores = backend.pad(scores, [[0, pad_size]], constant_values=-1)
    labels = backend.pad(labels, [[0, pad_size]], constant_values=-1)
    labels = keras.backend.cast(labels, 'int32')
    other_ = [backend.pad(o, [[0, pad_size]] + [[0, 0] for _ in range(1, len(o.shape))], constant_values=-1) for o in other_]

    # set shapes, since we know what they are
    boxes.set_shape([max_detections, 4])
    scores.set_shape([max_detections])
    labels.set_shape([max_detections])
    for o, s in zip(other_, [list(keras.backend.int_shape(o)) for o in other]):
        o.set_shape([max_detections] + s[1:])

    return [boxes, scores, labels] + other_

这个 function 有以下几个步骤:

  • 判断是否需要对每个分类都做一个筛选,还是选出得分最高的那个分类
  • 筛那些得分低于0.05的目标
  • 应用NMS二次筛选
  • 3次筛选,选出Topk(k = min(300,筛选出来的目标数量))

3.3. loss

在 model.compile 这边定义 loss function 和 optimizer(Adam):

# compile model
training_model.compile(
    loss={
        'regression'    : losses.smooth_l1(),
        'classification': losses.focal()
    },
    optimizer=keras.optimizers.adam(lr=lr, clipnorm=0.001)
)

对于坐标的损失,采用现在比较流行的 smooth L1,其主要是预防梯度爆炸以及减少一些极端值的影响,代码如下:

def smooth_l1(sigma=3.0):
    """ 
    Create a smooth L1 loss functor.

    Args
        sigma: This argument defines the point where the loss changes from L2 to L1.

    Returns
        A functor for computing the smooth L1 loss given target data and predicted data.
    """
    sigma_squared = sigma ** 2

    def _smooth_l1(y_true, y_pred):
        """ 
        Compute the smooth L1 loss of y_pred w.r.t. y_true.

        Args
            y_true: Tensor from the generator of shape (B, N, 5). The last value for each box is the state of the anchor (ignore, negative, positive).
            y_pred: Tensor from the network of shape (B, N, 4).

        Returns
            The smooth L1 loss of y_pred w.r.t. y_true.
        """
        # separate target and state
        regression        = y_pred
        regression_target = y_true[:, :, :-1]
        anchor_state      = y_true[:, :, -1]

        # filter out "ignore" anchors
        indices = backend.where(keras.backend.equal(anchor_state, 1))
        regression = backend.gather_nd(regression, indices)
        regression_target = backend.gather_nd(regression_target, indices)

        # compute smooth L1 loss
        # f(x) = 0.5 * (sigma * x)^2        if |x| < 1 / sigma / sigma
        #        |x| - 0.5 / sigma / sigma  othe rwise
        regression_diff = regression - regression_target
        regression_diff = keras.backend.abs(regression_diff)
        regression_loss = backend.where(
            keras.backend.less(regression_diff, 1.0 / sigma_squared),
            0.5 * sigma_squared * keras.backend.pow(regression_diff, 2),
            regression_diff - 0.5 / sigma_squared
        )

        # compute the normalizer: the number of positive anchors
        normalizer = keras.backend.maximum(1, keras.backend.shape(indices)[0])
        normalizer = keras.backend.cast(normalizer, dtype=keras.backend.floatx())
        return keras.backend.sum(regression_loss) / normalizer

    return _smooth_l1

分类损失计算我们采用本文最突出的贡献 Focal loss:

def focal(alpha=0.25, gamma=2.0):
    """ 
    Create a functor for computing the focal loss.

    Args
        alpha: Scale the focal weight with alpha.
        gamma: Take the power of the focal weight with gamma.

    Returns
        A functor that computes the focal loss using the alpha and gamma.
    """
    def _focal(y_true, y_pred):
        """ 
        Compute the focal loss given the target tensor and the predicted tensor.

        As defined in https://arxiv.org/abs/1708.02002

        Args
            y_true: Tensor of target data from the generator with shape (B, N, num_classes).
            y_pred: Tensor of predicted data from the network with shape (B, N, num_classes).

        Returns
            The focal loss of y_pred w.r.t. y_true.
        """
        labels = y_true[:, :, :-1]
        anchor_state = y_true[:, :, -1]  # -1 for ignore, 0 for background, 1 for object
        classification = y_pred

        # filter out "ignore" anchors
        indices = backend.where(keras.backend.not_equal(anchor_state, -1))
        labels = backend.gather_nd(labels, indices)
        classification = backend.gather_nd(classification, indices)

        # compute the focal loss
        alpha_factor = keras.backend.ones_like(labels) * alpha
        alpha_factor = backend.where(keras.backend.equal(labels, 1), 
                                     alpha_factor, 
                                     1 - alpha_factor)
        focal_weight = backend.where(keras.backend.equal(labels, 1), 
                                     1 - classification, 
                                     classification)
        focal_weight = alpha_factor * focal_weight ** gamma

        cls_loss = focal_weight * keras.backend.binary_crossentropy(labels, classification)

        # compute the normalizer: the number of positive anchors
        normalizer = backend.where(keras.backend.equal(anchor_state, 1))
        normalizer = keras.backend.cast(keras.backend.shape(normalizer)[0], keras.backend.floatx())
        normalizer = keras.backend.maximum(keras.backend.cast_to_floatx(1.0), normalizer)

        return keras.backend.sum(cls_loss) / normalizer

    return _focal

这边还要讲一个细节: y_true是如何生成的:

[1] - 生成anchors:

def anchors_for_shape(
    image_shape,
    pyramid_levels=None,
    anchor_params=None,
    shapes_callback=None,
):
    """ 
    Generators anchors for a given shape.

    Args
        image_shape: The shape of the image.
        pyramid_levels: List of ints representing which pyramids to use (defaults to [3, 4, 5, 6, 7]).
        anchor_params: Struct containing anchor parameters. If None, default values are used.
        shapes_callback: Function to call for getting the shape of the image at different pyramid levels.

    Returns
        np.array of shape (N, 4) containing the (x1, y1, x2, y2) coordinates for the anchors.
    """

    if pyramid_levels is None:
        pyramid_levels = [3, 4, 5, 6, 7]

    if anchor_params is None:
        anchor_params = AnchorParameters.default

    if shapes_callback is None:
        shapes_callback = guess_shapes
    image_shapes = shapes_callback(image_shape, pyramid_levels)

    # compute anchors over all pyramid levels
    all_anchors = np.zeros((0, 4))
    for idx, p in enumerate(pyramid_levels):
        anchors = generate_anchors(
            base_size=anchor_params.sizes[idx],
            ratios=anchor_params.ratios,
            scales=anchor_params.scales
        )
        shifted_anchors = shift(image_shapes[idx], 
                                anchor_params.strides[idx], 
                                anchors)
        all_anchors = np.append(all_anchors, shifted_anchors, axis=0)

    return all_anchors

有两个关键的function:

  • (1) - generate_anchors 生成一个default anchor
  • (2) - 在图片上移动这个默认的anchor
# generate_anchors 生成一个default anchor
def generate_anchors(base_size=16, ratios=None, scales=None):
    """
    Generate anchor (reference) windows by enumerating aspect ratios X
    scales w.r.t. a reference window.
    """

    if ratios is None:
        ratios = AnchorParameters.default.ratios

    if scales is None:
        scales = AnchorParameters.default.scales

    num_anchors = len(ratios) * len(scales)

    # initialize output anchors
    anchors = np.zeros((num_anchors, 4))

    # scale base_size
    anchors[:, 2:] = base_size * np.tile(scales, (2, len(ratios))).T

    # compute areas of anchors
    areas = anchors[:, 2] * anchors[:, 3]

    # correct for ratios
    anchors[:, 2] = np.sqrt(areas / np.repeat(ratios, len(scales)))
    anchors[:, 3] = anchors[:, 2] * np.repeat(ratios, len(scales))

    # transform from (x_ctr, y_ctr, w, h) -> (x1, y1, x2, y2)
    anchors[:, 0::2] -= np.tile(anchors[:, 2] * 0.5, (2, 1)).T
    anchors[:, 1::2] -= np.tile(anchors[:, 3] * 0.5, (2, 1)).T

    return anchors
# 在图片上移动这个默认的anchor
def shift(shape, stride, anchors):
    """ 
    Produce shifted anchors based on shape of the map and stride size.

    Args
        shape : Shape to shift the anchors over.
        stride : Stride to shift the anchors with over the shape.
        anchors: The anchors to apply at each location.
    """

    # create a grid starting from half stride from the top left corner
    shift_x = (np.arange(0, shape[1]) + 0.5) * stride
    shift_y = (np.arange(0, shape[0]) + 0.5) * stride

    shift_x, shift_y = np.meshgrid(shift_x, shift_y)

    shifts = np.vstack((
        shift_x.ravel(), shift_y.ravel(),
        shift_x.ravel(), shift_y.ravel()
    )).transpose()

    # add A anchors (1, A, 4) to
    # cell K shifts (K, 1, 4) to get
    # shift anchors (K, A, 4)
    # reshape to (K*A, 4) shifted anchors
    A = anchors.shape[0]
    K = shifts.shape[0]
    all_anchors = (anchors.reshape((1, A, 4)) + shifts.reshape((1, K, 4)).transpose((1, 0, 2)))
    all_anchors = all_anchors.reshape((K * A, 4))

    return all_anchors

[2] - 标记之前生成的anchors,并且赋予每个anchor一个state: 正例(iou超过0.5的foreground)为1,负例(iou低于0.4的background)为0,还有一些需要筛掉的例子(iou介于0.4和0.5之间). 最后生成的regression_batch, labels_batch 就是 loss function 中的 y_true,它们的 shape和模型预测出来的shape是一样的.

def anchor_targets_bbox(
    anchors,
    image_group,
    annotations_group,
    num_classes,
    negative_overlap=0.4,
    positive_overlap=0.5
):
    """ 
    Generate anchor targets for bbox detection.

    Args
        anchors: np.array of annotations of shape (N, 4) for (x1, y1, x2, y2).
        image_group: List of BGR images.
        annotations_group: List of annotations (np.array of shape (N, 5) for (x1, y1, x2, y2, label)).
        num_classes: Number of classes to predict.
        mask_shape: If the image is padded with zeros, mask_shape can be used to mark the relevant part of the image.
        negative_overlap: IoU overlap for negative anchors (all anchors with overlap < negative_overlap are negative).
        positive_overlap: IoU overlap or positive anchors (all anchors with overlap > positive_overlap are positive).

    Returns
        labels_batch: batch that contains labels & anchor states (np.array of shape (batch_size, N, num_classes + 1),
                      where N is the number of anchors for an image and the last column defines the anchor state (-1 for ignore, 0 for bg, 1 for fg).
        regression_batch: batch that contains bounding-box regression targets for an image & anchor states (np.array of shape (batch_size, N, 4 + 1),
                      where N is the number of anchors for an image, the first 4 columns define regression targets for (x1, y1, x2, y2) and the
                      last column defines anchor states (-1 for ignore, 0 for bg, 1 for fg).
    """

    assert(len(image_group) == len(annotations_group)), "The length of the images and annotations need to be equal."
    assert(len(annotations_group) > 0), "No data received to compute anchor targets for."
    for annotations in annotations_group:
        assert('bboxes' in annotations), "Annotations should contain bboxes."
        assert('labels' in annotations), "Annotations should contain labels."

    batch_size = len(image_group)

    regression_batch  = np.zeros((batch_size, anchors.shape[0], 4 + 1), dtype=keras.backend.floatx())
    labels_batch      = np.zeros((batch_size, anchors.shape[0], num_classes + 1), dtype=keras.backend.floatx())

    # compute labels and regression targets
    for index, (image, annotations) in enumerate(zip(image_group, annotations_group)):
        if annotations['bboxes'].shape[0]:
            # obtain indices of gt annotations with the greatest overlap
            positive_indices, ignore_indices, argmax_overlaps_inds = 
                compute_gt_annotations(anchors, 
                                       annotations['bboxes'], 
                                       negative_overlap, 
                                       positive_overlap)

            labels_batch[index, ignore_indices, -1] = -1
            labels_batch[index, positive_indices, -1] = 1

            regression_batch[index, ignore_indices, -1] = -1
            regression_batch[index, positive_indices, -1] = 1

            # compute target class labels
            labels_batch[index, positive_indices, annotations['labels'][argmax_overlaps_inds[positive_indices]].astype(int)] = 1

            regression_batch[index, :, :-1] = bbox_transform(
                anchors, 
                annotations['bboxes'][argmax_overlaps_inds, :])

        # ignore annotations outside of image
        if image.shape:
            anchors_centers = np.vstack([
                (anchors[:, 0] + anchors[:, 2]) / 2, 
                (anchors[:, 1] + anchors[:, 3]) / 2]).T
            indices = np.logical_or(
                anchors_centers[:, 0] >= image.shape[1], 
                anchors_centers[:, 1] >= image.shape[0])

            labels_batch[index, indices, -1]     = -1
            regression_batch[index, indices, -1] = -1

    return regression_batch, labels_batch
Last modification:June 20th, 2019 at 01:23 pm