How to create a Spark DataFrame

A dataframe is a collection of data, organised much like a table in a relational database with columns and rows. There are many methods available on a dataframe that can help with filtering, selecting, aggregating the data within.

There are many ways a DataFrame can be created. Below I show some of the common ones that I have used in pySpark.

Imports

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .getOrCreate()

Creating an empty DataFrame

emptySchema = StructType([])
# create empty dataframe with empty schema
emptyDF = spark.createDataFrame([],emptySchema)

schema = StructType([
    StructField("id", StringType()),
    StructField("dt", StringType()),
    StructField("value", DoubleType())
])
# create empty dataframe with a defined schema
emptyDFWithSchema = spark.createDataFrame([],schema)

Creating DataFrame with data

# create a dataframe with given data
dfFromData = spark.createDataFrame([['Alex',16,10],['Tom',16,20],['Bob',15,12]])

schema = StructType([
    StructField("name", StringType()),
    StructField("age", IntegerType()),
    StructField("points", IntegerType())
])

# create a dataframe with given data as well as schema
dfFromDataWithSchema = spark.createDataFrame([['Alex',16,10],['Tom',16,20],['Bob',15,12]],schema)

Create DataFrame from a RDD

rdd1 = sc.parallelize(["jan","feb","mar","april","may","jun"],3)

schema = StructType([StructField("month", StringType())])

# create dataframe using CreateDataFrame method
rddDF1 = spark.createDataFrame(rdd1.map(lambda x: (x,)))

# create dataframe using CreateDataFrame method and specified schema
rddDF1WithSchema = spark.createDataFrame(rdd1.map(lambda x: (x,)),schema)

# create dataframe  using toDF method
rddDF2 = rdd1.map(lambda x: (x,)).toDF()

# create dataframe using CreateDataFrame method and specified schema
rddDF2WithSchema = rdd1.map(lambda x: (x,)).toDF(schema)

Create DataFrame using a list

l = [('Tim','10','12'),('Tom','5','9'),('Harry','10','5')]
listDF = spark.createDataFrame(l, ['name','val1','val2'])

Create DataFrame from a Pandas DataFrame

l = [('Tim','10'),('Tom','5'),('Harry','15')]
pandasDF = pd.DataFrame(l, columns = ['Name', 'Age']) 
sparkDF =  spark.createDataFrame(pandasDF)

Create DataFrame using a CSV file

path = '/in-data/testfile.csv'
schema = StructType([
    StructField("id", StringType()),
    StructField("dt", StringType()),
    StructField("value", DoubleType())
])
csvDF = spark.read.csv(path,schema=schema,header=True)

Leave a Reply

Your email address will not be published. Required fields are marked *