编程语言
首页 > 编程语言> > java – BIGQUERY – 如何使用Pentaho Data Integration(Spoon)创建连接?

java – BIGQUERY – 如何使用Pentaho Data Integration(Spoon)创建连接?

作者:互联网

我试图通过Pentaho数据集成访问BigQuery,但我没有成功.

>系统:OSX El Capitan
> Google BigQuery身份验证方法:使用.p12密钥的服务帐户

我已经按照本教程使用了OSX
http://wiki.pentaho.com/display/EAI/Google+BigQuery

这就是我所做的:

>我将“kettle.zip的依赖项”下载并解压缩到PDI_FOLDER / libswt / osx64
>我下载并将“bqjdbc-1.4-standalone.jar”复制到PDI_FOLDER / lib
>之后,我尝试使用New> Database Connection> Generic Database> Native(JDBC)在数据集成中创建新连接

我按照本教程https://code.google.com/p/starschema-bigquery-jdbc/wiki/JDBCURL配置了这个参数的连接.所以参数是:

>自定义连接URL:jdbc:BQDriver:projectid(secretproject)?withServiceAccount = true
>自定义驱动程序类名称:net.starschema.clouddb.jdbc.BQDrive
>用户名:pentaho-data-integration@secretproject.iam.gserviceaccount.com
>密码:/Users/luisfsns/Dropbox/Lendico/etl/marketing/lendico-pentaho-data-integration-googlebigquery.p12

我不知道的事情:

>我的自定义连接URL名称是否正确?我应该提供什么作为项目论证?项目名称或路径的URL?有人能举个例子吗?
>我应该使用任何其他身份验证方法(尽管“服务帐户”)或任何其他类型的私钥,如.json?
>我的自定义驱动程序类名是否正确?

有人能帮助我吗?

这是我尝试测试创建的连接时的日志:

Error connecting to database [Teste] :
org.pentaho.di.core.exception.KettleDatabaseException: Error occurred
while trying to connect to the database

Driver class ‘net.starschema.clouddb.jdbc.BQDrive’ could not be found,
make sure the ‘Generic database’ driver (jar file) is installed.
net.starschema.clouddb.jdbc.BQDrive

org.pentaho.di.core.exception.KettleDatabaseException: Error occurred
while trying to connect to the database

Driver class ‘net.starschema.clouddb.jdbc.BQDrive’ could not be found,
make sure the ‘Generic database’ driver (jar file) is installed.
net.starschema.clouddb.jdbc.BQDrive

at
org.pentaho.di.core.database.Database.normalConnect(Database.java:428)
at org.pentaho.di.core.database.Database.connect(Database.java:358)
at org.pentaho.di.core.database.Database.connect(Database.java:311)
at org.pentaho.di.core.database.Database.connect(Database.java:301)
at
org.pentaho.di.core.database.DatabaseFactory.getConnectionTestReport(DatabaseFactory.java:80)
at
org.pentaho.di.core.database.DatabaseMeta.testConnection(DatabaseMeta.java:2686)
at
org.pentaho.ui.database.event.DataHandler.testDatabaseConnection(DataHandler.java:546)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597) at
org.pentaho.ui.xul.impl.AbstractXulDomContainer.invoke(AbstractXulDomContainer.java:313)
at
org.pentaho.ui.xul.impl.AbstractXulComponent.invoke(AbstractXulComponent.java:157)
at
org.pentaho.ui.xul.impl.AbstractXulComponent.invoke(AbstractXulComponent.java:141)
at
org.pentaho.ui.xul.swt.tags.SwtButton.access$500(SwtButton.java:43)
at
org.pentaho.ui.xul.swt.tags.SwtButton$4.widgetSelected(SwtButton.java:138)
at org.eclipse.swt.widgets.TypedListener.handleEvent(Unknown Source)
at org.eclipse.swt.widgets.EventTable.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Display.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.notifyListeners(Unknown Source) at
org.eclipse.swt.widgets.Display.runDeferredEvents(Unknown Source) at
org.eclipse.swt.widgets.Display.readAndDispatch(Unknown Source) at
org.eclipse.jface.window.Window.runEventLoop(Window.java:820) at
org.eclipse.jface.window.Window.open(Window.java:796) at
org.pentaho.ui.xul.swt.tags.SwtDialog.show(SwtDialog.java:389) at
org.pentaho.ui.xul.swt.tags.SwtDialog.show(SwtDialog.java:318) at
org.pentaho.di.ui.core.database.dialog.XulDatabaseDialog.open(XulDatabaseDialog.java:116)
at
org.pentaho.di.ui.core.database.dialog.DatabaseDialog.open(DatabaseDialog.java:59)
at
org.pentaho.di.ui.spoon.delegates.SpoonDBDelegate.newConnection(SpoonDBDelegate.java:464)
at
org.pentaho.di.ui.spoon.delegates.SpoonDBDelegate.newConnection(SpoonDBDelegate.java:451)
at org.pentaho.di.ui.spoon.Spoon.newConnection(Spoon.java:8728) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597) at
org.pentaho.ui.xul.impl.AbstractXulDomContainer.invoke(AbstractXulDomContainer.java:313)
at
org.pentaho.ui.xul.impl.AbstractXulComponent.invoke(AbstractXulComponent.java:157)
at
org.pentaho.ui.xul.impl.AbstractXulComponent.invoke(AbstractXulComponent.java:141)
at
org.pentaho.ui.xul.jface.tags.JfaceMenuitem.access$100(JfaceMenuitem.java:43)
at
org.pentaho.ui.xul.jface.tags.JfaceMenuitem$1.run(JfaceMenuitem.java:106)
at org.eclipse.jface.action.Action.runWithEvent(Action.java:498) at
org.eclipse.jface.action.ActionContributionItem.handleWidgetSelection(ActionContributionItem.java:545)
at
org.eclipse.jface.action.ActionContributionItem.access$2(ActionContributionItem.java:490)
at
org.eclipse.jface.action.ActionContributionItem$5.handleEvent(ActionContributionItem.java:402)
at org.eclipse.swt.widgets.EventTable.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Display.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source) at
org.eclipse.swt.widgets.Widget.notifyListeners(Unknown Source) at
org.eclipse.swt.widgets.Display.runDeferredEvents(Unknown Source) at
org.eclipse.swt.widgets.Display.readAndDispatch(Unknown Source) at
org.pentaho.di.ui.spoon.Spoon.readAndDispatch(Spoon.java:1319) at
org.pentaho.di.ui.spoon.Spoon.waitForDispose(Spoon.java:7939) at
org.pentaho.di.ui.spoon.Spoon.start(Spoon.java:9190) at
org.pentaho.di.ui.spoon.Spoon.main(Spoon.java:654) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597) at
org.pentaho.commons.launcher.Launcher.main(Launcher.java:92) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597) at
apple.launcher.LaunchRunner.run(LaunchRunner.java:116) at
apple.launcher.LaunchRunner.callMain(LaunchRunner.java:51) at
apple.launcher.JavaApplicationLauncher.launch(JavaApplicationLauncher.java:52)
Caused by: org.pentaho.di.core.exception.KettleDatabaseException:
Driver class ‘net.starschema.clouddb.jdbc.BQDrive’ could not be found,
make sure the ‘Generic database’ driver (jar file) is installed.
net.starschema.clouddb.jdbc.BQDrive

at
org.pentaho.di.core.database.Database.connectUsingClass(Database.java:522)
at
org.pentaho.di.core.database.Database.connectUsingClass(Database.java:4697)
at
org.pentaho.di.core.database.Database.normalConnect(Database.java:414)
… 70 more Caused by: java.lang.ClassNotFoundException:
net.starschema.clouddb.jdbc.BQDrive at
java.net.URLClassLoader$1.run(URLClassLoader.java:202) at
java.security.AccessController.doPrivileged(Native Method) at
java.net.URLClassLoader.findClass(URLClassLoader.java:190) at
java.lang.ClassLoader.loadClass(ClassLoader.java:306) at
java.lang.ClassLoader.loadClass(ClassLoader.java:247) at
org.pentaho.di.core.database.Database.connectUsingClass(Database.java:497)
… 72 more

Custom URL :
jdbc:BQDriver:projectid(secretproject)?withServiceAccount=true Custom
Driver Class:net.starschema.clouddb.jdbc.BQDrive

解决方法:

答案可能不会让你高兴,但我们走了.
可以创建这种连接,但是提取有问题并且行的流量非常慢(Bigquery可以快速处理任何内容,但是这种JDBC使得获取数据的速度非常慢.

我在这里做的是一个Python 2.7脚本,用于将查询提取到表中并将表提取到Google Cloud Storage上的csv文件,然后下载该文件.

这真的很快,你不会有很多错误.

这里是你可以使用的python代码. (您需要安装google storage utils才能轻松地将文件从云端复制到您的机器上)

The SH code: (use in a shell script entry on your JOB)

#!/bin/bash
export PATH=${PATH}

# BOTO is the login manager for GsUtil
export BOTO_DISPLAYENV="/home/mromano/.boto"
export BOTO_CONFIG="/home/mromano/.boto"

rm /tmp/bigquery_extraction_*

#Run Big Query extraction script on python
python "$caminho/google_bigquery_extract_foo_bar.py"

#Give it some seconds to sync data to Google Cloud Storage
sleep 10

#Copy from Google Cloud Storage to local file
/usr/local/bin/gsutil -q cp gs://pentaho_exports/google_bigquery_extract_foo_bar.csv.gz /tmp/google_bigquery_extract_foo_bar.csv.gz

The python script: (create a table with query results, export table to CSV and delete the table)

import httplib2
import logging
logging.basicConfig()

from apiclient.discovery import build
from oauth2client.client import SignedJwtAssertionCredentials
from bigquery import get_client

# BigQuery project id as listed in the Google Developers Console.
project_id = 'ce______?_____8'

# Service account email address as listed in the Google Developers Console.
service_account = '5399951_____?_______73k@developer.gserviceaccount.com'

f = file('../../../../keys/bigquery_key.p12', 'rb')
key = f.read()
f.close()

credentials = SignedJwtAssertionCredentials(
    service_account,
    key,
    scope='https://www.googleapis.com/auth/bigquery')

http = httplib2.Http()
http = credentials.authorize(http)


client = get_client(project_id, credentials=credentials, service_account=service_account)

# Write to table
job = client.write_to_table("""SELECT * FROM 001234.TEST""",
                    'pentaho_export',
                  table='table_foo_bar',
                create_disposition='CREATE_IF_NEEDED',
                write_disposition='WRITE_TRUNCATE')
try:
    job_resource = client.wait_for_job(job, timeout=6000)
    #print job_resource
except BigQueryTimeoutException:
    print "Timeout"

# Exporting
job_export = client.export_data_to_uris( ['gs://pentaho_exports/foo_bar.csv.gz'],
                                   'pentaho_export',
                                   'table_foo_bar',
                   compression='GZIP',
                   field_delimiter='    ')
try:
    job_resource = client.wait_for_job(job_export, timeout=6000)
    #print job_resource
except BigQueryTimeoutException:
    print "Timeout"

# Delete an existing table.
deleted = client.delete_table('pentaho_export', 'table_foo_bar')

我希望它有所帮助. =)

标签:java,google-bigquery,jdbc,pentaho,pdi
来源: https://codeday.me/bug/20190724/1526020.html