其他分享
首页 > 其他分享> > Databricks文档04----使用 Azure Databricks 提取、转换和加载数据

Databricks文档04----使用 Azure Databricks 提取、转换和加载数据

作者:互联网

使用 Azure Databricks 执行 ETL(提取、转换和加载数据)操作。 将数据从 Azure Data Lake Storage Gen2 提取到 Azure Databricks 中,在 Azure Databricks 中对数据运行转换操作,然后将转换的数据加载到 Azure Synapse Analytics 中。

本教程中的步骤使用 Azure Databricks 的 Azure Synapse 连接器将数据传输到 Azure Databricks。 而此连接器又使用 Azure Blob 存储来临时存储在 Azure Databricks 群集和 Azure Synapse 之间传输的数据。

下图演示了应用程序流:

本教程涵盖以下任务:

创建DataLake测试账户

Azure Data Lake 存储 Gen2](Azure Data Lake Storage Gen2 Introduction | Microsoft Docs) (也称为 ADLS Gen2) 是一种用于大数据分析的下一代 Data Lake解决方案。 Azure Data Lake 存储 Gen2 使用低成本分层存储、高可用性和灾难恢复功能,生成 Azure Data Lake 存储 Gen1 功能文件系统语义、文件级安全性和缩放到 — Azure Blob 存储。 — Azure Blob File System (ABFS) 驱动程序提供 ADLS Gen2 存储的接口。 Databricks Runtime 中包含的 ABFS 驱动程序支持 Azure Blob 存储上的标准文件系统语义。

portal上选择存储账户,创建过程如下

 

 

 

 

 

ADLS Gen2 访问

从 Azure 存储帐户获取访问密钥。

创建 Azure Key Vault

可以使用Key Vault保存密钥

将 Azure 访问密钥添加到 Azure 密钥保管库

使用 Azure 服务主体运行作业

作业提供了一种非交互式的方式来运行 Azure Databricks 群集中的应用程序,例如,ETL 作业或应按计划运行的数据分析任务。 通常,这些作业以创建它们的用户身份运行,但这可能有一些限制:

使用服务帐户(与应用程序而不是特定用户关联的帐户)是解决这些限制的常用方法。 在 Azure 中,可使用 Azure Active Directory (Azure AD) 应用程序和服务主体来创建服务帐户。

这一点非常重要,尤其是服务主体控制对存储在 Azure Data Lake Storage Gen2 帐户中的数据的访问时。 使用这些服务主体运行作业允许作业访问存储帐户中的数据,并提供对数据访问范围的控制。

下面进行创建 Azure AD 应用程序和服务主体,并使该服务主体成为作业的所有者。向不拥有作业的其他组授予作业运行权限:

在 Azure Active Directory 中创建服务主体

在门户中搜索应用注册

完成

创建客户端密码

选择证书和密码-新建客户端密码

此密钥之后不能查看,一定要保存好,作为之后使用,

Azure Databricks 中创建个人访问令牌 (PAT)

  1. 在 Azure Databricks 中创建个人访问令牌 (PAT) ,可使用 PAT 对 Databricks REST API 进行身份验证。

    你将使用 Azure Databricks 个人访问令牌 (PAT) 对 Databricks REST API 进行身份验证。 创建可用于发出 API 请求的 PAT:

    1. 转到 Azure Databricks 工作区。

    2. 单击屏幕右上角的用户图标,然后单击“用户设置”。

    3. 单击 "访问令牌" "生成新令牌"

    4. 复制并保存令牌值。

  2. 使用 Databricks SCIM API 将服务主体作为非管理用户添加到 Azure Databricks。

    curl -X POST "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/preview/scim/v2/ServicePrincipals" \
      --header "Content-Type: application/scim+json" \
      --header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c" \
      --data-raw "{
        "schemas":[
          "urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal"
        ],
        "applicationId":"eb603b71-6def-44bb-b660-d4ea67fe33c4",
        "displayName": "test-sp",
        "entitlements":[
          {
            "value":"allow-cluster-create"
          }
        ]
      }"

    以上代码在windows 下执行会报错,需要把单引号改为双引号,把"" 改为 “^”,data-raw 参属下也不行,可以使用@data 把参数写成json读取。如下图

  3. 在 Azure Databricks 中创建 Azure Key Vault 支持的机密范围。

    访问:类似这样地址::https://<databrick url>/?o=7535626322818590#secrets/createScope

  4. 授予服务主体对机密范围的读取权限。

    curl -X POST 'https://<per-workspace-url/api/2.0/secrets/acls/put' \
      --header 'Authorization: Bearer <personal-access-token>' \
      --header 'Content-Type: application/json' \
      --data-raw '{
        "scope": "<scope-name>",
        "principal": "<application-id>",
        "permission": "READ"
      }'
      
      curl -X POST "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/secrets/acls/put"  --header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c" --header "Content-Type: application/json"  --data @create1.json
      

    测试作业

    在 Azure Databricks 中创建一个作业,并将该作业群集配置为从机密范围读取机密。

    1. 转到 Azure Databricks 登录页并选择“创建空白笔记本”。 为笔记本命名,然后选择“SQL”作为默认语言。

    2. 在笔记本的第一个单元格中输入 SELECT 1。 这是一个简单的命令,如果成功,只显示 1。 如果你已授予服务主体对 Azure Data Lake Storage Gen 2 中特定文件或路径的访问权限,则可以改为从这些路径读取。


    3. 转到“作业”并单击“+ 创建作业”按钮 。 为作业命名,单击“选择笔记本”,然后选择刚刚创建的笔记本。

    4. 单击群集信息旁边的“编辑”。

    5. 在“配置群集”页面上,单击“高级选项” 。

       

    6. 在“Spark”选项卡上,输入以下 Spark 配置:

    1. fs.azure.account.auth.type.acmeadls.dfs.core.windows.net OAuth
      fs.azure.account.oauth.provider.type.acmeadls.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
      fs.azure.account.oauth2.client.id.acmeadls.dfs.core.windows.net <application-id>
      fs.azure.account.oauth2.client.secret.acmeadls.dfs.core.windows.net {{secrets/<secret-scope-name>/<secret-name>}}
      fs.azure.account.oauth2.client.endpoint.acmeadls.dfs.core.windows.net https://login.microsoftonline.com/<directory-id>/oauth2/token
      ​
      -----
      ​
      ​
      fs.azure.account.auth.type.acmeadls.dfs.core.windows.net OAuth
      fs.azure.account.oauth.provider.type.acmeadls.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
      fs.azure.account.oauth2.client.id.acmeadls.dfs.core.windows.net  eb603b71-6def-44bb-b660-d4ea67fe33c4
      fs.azure.account.oauth2.client.secret.acmeadls.dfs.core.windows.net {{secrets/maxscope/acme-sp-secret}}
      fs.azure.account.oauth2.client.endpoint.acmeadls.dfs.core.windows.net https://login.microsoftonline.com/75d1575e-746b-46a1-9faf-e571b4af6eda/oauth2/token
      ​
      ​
      将 <secret-scope-name> 替换为包含客户端机密的 Azure Databricks 机密范围的名称。 maxscope
      将 <application-id> 替换为 Azure AD 应用程序注册的 Application (client) ID。  eb603b71-6def-44bb-b660-d4ea67fe33c4
      将 <secret-name> 替换为与机密范围中的客户端机密值关联的名称。acme-sp-secret
      将 <directory-id> 替换为 Azure AD 应用程序注册的 Directory (tenant) ID。 75d1575e-746b-46a1-9faf-e571b4af6eda
      ​

  5. 将作业的所有权转移给服务主体。

    curl -X PUT 'https://<per-workspace-url>/api/2.0/permissions/jobs/<job-id>' \
      --header 'Authorization: Bearer <personal-access-token>' \
      --header 'Content-Type: application/json' \
      --data-raw '{
        "access_control_list": [
          {
            "service_principal_name": "<application-id>",
            "permission_level": "IS_OWNER"
          },
          {
            "group_name": "admins",
            "permission_level": "CAN_MANAGE"
          }
        ]
      }'
      
      
    替换 <per-workspace-url> 为 Azure Databricks 工作区的唯一 <per-workspace-url> 。
    使用 Azure Databricks 个人访问令牌替换 <personal-access-token>。
    将 <application-id> 替换为 Azure AD 应用程序注册的 Application (client) ID。
    ​
    ​
    ​
    该作业还需要对笔记本的读取权限。 运行以下命令以授予所需的权限:
    ​
    ​
    curl -X PUT 'https://<per-workspace-url>/api/2.0/permissions/notebooks/<notebook-id>' \
      --header 'Authorization: Bearer <personal-access-token>' \
      --header 'Content-Type: application/json' \
      --data-raw '{
        "access_control_list": [
          {
            "service_principal_name": "<application-id>",
            "permission_level": "CAN_READ"
          }
        ]
      }'
      
      
    替换 <per-workspace-url> 为 Azure Databricks 工作区的唯一 <per-workspace-url> 。
    将 <notebook-id> 替换为与作业关联的笔记本的 ID。 若要查找 ID,请转到 Azure Databricks 工作区中的笔记本,并在笔记本的 URL 中查找 notebook/ 后面的数字 ID。
    使用 Azure Databricks 个人访问令牌替换 <personal-access-token>。
    将 <application-id> 替换为 Azure AD 应用程序注册的 Application (client) ID。
    ​
    curl -X PUT "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/permissions/jobs/731458451210876" ^
      --header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c" ^
      --header "Content-Type: application/json" ^
      --data @create2.json
      
      
      
      
    curl -X PUT "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/permissions/notebooks/861292323730779" ^
      --header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c"  ^
      --header "Content-Type: application/json" ^
      --data @create3.json
    ​

  6. 通过将作业作为服务主体运行来测试该作业。

使用服务主体运行作业的方式与使用用户身份运行作业的方式相同,无论是通过 UI、API 还是 CLI。 使用 Azure Databricks UI 测试作业:

  1. 转到 Azure Databricks UI 中的“作业”并选择作业。

  2. 单击 “立即运行”

如果一切正常运行,你将看到作业的状态为“成功”。 可在 UI 中选择作业以验证输出:

ETL过程测试

1、打开DataBricks工作区,创建notebook,输入下面的代码进行设置

val appID = "<appID>"
val secret = "<secret>"
val tenantID = "<tenant-id>"
​
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
---
​
val storageAccountName = "<storage-account-name>"
val appID = "<app-id>"
val secret = "<secret>"
val fileSystemName = "<file-system-name>"
val tenantID = "<tenant-id>"
​
spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
--
%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

下载数据

如图运行后:

将示例数据引入 Azure Data Lake Storage Gen2 帐户

%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json
​
​
​
​
dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

从 Azure Data Lake Storage Gen2 帐户中提取数据

---获取数据
val df = spark.read.json("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/small_radio_json.json")
df.show()
​
---获取数据列
val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
specificColumnsDf.show()
​
---更改列名
val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type")
renamedColumnsDF.show()
​
​

加载数据到Azure Synapse

由于需要临时存储,输入下面配置。

val blobStorage = "maxtestdatalake.blob.core.windows.net"
val blobContainer = "temp"
val blobAccessKey =  "I3db2DV2qMGj2IDgLwbp/d7a9esxAjTF3GxM0T2Tbv/ssirP9NntHTdzFCohSlmIxhEaiRcq5BXJC1lO5/aQiQ=="
​
​
---设置
val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
​
val acntInfo = "fs.azure.account.key."+ blobStorage
sc.hadoopConfiguration.set(acntInfo, blobAccessKey)
​
---数据库配置
//Azure Synapse related settings
val dwDatabase = "maxsqlpools"
val dwServer = "maxsqlpools.database.windows.net"
val dwUser = "max"
val dwPass = ""
val dwJdbcPort =  "1433"
val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
​
----
数据存放
spark.conf.set(
    "spark.sql.parquet.writeLegacyFormat",
    "true")
​
renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable")       .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()
​
​
​

连接数据库

下面是数据库数据信息,数据进入了数据库。

整个过程完成。

标签:fs,val,04,Databricks,----,azure,Azure,net
来源: https://blog.csdn.net/capsicum29/article/details/123614322