Databricks文档04----使用 Azure Databricks 提取、转换和加载数据
作者:互联网
使用 Azure Databricks 执行 ETL(提取、转换和加载数据)操作。 将数据从 Azure Data Lake Storage Gen2 提取到 Azure Databricks 中,在 Azure Databricks 中对数据运行转换操作,然后将转换的数据加载到 Azure Synapse Analytics 中。
本教程中的步骤使用 Azure Databricks 的 Azure Synapse 连接器将数据传输到 Azure Databricks。 而此连接器又使用 Azure Blob 存储来临时存储在 Azure Databricks 群集和 Azure Synapse 之间传输的数据。
下图演示了应用程序流:
本教程涵盖以下任务:
-
创建 Azure Databricks 服务。
-
在 Azure Databricks 中创建 Spark 群集。
-
在 Data Lake Storage Gen2 帐户中创建文件系统。
-
上传示例数据到 Azure Data Lake Storage Gen2 帐户。
-
创建服务主体。
-
从 Azure Data Lake Storage Gen2 帐户中提取数据。
-
在 Azure Databricks 中转换数据。
-
将数据加载到 Azure Synapse 中
创建DataLake测试账户
Azure Data Lake 存储 Gen2](Azure Data Lake Storage Gen2 Introduction | Microsoft Docs) (也称为 ADLS Gen2) 是一种用于大数据分析的下一代 Data Lake解决方案。 Azure Data Lake 存储 Gen2 使用低成本分层存储、高可用性和灾难恢复功能,生成 Azure Data Lake 存储 Gen1 功能文件系统语义、文件级安全性和缩放到 — Azure Blob 存储。 — Azure Blob File System (ABFS) 驱动程序提供 ADLS Gen2 存储的接口。 Databricks Runtime 中包含的 ABFS 驱动程序支持 Azure Blob 存储上的标准文件系统语义。
portal上选择存储账户,创建过程如下
ADLS Gen2 访问
-
对于生产和多用户方案,有多种身份验证方法可用于从 Azure Databricks 访问 ADLS Gen2 存储。 根据要求选择方法:
-
若要根据每个用户的权限ADLS Gen2工作区用户访问 Azure Data Lake 存储
-
若要为多个工作区用户提供对一组常用文件夹或文件的访问权限,请参阅配合使用 OAuth 2.0 和 Azure 服务主体来访问 Azure Data Lake Storage Gen2。
-
若要精细控制对存储资源的访问,请参阅使用 SAS 令牌提供程序直接访问 Azure Data Lake Storage Gen2,定义自定义访问策略。
-
从 Azure 存储帐户获取访问密钥。
创建 Azure Key Vault
可以使用Key Vault保存密钥
将 Azure 访问密钥添加到 Azure 密钥保管库
使用 Azure 服务主体运行作业
作业提供了一种非交互式的方式来运行 Azure Databricks 群集中的应用程序,例如,ETL 作业或应按计划运行的数据分析任务。 通常,这些作业以创建它们的用户身份运行,但这可能有一些限制:
-
创建并运行作业取决于具有适当权限的用户。
-
只有创建作业的用户才能访问作业。
-
用户可能已从 Azure Databricks 工作区中删除。
使用服务帐户(与应用程序而不是特定用户关联的帐户)是解决这些限制的常用方法。 在 Azure 中,可使用 Azure Active Directory (Azure AD) 应用程序和服务主体来创建服务帐户。
这一点非常重要,尤其是服务主体控制对存储在 Azure Data Lake Storage Gen2 帐户中的数据的访问时。 使用这些服务主体运行作业允许作业访问存储帐户中的数据,并提供对数据访问范围的控制。
下面进行创建 Azure AD 应用程序和服务主体,并使该服务主体成为作业的所有者。向不拥有作业的其他组授予作业运行权限:
在 Azure Active Directory 中创建服务主体
在门户中搜索应用注册
完成
创建客户端密码
选择证书和密码-新建客户端密码
此密钥之后不能查看,一定要保存好,作为之后使用,
Azure Databricks 中创建个人访问令牌 (PAT)
-
在 Azure Databricks 中创建个人访问令牌 (PAT) ,可使用 PAT 对 Databricks REST API 进行身份验证。
你将使用 Azure Databricks 个人访问令牌 (PAT) 对 Databricks REST API 进行身份验证。 创建可用于发出 API 请求的 PAT:
-
转到 Azure Databricks 工作区。
-
单击屏幕右上角的用户图标,然后单击“用户设置”。
-
单击 "访问令牌" "生成新令牌"。
-
复制并保存令牌值。
-
-
使用 Databricks SCIM API 将服务主体作为非管理用户添加到 Azure Databricks。
curl -X POST "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/preview/scim/v2/ServicePrincipals" \ --header "Content-Type: application/scim+json" \ --header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c" \ --data-raw "{ "schemas":[ "urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal" ], "applicationId":"eb603b71-6def-44bb-b660-d4ea67fe33c4", "displayName": "test-sp", "entitlements":[ { "value":"allow-cluster-create" } ] }"
以上代码在windows 下执行会报错,需要把单引号改为双引号,把"" 改为 “^”,data-raw 参属下也不行,可以使用@data 把参数写成json读取。如下图
-
在 Azure Databricks 中创建 Azure Key Vault 支持的机密范围。
访问:类似这样地址::https://<databrick url>/?o=7535626322818590#secrets/createScope
-
授予服务主体对机密范围的读取权限。
curl -X POST 'https://<per-workspace-url/api/2.0/secrets/acls/put' \ --header 'Authorization: Bearer <personal-access-token>' \ --header 'Content-Type: application/json' \ --data-raw '{ "scope": "<scope-name>", "principal": "<application-id>", "permission": "READ" }' curl -X POST "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/secrets/acls/put" --header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c" --header "Content-Type: application/json" --data @create1.json
测试作业
在 Azure Databricks 中创建一个作业,并将该作业群集配置为从机密范围读取机密。
-
转到 Azure Databricks 登录页并选择“创建空白笔记本”。 为笔记本命名,然后选择“SQL”作为默认语言。
-
在笔记本的第一个单元格中输入
SELECT 1
。 这是一个简单的命令,如果成功,只显示 1。 如果你已授予服务主体对 Azure Data Lake Storage Gen 2 中特定文件或路径的访问权限,则可以改为从这些路径读取。 -
转到“作业”并单击“+ 创建作业”按钮 。 为作业命名,单击“选择笔记本”,然后选择刚刚创建的笔记本。
-
单击群集信息旁边的“编辑”。
-
在“配置群集”页面上,单击“高级选项” 。
-
在“Spark”选项卡上,输入以下 Spark 配置:
-
-
-
fs.azure.account.auth.type.acmeadls.dfs.core.windows.net OAuth fs.azure.account.oauth.provider.type.acmeadls.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider fs.azure.account.oauth2.client.id.acmeadls.dfs.core.windows.net <application-id> fs.azure.account.oauth2.client.secret.acmeadls.dfs.core.windows.net {{secrets/<secret-scope-name>/<secret-name>}} fs.azure.account.oauth2.client.endpoint.acmeadls.dfs.core.windows.net https://login.microsoftonline.com/<directory-id>/oauth2/token ----- fs.azure.account.auth.type.acmeadls.dfs.core.windows.net OAuth fs.azure.account.oauth.provider.type.acmeadls.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider fs.azure.account.oauth2.client.id.acmeadls.dfs.core.windows.net eb603b71-6def-44bb-b660-d4ea67fe33c4 fs.azure.account.oauth2.client.secret.acmeadls.dfs.core.windows.net {{secrets/maxscope/acme-sp-secret}} fs.azure.account.oauth2.client.endpoint.acmeadls.dfs.core.windows.net https://login.microsoftonline.com/75d1575e-746b-46a1-9faf-e571b4af6eda/oauth2/token 将 <secret-scope-name> 替换为包含客户端机密的 Azure Databricks 机密范围的名称。 maxscope 将 <application-id> 替换为 Azure AD 应用程序注册的 Application (client) ID。 eb603b71-6def-44bb-b660-d4ea67fe33c4 将 <secret-name> 替换为与机密范围中的客户端机密值关联的名称。acme-sp-secret 将 <directory-id> 替换为 Azure AD 应用程序注册的 Directory (tenant) ID。 75d1575e-746b-46a1-9faf-e571b4af6eda
-
-
将作业的所有权转移给服务主体。
curl -X PUT 'https://<per-workspace-url>/api/2.0/permissions/jobs/<job-id>' \ --header 'Authorization: Bearer <personal-access-token>' \ --header 'Content-Type: application/json' \ --data-raw '{ "access_control_list": [ { "service_principal_name": "<application-id>", "permission_level": "IS_OWNER" }, { "group_name": "admins", "permission_level": "CAN_MANAGE" } ] }' 替换 <per-workspace-url> 为 Azure Databricks 工作区的唯一 <per-workspace-url> 。 使用 Azure Databricks 个人访问令牌替换 <personal-access-token>。 将 <application-id> 替换为 Azure AD 应用程序注册的 Application (client) ID。 该作业还需要对笔记本的读取权限。 运行以下命令以授予所需的权限: curl -X PUT 'https://<per-workspace-url>/api/2.0/permissions/notebooks/<notebook-id>' \ --header 'Authorization: Bearer <personal-access-token>' \ --header 'Content-Type: application/json' \ --data-raw '{ "access_control_list": [ { "service_principal_name": "<application-id>", "permission_level": "CAN_READ" } ] }' 替换 <per-workspace-url> 为 Azure Databricks 工作区的唯一 <per-workspace-url> 。 将 <notebook-id> 替换为与作业关联的笔记本的 ID。 若要查找 ID,请转到 Azure Databricks 工作区中的笔记本,并在笔记本的 URL 中查找 notebook/ 后面的数字 ID。 使用 Azure Databricks 个人访问令牌替换 <personal-access-token>。 将 <application-id> 替换为 Azure AD 应用程序注册的 Application (client) ID。 curl -X PUT "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/permissions/jobs/731458451210876" ^ --header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c" ^ --header "Content-Type: application/json" ^ --data @create2.json curl -X PUT "https://adb-7535626322818590.10.azuredatabricks.net/api/2.0/permissions/notebooks/861292323730779" ^ --header "Authorization: Bearer dapi2c58adc761db573f34b6f356761a933c" ^ --header "Content-Type: application/json" ^ --data @create3.json
-
通过将作业作为服务主体运行来测试该作业。
使用服务主体运行作业的方式与使用用户身份运行作业的方式相同,无论是通过 UI、API 还是 CLI。 使用 Azure Databricks UI 测试作业:
-
转到 Azure Databricks UI 中的“作业”并选择作业。
-
单击 “立即运行” 。
如果一切正常运行,你将看到作业的状态为“成功”。 可在 UI 中选择作业以验证输出:
ETL过程测试
1、打开DataBricks工作区,创建notebook,输入下面的代码进行设置
val appID = "<appID>" val secret = "<secret>" val tenantID = "<tenant-id>" spark.conf.set("fs.azure.account.auth.type", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>") spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>") spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token") spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true") --- val storageAccountName = "<storage-account-name>" val appID = "<app-id>" val secret = "<secret>" val fileSystemName = "<file-system-name>" val tenantID = "<tenant-id>" spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "") spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "") spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token") spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true") dbutils.fs.ls("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/") spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false") -- %sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json
下载数据
如图运行后:
将示例数据引入 Azure Data Lake Storage Gen2 帐户
%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")
从 Azure Data Lake Storage Gen2 帐户中提取数据
---获取数据 val df = spark.read.json("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/small_radio_json.json") df.show() ---获取数据列 val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level") specificColumnsDf.show() ---更改列名 val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type") renamedColumnsDF.show()
加载数据到Azure Synapse
由于需要临时存储,输入下面配置。
val blobStorage = "maxtestdatalake.blob.core.windows.net" val blobContainer = "temp" val blobAccessKey = "I3db2DV2qMGj2IDgLwbp/d7a9esxAjTF3GxM0T2Tbv/ssirP9NntHTdzFCohSlmIxhEaiRcq5BXJC1lO5/aQiQ==" ---设置 val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs" val acntInfo = "fs.azure.account.key."+ blobStorage sc.hadoopConfiguration.set(acntInfo, blobAccessKey) ---数据库配置 //Azure Synapse related settings val dwDatabase = "maxsqlpools" val dwServer = "maxsqlpools.database.windows.net" val dwUser = "max" val dwPass = "" val dwJdbcPort = "1433" val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;" val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions" val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass ---- 数据存放 spark.conf.set( "spark.sql.parquet.writeLegacyFormat", "true") renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable") .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()
连接数据库
下面是数据库数据信息,数据进入了数据库。
整个过程完成。
标签:fs,val,04,Databricks,----,azure,Azure,net 来源: https://blog.csdn.net/capsicum29/article/details/123614322