forked from spark-examples/pyspark-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyspark-join-two-dataframes.py
74 lines (61 loc) · 1.86 KB
/
pyspark-join-two-dataframes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# -*- coding: utf-8 -*-
"""
author SparkByExamples.com
"""
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder \
.appName('SparkByExamples.com') \
.getOrCreate()
#EMP DataFrame
empData = [(1,"Smith",10), (2,"Rose",20),
(3,"Williams",10), (4,"Jones",30)
]
empColumns = ["emp_id","name","emp_dept_id"]
empDF = spark.createDataFrame(empData,empColumns)
empDF.show()
#DEPT DataFrame
deptData = [("Finance",10), ("Marketing",20),
("Sales",30),("IT",40)
]
deptColumns = ["dept_name","dept_id"]
deptDF=spark.createDataFrame(deptData,deptColumns)
deptDF.show()
#Address DataFrame
addData=[(1,"1523 Main St","SFO","CA"),
(2,"3453 Orange St","SFO","NY"),
(3,"34 Warner St","Jersey","NJ"),
(4,"221 Cavalier St","Newark","DE"),
(5,"789 Walnut St","Sandiago","CA")
]
addColumns = ["emp_id","addline1","city","state"]
addDF = spark.createDataFrame(addData,addColumns)
addDF.show()
#Join two DataFrames
empDF.join(addDF,empDF["emp_id"] == addDF["emp_id"]).show()
#Drop duplicate column
empDF.join(addDF,["emp_id"]).show()
#Join Multiple DataFrames
empDF.join(addDF,["emp_id"]) \
.join(deptDF,empDF["emp_dept_id"] == deptDF["dept_id"]) \
.show()
#Using Where for Join Condition
empDF.join(deptDF).where(empDF["emp_dept_id"] == deptDF["dept_id"]) \
.join(addDF).where(empDF["emp_id"] == addDF["emp_id"]) \
.show()
#SQL
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
addDF.createOrReplaceTempView("ADD")
spark.sql("select * from EMP e, DEPT d, ADD a " + \
"where e.emp_dept_id == d.dept_id and e.emp_id == a.emp_id") \
.show()
#
df1 = spark.createDataFrame(
[(1, "A"), (2, "B"), (3, "C")],
["A1", "A2"])
df2 = spark.createDataFrame(
[(1, "F"), (2, "B")],
["B1", "B2"])
df = df1.join(df2, (df1.A1 == df2.B1) & (df1.A2 == df2.B2))
df.show()