Appearance
Creating a Dataframe
PySpark DataFrames are a fundamental component of Apache Spark, offering a distributed collection of data organized into named columns. They provide a high-level, tabular data abstraction, akin to a relational database table, with seamless support for big data processing, enabling scalable and efficient data manipulation and analysis. Leveraging the power of Spark's distributed computing, PySpark DataFrames are well-suited for handling large-scale datasets and complex data transformations in Python.
An Empty Dataframes
To create an empty DataFrame in PySpark, you can follow these steps:
python
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
# Create a SparkSession (if not already created)
spark = SparkSession.builder.appName("EmptyDataFrame").getOrCreate()
# Define the schema for the empty DataFrame
schema = StructType([
StructField("column_name_1", StringType(), True),
StructField("column_name_2", StringType(), True),
# Add more columns as needed
])
# Create an empty DataFrame with the defined schema
empty_df = spark.createDataFrame([], schema)
# Show the empty DataFrame (it will have the columns but no rows)
empty_df.show()
Replace "column_name_1"
, "column_name_2"
, etc. with the actual column names you want for your DataFrame. By default, the createDataFrame
function expects an iterable (such as a list) of Row objects, but passing an empty list will create an empty DataFrame with the specified schema.
From a Collection
To create a DataFrame from a collection in PySpark, you can use the createDataFrame function provided by the SparkSession object. The collection can be a list of tuples, a list of dictionaries, or an RDD (Resilient Distributed Dataset). Here's how you can do it using a list of dictionaries:
python
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Create a SparkSession (if not already created)
spark = SparkSession.builder.appName("DataFrameFromCollection").getOrCreate()
# Option 1: Create DataFrame from a list of dictionaries
data_dict = [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
{"name": "Charlie", "age": 35}
]
df_dict = spark.createDataFrame(data_dict)
df_dict.show()
# Option 2: Create DataFrame from a list of tuples
data_tuple = [
("Alice", 30),
("Bob", 25),
("Charlie", 35)
]
schema_tuple = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df_tuple = spark.createDataFrame(data_tuple, schema=schema_tuple)
df_tuple.show()
# Option 3: Create DataFrame from an RDD (Resilient Distributed Dataset)
rdd_data = spark.sparkContext.parallelize(data_tuple)
rdd_row = rdd_data.map(lambda x: Row(name=x[0], age=x[1]))
df_rdd = spark.createDataFrame(rdd_row)
df_rdd.show()
This example demonstrates three different options for creating a DataFrame using a list of dictionaries, tuples, and an RDD. The resulting DataFrames will have the same content, and you can choose the method that suits your data format or processing requirements best.
Custom Schema
To create a DataFrame with a custom schema that includes a map and array type in PySpark, you can use the createDataFrame
method along with the appropriate StructType
and ArrayType
or MapType
from the pyspark.sql.types
module. Here's an example of how to do it:
python
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, ArrayType
# Create a SparkSession (if not already created)
spark = SparkSession.builder.appName("DataFrameWithCustomSchema").getOrCreate()
# Sample data as a list of dictionaries with a map and an array
data = [
{"id": 1, "name": "Alice", "scores": [85, 90, 78], "attributes": {"height": 170, "weight": 60}},
{"id": 2, "name": "Bob", "scores": [75, 80, 82], "attributes": {"height": 180, "weight": 75}},
{"id": 3, "name": "Charlie", "scores": [92, 88, 95], "attributes": {"height": 165, "weight": 55}}
]
# Define the custom schema with map and array types
custom_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("scores", ArrayType(IntegerType(), True), True),
StructField("attributes", MapType(StringType(), IntegerType(), True), True)
])
# Create a DataFrame with the custom schema
df = spark.createDataFrame(data, schema=custom_schema)
# Show the DataFrame
df.show(truncate=False)
In this example, the DataFrame is created with a custom schema containing four fields: "id" of IntegerType, "name" of StringType, "scores" of ArrayType(IntegerType), and "attributes" of MapType(StringType, IntegerType). The ArrayType
represents an array of elements of a specific data type, and the MapType
represents a key-value map where both the key and value can have their specific data types.
The truncate=False
argument in the show()
method is used to display the full contents of the DataFrame, ensuring that the arrays and maps are not truncated in the output.
From Single files
PySpark can read data from various file types. Helper functions for the most common types have been provided.
CSV File
python
df = spark.read.csv("path/to/data.csv")
JSON Files
python
df = spark.read.json("path/to/data.json")
TXT Files
python
df = spark.read.txt("path/to/data.txt")
Parquet Files
python
df = spark.read.parquet("path/to/data.parquet")
! Pro Tip
Other reader function include .orc, .avro, .jdbc
From Multiple Files
Spark can also read multiple files from a folder. Pass the path to the folder to appropriate reader function
python
df = spark.read.csv("path/to/folder/")
df = spark.read.json("path/to/folder/")
df = spark.read.parquet("path/to/folder/")
df = spark.read.format("txt").load("/path/to/folder/")
...