기존 MapReduced의 문제점
기존 MapReduce 에서의 문제점
- 데이터 복제, 디스크 I/O, 직렬화로 인한 오버헤드 발생
- 디스크와 메모리 간의 지속적인 데이터 전송으로 인한 트래픽 발생
- 오버에드 및 대량의 트래픽으로 인한 프로그래밍이 어려워짐
- 성능 병목 및 패치 문제 발생 가능성 증가
- 디스크 지속성이 메모리 작업에 비해 느려 성능 악화
Data-Flow Systems
- Data-flow 시스템이란 기존의 데이터 시스템과 달리, 데이터 처리를 단계적인 흐름으로 관리하는 시스템
- 기존에는 MapDeduce는 2가지의 등급별로 작업을 진행함
- One for Map , the Second for Reduce Flow -> 직렬화가 문제가 되는 경우 발생 (병목 등의 현상)
- Dta-Flow Systems의 일반적인 방식
- 임의의 수의 Tasks/Ranks를 허용
- MapReduce와 달리, Data-Flow 시스템은 임의의 수의 작업이나 단계를 실행할 수 있음
- 병렬성을 높일 수 있으며, 더 유연한 데이터 처리를 가능케 함
- Map과 Reduce 이외에 다른 기능을 추가
- MapReduce는 주로 맵 함수와 리듀스 함수를 사용하여 데이터를 처리
- Data-Flow System은 맵과 리듀스 이외에도 다양한 작업을 추가할 수 있음
- 예를 들어, 데이터 정제, 필터링, 그룹화, 조인 등의 작업 등이 추가 가능함
- 임의의 수의 Tasks/Ranks를 허용
Spark
현재 가장 선두적인 Data-Flow System으로 높은 성능과 확장성, 다양한 기능과 API, 유연한 실행 엔진, 큰 생태계 등의 장점으로 많은 기업과 기관에서 사용중인 시스템입니다.
- Map-reduce model에 제한적이지 않은 표현형 컴퓨터 시스템
- 빠른 데이터 쉐어링
- 중간 단계의 디스크 저장 작업을 줄이기
- 반복적인 쿼리를 캐싱 ( 머신 러닝 등 )
- General execution graph(DAGs)
- map 과 reduce에서 벗어나 더 많은 기능을 제공함
- 하둡과 호환성이 좋음
- 빠른 데이터 쉐어링
- RDD(Resilient Distributed Dataset)을 사용
- 더 높은 레벨의 API들을 제공 : DataFrames & DataSets
- 최근 스파크에서 등장한 버전
- 다른 API들 ( SQL ) 에 대해서도 지원함
RDD ( Resilient Distributed DataSet )
- records들의 집합을 부분적으로 Key-Value 쌍으로 나눔
- 읽기 전용으로, 클러스터들 사이에서 Spread 됨
- 메모리에서 캐싱되어짐 [디스크로 되돌아가는 것도 가능]
- RDDs는 하둡에서 생성하거나 다른 RDDs를 변환하여 생성할 수 있음
- RDDs는 데이터 세트의 모든 요소에 동일한 작업을 적용하는 응용 프로그램에 가장 적합함
Spark RDD 기능
- Transformations build RDDs through deterministic operation on other RDDs
- 트랜스포메이션은 map, filter, join, union, intersection, disinct 등의 작업을 포함함
- Lazy evaluation : Noting computed until an action requires it
- Actions to return value or export data
- 액션은 count, collect, reduce, save 등의 작업을 포함함
- actions can be applied to Rdds; actions force calculations and return values
Spark RDD의 기능 정리
Transformations (변환 작업) | - 다른 RDD에 대한 결정론적인 작업을 통해 RDD를 구축합니다. - map, filter, join, union, intersection, distinct 등의 작업이 이에 해당합니다. - 기존의 RDD를 변환하여 새로운 RDD를 생성합니다. |
Lazy Evaluation (지연 평가) | - 계산이 필요한 액션이 요구되기 전까지 계산을 수행하지 않습니다. - RDD를 변환하는 작업은 지연되어 있으며, 실제 계산은 액션이 호출될 때 수행됩니다 |
Actions (액션) | - 값을 반환하거나 데이터를 내보내는 작업입니다. - count, collect, reduce, save 등의 작업이 이에 해당합니다. - 액션을 수행하면 RDD의 변환 작업이 실제로 수행되고 결과가 반환됩니다. |
Spark Task Scheduler : General DAGs { Directed acyclic Graph : 단방향 그래프 구조 }
Spark Task Scheduler는 일반적인 방향성 비순환 그래프(Directed Acyclic Graph, DAG)를 처리합니다.
이를 통해 다음과 같은 기능을 제공합니다:
General Task Graphs 지원 | - Spark Task Scheduler는 다양한 타입의 작업 그래프를 처리할 수 있습니다. - 이는 각 작업이 다른 작업에 의존하며, 이를 효율적으로 관리할 수 있도록 합니다 |
Pipeline Functions | - 파이프라인 함수를 지원하여 작업을 연속적으로 처리할 수 있습니다. - 이는 중간 결과를 캐싱하고 새로운 작업을 수행하는 데 효율적입니다. |
Cache-aware Data Reuse & Locality: |
- 데이터의 재사용과 로컬리티를 고려합니다. - 캐시된 데이터를 활용하여 계산을 최적화하고, 데이터가 로컬에서 사용될 수 있도록 합니다 |
Partitioning-aware to Avoid Shuffles |
- 셔플을 피하기 위해 파티셔닝을 고려합니다. - 데이터를 적절하게 파티셔닝하여 셔플 작업을 최소화하고 성능을 향상시킵니다. |
Higher-Level API : DataFrame & DataSet
공통점 : SparkSQL 엔진 위에서 구축되었으며, RDD로 변환이 가능함
DataFrame | - RDD와 달리, 데이터가 컬럼으로 나뉜다. ( e.g. table in a relational database ) - RDB의 테이블과 유사한 형태이며, Spark SQL 위에 구축되어 SQL쿼리와 함께 사용 가능함 - RDD로 변환 가능하며, 구조적으로 정의된 데이터 처리와 쿼리에 효과적임 |
DataSet | - DataFrame API의 확장 (type-safe, object-oriented programming interface 등 제공 ) - 컴파일 시점 오류 확인, 객체 지향적 프로그래밍 모델, RDD 변환을 지원함 - Java와 Scala에서만 사용이 가능함 |
관련 라이브러리 정보
- Both built on Spark SQL Engine , Both can be converted back to an RDD
- Spark SQL : SQL형식으로 제공하는 라이브러리
- Spark Streaming : 스트리밍 데이터 처리 ( 초당 수백만 개 레코드 처리 가능 )
- MLlib - scalable ML : 대규모 데이터셋에서 학습 모델을 학습하고 적용이 가능함
- GraphX - graph manipulation : 그래프 조작을 위한 라이브러리로, 그래프 알고리즘 구현 및 분석 가능
성능 : Sparak는 일반적으로 빠르다 , but with caveats
- 스파크는 데이터를 메모리 상에서 처리함
- Hadoop's MapReduce는 맵 또는 리듀스 액션 마다 메모리-디스크 이동이 잦음
- 일반적으로는 메모리 기반의 스파크가 일반적으로 맵 리듀스 성능이 좋음
- 메모리가 많이 필요한 단점이 있음
- 맵 리듀스의 장점으로는 다른 서비스를 쉽게 통합하고 인터페이스 화 할 수 있는 장점이 있음
- 스파크의 장점: High Level APIs로 쉬운 프로그래밍 가능, 데이터 프로세싱의 지원이 많고 생태계 구축도 활성화
![]() |
![]() |
스파크 사용 예제
스파크 환경 셋팅
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
분산 컴퓨팅 예제 #1.
"몬테 카를로 방법"을 사용하여 원주율을 계산
import random
NUM_SAMPLES = 1000000
def inside(p):
x,y = random.random(), random.random()
return x*x + y*y < 1
count = sc.parallelize(range(0m NUM_SAMPLES).filter(inside).count()
pi = 4 * count / NUM_SAMPLES
print("Pi s roughly", pi)
Exercise 데이터 셋
- 베트남 전쟁 데이터 셋 [ Vietnam War ]
#Load The datasets
Bombing_Operations = spark.read.json("Bombing_Operations.json.gz")
Aircraft_Glossary = spark.read.json("Aircaft_Glossary.json.gz")
Bombing_Operations.printSchema()
Aircraft_Glossary.printSchema()
Bombing_Operations.take(3)
# 포멧된 샘플을 볼 수 있음
Aircraft_Glossary.show()
print("in total there are {0} opperations".format(Bombing_Operations.count())
Question 1: Which countries are invlived and in how many missions
- Keywords : Dataframe API, SQL, group by, sort
- Let's group the missions by COntryFlyingMission and count how many records exist :
missions_counts = Bombing_Operations.gorupBy("CountryFlyingMission").agg(count("*")
.alias("MissionsCount")).sort(desc("MissionCount"))
Bombing_Operations.createReplaceTempView("Bombing_Operations")
query = """
SELECT ContryFlyingMission, count(*) as MissionsCount
FROM Bombing_Operations
GROUP BY ContryFlyingMission
ORDER BY MissionCount DESC
"""
missions_counts = spark.sql(query)
missions.counts.show()
#The DataFrame is small enough to Be moved to Pandas
missions_count_pd = missions_counts.toPandas()
missions_count_pd.head()
Question 2: Show the number of missions in time for each of the countries involved
- Keywords :gorup by, parse date, plot