반응형
www.youtube.com/watch?v=zC9cnh8rJd0
Spark processing 추이
Spark 2.0 이전,
Spark Core > SparkContext > RDD > Transformations&Actions
Spark 2.0 이후,
Spark SQL > Spark Session > DataFrame > toTable > SQL
(DataFrame에서 바로 분석도 가능하지만, 최적화 성능을 이끌어내기 힘들었음)
Spark 2.2 이후, CBO(Cost-Based Pptimizer) 등장, DataFrame 사용시, 자동으로 최적화
Spark SQL > Spark Session > DataFrame > toTable > SQL (쿼리로 분석)
Spark SQL > Spark Session > DataFrame (파이썬으로 분석)
둘다 거의 같은 성능. RDD는 사용x (HIVE 사용시 HIVE SQL에 따라 자동으로 map&reduce를 실행하듯, Spark SQL automatically converted to transformation&action)
Spark SQL
- DataFrame은 스키마를 가지고 있다.(즉, spark가 우리가 필요한 데이터가 어떤 column에 어떤 형식으로 저장되어 있는지 알 수 있다.
- 따라서 실행시 요구되는 데이터만 메모리에 적재하여 사용한다.
- RDD 연산의 경우, 사용하려는 데이터가 속한 모든 데이터를 메모리에 가져와야한다.
- RDD 및 아주 많은 file format으로 부터 생성 가능하다.
- RDD에서 DataFrame을 만드는 방법
- 각 row를 Row Object로 변환 or StructType으로 변환 후, toDF
- csv, xml, json, HIVE, RDBMS table, NOSQL DB, etc.
- RDD에서 DataFrame을 만드는 방법
- SQLContext는 spark1.에서 SparkSession은 2.에서 DataFrame을 생성하기 위해 사용됨.
Practice
- 주석 설명 참고
In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
In [2]:
spark = SparkSession.builder.appName("Basic").getOrCreate()
In [3]:
spark
Out[3]:
SparkSession - in-memory
In [4]:
path = "/home/user/data/"
DataFrame 생성(from json)¶
In [5]:
df = spark.read.json(path + 'people.json')
In [6]:
df.show()
+---+----+ |age|name| +---+----+ | 29|Choi| | 25|Park| | 22| Kim| +---+----+
In [7]:
df.printSchema()
root |-- age: long (nullable = true) |-- name: string (nullable = true)
In [8]:
df.columns
Out[8]:
['age', 'name']
In [9]:
df.describe() # make another DF storing some inportant info (count, std, mean, etc)
Out[9]:
DataFrame[summary: string, age: string, name: string]
In [10]:
df.describe().show()
+-------+------------------+----+ |summary| age|name| +-------+------------------+----+ | count| 3| 3| | mean|25.333333333333332|null| | stddev| 3.511884584284246|null| | min| 22|Choi| | max| 29|Park| +-------+------------------+----+
Make schema structure¶
In [11]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType
In [12]:
data_schema = [StructField("age", IntegerType(), True), StructField("name", StringType(), True)]
In [14]:
final_struc = StructType(fields=data_schema)
In [15]:
df = spark.read.json(path + "people.json", schema=final_struc)
In [16]:
df.printSchema()
root |-- age: integer (nullable = true) |-- name: string (nullable = true)
Grabbing the data¶
In [17]:
df['age']
Out[17]:
Column<'age'>
In [18]:
type(df['age'])
Out[18]:
pyspark.sql.column.Column
In [19]:
df.select('age')
Out[19]:
DataFrame[age: int]
In [20]:
type(df.select('age'))
Out[20]:
pyspark.sql.dataframe.DataFrame
In [22]:
df.select('age').show()
+---+ |age| +---+ | 29| | 25| | 22| +---+
In [21]:
df.head(2)
Out[21]:
[Row(age=29, name='Choi'), Row(age=25, name='Park')]
In [24]:
df.select(['age','name']).show()
+---+----+ |age|name| +---+----+ | 29|Choi| | 25|Park| | 22| Kim| +---+----+
Creating new colmns¶
In [26]:
df.withColumn('newage', df['age']).show() # withColumn(newcol, algorithm) : make a new col
+---+----+------+ |age|name|newage| +---+----+------+ | 29|Choi| 29| | 25|Park| 25| | 22| Kim| 22| +---+----+------+
In [27]:
df.show() # RDD처럼 DF도 immutable하다. withColumn이 기존의 DF를 바꾸는것이 아님.
+---+----+ |age|name| +---+----+ | 29|Choi| | 25|Park| | 22| Kim| +---+----+
In [28]:
df.withColumnRenamed('age', 'newage').show() # withColumnRenamed(from, to) : rename col name
+------+----+ |newage|name| +------+----+ | 29|Choi| | 25|Park| | 22| Kim| +------+----+
- examples
In [29]:
df.withColumn('after_5_years', df['age'] + 5).show()
+---+----+-------------+ |age|name|after_5_years| +---+----+-------------+ | 29|Choi| 34| | 25|Park| 30| | 22| Kim| 27| +---+----+-------------+
In [30]:
df.withColumn('age in 2000s', df['age'] - 21).show()
+---+----+------------+ |age|name|age in 2000s| +---+----+------------+ | 29|Choi| 8| | 25|Park| 4| | 22| Kim| 1| +---+----+------------+
SQL¶
In [31]:
df.createOrReplaceTempView("people") # createOrReplaceTempView("table name") : register DF as Table
In [35]:
new_df = spark.sql("SELECT * FROM people")
In [36]:
new_df # spark.sql create DF by using SQL from Table
Out[36]:
DataFrame[age: int, name: string]
In [37]:
new_df.show()
+---+----+ |age|name| +---+----+ | 29|Choi| | 25|Park| | 22| Kim| +---+----+
In [38]:
spark.sql("SELECT * FROM people WHERE age>25").show()
+---+----+ |age|name| +---+----+ | 29|Choi| +---+----+
RDD to DataFrame¶
- USING ROW
In [39]:
sc = spark.sparkContext
In [51]:
lines = sc.textFile(path + "people.txt") # lines is RDD
In [52]:
lines.top(3)
Out[52]:
['Park,25', 'Kim,22', 'Choi,29']
In [53]:
lines.take(2)
Out[53]:
['Choi,29', 'Park,25']
In [54]:
parts = lines.map(lambda l: l.split(","))
In [55]:
parts.top(3)
Out[55]:
[['Park', '25'], ['Kim', '22'], ['Choi', '29']]
In [69]:
from pyspark.sql import Row
In [70]:
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
In [72]:
people.top(3)
Out[72]:
[Row(name='Park', age=25), Row(name='Kim', age=22), Row(name='Choi', age=29)]
- Row가 가진 schema 자동 적용
In [77]:
schemaPeople = spark.createDataFrame(people)
In [78]:
schemaPeople.printSchema()
root |-- name: string (nullable = true) |-- age: long (nullable = true)
In [79]:
schemaPeople.show()
+----+---+ |name|age| +----+---+ |Choi| 29| |Park| 25| | Kim| 22| +----+---+
In [80]:
schemaPeople.createOrReplaceTempView("people")
In [81]:
early20 = spark.sql("SELECT * FROM people WHERE age<=23")
In [82]:
early20.show()
+----+---+ |name|age| +----+---+ | Kim| 22| +----+---+
- USING STRUCTTPYE
In [110]:
lines = sc.textFile(path + "people.txt") # lines is RDD
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], int(p[1].strip())))
In [111]:
people.top(3)
Out[111]:
[('Park', 25), ('Kim', 22), ('Choi', 29)]
In [112]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType
In [113]:
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
In [116]:
schemaPeople = spark.createDataFrame(people, schema) # peopleRDD의 각 line에 StructType schema 적용
In [117]:
schemaPeople.show()
+----+---+ |name|age| +----+---+ |Choi| 29| |Park| 25| | Kim| 22| +----+---+
반응형
'IT study > Big Data' 카테고리의 다른 글
| 아파트 가격 추이 분석 서비스(4) (0) | 2021.06.13 |
|---|---|
| 아파트 가격 추이 분석 서비스(3) (0) | 2021.05.30 |
| 아파트 가격 추이 분석 서비스(2) (0) | 2021.05.22 |
| 아파트 가격 추이 분석 서비스(1) (0) | 2021.05.22 |
| [YouTube]Apache Spark Tutorial Full Course - RDD Creation (4) (0) | 2021.05.17 |