编程语言
首页 > 编程语言> > Scanpy源码浅析之pp.normalize_total

Scanpy源码浅析之pp.normalize_total

作者:互联网

版本

导入Scanpy, 其版本为'1.9.1',如果你看到的源码和下文有差异,其可能是由于版本差异。

import scanpy as sc

sc.__version__
#'1.9.1'

例子

函数pp.normalize_total用于Normalize counts per cell, 其源代码在scanpy/preprocessing/_normalization.py
我们通过一个简单例子来了解该函数主要功能:
将一个简单的矩阵数据转为AnnData对象

from anndata import AnnData
import scanpy as sc

adata = AnnData(np.array([
       [3, 3, 3, 6, 6],
        [1, 1, 1, 2, 2],
        [1, 22, 1, 2, 2],
     ]))

adata.X
# array([[ 3.,  3.,  3.,  6.,  6.],
#        [ 1.,  1.,  1.,  2.,  2.],
#        [ 1., 22.,  1.,  2.,  2.]], dtype=float32)

运行后pp.normalize_total后, adata的数值发生变换

sc.pp.normalize_total(adata, target_sum=1)
adata.X

# array([[0.14, 0.14, 0.14, 0.29, 0.29],
#        [0.14, 0.14, 0.14, 0.29, 0.29],
#        [0.04, 0.79, 0.04, 0.07, 0.07]], dtype=float32)

数值变换满足一个规律,即每行的的总和加起来为一个确定数值,这里使用target_sum设为1.

for i in range(adata.shape[0]):
    print(sum(adata.X[i, ]))

# 1.0000000447034836
# 1.0000000447034836
# 0.9999999925494194

代码

全部代码

以下为normalize_total 主要逻辑代码,为方便理解主要逻辑,其中删除了一些即将废弃的,异常处理,日志打印等代码。
image.png

参数设置

代码前几行是函数的参数设置:

def normalize_total(
    adata: anndata._core.anndata.AnnData,
    target_sum: Optional[float] = None,
    exclude_highly_expressed: bool = False,
    max_fraction: float = 0.05,
    key_added: Optional[str] = None,
    layer: Optional[str] = None,
    inplace: bool = True,
    copy: bool = False,
) -> Optional[Dict[str, numpy.ndarray]]

adata, target_sum, ..., copy 是函数参数, 冒号后面跟的是参数类型注解,表面这个参数应该传递什么类型的值给函数。

是否copy数据

if copy:
    if not inplace:
        raise ValueError("`copy=True` cannot be used with `inplace=False`.")
    adata = adata.copy()

用到参数copy真值,来判断将现有adata 数据复制一份新数据的来进行后续操作

获取layer数据


X = _get_obs_rep(adata, layer=layer)

该行代码用到参数layer , 代码功能为获取adata 的layer数据,如果不另外设置,默认返回adata.X也是raw data, 进行normalize。

是否排除非常高表达的基因

if exclude_highly_expressed:
    counts_per_cell = X.sum(1)  # original counts per cell
    counts_per_cell = np.ravel(counts_per_cell)
    # at least one cell as more than max_fraction of counts per cell
    gene_subset = (X > counts_per_cell[:, None] * max_fraction).sum(0)
    gene_subset = np.ravel(gene_subset) == 0

    counts_per_cell = X[:, gene_subset].sum(1)
else:
    counts_per_cell = X.sum(1)
counts_per_cell = np.ravel(counts_per_cell)
 

这几行代码用到参数exclude_highly_expressedmax_fraction,这两是搭配使用的。

假设X

array([[ 3.,  3.,  3.,  6.,  6.],
       [ 1.,  1.,  1.,  2.,  2.],
       [ 1., 22.,  1.,  2.,  2.]], dtype=float32)

如果exclude_highly_expressed 为False, 则最后输出为每行的total count: counts_per_cell

counts_per_cell= X.sum(1)
counts_per_cell

# array([21.,  7., 28.], dtype=float32)

如果exclude_highly_expressed 为True, 则根据计算出每行的原始total count,再计算totalcount * max_fraction得到每行 允许的基因最大count 阈值,筛选出每行中小于等于这个阈值的基因count, 然后求和得到counts_per_cell。

exclude_highly_expressed = True
max_fraction = 0.2
if exclude_highly_expressed:
    counts_per_cell = X.sum(1)  # original counts per cell
    counts_per_cell = np.ravel(counts_per_cell)
    
    # at least one cell as more than max_fraction of counts per cell
    # 下面几行是关键,如需要更好的理解,你可能需要一步步的运算查看输出结果来帮助理解
    gene_subset = (X > counts_per_cell[:, None] * max_fraction).sum(0) 
    gene_subset = np.ravel(gene_subset) == 0
    counts_per_cell = X[:, gene_subset].sum(1)
    # X[:, gene_subset]
    # array([[3., 3.],
    #        [1., 1.],
    #        [1., 1.]], dtype=float32)   

    #counts_per_cell
    # array([6., 2., 2.], dtype=float32)

是否替换原数据

    if inplace:
        if key_added is not None:
            # 添加.obs 字段, 内容为每行的total count
            adata.obs[key_added] = counts_per_cell
        # _set_obs_rep函数作用为将normalize的数据替换adata原数据            
        _set_obs_rep(
            adata, _normalize_data(X, counts_per_cell, target_sum), layer=layer
        )
    else:
        # 如果inplace为False,即不原位替换,则返回一个字典
        dat = dict(
            X=_normalize_data(X, counts_per_cell, target_sum, copy=True),
            norm_factor=counts_per_cell,
        )

这几行代码用到的参数:

该部分代码逻辑,则根据inplace 真值,执行不同操作:

其中_normalize_data()函数为真正进行normalize的函数, 后文再进行说明。

判断返回结果

    
    if copy:
        return adata
    elif not inplace:
        return dat

_normalize_data

_normalize_data函数的代码如下:

def _normalize_data(X, counts, after=None, copy=False):
    # 是否copy
    X = X.copy() if copy else X
    # 类型转换
    if issubclass(X.dtype.type, (int, np.integer)):
        X = X.astype(np.float32)  # TODO: Check if float64 should be used
    # 支持DaskArray类型
    if isinstance(counts, DaskArray):
        counts_greater_than_zero = counts[counts > 0].compute_chunk_sizes()
    else:
        counts_greater_than_zero = counts[counts > 0]
    # after为传进来的target_sum
    # 如果为target_sum为None, target_sum则使用counts的中值
    after = np.median(counts_greater_than_zero, axis=0) if after is None else after
    # 将counts中得0 变成1
    counts += counts == 0
    # 使最后每行总和为target_sum
    counts = counts / after
    # 下面为支持不同类型,做了一些判断,但做的事都一样
    if issparse(X):
        sparsefuncs.inplace_row_scale(X, 1 / counts)
    elif isinstance(counts, np.ndarray):
        np.divide(X, counts[:, None], out=X)
    else:
        X = np.divide(X, counts[:, None])  # dask does not support kwarg "out"
        # X = X / counts
        # counts 
        # array([6., 2., 2.], dtype=float32)
        # X 
        # array([[ 3.,  3.,  3.,  6.,  6.],
        #        [ 1.,  1.,  1.,  2.,  2.],
        #        [ 1., 22.,  1.,  2.,  2.]], dtype=float32)   
        # np.divide(X, counts[:, None])
        # array([[ 0.5,  0.5,  0.5,  1. ,  1. ],
        #        [ 0.5,  0.5,  0.5,  1. ,  1. ],
        #        [ 0.5, 11. ,  0.5,  1. ,  1. ]], dtype=float32)        
        # 即每行得count除以每行的total count
    return X

上面代码中给了一些注释,辅助理解。

标签:normalize,sum,per,cell,源码,counts,adata,浅析
来源: https://www.cnblogs.com/huanping/p/16684866.html