public final class DataFrameWriter<T>
extends Object
Dataset
to external storage systems (e.g. file systems,
key-value stores, etc). Use Dataset.write
to access this.
Modifier and Type | Method and Description |
---|---|
DataFrameWriter<T> |
bucketBy(int numBuckets,
String colName,
scala.collection.Seq<String> colNames)
Buckets the output by the given columns.
|
DataFrameWriter<T> |
bucketBy(int numBuckets,
String colName,
String... colNames)
Buckets the output by the given columns.
|
void |
csv(String path)
Saves the content of the
DataFrame in CSV format at the specified path. |
DataFrameWriter<T> |
format(String source)
Specifies the underlying output data source.
|
void |
insertInto(String tableName)
Inserts the content of the
DataFrame to the specified table. |
void |
jdbc(String url,
String table,
java.util.Properties connectionProperties)
Saves the content of the
DataFrame to an external database table via JDBC. |
void |
json(String path)
Saves the content of the
DataFrame in JSON format (
JSON Lines text format or newline-delimited JSON) at the specified path. |
DataFrameWriter<T> |
mode(SaveMode saveMode)
Specifies the behavior when data or table already exists.
|
DataFrameWriter<T> |
mode(String saveMode)
Specifies the behavior when data or table already exists.
|
DataFrameWriter<T> |
option(String key,
boolean value)
Adds an output option for the underlying data source.
|
DataFrameWriter<T> |
option(String key,
double value)
Adds an output option for the underlying data source.
|
DataFrameWriter<T> |
option(String key,
long value)
Adds an output option for the underlying data source.
|
DataFrameWriter<T> |
option(String key,
String value)
Adds an output option for the underlying data source.
|
DataFrameWriter<T> |
options(scala.collection.Map<String,String> options)
(Scala-specific) Adds output options for the underlying data source.
|
DataFrameWriter<T> |
options(java.util.Map<String,String> options)
Adds output options for the underlying data source.
|
void |
orc(String path)
Saves the content of the
DataFrame in ORC format at the specified path. |
void |
parquet(String path)
Saves the content of the
DataFrame in Parquet format at the specified path. |
DataFrameWriter<T> |
partitionBy(scala.collection.Seq<String> colNames)
Partitions the output by the given columns on the file system.
|
DataFrameWriter<T> |
partitionBy(String... colNames)
Partitions the output by the given columns on the file system.
|
void |
save()
Saves the content of the
DataFrame as the specified table. |
void |
save(String path)
Saves the content of the
DataFrame at the specified path. |
void |
saveAsTable(String tableName)
Saves the content of the
DataFrame as the specified table. |
DataFrameWriter<T> |
sortBy(String colName,
scala.collection.Seq<String> colNames)
Sorts the output in each bucket by the given columns.
|
DataFrameWriter<T> |
sortBy(String colName,
String... colNames)
Sorts the output in each bucket by the given columns.
|
void |
text(String path)
Saves the content of the
DataFrame in a text file at the specified path. |
public DataFrameWriter<T> bucketBy(int numBuckets, String colName, String... colNames)
This is applicable for all file-based data sources (e.g. Parquet, JSON) starting with Spark 2.1.0.
numBuckets
- (undocumented)colName
- (undocumented)colNames
- (undocumented)public DataFrameWriter<T> bucketBy(int numBuckets, String colName, scala.collection.Seq<String> colNames)
This is applicable for all file-based data sources (e.g. Parquet, JSON) starting with Spark 2.1.0.
numBuckets
- (undocumented)colName
- (undocumented)colNames
- (undocumented)public void csv(String path)
DataFrame
in CSV format at the specified path.
This is equivalent to:
format("csv").save(path)
You can set the following CSV-specific option(s) for writing CSV files:
sep
(default ,
): sets a single character as a separator for each
field and value.quote
(default "
): sets a single character used for escaping quoted values where
the separator can be part of the value. If an empty string is set, it uses u0000
(null character).escape
(default \
): sets a single character used for escaping quotes inside
an already quoted value.charToEscapeQuoteEscaping
(default escape
or \0
): sets a single character used for
escaping the escape for the quote character. The default value is escape character when escape
and quote characters are different, \0
otherwise.escapeQuotes
(default true
): a flag indicating whether values containing
quotes should always be enclosed in quotes. Default is to escape all values containing
a quote character.quoteAll
(default false
): a flag indicating whether all values should always be
enclosed in quotes. Default is to only escape values containing a quote character.header
(default false
): writes the names of columns as the first line.nullValue
(default empty string): sets the string representation of a null value.emptyValue
(default ""
): sets the string representation of an empty value.encoding
(by default it is not set): specifies encoding (charset) of saved csv
files. If it is not set, the UTF-8 charset will be used.compression
(default null
): compression codec to use when saving to file. This can be
one of the known case-insensitive shorten names (none
, bzip2
, gzip
, lz4
,
snappy
and deflate
). dateFormat
(default yyyy-MM-dd
): sets the string that indicates a date format.
Custom date formats follow the formats at
Datetime Patterns.
This applies to date type.timestampFormat
(default yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
): sets the string that
indicates a timestamp format. Custom date formats follow the formats at
Datetime Patterns.
This applies to timestamp type.ignoreLeadingWhiteSpace
(default true
): a flag indicating whether or not leading
whitespaces from values being written should be skipped.ignoreTrailingWhiteSpace
(default true
): a flag indicating defines whether or not
trailing whitespaces from values being written should be skipped.lineSep
(default \n
): defines the line separator that should be used for writing.
Maximum length is 1 character.path
- (undocumented)public DataFrameWriter<T> format(String source)
source
- (undocumented)public void insertInto(String tableName)
DataFrame
to the specified table. It requires that
the schema of the DataFrame
is the same as the schema of the table.
tableName
- (undocumented)saveAsTable
, insertInto
ignores the column names and just uses position-based
resolution. For example:
, SaveMode.ErrorIfExists and SaveMode.Ignore behave as SaveMode.Append in insertInto
as
insertInto
is not a table creating operation.
scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
scala> sql("select * from t1").show
+---+---+
| i| j|
+---+---+
| 5| 6|
| 3| 4|
| 1| 2|
+---+---+
Because it inserts data to an existing table, format or options will be ignored.
public void jdbc(String url, String table, java.util.Properties connectionProperties)
DataFrame
to an external database table via JDBC. In the case the
table already exists in the external database, behavior of this function depends on the
save mode, specified by the mode
function (default to throwing an exception).
Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.
You can set the following JDBC-specific option(s) for storing JDBC:
truncate
(default false
): use TRUNCATE TABLE
instead of DROP TABLE
.
In case of failures, users should turn off truncate
option to use DROP TABLE
again. Also,
due to the different behavior of TRUNCATE TABLE
among DBMS, it's not always safe to use this.
MySQLDialect, DB2Dialect, MsSqlServerDialect, DerbyDialect, and OracleDialect supports this
while PostgresDialect and default JDBCDirect doesn't. For unknown and unsupported JDBCDirect,
the user option truncate
is ignored.
url
- JDBC database url of the form jdbc:subprotocol:subname
table
- Name of the table in the external database.connectionProperties
- JDBC database connection arguments, a list of arbitrary string
tag/value. Normally at least a "user" and "password" property
should be included. "batchsize" can be used to control the
number of rows per insert. "isolationLevel" can be one of
"NONE", "READ_COMMITTED", "READ_UNCOMMITTED", "REPEATABLE_READ",
or "SERIALIZABLE", corresponding to standard transaction
isolation levels defined by JDBC's Connection object, with default
of "READ_UNCOMMITTED".public void json(String path)
DataFrame
in JSON format (
JSON Lines text format or newline-delimited JSON) at the specified path.
This is equivalent to:
format("json").save(path)
You can set the following JSON-specific option(s) for writing JSON files:
compression
(default null
): compression codec to use when saving to file. This can be
one of the known case-insensitive shorten names (none
, bzip2
, gzip
, lz4
,
snappy
and deflate
). dateFormat
(default yyyy-MM-dd
): sets the string that indicates a date format.
Custom date formats follow the formats at
Datetime Patterns.
This applies to date type.timestampFormat
(default yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
): sets the string that
indicates a timestamp format. Custom date formats follow the formats at
Datetime Patterns.
This applies to timestamp type.encoding
(by default it is not set): specifies encoding (charset) of saved json
files. If it is not set, the UTF-8 charset will be used. lineSep
(default \n
): defines the line separator that should be used for writing.ignoreNullFields
(default true
): Whether to ignore null fields
when generating JSON objects. path
- (undocumented)public DataFrameWriter<T> mode(SaveMode saveMode)
SaveMode.Overwrite
: overwrite the existing data.SaveMode.Append
: append the data.SaveMode.Ignore
: ignore the operation (i.e. no-op).SaveMode.ErrorIfExists
: throw an exception at runtime.
The default option is ErrorIfExists
.
saveMode
- (undocumented)public DataFrameWriter<T> mode(String saveMode)
overwrite
: overwrite the existing data.append
: append the data.ignore
: ignore the operation (i.e. no-op).error
or errorifexists
: default option, throw an exception at runtime.saveMode
- (undocumented)public DataFrameWriter<T> option(String key, String value)
All options are maintained in a case-insensitive way in terms of key names. If a new option has the same key case-insensitively, it will override the existing option.
You can set the following option(s):
timeZone
(default session local timezone): sets the string that indicates a time zone ID
to be used to format timestamps in the JSON/CSV datasources or partition values. The following
formats of timeZone
are supported:
spark.sql.session.timeZone
is
used by default.
key
- (undocumented)value
- (undocumented)public DataFrameWriter<T> option(String key, boolean value)
All options are maintained in a case-insensitive way in terms of key names. If a new option has the same key case-insensitively, it will override the existing option.
key
- (undocumented)value
- (undocumented)public DataFrameWriter<T> option(String key, long value)
All options are maintained in a case-insensitive way in terms of key names. If a new option has the same key case-insensitively, it will override the existing option.
key
- (undocumented)value
- (undocumented)public DataFrameWriter<T> option(String key, double value)
All options are maintained in a case-insensitive way in terms of key names. If a new option has the same key case-insensitively, it will override the existing option.
key
- (undocumented)value
- (undocumented)public DataFrameWriter<T> options(scala.collection.Map<String,String> options)
All options are maintained in a case-insensitive way in terms of key names. If a new option has the same key case-insensitively, it will override the existing option.
You can set the following option(s):
timeZone
(default session local timezone): sets the string that indicates a time zone ID
to be used to format timestamps in the JSON/CSV datasources or partition values. The following
formats of timeZone
are supported:
spark.sql.session.timeZone
is
used by default.
options
- (undocumented)public DataFrameWriter<T> options(java.util.Map<String,String> options)
All options are maintained in a case-insensitive way in terms of key names. If a new option has the same key case-insensitively, it will override the existing option.
You can set the following option(s):
timeZone
(default session local timezone): sets the string that indicates a time zone ID
to be used to format timestamps in the JSON/CSV datasources or partition values. The following
formats of timeZone
are supported:
spark.sql.session.timeZone
is
used by default.
options
- (undocumented)public void orc(String path)
DataFrame
in ORC format at the specified path.
This is equivalent to:
format("orc").save(path)
You can set the following ORC-specific option(s) for writing ORC files:
compression
(default is the value specified in spark.sql.orc.compression.codec
):
compression codec to use when saving to file. This can be one of the known case-insensitive
shorten names(none
, snappy
, zlib
, and lzo
). This will override
orc.compress
and spark.sql.orc.compression.codec
. If orc.compress
is given,
it overrides spark.sql.orc.compression.codec
.path
- (undocumented)public void parquet(String path)
DataFrame
in Parquet format at the specified path.
This is equivalent to:
format("parquet").save(path)
You can set the following Parquet-specific option(s) for writing Parquet files:
compression
(default is the value specified in spark.sql.parquet.compression.codec
):
compression codec to use when saving to file. This can be one of the known case-insensitive
shorten names(none
, uncompressed
, snappy
, gzip
, lzo
, brotli
, lz4
, and zstd
).
This will override spark.sql.parquet.compression.codec
.path
- (undocumented)public DataFrameWriter<T> partitionBy(String... colNames)
Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.
This is applicable for all file-based data sources (e.g. Parquet, JSON) starting with Spark 2.1.0.
colNames
- (undocumented)public DataFrameWriter<T> partitionBy(scala.collection.Seq<String> colNames)
Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.
This is applicable for all file-based data sources (e.g. Parquet, JSON) starting with Spark 2.1.0.
colNames
- (undocumented)public void save(String path)
DataFrame
at the specified path.
path
- (undocumented)public void save()
DataFrame
as the specified table.
public void saveAsTable(String tableName)
DataFrame
as the specified table.
In the case the table already exists, behavior of this function depends on the
save mode, specified by the mode
function (default to throwing an exception).
When mode
is Overwrite
, the schema of the DataFrame
does not need to be
the same as that of the existing table.
When mode
is Append
, if there is an existing table, we will use the format and options of
the existing table. The column order in the schema of the DataFrame
doesn't need to be same
as that of the existing table. Unlike insertInto
, saveAsTable
will use the column names to
find the correct column positions. For example:
scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
scala> Seq((3, 4)).toDF("j", "i").write.mode("append").saveAsTable("t1")
scala> sql("select * from t1").show
+---+---+
| i| j|
+---+---+
| 1| 2|
| 4| 3|
+---+---+
In this method, save mode is used to determine the behavior if the data source table exists in Spark catalog. We will always overwrite the underlying data of data source (e.g. a table in JDBC data source) if the table doesn't exist in Spark catalog, and will always append to the underlying data of data source if the table already exists.
When the DataFrame is created from a non-partitioned HadoopFsRelation
with a single input
path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC
and Parquet), the table is persisted in a Hive compatible format, which means other systems
like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
specific format.
tableName
- (undocumented)public DataFrameWriter<T> sortBy(String colName, String... colNames)
This is applicable for all file-based data sources (e.g. Parquet, JSON) starting with Spark 2.1.0.
colName
- (undocumented)colNames
- (undocumented)public DataFrameWriter<T> sortBy(String colName, scala.collection.Seq<String> colNames)
This is applicable for all file-based data sources (e.g. Parquet, JSON) starting with Spark 2.1.0.
colName
- (undocumented)colNames
- (undocumented)public void text(String path)
DataFrame
in a text file at the specified path.
The DataFrame must have only one column that is of string type.
Each row becomes a new line in the output file. For example:
// Scala:
df.write.text("/path/to/output")
// Java:
df.write().text("/path/to/output")
The text files will be encoded as UTF-8.
You can set the following option(s) for writing text files:
compression
(default null
): compression codec to use when saving to file. This can be
one of the known case-insensitive shorten names (none
, bzip2
, gzip
, lz4
,
snappy
and deflate
). lineSep
(default \n
): defines the line separator that should be used for writing.path
- (undocumented)