데이터
(Dataquest) Spark를 활용한 대규모 데이터셋 분석
Yuniverse.
2024. 11. 24. 21:13
Dataquest: Spark 강의 의 내용을 공부한 후 정리한 글입니다.
Spark 등장 배경
- 데이터 수집 추세가 증가함에 따라 기존 기술로는 대량의 데이터를 분석할 수 없었기 때문에 작업을 수행할 수 있는 새로운 도구와 접근 방식을 구축해야 했다.
- 엔지니어들은 처음에 더 크고 강력한 컴퓨터를 사용하여 데이터를 처리하려고 시도했지만 여전히 많은 계산 문제로 인해 한계에 부딪혔다. 그 과정에서 그들은 결과를 계산하기 위해 수백 또는 수천 대의 컴퓨터에 계산을 효율적으로 분배하는 MapReduce와 같은 패러다임을 개발했다. 즉, Hadoop은 빠르게 빅 데이터를 위한 지배적인 처리 도구 키트가 된 오픈 소스 프로젝트이다.
더보기
MapReduce
분산 서버를 마샬링(메모리 상에 형상화된 객체 데이터를 적당한 다른 데이터 형태로 변환)하고, 다양한 작업을 병렬로 실행하고, 시스템의 여러 부분 간의 모든 통신 및 데이터 전송을 관리하고, 중복성 및 내결함성을 제공하여 처리를 조정한다.
- Hadoop
- 파일 시스템(하둡 분산 파일 시스템 또는 HDFS)과 MapReduce 패러다임의 자체 구현으로 구성. MapReduce는 계산을 Map 및 Reduce 단계로 변환하여 Hadoop이 여러 시스템에 쉽게 배포할 수 있게 했다.
- Hadoop을 사용하면 대규모 데이터 세트를 분석할 수 있었지만 계산을 위해 메모리가 아닌 디스크 스토리지에 크게 의존했고, 각 단계 사이에 디스크에 쓰고 디스크에서 읽어야 하기 때문에 동일한 데이터를 여러 번 전달하거나 많은 중간 단계를 거쳐야 하는 계산에는 적합하지 않았다. SQL 및 기계 학습 구현과 같이 많은 데이터 과학자가 필요로 하는 추가 라이브러리에 대한 지원되지 않는 문제 또한 존재했다.
- → RAM(컴퓨터 메모리)의 비용이 크게 떨어지기 시작하자 데이터를 메모리에 저장하여 Hadoop을 확장하거나 대체하는 것이 매력적인 대안으로 빠르게 부상했다.
복원력 있는 분산 데이터 세트 (RDDs)
- Spark의 핵심 데이터 구조
- 많은 머신으로 구성된 클러스터의 RAM 또는 메모리에 분산된 데이터 세트를 Spark로 표현한 것
- RDD 객체: 튜플, 딕셔너리, 목록 등의 목록을 보유하는 데 사용할 수 있는 요소의 모음
- pandas DataFrame과 유사하게 데이터 세트를 RDD에 로드한 다음 해당 객체에 액세스할 수 있는 모든 메서드를 실행할 수 있다.
- Lazy Evaluation: 코드가 호출되었을 때 파일에 대한 포인터를 생성은 하지만, 해당 변수가 필요할 때까지 실제로 데이터를 읽지는 않는 것. → 작업 대기열을 구축하고 Spark가 백그라운드에서 워크플로를 최적화하도록 할 수 있다.
- RDD 객체는 한 번 생성하면 값을 변경할 수 없다. Python에서 list 및 dictionary 객체는 변경 가능한 반면 tuple 객체는 변경할 수 없고, Python에서 튜플 객체를 수정하는 유일한 방법은 필요한 업데이트로 새 튜플 객체를 만드는 것인 것처럼, RDD 객체도 마찬가지이다. (튜플이 없는 새 RDD 개체를 만드는 것이 유일한 방법) Spark는 RDD의 불변성을 사용하여 계산 속도를 향상시킨다.
- RDD의 요소 개수를 셀 때는 Python의 len( ) 함수를 사용할 수 없고, count( )를 사용해야 한다.
Pipelines
- Spark의 모든 작업 또는 계산은 기본적으로 파이프라인을 형성하기 위해 함께 연결하고 연속적으로 실행할 수 있는 일련의 단계.
- 파이프라인의 각 단계는 Python 값(ex. integer), Python 데이터 구조(ex. dictionary) 또는 RDD 개체를 반환.
- map( ): RDD의 각 요소(즉, 각 줄)에 대해 주어진 함수를 적용하는 역할
- ex) raw_data.map(lambda line: line.split('\t'))
- reduceByKey( ): 동일한 키를 가진 값들을 지정된 연산을 통해 축소(reduce)하는 역할
- ex) daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
- 키가 '2023'인 값들이 3개가 있다면, (2023, 1), (2023, 1), (2023, 1) 형태일 텐데, reduceByKey는 이를 (2023, 3)으로 줄여준다.
- ex) daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
- flatMap( ): RDD의 모든 요소에 대한 출력이 필요하지 않다는 점에서 map()과 다르다. RDD에서 일련의 값을 생성하려고 할 때마다 유용하다.
- ex) split_hamlet.flatMap(lambda x: hamlet_speaks(x))
yield
- 인터프리터가 작업할 때 데이터를 생성하고 필요할 때 끌어올 수 있도록 하는 Python 기술
- Lambda 함수는 간단한 논리를 사용하여 PySpark 메서드에 전달할 수 있는 빠른 함수를 작성하는 데 적합하지만, 보다 맞춤화된 논리를 작성해야 할 때는 이상적이지 않다. PySpark에서 데이터 시퀀스를 반환하는 모든 함수는 yield 문을 사용하여 나중에 가져올 값을 지정해야 한다.
- 모든 함수에 yield가 필요한 것은 아니다. map() 또는 filter()의 경우 return을 사용하여 함수를 실행하는 RDD의 모든 단일 요소에 대한 값을 반환한다.
Transformations and Actions
- Spark에는 변환(transformation)과 작업(action)이라는 두 가지 종류의 메서드가 있다.
- 액션 메소드를 사용할 때마다 Spark는 지연 코드를 강제로 평가한다.
- 변환 메소드를 함께 연결하고 결과 RDD 객체를 인쇄하는 경우에만 RDD 유형(ex: PythonRDD, PipelinedRDD 객체)을 볼 수 있지만 그 안의 요소는 볼 수 없다. ← 계산이 실제로 아직 발생하지 않았기 때문.
- Spark는 많은 변환을 함께 연결하는 것을 단순화하지만 작업을 사용하여 이러한 변환 사이의 중간 RDD 개체를 관찰하는 것이 좋다. (변환이 예상대로 작동하는지 여부를 알 수 있다.)
Spark Dataframe
- Spark DataFrames는 Spark의 규모와 속도를 pandas의 쿼리, 필터 및 분석 기능과 결합한다. 한 대의 컴퓨터에서만 실행할 수 있는 pandas와 달리 Spark는 분산 메모리(및 필요한 경우 디스크)를 사용하여 더 큰 데이터 세트를 처리하고 계산을 더 빠르게 실행할 수 있다.
- Spark DataFrames 사용시 기존 pandas 코드를 수정하고 재사용하여 훨씬 더 큰 데이터 세트로 확장할 수 있고, 다양한 데이터 형식에 대한 더 나은 지원을 제공한다. SQL 인터페이스를 사용하여 대규모 데이터베이스 시스템 및 기타 데이터 저장소를 쿼리하는 분산 SQL 쿼리를 작성할 수도 있다.
- SQLContext 개체로 데이터를 읽을 때 Spark는 다음을 수행한다.
- Spark DataFrame 개체 인스턴스화
- 데이터에서 스키마를 유추하고 DataFrame과 연결
- 데이터를 읽어 여러 클러스터에 분산(여러 클러스터를 사용할 수 있는 경우)
- DataFrame 객체를 반환
# Import SQLContext
from pyspark.sql import SQLContext
# Pass in the SparkContext object `sc`
sqlCtx = SQLContext(sc)
# Read JSON data into a DataFrame object `df`
df = sqlCtx.read.json("census_2010.json")
# Print the DataFrame
df.show(5)
- pandas DataFrame과 마찬가지로 agg(), join(), sort(), where() 사용이 가능하다.
- pandas DataFrame과 달리 기존 객체를 수정할 수 없다. 대신 객체에 대한 변경 사항을 반영하는 새 DataFrame을 반환한다. (분산 데이터 구조로 더 쉽게 작업할 수 있도록 Spark에 불변성을 설계)
- pandas DataFrame은 Series 객체를 중심으로 구축되는 반면 Spark DataFrame은 RDD를 기반으로 구축된다.
Spark SQL
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
# Querying a table in Spark
sqlCtx.sql('select age from census2010').show()
# Calculating summary statistics for a DataFrame
query = 'select males,females from census2010'
sqlCtx.sql(query).describe().show()