Open Weathermap DAG 구현하기, API를 사용해서 DAG를 만들어보자
Open Weathermap API 소개
● 위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스
● 무료 계정으로 api key를 받아서 이를 호출시에 사용
○ https://openweathermap.org/price
만들려는 DAG: 서울 8일 낮/최소/최대 온도 읽기
● API Key를 open_weather_api_key라는 Variable로 저장
● 서울의 위도와 경도를 찾을 것
● One-Call API를 사용: https://openweathermap.org/api/one-call-api
○ 앞서 API KEY와 서울의 위도/경도를 사용해서 위의 API를 requests 모듈을 사용해서 호출
○ 응답 결과에서 온도 정보(평균/최소/최대)만 앞으로 7일을 대상으로 출력해볼 것
■ 날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)
https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={APIkey}&units=metric
DAG 구현 (1)
● Open Weathermap의 one call API를 사용해서 서울의 다음 8일간의 낮/최소/최대 온도를 읽어다가 각자 스키마 밑의 weather_forecast라는 테이블로 저장
○ https://openweathermap.org/api/one-call-api를 호출해서 테이블을 채움
○ weather_forecast라는 테이블이 대상이 됨
■ 여기서 유의할 점은 created_date은 레코드 생성시간으로 자동 채워지는 필드라는 점
CREATE TABLE keeyong.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
DAG 구현 (2)
● One-Call API는 결과를 JSON 형태로 리턴해줌
○ 이를 읽어들이려면 requests.get 결과의 text를 JSON으로 변환해 주어야함
○ 아니면 requests.get 결과 오브젝트가 제공해주는 .json()이란 함수 사용
f = requests.get(link)
f_js = f.json()
● 결과 JSON에서 daily라는 필드에 앞으로 8일간 날씨 정보가 들어가 있음.
○ daily 필드는 리스트이며 각 레코드가 하나의 날짜에 해당
○ 날짜 정보는 “dt”라는 필드에 들어 있음. 이는 epoch이라고 해서 1970년 1월 1일 이후 밀리세컨드로 시간을 표시. 이는 아래와 같은 코드로 읽을 수 있는 날짜로 변경 가능
datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09
Open Weather API 호출 응답 보기
● daily라는 리스트에 앞으로 8일간의 온도 정보가 들어옴
○ dt 필드가 날짜를 나타냄
○ temp 필드가 온도 정보를 나타냄
■ day
■ min
■ max
■ night
■ eve
■ morn
DAG 구현 (3)
● Airflow Connections를 통해 만들어진 Redshift connection
○ 기본 autocommit의 값은 False인 점을 유의
● 두 가지 방식의 Full Refresh 구현 방식
○ Full Refresh와 INSERT INTO를 사용
○ Full Refresh와 COPY를 사용
DAG 구현: Full Refresh
● API Key는 어디에 저장해야할까?
● Full Refresh
○ 매번 테이블을 지우고 다시 빌드
● DW상의 테이블은 아래처럼 정의
CREATE TABLE sungwoodat99.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
Primary Key Uniqueness 보장하기, 데이터 웨어하우스에서 Primary Key Uniqueness를 보장하기 위한 방법에 대해서 알아보자
Primary Key Uniqueness란?
● 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들)
○ 하나의 필드가 일반적이지만 다수의 필드를 사용할 수도 있음
○ 이를 CREATE TABLE 사용시 지정
● 관계형 데이터베이스 시스템이 Primary key의 값이 중복 존재하는 것을 막아줌
○ 예 1) Users 테이블에서 email 필드
○ 예 2) Products 테이블에서 product_id 필드
CREATE TABLE products (
product_id INT PRIMARY KEY,
name VARCHAR(50),
price decimal(7, 2)
);
CREATE TABLE orders (
order_id INT,
product_id INT,
PRIMARY KEY (order_id, product_id),
FOREIGN KEY (product_id) REFERENCES products (product_id)
);
빅데이터 기반 데이터 웨어하우스들은 Primary Key를 지켜주지 않음
● Primary key를 기준으로 유일성 보장을 해주지 않음
○ 이를 보장하는 것은 데이터 인력의 책임
● Primary key 유일성을 보장해주지 않는 이유는?
○ 보장하는데 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 됨
CREATE TABLE sungwoodat99.test (
date date primary key,
value bigint
);
INSERT INTO sungwoodat99.test VALUES ('2023-05-10', 100);
INSERT INTO sungwoodat99.test VALUES ('2023-05-10', 150); -- 이 작업이 성공함!
Primary Key 유지 방법 (1)
● 앞서 살펴본 sungwoodat99.weather_forecast 테이블을 대상으로 살펴보자
CREATE TABLE sungwoodat99.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
● 날씨 정보이기 때문에 최근 정보가 더 신뢰할 수 있음.
● 그래서 어느 정보가 더 최근 정보인지를 created_date 필드에 기록하고 이를 활용
● 즉 date이 같은 레코드들이 있다면 created_date을 기준으로 더 최근 정보를 선택
이를 하는데 적합한 SQL 문법이 ROW_NUMBER
Primary Key 유지 방법 (2)
1. date별로 created_date의 역순으로 일련번호를 매기고 싶다면?
2. 새로운 컬럼 추가! - date별로 레코드를 모으고 그 안에서 created_date의 역순으로 소팅한 후 1번부터 일련 번호 (seq) 부여
3. ROW_NUMBER를 쓰면 2를 구현 가능
ROW_NUMBER() OVER (partition by date
order by created_date DESC) seq
Primary Key 유지 방법 (3)
● 임시 테이블(스테이징 테이블)을 만들고 거기로 현재 모든 레코드를 복사
● 임시 테이블에 새로 데이터소스에서 읽어들인 레코드들을 복사
○ 이 때 중복 존재 가능
● 중복을 걸러주는 SQL 작성:
○ 최신 레코드를 우선 순위로 선택
○ ROW_NUMBER를 이용해서 primary key로 partition을 잡고 적당한 다른 필드(보통 타임스탬프 필드)로 ordering(역순 DESC)을 수행해 primary key별로 하나의 레코드를 잡아냄
● 위의 SQL을 바탕으로 최종 원본 테이블로 복사
○ 이때 원본 테이블에서 레코드들을 삭제
○ 임시 temp 테이블을 원본 테이블로 복사 (일련번호가 1번인 것들만 선택)
Primary Key 유지 방법 (4)
1. CREATE TEMP TABLE t AS SELECT * FROM sungwoodat99.weather_forecast;
a. 원래 테이블의 내용을 임시 테이블 t로 복사
2. DAG는 임시 테이블(스테이징 테이블)에 레코드를 추가
a. 이때 중복 데이터가 들어갈 수 있음
3. DELETE FROM sungwoodat99.weather_forecast;
Primary Key 유지 방법 (5)
4. 중복을 없앤 형태로 새로운 테이블 생성
INSERT INTO sungwoodat99.weather_forecast
SELECT date, temp, min_temp, max_temp, created_date
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;
위의 코드는 매번 새로 덮어쓰는 형식의 업데이트를 가정
weather_forecast로 Incremental Update 다시 설명
Weather_Forecast DAG를 Incremental Update로 구현
Upsert란?
● Primary Key를 기준으로 존재하는 레코드라면 새 정보로 수정
● 존재하지 않는 레코드라면 새 레코드로 적재
● 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원해줌
'Data Engineering > 실리콘밸리에서 날아온 데이터 엔지니어링 스타터 키트' 카테고리의 다른 글
[5주차] Airflow Deepdive 2 -Backfill과 Airflow (0) | 2023.09.15 |
---|---|
[4주차] Assignment (0) | 2023.09.10 |
[4주차] Airflow Deepdive - Airflow로 데이터 파이프라인 만들기 (2) (2) | 2023.09.05 |
[4주차] Airflow Deepdive - Airflow로 데이터 파이프라인 만들기 (1) (0) | 2023.09.05 |
[4주차] 데이터베이스 트랜잭션 (0) | 2023.09.05 |