首页 > TAG信息列表 > apache-spark-sql

Java-Spark SQL:嵌套类导致拼花错误

我似乎无法在镶木地板上写JavaRDD< T>.其中T代表Person类.我将其定义为 public class Person implements Serializable { private static final long serialVersionUID = 1L; private String name; private String age; private Address address; .... 地址: pub

将Spark DataFrame写入Hive表中的内存分配问题

我正在尝试使用pySpark中的.saveAsTable()将Spark DataFrame保存到Hive表(Parquet)中,但仍然会遇到以下内存问题: org.apache.hadoop.hive.ql.metadata.HiveException: parquet.hadoop.MemoryManager$1: New Memory allocation 1034931 bytes is smaller than the minimum allocati

python-PySpark:如何判断数据框的列类型

假设我们有一个称为df的数据框.我知道有使用df.dtypes的方法.但是我喜欢类似的东西 type(123)== int#注意int不是字符串 我想知道是否有类似的东西: type(df.select(< column_name>).collect()[0] [1])== IntegerType 基本上,我想知道从数据帧直接获取IntegerType,StringType之类的

java-Apache Spark无法处理大型Cassandra列系列

我正在尝试使用Apache Spark处理我的大型(〜230k条目)cassandra数据集,但是我经常遇到各种错误.但是,当在约200个数据集上运行时,我可以成功运行应用程序.我有一个包含3个节点的Spark设置,其中有1个主节点和2个worker,并且2个worker还安装了一个cassandra群集,其索引索引的复制因子

python-将字符串列转换为矢量列Spark DataFrames

我有一个Spark数据框,看起来如下: +-----------+-------------------+ | ID | features | +-----------+-------------------+ | 18156431|(5,[0,1,4],[1,1,1])| | 20260831|(5,[0,4,5],[2,1,1])| | 91859831|(5,[0,1],[1,3]) | | 206186631|(5,[3,4,

python-如何在不使用RDD API的情况下摆脱pyspark数据帧中的行包装器对象?

我针对临时视图发布以下SQL语句 cloudantdata.createOrReplaceTempView("washingflat") sqlDF = spark.sql("SELECT temperature FROM washingflat") sqlDF.rdd.map(lambda row : row.temperature).collect() 我只是对普通的(展开的)整数值感兴趣.到目前为止,我使用dataframe AP

java-如何在不知道数据模式的情况下从文本文件将数据加载到spark数据帧中?

我在hadoop中有一个文本文件,我需要使用spark java api在第二列中对其进行排序.我正在使用数据框,但不确定其列. 它可能具有动态列,这意味着我不知道确切的列数. 我该如何进行?请帮我. 提前致谢.解决方法:第一件事是我想在Scala中提供一个csv示例(不是Java) 您可以使用Spark csv api

python-PySpark groupby和最大值选择

我有一个PySpark数据框 name city date satya Mumbai 13/10/2016 satya Pune 02/11/2016 satya Mumbai 22/11/2016 satya Pune 29/11/2016 satya Delhi 30/11/2016 panda Delhi 29/11/2016 brata BBSR 28/11/2016 brata Goa 30/10/2016

python-将PySpark数据框列类型转换为字符串并替换方括号

我需要将PySpark df列类型从数组转换为字符串,还要删除方括号.这是数据框的架构.需要处理的列是CurrencyCode和TicketAmount >>> plan_queryDF.printSchema() root |-- event_type: string (nullable = true) |-- publishedDate: string (nullable = true) |-- plannedCustome

java-Spark SQL sum函数对双精度值的问题

我们正在尝试使用Spark SQL sum函数对双精度值求和. 样本数据: +------+ |amount| +------+ | 1000| | 1050| | 2049| +------+ 示例代码: df.select("amount").show(); df.registerTempTable("table"); sqlContext.sql("select amount/pow(10,2) from table").sho

java-通过apache spark将行作为列表进行分组

我有一个特殊的用例,其中我为同一位客户有多行,每行对象看起来像: root -c1: BigInt -c2: String -c3: Double -c4: Double -c5: Map[String, Int] 现在,我按列c1进行分组,并为同一客户收集所有行作为列表,例如: c1, [Row1, Row3, Row4] c2, [Row2, Row5] 我试图这样做 data

python-PySpark中的高效列处理

我有一个数据列,其中的列数非常多(> 30000). 我根据这样的第一列用1和0填充它: for column in list_of_column_names: df = df.withColumn(column, when(array_contains(df['list_column'], column), 1).otherwise(0)) 但是,此过程需要很多时间.有办法更有效地做到这一点吗?告诉

mysql-Pyspark DataFrameWriter jdbc函数的ignore选项会忽略整个事务还是只是冒犯行?

Pyspark DataFrameWriter类具有用于将数据帧写入sql的jdbc function.该函数具有–ignore选项,文档中将说明: Silently ignore this operation if data already exists. 但是它将忽略整个事务,还是仅忽略插入重复的行?如果我将–ignore与–append标志结合起来怎么办?行为会改变吗?解

python-用同一列的平均值填充Pyspark数据框列的空值

有了这样的数据框 rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,None,"201602"),(1,20,3003,"201601"), (1,20,None,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2

如何将字典列表转换为Pyspark DataFrame

我想将我的词典列表转换为DataFrame.这是清单: mylist = [ {"type_activity_id":1,"type_activity_name":"xxx"}, {"type_activity_id":2,"type_activity_name":"yyy"}, {"type_activity_id":3,"type_act

python – PySpark,通过JSON文件导入模式

tbschema.json看起来像这样: [{"TICKET":"integer","TRANFERRED":"string","ACCOUNT":"STRING"}] 我使用以下代码加载它 >>> df2 = sqlContext.jsonFile("tbschema.json") >>> f2.schema StructT

python – PySpark:使用过滤函数后取一列的平均值

我使用以下代码来获得薪水大于某个阈值的人的平均年龄. dataframe.filter(df['salary'] > 100000).agg({"avg": "age"}) 列的年龄是数字(浮点数),但我仍然收到此错误. py4j.protocol.Py4JJavaError: An error occurred while calling o86.agg. : scala.MatchError: age (of cla

python – 向Spark DataFrame添加一个空列

如在Web上的many other locations中所述,向现有DataFrame添加新列并不简单.不幸的是,拥有此功能非常重要(即使它在分布式环境中效率低下),尤其是在尝试使用unionAll连接两个DataFrame时. 将空列添加到DataFrame以便于unionAll的最优雅的解决方法是什么? 我的版本是这样的: from pysp

python – PySpark:StructField(…,…,False)总是返回`nullable = true`而不是`nullable = false`

我是PySpark的新手,面临一个奇怪的问题.我正在尝试在加载CSV数据集时将某些列设置为不可为空.我可以用一个非常小的数据集(test.csv)重现我的情况: col1,col2,col3 11,12,13 21,22,23 31,32,33 41,42,43 51,,53 在第5行第2列有一个空值,我不想在我的DF中获得该行.我将所有字段设置

python – PySpark:TypeError:condition应该是string或Column

我试图过滤基于如下的RDD: spark_df = sc.createDataFrame(pandas_df) spark_df.filter(lambda r: str(r['target']).startswith('good')) spark_df.take(5) 但是得到了以下错误: TypeErrorTraceback (most recent call last) <ipython-input-8-86cfb363dd8b> in &l

在从其他列派生的数据框中添加新列(Spark)

我正在使用Spark 1.3.0和Python.我有一个数据框,我希望添加一个从其他列派生的附加列.像这样, >>old_df.columns [col_1, col_2, ..., col_m] >>new_df.columns [col_1, col_2, ..., col_m, col_n] 哪里 col_n = col_3 - col_4 我如何在PySpark中执行此操作?解决方法:实现这一

python – Pyspark从日期到字符串更改列的类型

我有以下数据帧: corr_temp_df [('vacationdate', 'date'), ('valueE', 'string'), ('valueD', 'string'), ('valueC', 'string'), ('valueB', 'string'), ('value

mysql – 如何在jdbc数据源中使用子查询来获取dbtable选项?

我想使用Spark来处理来自JDBC源的一些数据.但首先,我想在JDBC端运行一些查询来过滤列和连接表,而不是从JDBC读取原始表,而是将查询结果作为表加载到Spark SQL中. 加载原始JDBC表的以下语法适用于我: df_table1 = sqlContext.read.format('jdbc').options( url="jdbc:mysql://fo

如何使用Java在Spark SQL中加入多个列以在DataFrame中进行过滤

> DataFrame a =包含列x,y,z,k> DataFrame b =包含列x,y,a a.join(b,<condition to use in java to use x,y >) ??? 我试过用 a.join(b,a.col("x").equalTo(b.col("x")) && a.col("y").equalTo(b.col("y"),"inner

python – 将数据从Dataframe传递到现有ML VectorIndexerModel时出错

我有一个Dataframe,我想用它来预测现有的模型.使用模型的transform方法时出错. 这就是我处理trainingdata的方法. forecast.printSchema() 我的Dataframe的架构: root |-- PM10: double (nullable = false) |-- rain_3h: double (nullable = false) |-- is_rain: double (null