pyspark.sql.streaming.DataStreamReader.csv

DataStreamReader.csv(path: str, schema: Union[pyspark.sql.types.StructType, str, None] = None, sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, None] = None, inferSchema: Union[bool, str, None] = None, ignoreLeadingWhiteSpace: Union[bool, str, None] = None, ignoreTrailingWhiteSpace: Union[bool, str, None] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[str, int, None] = None, maxCharsPerColumn: Union[str, int, None] = None, maxMalformedLogPerPartition: Union[str, int, None] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, None] = None, charToEscapeQuoteEscaping: Union[bool, str, None] = None, enforceSchema: Union[bool, str, None] = None, emptyValue: Optional[str] = None, locale: Optional[str] = None, lineSep: Optional[str] = None, pathGlobFilter: Union[bool, str, None] = None, recursiveFileLookup: Union[bool, str, None] = None, unescapedQuoteHandling: Optional[str] = None) → DataFrame[source]

Loads a CSV file stream and returns the result as a DataFrame.

This function will go through the input once to determine the input schema if inferSchema is enabled. To avoid going through the entire data once, disable inferSchema option or specify the schema explicitly using schema.

Parameters
pathstr or list

string, or list of strings, for input path(s).

schemapyspark.sql.types.StructType or str, optional

an optional pyspark.sql.types.StructType for the input schema or a DDL-formatted string (For example col0 INT, col1 DOUBLE).

.. versionadded:: 2.0.0
.. versionchanged:: 3.5.0

Supports Spark Connect.

Other Parameters
Extra options

For the extra options, refer to Data Source Option in the version you use.

Notes

This API is evolving.

Examples

Load a data stream from a temporary CSV file.

>>> import tempfile
>>> import time
>>> with tempfile.TemporaryDirectory() as d:
...     # Write a temporary text file to read it.
...     spark.createDataFrame([(1, "2"),]).write.mode("overwrite").format("csv").save(d)
...
...     # Start a streaming query to read the CSV file.
...     q = spark.readStream.schema(
...         "col0 INT, col1 STRING"
...     ).format("csv").load(d).writeStream.format("console").start()
...     time.sleep(3)
...     q.stop()