Open Weathermap DAG 구현하기, API를 사용해서 DAG를 만들어보자
Open Weathermap API 소개
● 위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스
● 무료 계정으로 api key를 받아서 이를 호출시에 사용
○ https://openweathermap.org/price
Pricing - OpenWeatherMap
Get weather data for any location on the globe immediately with our superb API! Just subscribe with your email and start using minute forecasts, hourly forecasts, history and other weather data in your applications. For more functionality, please consider
openweathermap.org
만들려는 DAG: 서울 8일 낮/최소/최대 온도 읽기
● API Key를 open_weather_api_key라는 Variable로 저장
● 서울의 위도와 경도를 찾을 것
● One-Call API를 사용: https://openweathermap.org/api/one-call-api
○ 앞서 API KEY와 서울의 위도/경도를 사용해서 위의 API를 requests 모듈을 사용해서 호출
○ 응답 결과에서 온도 정보(평균/최소/최대)만 앞으로 7일을 대상으로 출력해볼 것
■ 날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)
https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={APIkey}&units=metric
DAG 구현 (1)
● Open Weathermap의 one call API를 사용해서 서울의 다음 8일간의 낮/최소/최대 온도를 읽어다가 각자 스키마 밑의 weather_forecast라는 테이블로 저장
○ https://openweathermap.org/api/one-call-api를 호출해서 테이블을 채움
○ weather_forecast라는 테이블이 대상이 됨
■ 여기서 유의할 점은 created_date은 레코드 생성시간으로 자동 채워지는 필드라는 점
CREATE TABLE keeyong.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
DAG 구현 (2)
● One-Call API는 결과를 JSON 형태로 리턴해줌
○ 이를 읽어들이려면 requests.get 결과의 text를 JSON으로 변환해 주어야함
○ 아니면 requests.get 결과 오브젝트가 제공해주는 .json()이란 함수 사용
f = requests.get(link)
f_js = f.json()
● 결과 JSON에서 daily라는 필드에 앞으로 8일간 날씨 정보가 들어가 있음.
○ daily 필드는 리스트이며 각 레코드가 하나의 날짜에 해당
○ 날짜 정보는 “dt”라는 필드에 들어 있음. 이는 epoch이라고 해서 1970년 1월 1일 이후 밀리세컨드로 시간을 표시. 이는 아래와 같은 코드로 읽을 수 있는 날짜로 변경 가능
datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09
Open Weather API 호출 응답 보기
● daily라는 리스트에 앞으로 8일간의 온도 정보가 들어옴
○ dt 필드가 날짜를 나타냄
○ temp 필드가 온도 정보를 나타냄
■ day
■ min
■ max
■ night
■ eve
■ morn
DAG 구현 (3)
● Airflow Connections를 통해 만들어진 Redshift connection
○ 기본 autocommit의 값은 False인 점을 유의
● 두 가지 방식의 Full Refresh 구현 방식
○ Full Refresh와 INSERT INTO를 사용
○ Full Refresh와 COPY를 사용
DAG 구현: Full Refresh
● API Key는 어디에 저장해야할까?
● Full Refresh
○ 매번 테이블을 지우고 다시 빌드
● DW상의 테이블은 아래처럼 정의
CREATE TABLE sungwoodat99.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
Primary Key Uniqueness 보장하기, 데이터 웨어하우스에서 Primary Key Uniqueness를 보장하기 위한 방법에 대해서 알아보자
Primary Key Uniqueness란?
● 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들)
○ 하나의 필드가 일반적이지만 다수의 필드를 사용할 수도 있음
○ 이를 CREATE TABLE 사용시 지정
● 관계형 데이터베이스 시스템이 Primary key의 값이 중복 존재하는 것을 막아줌
○ 예 1) Users 테이블에서 email 필드
○ 예 2) Products 테이블에서 product_id 필드
CREATE TABLE products (
product_id INT PRIMARY KEY,
name VARCHAR(50),
price decimal(7, 2)
);
CREATE TABLE orders (
order_id INT,
product_id INT,
PRIMARY KEY (order_id, product_id),
FOREIGN KEY (product_id) REFERENCES products (product_id)
);
빅데이터 기반 데이터 웨어하우스들은 Primary Key를 지켜주지 않음
● Primary key를 기준으로 유일성 보장을 해주지 않음
○ 이를 보장하는 것은 데이터 인력의 책임
● Primary key 유일성을 보장해주지 않는 이유는?
○ 보장하는데 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 됨
CREATE TABLE sungwoodat99.test (
date date primary key,
value bigint
);
INSERT INTO sungwoodat99.test VALUES ('2023-05-10', 100);
INSERT INTO sungwoodat99.test VALUES ('2023-05-10', 150); -- 이 작업이 성공함!
Primary Key 유지 방법 (1)
● 앞서 살펴본 sungwoodat99.weather_forecast 테이블을 대상으로 살펴보자
CREATE TABLE sungwoodat99.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
● 날씨 정보이기 때문에 최근 정보가 더 신뢰할 수 있음.
● 그래서 어느 정보가 더 최근 정보인지를 created_date 필드에 기록하고 이를 활용
● 즉 date이 같은 레코드들이 있다면 created_date을 기준으로 더 최근 정보를 선택
이를 하는데 적합한 SQL 문법이 ROW_NUMBER
Primary Key 유지 방법 (2)
1. date별로 created_date의 역순으로 일련번호를 매기고 싶다면?
2. 새로운 컬럼 추가! - date별로 레코드를 모으고 그 안에서 created_date의 역순으로 소팅한 후 1번부터 일련 번호 (seq) 부여
3. ROW_NUMBER를 쓰면 2를 구현 가능
ROW_NUMBER() OVER (partition by date
order by created_date DESC) seq
Primary Key 유지 방법 (3)
● 임시 테이블(스테이징 테이블)을 만들고 거기로 현재 모든 레코드를 복사
● 임시 테이블에 새로 데이터소스에서 읽어들인 레코드들을 복사
○ 이 때 중복 존재 가능
● 중복을 걸러주는 SQL 작성:
○ 최신 레코드를 우선 순위로 선택
○ ROW_NUMBER를 이용해서 primary key로 partition을 잡고 적당한 다른 필드(보통 타임스탬프 필드)로 ordering(역순 DESC)을 수행해 primary key별로 하나의 레코드를 잡아냄
● 위의 SQL을 바탕으로 최종 원본 테이블로 복사
○ 이때 원본 테이블에서 레코드들을 삭제
○ 임시 temp 테이블을 원본 테이블로 복사 (일련번호가 1번인 것들만 선택)
Primary Key 유지 방법 (4)
1. CREATE TEMP TABLE t AS SELECT * FROM sungwoodat99.weather_forecast;
a. 원래 테이블의 내용을 임시 테이블 t로 복사
2. DAG는 임시 테이블(스테이징 테이블)에 레코드를 추가
a. 이때 중복 데이터가 들어갈 수 있음
3. DELETE FROM sungwoodat99.weather_forecast;
Primary Key 유지 방법 (5)
4. 중복을 없앤 형태로 새로운 테이블 생성
INSERT INTO sungwoodat99.weather_forecast
SELECT date, temp, min_temp, max_temp, created_date
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;
위의 코드는 매번 새로 덮어쓰는 형식의 업데이트를 가정
weather_forecast로 Incremental Update 다시 설명
Weather_Forecast DAG를 Incremental Update로 구현
Upsert란?
● Primary Key를 기준으로 존재하는 레코드라면 새 정보로 수정
● 존재하지 않는 레코드라면 새 레코드로 적재
● 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원해줌
'Data Engineering > 실리콘밸리에서 날아온 데이터 엔지니어링 스타터 키트' 카테고리의 다른 글
[5주차] Airflow Deepdive 2 -Backfill과 Airflow (0) | 2023.09.15 |
---|---|
[4주차] Assignment (0) | 2023.09.10 |
[4주차] Airflow Deepdive - Airflow로 데이터 파이프라인 만들기 (2) (2) | 2023.09.05 |
[4주차] Airflow Deepdive - Airflow로 데이터 파이프라인 만들기 (1) (0) | 2023.09.05 |
[4주차] 데이터베이스 트랜잭션 (0) | 2023.09.05 |