pyspark.sql.streaming.DataStreamReader.csv¶
-
DataStreamReader.
csv
(path: str, schema: Union[pyspark.sql.types.StructType, str, None] = None, sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, None] = None, inferSchema: Union[bool, str, None] = None, ignoreLeadingWhiteSpace: Union[bool, str, None] = None, ignoreTrailingWhiteSpace: Union[bool, str, None] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[str, int, None] = None, maxCharsPerColumn: Union[str, int, None] = None, maxMalformedLogPerPartition: Union[str, int, None] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, None] = None, charToEscapeQuoteEscaping: Union[bool, str, None] = None, enforceSchema: Union[bool, str, None] = None, emptyValue: Optional[str] = None, locale: Optional[str] = None, lineSep: Optional[str] = None, pathGlobFilter: Union[bool, str, None] = None, recursiveFileLookup: Union[bool, str, None] = None, unescapedQuoteHandling: Optional[str] = None) → DataFrame[source]¶ Loads a CSV file stream and returns the result as a
DataFrame
.This function will go through the input once to determine the input schema if
inferSchema
is enabled. To avoid going through the entire data once, disableinferSchema
option or specify the schema explicitly usingschema
.- Parameters
- pathstr or list
string, or list of strings, for input path(s).
- schema
pyspark.sql.types.StructType
or str, optional an optional
pyspark.sql.types.StructType
for the input schema or a DDL-formatted string (For examplecol0 INT, col1 DOUBLE
).- .. versionadded:: 2.0.0
- .. versionchanged:: 3.5.0
Supports Spark Connect.
- Other Parameters
- Extra options
For the extra options, refer to Data Source Option in the version you use.
Notes
This API is evolving.
Examples
Load a data stream from a temporary CSV file.
>>> import tempfile >>> import time >>> with tempfile.TemporaryDirectory() as d: ... # Write a temporary text file to read it. ... spark.createDataFrame([(1, "2"),]).write.mode("overwrite").format("csv").save(d) ... ... # Start a streaming query to read the CSV file. ... q = spark.readStream.schema( ... "col0 INT, col1 STRING" ... ).format("csv").load(d).writeStream.format("console").start() ... time.sleep(3) ... q.stop()