Skip to content
On this page

Creating a Dataframe

PySpark DataFrames are a fundamental component of Apache Spark, offering a distributed collection of data organized into named columns. They provide a high-level, tabular data abstraction, akin to a relational database table, with seamless support for big data processing, enabling scalable and efficient data manipulation and analysis. Leveraging the power of Spark's distributed computing, PySpark DataFrames are well-suited for handling large-scale datasets and complex data transformations in Python.

An Empty Dataframes

To create an empty DataFrame in PySpark, you can follow these steps:

python
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Create a SparkSession (if not already created)
spark = SparkSession.builder.appName("EmptyDataFrame").getOrCreate()

# Define the schema for the empty DataFrame
schema = StructType([
StructField("column_name_1", StringType(), True),
StructField("column_name_2", StringType(), True),
# Add more columns as needed
])

# Create an empty DataFrame with the defined schema
empty_df = spark.createDataFrame([], schema)

# Show the empty DataFrame (it will have the columns but no rows)
empty_df.show()

Replace "column_name_1", "column_name_2", etc. with the actual column names you want for your DataFrame. By default, the createDataFrame function expects an iterable (such as a list) of Row objects, but passing an empty list will create an empty DataFrame with the specified schema.

From a Collection

To create a DataFrame from a collection in PySpark, you can use the createDataFrame function provided by the SparkSession object. The collection can be a list of tuples, a list of dictionaries, or an RDD (Resilient Distributed Dataset). Here's how you can do it using a list of dictionaries:

python
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create a SparkSession (if not already created)
spark = SparkSession.builder.appName("DataFrameFromCollection").getOrCreate()

# Option 1: Create DataFrame from a list of dictionaries
data_dict = [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
{"name": "Charlie", "age": 35}
]
df_dict = spark.createDataFrame(data_dict)
df_dict.show()

# Option 2: Create DataFrame from a list of tuples
data_tuple = [
("Alice", 30),
("Bob", 25),
("Charlie", 35)
]
schema_tuple = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df_tuple = spark.createDataFrame(data_tuple, schema=schema_tuple)
df_tuple.show()

# Option 3: Create DataFrame from an RDD (Resilient Distributed Dataset)
rdd_data = spark.sparkContext.parallelize(data_tuple)
rdd_row = rdd_data.map(lambda x: Row(name=x[0], age=x[1]))
df_rdd = spark.createDataFrame(rdd_row)
df_rdd.show()

This example demonstrates three different options for creating a DataFrame using a list of dictionaries, tuples, and an RDD. The resulting DataFrames will have the same content, and you can choose the method that suits your data format or processing requirements best.

Custom Schema

To create a DataFrame with a custom schema that includes a map and array type in PySpark, you can use the createDataFrame method along with the appropriate StructType and ArrayType or MapType from the pyspark.sql.types module. Here's an example of how to do it:

python
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, ArrayType

# Create a SparkSession (if not already created)
spark = SparkSession.builder.appName("DataFrameWithCustomSchema").getOrCreate()

# Sample data as a list of dictionaries with a map and an array
data = [
{"id": 1, "name": "Alice", "scores": [85, 90, 78], "attributes": {"height": 170, "weight": 60}},
{"id": 2, "name": "Bob", "scores": [75, 80, 82], "attributes": {"height": 180, "weight": 75}},
{"id": 3, "name": "Charlie", "scores": [92, 88, 95], "attributes": {"height": 165, "weight": 55}}
]

# Define the custom schema with map and array types
custom_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("scores", ArrayType(IntegerType(), True), True),
StructField("attributes", MapType(StringType(), IntegerType(), True), True)
])

# Create a DataFrame with the custom schema
df = spark.createDataFrame(data, schema=custom_schema)

# Show the DataFrame
df.show(truncate=False)

In this example, the DataFrame is created with a custom schema containing four fields: "id" of IntegerType, "name" of StringType, "scores" of ArrayType(IntegerType), and "attributes" of MapType(StringType, IntegerType). The ArrayType represents an array of elements of a specific data type, and the MapType represents a key-value map where both the key and value can have their specific data types.

The truncate=False argument in the show() method is used to display the full contents of the DataFrame, ensuring that the arrays and maps are not truncated in the output.

From Single files

PySpark can read data from various file types. Helper functions for the most common types have been provided.

CSV File

python
df = spark.read.csv("path/to/data.csv")

JSON Files

python
df = spark.read.json("path/to/data.json")

TXT Files

python
df = spark.read.txt("path/to/data.txt")

Parquet Files

python
df = spark.read.parquet("path/to/data.parquet")

! Pro Tip

Other reader function include .orc, .avro, .jdbc

From Multiple Files

Spark can also read multiple files from a folder. Pass the path to the folder to appropriate reader function

python
df = spark.read.csv("path/to/folder/")
df = spark.read.json("path/to/folder/")
df = spark.read.parquet("path/to/folder/")
df = spark.read.format("txt").load("/path/to/folder/")
...