编程语言
首页 > 编程语言> > DBSCAN算法

DBSCAN算法

作者:互联网

本文简单介绍DBSCAN算法的原理及实现。

DBSCAN算法原理

基本概念

DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的空间聚类算法。该算法将具有足够密度的区域划分为簇,并在具有噪声的空间数据库中发现任意形状的簇,它将簇定义为密度相连的点的最大集合。

假设样本集为 D = ( x 1 , x 2 , . . . , x m ) D=(x_1, x_2, ..., x_m) D=(x1​,x2​,...,xm​),DBSCAN算法有如下相关定义:

注意:

算法原理

总体思路

  1. 遍历每一个未处理的点
    1. 如果为核心点,找出所有从该点密度可达的点,形成一个簇
    2. 否则,标记后遍历下一个点

详细算法

算法1

输入:样本集 D = ( x 1 , x 2 , . . . , x m ) D=(x_1, x_2, ..., x_m) D=(x1​,x2​,...,xm​),参数 ( ϵ , M i n P t s ) (\epsilon, MinPts) (ϵ,MinPts)

输出:核心点集合 Ω \Omega Ω

  1. 初始化核心对象集合 Ω = ∅ \Omega = \emptyset Ω=∅
  2. 对于所有点 x j x_j xj​:
    1. 找到样本 x j x_j xj​的 ϵ \epsilon ϵ-邻域子样本集 N ϵ ( x j ) N_{\epsilon}(x_j) Nϵ​(xj​)
    2. 如果子样本集合个数满足 ∣ N ϵ ( x j ) ∣ ≥ M i n P t s |N_{\epsilon}(x_j)|≥MinPts ∣Nϵ​(xj​)∣≥MinPts,将样本 x j x_j xj​加入核心对象样本集合 Ω = Ω ∪ { x j } \Omega = \Omega \cup \{ x_j \} Ω=Ω∪{xj​}

算法2

输入:样本集 D = ( x 1 , x 2 , . . . , x m ) D=(x_1, x_2, ..., x_m) D=(x1​,x2​,...,xm​),参数 ( ϵ , M i n P t s ) (\epsilon, MinPts) (ϵ,MinPts)

输出:簇划分结果 C = { C 1 , C 2 , . . . , C k } C=\{ C_1, C_2, ..., C_k \} C={C1​,C2​,...,Ck​}

  1. 执行算法1,找到所有核心点 Ω \Omega Ω
  2. 初始化簇类别 k = 0 k=0 k=0, 未访问样本集合 Γ = D \Gamma = D Γ=D, 簇划分结果 C = ∅ C=\emptyset C=∅
  3. 如果核心点集合 Ω \Omega Ω不为空,则继续执行,否则算法结束
  4. 在核心对象集合 Ω \Omega Ω中,随机选择一个核心对象 o o o,初始化当前簇核心对象队列 Ω c u r = { o } Ω_{cur}=\{o\} Ωcur​={o}, 初始化类别序号 k = k + 1 k=k+1 k=k+1,初始化当前簇样本集合 C k = { o } C_k=\{o\} Ck​={o}, 更新未访问样本集合 Γ = Γ − o Γ=Γ−{o} Γ=Γ−o
  5. Repeat
  6. 如果当前簇核心对象集合 Ω c u r ≠ ∅ Ω_{cur}\ne∅ Ωcur​​=∅,在 Ω c u r Ω_{cur} Ωcur​中取出一个核心对象 o ′ o′ o′,通过邻域距离阈值 ϵ ϵ ϵ找出所有的 ϵ ϵ ϵ-邻域子样本集 N ϵ ( o ′ ) N_ϵ(o′) Nϵ​(o′),令 Δ = N ϵ ( o ′ ) ∩ Γ Δ=N_ϵ(o′)∩Γ Δ=Nϵ​(o′)∩Γ, 更新当前簇样本集合 C k = C k ∪ Δ C_k=C_k∪Δ Ck​=Ck​∪Δ, 更新未访问样本集合 Γ = Γ − Δ Γ=Γ−Δ Γ=Γ−Δ, 更新 Ω c u r = Ω c u r ∪ ( Δ ∩ Ω ) − o ′ Ω_{cur}=Ω_{cur}∪(Δ∩Ω)−o′ Ωcur​=Ωcur​∪(Δ∩Ω)−o′,更新核心对象集合 Ω = Ω − C k Ω=Ω−C_k Ω=Ω−Ck​
  7. 如果当前簇核心对象队列 Ω c u r = ∅ Ω_{cur}=∅ Ωcur​=∅,则当前聚类簇 C k C_k Ck​生成完毕, 更新簇划分 C = C ∪ C k C=C \cup C_k C=C∪Ck​ ,转入步骤3。

DBSCAN算法实现

Sklearn DBSCAN

尝试使用sklearn中的DBSCAN算法。

# all import
from sklearn import datasets
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
import numpy as np
import time
# generate data
n_samples = 1500
noisy_circles = datasets.make_circles(n_samples=n_samples, factor=.5,noise=.05)
X, y = noisy_circles
# plot
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
# plt.figure(1)
plt.scatter(X[:, 0], X[:, 1])

plt.subplot(1, 2, 2)
# plt.figure(2)
plt.scatter(X[:, 0], X[:, 1], c=y)

请添加图片描述

上图是sklearn生成的数据,右图是根据label直接绘制的结果。

下面使用sklearn中的DBSCAN类对数据X进行聚类。

# run sklearn DBSCAN to predict
y_pred = DBSCAN(eps=0.1).fit_predict(X)

# plot sklearn DBSCAN result
plt.figure(figsize=(5, 5))
plt.scatter(X[:, 0], X[:, 1], c=y_pred)

在这里插入图片描述

My DBSCAN

此部分代码为笔者依据上述伪代码实现,未经过优化

# my DBSCAN algorithm
from sklearn.metrics.pairwise import pairwise_distances

class MyDBSCAN:
    def __init__(self, X, eps=0.5, minPts=5):
        """
        @param:
            X: input dataset, (n, m)
            eps: 
            minPts:
        """
        self.X = X
        self.n = len(X)
        self.eps = eps
        self.minPts = minPts

    def get_core_points(self):
        """
        @return:
            core_points: the index of core points
        """
        core_points = []
        for i in range(self.n):
            neighbors = self.get_neighbors(i)
            if len(neighbors) >= self.minPts:
                core_points.append(i)
        return core_points

    
    def get_neighbors(self, point_idx):
        """
        @param:
            point_idx: index of point
        @return:
            neighbors: neighbors of point
        """
        neighbors = []
        for i in range(self.n):
            if self.dis_mat[point_idx][i] < self.eps:
                neighbors.append(i)
        return neighbors
    
    def compute_dis_mat(self):
        """
        @param:
            X: input dataset, (n, m)
        @return:
            self.dis_mat: (n, n)
        """
        self.dis_mat = pairwise_distances(X)
        return
    
    def random_choose(self, points):
        """
        random choose a point
        """
        # return points[0]
        return points[np.random.randint(0, len(points))]

    def fit_predict(self):
        """
        @param:
            X: input dataset, (n, m)
        @return:
            y_pred: predicted result, (n,)
        """
        # computer distance matrix
        self.compute_dis_mat()

        core_points = set(self.get_core_points())
        waiting_points = set([i for i in range(len(self.X))])
        cluster = -1 * np.ones(self.n, dtype=np.int32)
        k = 0

        while len(core_points) > 0:
            o = self.random_choose(list(core_points))
            cur_core_points = set()
            cur_core_points.add(o)
            cur_cluster = set()
            cur_cluster.add(o)
            waiting_points.remove(o)
            while True:
                if len(cur_core_points) > 0:
                    point = list(cur_core_points)[0]
                    neighbors = set(self.get_neighbors(point))
                    delta = neighbors & waiting_points
                    cur_cluster = cur_cluster | delta
                    waiting_points = waiting_points - delta
                    cur_core_points = cur_core_points | (delta & core_points)
                    cur_core_points.remove(point)
                    core_points = core_points - cur_cluster
                else:
                    cluster[list(cur_cluster)] = k
                    k += 1
                    break
        return cluster

将其用到sklearn生成的数据集上测试效果:

n_samples = 500
noisy_circles = datasets.make_circles(n_samples=n_samples, factor=.5,
                                      noise=.05)
X, y = noisy_circles
start = time.time()
y_my_pred = MyDBSCAN(X=X, eps=0.15).fit_predict()
end1 = time.time()
y_pred = DBSCAN(eps=0.15).fit_predict(X)
end2 = time.time()

print('{:15} {:5} {:3}'.format('Algorithm', 'Time', 'NMI'))
print('{:15} {:5.2f} {:3}'.format('My DBSCAN ', end1 - start, metrics.normalized_mutual_info_score(y, y_my_pred)))
print('{:15} {:5.2f} {:3}'.format('Sklearn DBSCAN', end2 - end1, metrics.normalized_mutual_info_score(y, y_pred)))
Algorithm       Time  NMI
My DBSCAN       0.11  1.0
Sklearn DBSCAN  0.00  1.0
# plot sklearn DBSCAN result
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.scatter(X[:, 0], X[:, 1], c=y)
plt.title('origin data')

plt.subplot(1, 3, 2)
plt.scatter(X[:, 0], X[:, 1], c=y_pred)
plt.title('sklearn DBSCAN')

plt.subplot(1, 3, 3)
plt.scatter(X[:, 0], X[:, 1], c=y_my_pred)
plt.title('My DBSCAN')

在这里插入图片描述

修改eps后的测试结果(minPts=5):

epsresult
0.1在这里插入图片描述
0.12在这里插入图片描述
0.15在这里插入图片描述

可以观察到在sklearn生成的该类数据集上My DBSCAN与sklearn DBSCAN的聚类结果非常接近,但My DBSCAN的运行时间较长。

Reference

A Density-Based Algorithm for Discovering Clusters in Large Spatial Databases with Noise (aaai.org)

DBSCAN密度聚类算法 - 刘建平Pinard - 博客园

用scikit-learn学习DBSCAN聚类 - 刘建平Pinard - 博客园

DBSCAN - 维基百科

详解DBSCAN聚类 - 知乎

Visualizing DBSCAN Clustering

标签:xj,DBSCAN,cur,self,算法,points,plt
来源: https://blog.csdn.net/weixin_44241550/article/details/120371224