이번에는 Hive에서의 데이터 아키텍처 최적화 방법 중 '적은 데이터를 많게 만들어라'는 사상으로 HQL을 튜닝한 케이스를 살펴보도록 하겠습니다.

 

 

 

  여기서 표현하고 있는 '적은 데이터'는 단순히 데이터의 건수가 적다는 것만을 뜻하는 건 아닙니다.

  Equal 조인이 불가능하도록 데이터가 축약되어 있어 데이터의 분포도가 나쁜 경우를 의미합니다. 데이터가 축약되어 있기에 테이블의 건수가 상대적으로 적게 저장되어 있을 것입니다. 이런 경우에는 축약된 데이터를 상세화하여 하나의 로우로 표현된 데이터를 다수의 로우로 확장시켜줘야 합니다.

  즉, 적은 데이터를 많게 만들어(상세화하여) 데이터의 분포도를 좋게 만들고 Equal 조인이 가능하도록 테이블 아키텍처를 변경해주는 작업이 필요합니다.

 

 

  이번 튜닝 케이스에서 테이블 아키텍처를 변경하고자 하는 대상은 '국가별IP주소대역' 테이블입니다.

  우리가 외부 네트워크에 접속하기 위해서는 우선 IP(이 포스트에서는 IPv4 기준으로 설명)를 할당받아야 합니다. 그런데 이 IP라는 것이 표현상의 한계(000.000.000.000 ~ 255.255.255.255)가 있기 때문에 IP 주소의 수도 한계가 있습니다. 따라서 국가별로 사용할 수 있는 IP대역을 정의하여 IP의 무분별한 사용을 제어하고 있습니다.  

 

[출처] https://krnic.or.kr/jsp/infoboard/stats/landCurrent.jsp

 

 

  국가별로 사용할 수 있는 IP 대역이 지정되어 있으므로 우리는 IP주소만으로 접속자가 현재 어느 나라에서 접속을 하고 있는지 확인할 수 있습니다.

  국가별 IP주소대역은 한국인터넷정보센터(KRNIC)에서 확인 가능하며, CSV 파일로 다운로드도 가능합니다.

* URL : https://krnic.or.kr/jsp/infoboard/stats/landCurrent.jsp

 

[출처] https://krnic.or.kr/jsp/infoboard/stats/landCurrent.jsp

 

 

  한국인터넷정보센터에서 다운로드한 국가별IP주소대역 CSV 파일의 구조는 아래와 같습니다.

 

 

  국가별로 IP 주소 대역이 할당되므로 '시작IP'와 '끝IP'로 IP 대역의 범위를 표현해주고 있습니다.

  '시작IP'와 '끝IP' 사이에 무수한 IP가 존재할 수 있는데 범위 값으로 데이터를 축약한 상태입니다.

  '이 축약된 데이터를 어떻게 풀어나갈 것인가?'가 이 튜닝 케이스의 핵심이 되겠습니다.

 

 

  우선 이번 튜닝의 결과는 아래와 같습니다. 

 

 

 

  위 결과는 제 PC에서 테스트용 데이터를 생성하여 측정한 결과입니다. 

  실제 운영 환경에서는 제가 디멘전 테이블 아키텍처 변경과 데이터 입력 작업만 수행하고, 실제 운영상의 쿼리 변경 작업은 운영 담당자가 진행하여 상세한 튜닝 결과는 확인하지 못했습니다. 운영 클러스터에서는 튜닝 전 약 2시간 정도 수행되는 쿼리가 튜닝 후에는 수행 시간이 약 7분 정도로 단축되었다고 합니다.

  아마도 조인하고자 하는 Fact 테이블의 데이터 건수가 많을수록 성능 차이는 더욱더 크게 벌어질 것입니다.

  참고로, 쿼리에 명시된 TEST_ODS.ACC_LOG 테이블(보안을 위해 스키마, 테이블, 컬럼 이름을 임의로 변경하여 사용)의 실 운영 데이터는 사양이 다른 다수의 인프라에서 수집된 로그입니다. 해당 DW 시스템에서 1일 수집되는 데이터는 약 500GB 이상이고요. 수집 대상 로그 데이터에 국가 정보 추가 요청을 원천 시스템에 요청할 수 있는 환경이 아니였습니다.
  ( 문의에 답변하고자 위 단락 내용 추가하였습니다. 2021/07/08 )

 

  그럼 데이터 건수가 적은 '국가별IP주소대역(CNTRY_IP_RANGE)' 테이블의 아키텍처를 어떻게 변경하고 어떻게 데이터를 입력했는지 퀴즈를 풀듯이 하나하나씩 보여드리도록 하겠습니다.

 

 

 

 

[ 튜닝 전 ]

 

> 튜닝 전 쿼리

  먼저 튜닝 전 쿼리를 살펴보도록 하겠습니다.

INSERT INTO TEST_DW.ACC_IP_CNTRY_MAPP PARTITION (ACC_ID, ACC_START_DT)
SELECT H.IP_ADDR
     , G.CNTRY_CD
     , G.CNTRY_NM
     , H.C
     , H.ACC_ID
     , H.ACC_START_DT
  FROM (
        SELECT IP_ADDR
             , COALESCE(SPLIT(IP_ADDR,'\\.')[0], '_')   AS IP_OCTET1
             , CAST( SPLIT(IP_ADDR,'\\.')[0] AS BIGINT) * 16777216 +
               CAST( SPLIT(IP_ADDR,'\\.')[1] AS BIGINT) * 65536 +
               CAST( SPLIT(IP_ADDR,'\\.')[2] AS BIGINT) * 256 +
               CAST( SPLIT(IP_ADDR,'\\.')[3] AS BIGINT) AS IP_ADDR_NUMT
             , COUNT(*)            AS ACC_CNT  
             , MAX(ACC_ID)         AS ACC_ID
             , MAX(ACC_START_DT)   AS ACC_START_DT
          FROM TEST_ODS.ACC_LOG
         WHERE ACC_ID       = '${ACC_ID}'
           AND ACC_START_DT = '${ACC_START_DT}'
         GROUP BY IP_ADDR
     ) H
 INNER JOIN
       (
        SELECT IP_OCTET1
             , START_IP_NUMT
             , END_IP_NUMT
             , CNTRY_CD
             , CNTRY_NM
          FROM TEST_DW.CNTRY_IP_RANGE
     ) G
    ON H.IP_OCTET1 = G.IP_OCTET1
 WHERE H.IP_ADDR_NUMT BETWEEN G.START_IP_NUMT AND G.END_IP_NUMT;

 

  쿼리문 하단에 보면 조인 컬럼으로 IP_OCTET1(파티션 키 컬럼)이 사용되었다는 것을 확인할 수 있습니다. WHERE 절에서는 BETWEEN 연산을 이용하여 IP 주소(IP_ADDR_NUMT)가 범위(START_IP_NUMT ~ END_IP_NUMT)에 포함되는 매핑 데이터만 필터링하고 있습니다.

  위 텍스트 상으로는 성능 부하가 존재하는 쿼리인지 판단하는게 쉽지 않을 수 있습니다.
  실제 테이블안 데이터의 형태와 분포도 등을 확인하면 좀 더 쉽게 성능상의 문제점을 파악할 수 있을 것입니다.

 

 


> IP 주소 구조

  우선 IP 주소 구조를 간략하게 살펴보도록 하겠습니다.

  IP 주소(정확하게는 IPv4)는 32자리의 이진수로 구성되어 있습니다. 이진수로 표현된 IP주소는 가독성이 떨어지므로 사람들이 쉽게 읽을 수 있도록 10진수로 변환하여 표기합니다. 그리고 더 가독성을 향상하기 위해 8비트마다 점( . , 옷뎃, OCTET)을 찍어 표현하도록 규정되어 있습니다.

 

 

  일반적으로 우리가 데이터로 저장하고 관리하는 IP주소는 옷뎃으로 구분된 10진수로 된 문자열 형태(예: 10.168.80.15)입니다.

  IP주소가 문자열 형태인 경우에는 가독성이 향상되지만 정렬이나 범위 검색 시에는 문제가 발생합니다.
  예를 들어, 일부 문자열 형태의 IP 주소를 정렬한 결과는 아래와 같을 것입니다.

1.x.x.x  196.x.x.x  2.x.x.x  20.x.x.x  213.x.x.x  5.x.x.x  77.x.x.x  80.x.x.x  9.x.x.x  99.x.x.x

 

  위와 같은 이유로 문자열 IP주소로 정렬이나 범위 검색을 하는 경우에는 숫자로 변환하는 작업이 선행되어야 합니다.

 

  문자열 IP 주소를 10진수 숫자로 변환하는 수식은 아래와 같습니다.

옷뎃1 * 16777216  +  옷뎃2 * 65536  +  옷뎃3 * 256  +  옷뎃4

  위 수식을 SQL로 표현하면 아래와 같습니다.

SELECT CAST( SPLIT(IP_ADDR,'\\.')[0] AS BIGINT) * 16777216 +
       CAST( SPLIT(IP_ADDR,'\\.')[1] AS BIGINT) * 65536 +
       CAST( SPLIT(IP_ADDR,'\\.')[2] AS BIGINT) * 256 +
       CAST( SPLIT(IP_ADDR,'\\.')[3] AS BIGINT) AS IP_ADDR_NUMT
  FROM (SELECT '10.168.80.15' AS IP_ADDR) I;

 

  물론 10진수로 표현된 IP 주소를 문자열 형태로도 변환 가능합니다.
  아래는 문자열 IP주소로 변환하는 SQL문 예제입니다.

SELECT CONCAT( CAST(CAST(IP_ADDR_NUMT / 16777216 AS INT)    AS STRING) , '.',
               CAST(CAST(IP_ADDR_NUMT / 65536 % 256 AS INT) AS STRING) , '.',
               CAST(CAST(IP_ADDR_NUMT / 256 % 256 AS INT)   AS STRING) , '.',
               CAST(CAST(IP_ADDR_NUMT % 256 AS INT) AS STRING) 
             ) AS IP_ADDR
  FROM (SELECT 178802703 AS IP_ADDR_NUMT) I;

 

  실제 쿼리에서 IP 주소의 범위 검색 연산은 위와 같이 10진수로 표현된 숫자 IP 주소를 사용하였습니다.
  하지만, 이 포스트에서는 장표나 부연 설명을 단순화 하기 위해 문자열 형태의 IP주소로 표기하였으니 참고 바랍니다.

 

 


> 데이터 처리 방식 1 (국가별IP주소대역이 비 파티션 테이블인 경우)

 

  접속로그(ACC_LOG)의 IP 주소가 해당되는 국가 정보를 확인하기 위해서는 우선 국가별IP주소대역(CNTRY_IP_RANGE) 테이블과 조인을 해야 합니다.
  그런데 국가별IP주소대역 테이블에는 국가코드(CNTRY_CD)별 시작IP(START_IP)와 끝IP(END_IP) 정보만 존재합니다.
  국가별 IP주소대역이 시작과 끝이라는 범위로 표현되어 있기 때문에 Equal 조인이 불가능합니다. 어쩔 수 없이 크로스 조인(카테시안 곱) 후 BETWEEN 연산자를 사용해 접속로그 IP주소에 해당되는 국가코드를 찾도록 필터링해야 합니다.

 

 

 

 
  위와 같이 '국가별IP주소대역' 테이블과 '접속로그' 테이블을 크로스 조인하게 되면 조인 결과 데이터 셋의 건수는 '국가별IP주소대역 데이터 건수 * 접속로그 테이블 건수'가 될 것입니다.
  문제는 하나의 잡에 할당되는 맵과 리듀스의 건수는 조인하고자 하는 테이블 데이터의 크기를 기반으로 산정되어, 조인 결과 셋 크기에 비해 적은 수의 맵과 리듀스 태스크가 할당될 수 있습니다. 과도하게 큰 데이터를 적절하게 분산 처리하지 못하고 적은 수의 태스크에서 큰 데이터를 처리하게 되니 연산 속도가 떨어지게 됩니다. 심한 경우에는 자원 부족에 의해 태스크 실패가 발생할 수도 있습니다.

  이러한 이유에서인지 하이브에서는 크로스 조인에 의한 부하를 방지하기 위해서 기본적으로 카테시안 곱 연산을 막아놨습니다. (일부 옵션을 변경 후 크로스 조인 가능) 그만큼 부하가 큰 연산인 관계로 쿼리 작성 시 지양해야 할 조인 방식입니다.

  크로스 조인의 부하를 제거하기 위해서는 Equal 조인이 가능하도록 '국가별IP주소대역' 테이블 아키텍처를 변경해 Equal 조인이 가능한 컬럼을 추가해줘야 합니다.

 

 


> 데이터 처리 방식 2 (국가별IP주소대역 테이블을 파티셔닝)

 

  위 튜닝 전 쿼리에서 매핑되는 데이터는 IP주소입니다.
  각 IP주소를 Equal 조인하기 위해서는 '국가별IP주소대역' 테이블에 Equal 조인이 가능한 IP주소 기반의 추출 속성을 추가해줘야 합니다.

  '국가별IP주소대역' 테이블에는 시작IP와 끝IP 컬럼이 존재합니다. 매핑시에도 두 컬럼을 이용한 범위 조건 연산을 하므로 신규 추출 컬럼은 시작IP와 끝IP 간의 공통 정보를 기반으로 생성되어야 합니다.

  IP주소 구조에서 설명했듯이 IP 주소는 옷뎃(.)으로 구분됩니다. 시작IP와 끝IP에서 각 옷뎃별로 동일한 데이터가 공통적으로 존재한다면 공통된 옷뎃을 Equal 조인 컬럼으로 사용할 수 있을 것입니다.

  한국인터넷정보센터에서 다운로드한 국가별IP주소대역 CSV 파일(2020년 7월 기준) 데이터를 기반으로 시작IP와 끝IP의 옷뎃별 일치율을 점검한 결과는 아래와 같습니다.

 

 

  시작IP와 끝IP에서 100% 동일한 값을 가진 옷뎃은 옷뎃1 뿐입니다. 이 옷뎃1(IP_OCTET1)을 조인 키 컬럼으로 사용하여 Equal 조인 연산을 할 수 있습니다.

  부수적으로 국가별IP주소대역 테이블은 대용량의 테이블과 조인되므로 건수는 적지만(190,350건) 옷뎃1별로 파티셔닝 하였습니다.

 

 

  이제 파티션 된 국가별IP주소대역 테이블과 접속로그 테이블을 조인 시 데이터 처리 방식을 확인해보겠습니다. 참고로 IP_OCTET1 컬럼으로 파티션된 테이블이 튜닝 전 쿼리에 해당되는 테이블 형상입니다.

 

 

 

  IP_OCTET1 추출 속성을 생성하고 파티셔닝을 적용하여 파티션 단위의 Equal 조인이 가능하게 되었습니다. 크로스 조인 결과 데이터 셋도 당연히 크기가 감소하게 됩니다.

  하지만 형식상 Equal 조인 연산 일뿐, 매핑되는 데이터가 N:M이므로 사실상 파티션 별 크로스 조인입니다. 범위가 넓은 M:N 매핑이라 Equal 조인에 의한 부하 감소는 크지 않습니다. 실제 운영환경에서도 잡 실패가 발생할 만큼 완벽하게 부하를 제거했다고 볼 수는 없는 아키텍처입니다.

 

  특히 파티션 키 컬럼인 IP_OCTET1 컬럼의 데이터 분포를 확인해보면 파티셔닝 전략이 성공적이었다고 볼 수는 없을 것입니다.

 

 

  파티션 별 데이터 건수의 편차가 너무 큰 관계로 데이터가 몰려있는 파티션 데이터를 할당받은 태스크는 부하가 아주 클 것이라고 예상하실 수 있을 것입니다.

 

 

 

 

[ 튜닝 후 ]

 

> 국가별IP주소대역 테이블 아키텍처 전략

  아래 그림의 예제처럼 기존의 '국가별IP주소대역' 테이블 내 데이터는 국가별 IP 주소 값이 시작IP와 끝IP로 축약되어 있습니다. 시작IP와 끝IP 사이에는 무수히 많은 IP가 존재할 수도 있는데도 말입니다.
  축약되어 표현된 데이터로 '국가별IP주소대역' 테이블의 건수는 적지만 IP주소와 매핑 시 크로스 조인으로 인하여 대량의 조인 결과 데이터 셋이 생성되게 됩니다.

 

 

 

 

  대량의 조인 결과 데이터 셋을 방지하는 방법은 '국가별IP주소대역' 테이블에서 축약되어 표현된 IP주소 정보를 풀어서 다수의 로우로 더 상세하게 표현하는 것입니다. 시작IP와 끝IP 사이에 존재하는 IP대역 정보를 세분화하여 각각의 로우로 저장을 하는 것이죠.

 

 

 

 

  위 그림의 예제 '국가별IP주소대역' 테이블의 첫 번째 로우의 시작IP는 '103.106.140.0'이고 끝IP는 '103.106.143.255'입니다. 이 두 IP 사이에는 '103.106.140.x', '103.106.141.x', '103.106.142.x', '103.106.143.x'의 IP가 존재하게 됩니다.

  이 IP 주소를 '옷뎃1.옷뎃2.옷뎃3' 단위로 세분화하여 각각의 로우로 저장할 수 있습니다.
  세분화된 각 로우의 시작IP와 끝IP는 옷뎃1, 옷뎃2, 옷뎃3의 값이 동일하기 때문에 '옷뎃1.옷뎃2.옷뎃3' 형태의 IP_OCTET123라는 추출 속성을 만들 수 있습니다.

  세분화 단위에서 옷뎃4는 왜 제외되었는지 궁금해 하실 수 있습니다. 국가별IP주소대역 데이터를 보면 거의 모든 데이터가 옷뎃3 단위로 구분되어 있으며, 옷뎃4 단위로 데이터가 구분되는 경우는 미비합니다.
  옷뎃3까지의 데이터 만으로도 IP주소와 매핑 시 거의 모든 데이터가 1:1로 매핑될 수 있습니다.
  옷뎃4 단위로 즉, 전체 IP 주소의 국가 정보를 저장하게 된다면 '국가별IP주소대역_확장' 테이블 데이터 건수의 256배의 데이터를 저장해야 하는데, 이는 쿼리 수행 시 액세스 대상 데이터의 크기를 증가시키는 원인이 될 수 있습니다.
  위와 같은 이유로 '국가별IP주소대역_확장' 테이블은 옷뎃3 단위로 데이터를 세분화하였습니다.

 

  위와 같은 방법으로 '국가별IP주소대역_확장' 테이블을 생성하면 IP_OCTET123 컬럼을 조인 키 컬럼으로 사용할 수 있게 됩니다.

 

 



  IP_OCTET123 컬럼을 조인 키 컬럼으로 사용하게 되면 Equal 조인 시 두 테이블 데이터 간 매핑 정확도가 높아지게 됩니다. 이로 인하여 조인 결과 데이터 셋의 크기가 감소되고 태스크에서 처리해야 할 작업량이 줄어들어 성능 부하도 자연스럽게 감소됩니다.

 

 

 

  또한, 파티션 별 데이터 건수는 아래와 같이 거의 고르게 분포되어 있습니다. 파티션 별 데이터 건수의 편차가 거의 없기에 특정 태스크에 작업이 몰리는 Skew가 발생할 가능성이 더 낮아지게 됩니다.

 

 

  참고로, 파티션 키 컬럼은 IP_OCTET1으로 '국가별IP주소대역' 테이블과 동일합니다. 건수가 적은 테이블이고 테이블 당 파티션 수에 대한 제약이 존재하여 파티션 키는 동일하게 관리하였습니다.

 

 


> 튜닝 후 쿼리

 

  튜닝 후 쿼리는 아래와 같습니다.

INSERT INTO TEST_DW.ACC_IP_CNTRY_MAPP PARTITION (ACC_ID, ACC_START_DT)
SELECT H.IP_ADDR
     , G.CNTRY_CD
     , G.CNTRY_NM
     , H.ACC_ID
     , H.ACC_START_DT
  FROM (
        SELECT IP_ADDR
             , COALESCE(SPLIT(IP_ADDR,'\\.')[0], '_')   AS IP_OCTET1
             , CAST( SPLIT(IP_ADDR,'\\.')[0] AS BIGINT) * 16777216 +
               CAST( SPLIT(IP_ADDR,'\\.')[1] AS BIGINT) * 65536 +
               CAST( SPLIT(IP_ADDR,'\\.')[2] AS BIGINT) * 256 +
               CAST( SPLIT(IP_ADDR,'\\.')[3] AS BIGINT) AS IP_ADDR_NUMT
             , ${ACC_ID}         AS ACC_ID
             , ${ACC_START_DT}   AS ACC_START_DT
             , CONCAT( SPLIT(IP_ADDR,'\\.')[0], '.', SPLIT(IP_ADDR,'\\.')[1], '.', SPLIT(IP_ADDR,'\\.')[2] ) AS IP_OCTET123_DIV
          FROM TEST_ODS.ACC_LOG
         WHERE ACC_ID       = ${ACC_ID}
           AND ACC_START_DT = ${ACC_START_DT}
         GROUP BY IP_ADDR
     ) H
 INNER JOIN
       (
        SELECT IP_OCTET1
             , IP_OCTET123_DIV
             , PART_START_IP_NUMT
             , PART_END_IP_NUMT
             , CNTRY_CD
             , CNTRY_NM
          FROM TEST_DW.CNTRY_IP_RANGE_EXTENT
     ) G
    ON H.IP_OCTET1       = G.IP_OCTET1
   AND H.IP_OCTET123_DIV = G.IP_OCTET123_DIV 
 WHERE H.IP_ADDR_NUMT BETWEEN G.PART_START_IP_NUMT AND G.PART_END_IP_NUMT
;

 

  접속IP주소(IP_ADDR) 별로 국가정보(CNTRY_CD와 CNTRY_NM)를 매핑하기 위해 아키텍처를 개선한 '국가별IP주소대역_확장'(CNTRY_IP_RANGE_EXTENT) 테이블을 조인합니다.
  이때, 조인 키 컬럼으로는 파티션 키 컬럼인 IP_OCTET1과 '옷뎃1.옷뎃2.옷뎃3' 정보를 저장한 IP_OCTET123_DIV 컬럼을 사용하게 됩니다.
  IP_OCTET123_DIV 컬럼을 조인 키 컬럼으로 사용하게 되어 거의 모든 데이터가 1:1로 매핑되고 조인 결과 데이터 셋의 크기가 감소되어 쿼리 성능이 향상되게 됩니다.

 

 


> 국가별IP주소대역_확장 테이블 데이터 만들기

  이번 포스트의 핵심은 '국가별IP주소대역' 테이블을 이용해 '국가별IP주소대역_확장' 테이블 데이터를 만드는 것이라고 볼 수 있습니다.

  '국가별IP주소대역_확장' 테이블 데이터를 만드는 쿼리문은 아래 Github에 업로드 해놨습니다.

 

* Github url : https://github.com/sparkdia/DB_Tuning/tree/master/IP_with_countries_mapping

  참고로 '국가별IP주소대역_확장' 테이블 데이터를 생성하는 쿼리 작업이 크로스 조인으로 인해 부하가 있을 수 있습니다.
  옷뎃별 확장되는 데이터 건수(OCTET2_GAP, OCTET3_GAP)나 파티션 키 컬럼(IP_OCTET1) 기준으로 Insert 작업을 분할하여 수행하는 Shell Script도 업로드해놨으니 참고 바랍니다.

 

  긴 글 읽어주셔서 감사드리고요. 하이브에서의 튜닝 케이스였지만 테이블 아키텍처 변경 작업을 통한 성능 개선이므로 다른 RDBMS에서도 적용이 가능할 겁니다.

  혹시 내용상의 오류나 궁금한 사항이 있으시면 친절한 댓글 부탁드립니다.

감사합니다.  

  

 

  CentOS 환경에서 Hive 설치 작업을 진행해보겠습니다. 설치할 Hive를 포함하여 사전에 설치된 프로그램들의 목록과 버전은 아래와 같습니다.

- Hive : 2.3.6
- Hadoop : 2.10.0

- Java : JDK 7
- MySQL : 5.7.28
- OS : CentOS 8 (VirtualBox 6에서 구동 중)

  Hive 설치에 앞서 우선 Hadoop과 MySQL 설치 작업이 완료되어야 합니다. 구성되기 전이라면 아래 포스트를 참고하여 Java, Hadoop 및 MySQL을 설치 작업을 먼저 진행해주시면 됩니다.

> JDK 7 설치하기 (CentOS 8 환경)
> 하둡(Hadoop) 설치하기[#1] - 설치 준비
> 하둡(Hadoop) 설치하기[#2] - 하둡 환경 설정하기
> 하둡(Hadoop) 설치하기[#3] - 데이터 노드 생성 및 하둡 실행
> MySQL 5.7 설치 (CentOS 8)


 

1. Hive 다운로드 및 압축해제

 

  사전 설치 작업이 완료되었으면 본격적으로 Hive를 설치해 보겠습니다. 우선 Hive 홈페이지에 접속해서 설치 파일을 다운로드합니다.

* URL : https://hive.apache.org/downloads.html

  Hive의 다운로드 홈페이지에 접속을하면 페이지 중간에 파란 글씨로 'Download a release now!'라는 문장을 확인 할 수 있을 것입니다. 해당 문장을 접속하면 아래와 같이 Hive 설치 파일을 다운로드 받을 수 있는 Mirror 사이트 중 사용자 접속 지역과 가까운 Mirror 사이트 목록이 나열되어 있을 것입니다. 이 중에서 한 곳을 임의로 선택해서 들어가봅니다.

  필자는 mirror.apache-kr.org 사이트를 선택했는데, 다운로드 가능한 Hive 버전별 디렉토리 목록을 확인 할 수 있으며, 이 중 설치하고자 하는 hive 버전을 선택해서 클릭하겠습니다. 현재 Hive의 최신 버전은 3.1.2이지만, 필자는 MR 테스트를 목적으로 Hive 설치 작업을 진행하기해 Hive-2.3.6 버전을 선택하고자 합니다.

  각 Hive 버전별 디렉토리 안에는 컴파일되어 실행가능한 바이너리 파일(apache-hive-2.3.6-bin.tar.gz)과 소스 파일(apache-hive-2.3.6-src.tar.gz)이 존재합니다. 이번 설치 과정에서는 컴파일 없이 직접 설치작업을 진행할 것이므로 바이너리 파일을 선택하여 다운로드하겠습니다. 로컬 PC가 아닌 서버에서 wget 명령을 사용하여 다운로드 할 계획이므로 파일의 링크 주소를 복사합니다.(파일명에서 오른쪽 마우스 버튼을 클릭하여 나타난 팝업 메뉴에 링크 주소 복사 메뉴가 존재할 것입니다.)

 

  다운로드 링크 주소를 복사하였으면 아래와 같이 서버에서 wget을 이용해 설치파일을 다운로드 합니다.

>  wget http://mirror.apache-kr.org/hive/hive-2.3.6/apache-hive-2.3.6-bin.tar.gz

 

  다운로드된 파일은 압축된 상태이기 때문에 tar 명령으로 압축을 해제하도록 합니다. 편의를 위해 압축 해제 경로를 설치 디렉토리인 '/usr/local'로 지정하였습니다.

> sudo tar -zxvf ./apache-hive-2.3.6-bin.tar.gz -C /usr/local/

  설치 디렉토리로 지정한 /usr/local 디렉토리는 root 소유이기 때문에 압축 해제된 파일의 소유자가 root로 되어 있습니다. 이를  Hive 운영 사용자 계정으로 소유자와 그룹을 변경해야 합니다.

> sudo chown -R hduser:hadoop ./apache-hive-2.3.6-bin/
> sudo mv apache-hive-2.3.6-bin/ hive-2.3.6

  추가로 hive 디렉토리명이 너무 긴 관계로 필자는 사용상 편의를 위해 hive-2.3.6으로 이름을 변경하였습니다. 물론 필수 작업은 아니며 추후 환경설정 작업 시 올바른 경로만 입력해주면 됩니다.

 

 

2. OS 사용자 환경 설정

  Hive 구동을 위해 $HIVE_HOME 환경 변수를 설정해줘야 합니다. 사용자 홈 디렉토리에 .bashrc파일을 열어 $HIVE_HOME 변수를 추가하고 PATH에 $HIVE_HOME/bin 경로도 추가해줍니다.

> vi ~/.bashrc
export HIVE_HOME=/usr/local/hive-2.3.6

PATH="$HOME/.local/bin:$HOME/bin:$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin"
export PATH

 

 

 

3. HDFS 디렉토리 생성

  Hive에서 테이블을 생성하기 위해서는 사전에 HDFS에 /tmp와 /user/hive/warehouse 디렉토리가 생성되어야 합니다. 생성된 디렉토리에는 추가로 그룹 쓰기 권한도 설정이 필요합니다.

> hadoop dfs -mkdir /tmp
> hadoop dfs -mkdir -p /user/hive/warehouse
> hadoop dfs -chmod g+w /tmp
> hadoop dfs -chmod -R g+w /user

 

4. Hive 설정

  hive의 기본 설정 값은 $HIVE_HOME/conf 디렉토리에 존재하는 hive-default.xml.template 파일에서 확인할 수 있습니다. 기본 설정 값을 변경하여 적용하고 싶다면 hive-site.xml 파일을 작성하여 설정값을 재정의하면 됩니다.

  이번 설치작업은 테스트 용도이므로 Metastore에 대한 설정 작업만 진행할 계획이며, Metastore에 관한 설정 변경 사항은 다음 단계(5.4. hive-site.xml 설정)에서 확인하도록 하겠습니다.

 

5. Metadata Store (MySQL) 설정

  Hive에서 메타데이터를 관리하기 위해 기본적으로 임베디드 데이터베이스인 Derby를 사용하고 있습니다. 만약, 다른 DBMS를 Metastore로 사용하기 위해서는 당연히 DBMS와 JDBC Connector를 먼저 설치해야 합니다. 설치가 완료되면 metastore로 사용한 데이터베이스와 사용자 계정을 생성하고 hive-site.xml 설정 정보를 변경해줘야 합니다.

5.1. MySQL 설치

  여기서는 Metastore로 MySQL 5.7.28을 사용하려고 합니다. CentOS 8에서 Mysql을 설치하는 방법은 아래 포스팅을 참고합니다.

> MySQL 5.7 설치 (CentOS 8)

 

MySQL 5.7 설치 (CentOS 8)

CentOS 환경에서 Hive 설치&테스트를 위해서 MySQL 설치가 선행되어야 했었다. MySQL 사용이 주목적이 아닌 관계로 yum을 이용해 최신 버전의 MySQL(8.0.17)을 쉽게 설치할 수 있었지만, 지금은 Hive metastore로..

sparkdia.tistory.com

 

5.2. MySQL내 Metastore 데이터베이스 생성

  mysql에 접속하여 Metastore로 사용할 데이터베이스와 계정을 생성해보겠습니다.
  Metastore로 사용할 데이터베이스명은 'metastore'로 정의하였으며, Hive의 기본 캐릭터셋이 utf8이므로, 생성할 데이터베이스의 기본 캐릭터셋도 utf8로 설정합니다. metastore 사용을 위해 별도의 mysql 사용자 계정 hive를 생성하였고, 생성된 데이터베이서 metastore에 대한 모든 권한을 부여해주었습니다.

mysql> CREATE DATABASE metastore DEFAULT CHARACTER SET utf8;
mysql> CREATE USER 'hive'@'%' IDENTIFIED BY '비밀번호';
mysql> GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'%';
mysql> FLUSH PRIVILEGES;

  데이터베이스 및 사용자 생성이 정상적으로 완료되었으면 위와 같은 화면을 볼 수 있을 것입니다. 추가적으로 Mysql의 메타 정보를 조회하여 확인하는 방법도 존재합니다.

  MySQL 사용자 계정은 mysql.user 테이블을 조회하면 됩니다.

mysql> SELECT Host,User FROM mysql.user;

  MySQL 데이터베이스 목록은 INFORMATION_SCHEMA.SCHEMATA 테이블을 조회하면 됩니다.

mysql> SELECT SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME 
       ->  FROM INFORMATION_SCHEMA.SCHEMATA; 

 

 

5.3 MySQL JDBC Connector/J 설치

  Hive에서는 JDBC를 이용해 MySQL로 액세스를 하게 됩니다. 따라서, 설치된 MySQL 버전에 적합한 MySQL JDBC Connector/J를 다운로드 받아서 설치해야 합니다. 각 Connector/J 버전 별 지원 내역은 아래 URL에서 확인 가능하며, 필자는 JDK 7 환경이기 때문에 5.1 버전의 Connector/J를 다운로드 받아 설치할 것입니다.

* URL : https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-versions.html

  JDBC Connector/J는 아래 사이트에서 다운로드 가능하다. 다운로드를 위해서는 Oracle 로그인이 필요합니다.

* URL : https://downloads.mysql.com/archives/c-j/

  다운로드 페이지에서 파일 링크 주소를 복사한 후 서버에서 wget을 이용해 파일을 다운로드합니다.

> wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-5.1.47.tar.gz

  다운로드된 파일은 압축된 상태이므로 tar 명령어를 이용해 압축을 해제합니다.

> tar -zxvf ./mysql-connector-java-5.1.47.tar.gz

  압축해제가 완료되면 여러 파일을 확인할 수 있는데, 이중 파일명이 *-bin.jar인 파일만 사용하면 됩니다.

  해당 파일을 Hive의 라이브러리 파일들이 저장되어 있는 $HIVE_HOME/lib로 이동해놓겠습니다.

> mv mysql-connector-java-5.1.47-bin.jar /usr/local/hive-2.3.6/lib

 

5.4. hive-site.xml 설정

  Metastore DB를 MySQL로 사용하기 위해서는 hive-site.xml에서 JDBC Connection 정보 설정이 필요합니다. $HIVE_HOME/conf 디렉토리 내 hive-site.xml 파일을 vi를 이용해 아래와 같이 편집하겠습니다.

<configuration>
  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=true</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.mysql.jdbc.Driver</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>hive</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>비밀번호</value>
  </property>
</configuration>

 

5.5 Metastore 초기화

  MySQL 연결 작업이 완료되었으면 schematool을 이용하여 메타정보 초기화 작업을 진행해야 합니다.

> bin/schematool -dbType mysql -initSchema

  메타정보 초기화 작업이 정상적으로 완료되었으면, MySQL의 metastore 데이터베이스에 아래와 같이 메타 정보 관리를 위한 테이블이 생성되었음을 확인할 수 있을 것입니다.

 

 

6. Hive 실행 및 테스트

  이제 Hive 설치 작업은 완료되었으며, 본격적으로 Hive를 실행하여 테스트를 진행해 보겠습니다..

  hive를 실행하여 아래와 같이 데이터베이스를 생성하고 확인하는 기본 명령어를 실행해 보았습니다.

hive> create database test;     -- test 데이터베이스 생성
hive> show database;

 

  테이블 생성도 테스트 해보겠습니다. integer형 col1컬럼과 string형 col2 컬럼을 가진 tab1 테이블을 생성 해보았습니다.

hive> create table test1. tab1 (
       >   col1 integer,
       >   col2 string
       >  );

  생성된 테이블에 테스트용 데이터를 추가해 보았습니다.

hive> insert into table test1.tab1
       > select 1 as col1, 'ABCDE' as col1;

  INSERT 작업은 MapReduce 엔진에의해 처리되며, ResourceManager 웹페이지에 접속해보면 방금 실행된 insert문을 확인 할 수 있을 것입니다.

 

  위에서 입력한 테스트 데이터를 조회해보겠습니다.

hive> select * from tab1;

  방금 입력한 테스트 데이터가 조회된다면 정상적으로 Hive쿼리가 수행될 것입니다.

  참고로, 이렇게 입력된 데이터는 HDFS에서 확인할 수 있습니다. 테이블 생성 시 location 옵션을 설정하지 않았다면 아래와 같은 기본 저장 경로에서 데이터가 저장되어 있을 것입니다.

/user/hive/warehouse/데이터베이스명.db/테이블명

  필자가 생성한 테이블의 데이터베이스명과 테이블명은 각각 'test', 'tab1'이므로 아래 경로에 데이터 파일이 존재하게 됩니다.

/user/hive/warehouse/test.db/tab

  HDFS 내 데이터는 아래와 같이 Nodemanager 페이지 내 'Utilities'>'Browse the file system'메뉴를 클릭하여 표시되는 웹페이지나 확인하거나, 직업 서버에서 hadoop dfs 명령을 이용해서 확인 가능합니다.

 

7. 기타 - Warning 해결

  Hive에서 명령 실행 시 SSL 보안 인증서 관련 문제로 아래와 같은 Warning 문구가 표시되었습니다. 

 Thu Mar 12 16:11:26 KST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

  이 설치 작업은 테스트 용도이므로 SSL을 비활성화하도록 hive-site.xml 설정을 변경 해주었습니다. 아래 그림에서와 같이 javax.jdo.option.ConnectionURL 설정 값 뒤에 SSL을 비활성한다는 &useSSL=false 설정을 추가 해주었습니다.

 

  2010년에 처음 선 보인 Spark가 10주년이 되는 올해 6월 18일에 3.x 대 첫 버전인 3.0.0 릴리즈를 공개했습니다.

  Spark 3.0에서 기능 개선이 가장 많았던(top active component) 것은 Spark SQL인데요. TPC-DS 30TB 벤치마크 테스트에서 Spark 2.4 비해 2배 정도 빠른 성능을 보여줬다고 합니다.

  그 외 Spark 3.0의 주요 New feature는 아래와 같습니다.

  • adaptive query execution
  • dynamic partition pruning
  • ANSI SQL compliance
  • significant improvements in pandas APIs
  • new UI for structured streaming
  • up to 40x speedups for calling R user-defined functions
  • accelerator-aware scheduler

 

  이제 본격적으로 Spark 3.0.0을 설치해보도록 하겠습니다.

1. 자바 설치

  스파크는 스칼라로 구현되어 있으며, 자바 가상 머신(JVM) 기반에서 동작합니다.
  따라서, 스파크를 설치하기 위해서는 사전에 자바가 설치되어야 합니다.

  스파크 3.0.0은 자바 11버전을 지원하므로,  JDK 11 버전을 다운로드 받아 설치합니다.

  * URL : https://spark.apache.org/releases/spark-release-3-0-0.html

 

  Java SE 11 버전 설치는 아래 포스트를 참고 바랍니다.

https://sparkdia.tistory.com/64

 

Java SE 11 설치 on Windows 10

  윈도우 10 환경에서 Java se 10 설치 작업을 진행해보겠습니다. 이 포스트를 작성하는 시점에 릴리즈된 DK 최신 버전은 14입니다. 하지만 제가 설치하고자 하는 프로그램과 호환이되는 버전이 필��

sparkdia.tistory.com

 

 

2. 파이썬(Python) 설치

  스파크 3.0.0에서 파이썬 2 버전대는 더 이상 지원하지 않는다고 합니다.

  아래 파이썬 홈페이지 접속하여 최신 버전인 3.8.3을 다운로드받아 설치합니다.

https://www.python.org/downloads/

 

Download Python

The official home of the Python Programming Language

www.python.org

 

 

 

3. 스파크 다운로드 및 압축해제

  아래 URL에 접속하여 스파크 다운로드 페이지로 이동합니다.

* URL : https://spark.apache.org/downloads.html

 

Downloads | Apache Spark

Download Apache Spark™ Choose a Spark release: Choose a package type: Download Spark: Verify this release using the and project release KEYS. Note that, Spark 2.x is pre-built with Scala 2.11 except version 2.4.2, which is pre-built with Scala 2.12. Spar

spark.apache.org

 

  스파크 릴리즈를 3.0.0으로 선택하고, 하둡 3.2가 포함된 패키지를 선택합니다.

  Download Spark 옆의 링크를 클릭하면 다운로드가 가능한 미러 사이트 목록이 표시되는 아래 페이지로 이동합니다.

  위 미러 사이트 중 한 곳을 선택하여 설치 파일을 다운로드 받습니다.

 

4. 스파크 설치

  다운로드 받은 파일을 설치하고자 하는 디렉토리(예: C:\Spark\)에 옮긴 후 압축을 해제합니다.

 

 

5. Winutils 설치

  아래 URL에 접속하여 Spark 다운로드 시 선택한 하둡 버전에 맞는 winutils.exe 파일을 다운로드 받아야 합니다.

  * URL : https://github.com/cdarlint/winutils

  이 포스트에서는 하둡 3.2 버전을 선택하였으므로 하단 URL에 접속하여 winutils.exe 파일을 다운로드 합니다.

  * URL : https://github.com/cdarlint/winutils/tree/master/hadoop-3.2.1/bin

  위 페이지의 winutils.exe 링크에서 오른쪽 마우스를 클릭하여 팝업 메뉴를 띄웁니다. 팝업 메뉴 중 '다른 이름으로 링크 저장'을 클릭하여 winutils.exe 파일을 다운로드 합니다.

  C:\에 Hadoop\bin 디렉토리를 생성합니다. 생성된 bin 디렉토리에 다운로드 받은 winutils.exe 파일을 이동시킵니다.

 

 

6. 환경설정

  탐색기에서 [내 PC]를 선택 후 마우스 오른쪽 버튼을 클릭하여 팝업 메뉴를 띄웁니다. 팝업 메뉴 중 하단의 '속성'을 클릭합니다.

 

  왼쪽 메뉴 하단의 '고급 시스템 설정'을 클릭합니다.

 

  시스템 속성 다이얼로그에서 '고급' 탭을 선택 후 하단의 '환경 변수' 버튼을 클릭합니다.

 

  환경 변수 다이얼로그의 '시스템 변수'에서 '새로 만들기'를 클릭합니다.

 

  변수 이름은 'SPARK_HOME', 변수 값은 스파크 설치 디렉토리 명(예: C:\Spark\spark-3.0.0-bin-hadoop3.2)을 입력합니다.

 

 다시 한번 '새로 만들기' 버튼을 클릭 후 변수 이름은 'HADOOP_HOME', 변수 값은 'C:\Hadoop'을 입력합니다.

 

  시스템 변수 중 'Path'를 선택 후 '편집' 버튼을 눌러 아래와 같이 '%SPARK_HOME%\bin'과 '%HADOOP_HOME%\bin 경로를 추가해줍니다.

 

 

7. 스파크 실행

  명령 프롬프트를 실행하여 커맨드에서 pyspark를 입력하면 아래와 같이 pyspark가 실행되는 것을 확인하실 수 있습니다.

 

  테스트로 prod_list.csv 파일을 읽어 prod라는 데이터프레임을 생성한 후, COLOR별 건수를 출력하는 예제를 실행해보았습니다.

prod = spark.read.csv("C:\\Spark\\sample\\prod_list.csv", header="true", inferSchema="true")
prod.groupBy("COLOR").count().show()

 

 

8. 스파크 Web UI

  아래 URL을 통해서 스파크 웹 UI에 접속 가능합니다.

* URL : http://localhost:4040/

  스파크 내에서 실행 중인 잡 목록을 모니터링 할 수 있으며, 아래와 같이 Stage별 DAG도 확인이 가능합니다.

 

[Ref.] https://phoenixnap.com/kb/install-spark-on-windows-10

'BigData > Spark' 카테고리의 다른 글

스파크(Spark) 실행하기 on Databricks  (2) 2020.06.22
Spark 2.1.3 설치 (CentOS 8)  (0) 2020.06.14

 

  필자가 생각하는 Hive에서의 데이터 아키텍처 최적화 방법은 아래와 같이 3가지로 구분될 수 있습니다.

Hive에서 데이터 아키텍처 최적화 방법

 

  • 많은 데이터는 적게 만들어라
  • 적은 데이터를 많게 만들어라
  • Skew는 분할처리하라.

 

  이 포스트에서는 첫 번째 Case인 '많은 데이터는 적게 만들기' 방법으로 튜닝한 예제에 대해 설명하고자 합니다.

  먼저 튜닝 결과는 아래와 같습니다.

  리듀스의 수가 크게 증가한 것에 의문을 가지시는 분들이 계실 수도 있는데요. 이 부분에 대해서는 아래에서 궁금증을 풀어보도록 하겠습니다.

 

 

 

[ 튜닝 전 ]

 

  먼저 튜닝 전 쿼리를 살펴보도록 하겠습니다.

SELECT BASE_DT
     , MAX(ORG_CNT)                AS ORG_CNT
     , MAX(CAL_CNT)                AS CAL_CNT
     , MAX(ORG_CNT) - MAX(CAL_CNT) AS GAP_CNT
  FROM (
        SELECT '${BASE_DT}'        AS BASE_DT
             , COUNT(DISTINCT UID) AS ORG_CNT
             , 0                   AS CAL_CNT
          FROM ACT_TXN_DD
         WHERE BASE_DT BETWEEN TO_CHAR(DATE_ADD(FROM_UNIXTIME(UNIX_TIMESTAMP('${BASE_DT}', 'yyyyMMdd'), 'yyyy-MM-dd'), -179), 'yyyyMMdd') 
                           AND '${BASE_DT}'
         UNION ALL
        SELECT BASE_DT
             , 0                   AS ORG_CNT
             , COUNT(DISTINCT UID) AS CAL_CNT
          FROM ACT_TXN_180_SUM
         WHERE BASE_DT = '${BASE_DT}'
         GROUP BY BASE_DT
     ) A
 GROUP BY BASE_DT
;

  * 참고
   - BASE_DT는 파티션 키 컬럼입니다.
   - 위 조건절에 사용된 TO_CHAR 함수는 Hive 내장 UDF가 아니라 시스템 운영상 필요해 의해 만들어진 진짜 사용자 정의 UDF입니다.

 

  위 쿼리의 튜닝 포인트는 2가지가 존재합니다.

* Tuning Point #1 : 조건절 내 사용자 정의 UDF 제거

  사용자 정의 UDF는 Hive의 옵티마이저가 제대로 인식할 수 없는 UDF라고 볼 수 있습니다. 따라서 옵티마이저가 실행 계획을 수립할 때 사용자 정의 UDF는 비용 계산에서 제외됩니다.
  이러한 사용자 정의 UDF가 where 조건절에 사용되어 추출 데이터의 건수를 상당히 줄인다 해도 옵티마이저의 쿼리 수행 비용 계산 시에는 반영되지 않는 겁니다.

  특히 위 쿼리에서처럼 파티션 키 컬럼(BASE_DT)의 필터링 조건으로 사용자 정의 UDF가 사용되는 경우에 성능상의 큰 이슈가 발생할 수 있습니다. 파티션 키 컬럼의 필터링 조건이 존재하지만 Partition pruning을 시도하지 못하고 전체 테이블의 데이터를 읽어야(Full table scan) 하기 때문입니다.

  위 쿼리에서는 Partition pruning을 위해서 where 조건절에 사용된 사용자 정의 UDF를 제거하는 것이 필요합니다. 

 

  참고로 Hive에서 Partition pruning을 판단할 수 있는 방법은 아래 포스트에서 확인하실 수 있습니다.

 

Hive 튜닝 기본 -실행계획에서 Partition pruning 확인하기

Hive에서도 아래와 같이 'EXPLAIN' 명령문으로 쿼리 실행계획을 확인할 수 있습니다. hive> EXPLAIN 쿼리문; Hive의 실행계획은 다른 DBMS에 비해 실행계획의 가독성이 떨어진다는 점이 참으로 안타까운데

sparkdia.tistory.com

 

 

* Tuning Point #2 : DISTINCT COUNT 연산 부하

  COUNT(DISTINCT 컬럼) 함수는 '컬럼'내 중복을 제외한 데이터의 건수를 계산합니다.
  전체 데이터에서 중복이 제거된 건수를 계산해야 하므로 이 작업은 단 하나의 리듀스 태스크가 처리하게 됩니다.

  위 쿼리의 리듀스 태스크 수가 3개였습니다.

  - 서브쿼리 내 UNION ALL 절 상단의 쿼리
  - 서브쿼리 내 UNION ALL 절 하단의 쿼리
  - 최상위 쿼리

  데이터의 건수와는 상관없이 각 쿼리를 수행하는 Staage 별로 단 1개의 리듀스 태스크가 할당이 되어 집계 함수 연산을 수행하는 것입니다.

  큰 작업을 여러 개의 맵과 리듀스가 나눠서 빠른 시간 내에 처리하는 것이 하둡의 기본 사상이자 장점입니다.
  1개의 리듀스로 집계 함수 연산을 수행한다는 것은 하둡의 장점인 분산 처리의 이점을 전혀 활용하지 못하는 경우입니다. 

  위 쿼리에서는 다수의 리듀스가 연산을 처리할 수 있도록 COUNT(DISTINCT 컬럼) 연산의 부하 감소가 필요합니다.

 

 

 

 

[ 튜닝 후 ]

 

  아래는 튜닝 전 쿼리에서 부하를 유발한 사용자 정의 UDF를 제거하고 COUNT DISTINCT의 부하를 감소시킨 쿼리입니다.

SELECT BASE_DT
     , MAX(ORG_CNT)                AS ORG_CNT
     , MAX(CAL_CNT)                AS CAL_CNT
     , MAX(ORG_CNT) - MAX(CAL_CNT) AS GAP_CNT
  FROM (
        SELECT '${BASE_DT}'        AS BASE_DT
             , COUNT(DISTINCT UID) AS ORG_CNT
             , 0                   AS CAL_CNT
          FROM (
                SELECT BASE_DT
                     , UID
                  FROM ACT_TXN_DD
                 WHERE BASE_DT BETWEEN DATE_FORMAT(DATE_ADD(FROM_UNIXTIME(UNIX_TIMESTAMP('${BASE_DT}', 'yyyyMMdd'), 'yyyy-MM-dd'), -179), 'yyyyMMdd') 
                                   AND '${BASE_DT}'
                 GROUP BY BASE_DT, UID
             ) SUB
         UNION ALL
        SELECT BASE_DT
             , 0                   AS ORG_CNT
             , COUNT(DISTINCT UID) AS CAL_CNT
          FROM ACT_TXN_180_SUM
         WHERE BASE_DT = '${BASE_DT}'
         GROUP BY BASE_DT
     ) A
 GROUP BY BASE_DT
;

 

 

* Tuning Point #1 : 조건절 내 사용자 정의 UDF 제거

  파티션 키 컬럼인 BASE_DT의 필터링 조건을 사용자 정의 UDF가 아닌 Hive 내장 UDF인 DATE_FORMAT으로 변경하였습니다.

WHERE BASE_DT BETWEEN TO_CHAR(DATE_ADD(FROM_UNIXTIME(UNIX_TIMESTAMP('${BASE_DT}', 'yyyyMMdd'), 'yyyy-MM-dd'), -179), 'yyyyMMdd') AND '${BASE_DT}'
WHERE BASE_DT BETWEEN DATE_FORMAT(DATE_ADD(FROM_UNIXTIME(UNIX_TIMESTAMP('${BASE_DT}', 'yyyyMMdd'), 'yyyy-MM-dd'), -179), 'yyyyMMdd') AND '${BASE_DT}'

  Hive 내장 UDF를 사용하였기에 옵티마이저는 BASE_DT 컬럼의 필터링 연산을 실행 계획에 반영할 수 있습니다.
  필터링 연산이 적용되므로 Partition pruning이 발생하여 액세스 대상 데이터의 건수의 감소로 쿼리 성능이 향상되게 됩니다.

 

 

* Tuning Point #2 : DISTINCT COUNT 연산 부하

  앞서 DISTINCT COUNT 연산은 오직 하나의 리듀스가 처리한다고 했습니다.
  분산 처리가 불가능한 상황이므로, 리듀스 연산 시간을 단축하는 방법은 리듀스의 입력 데이터 건수를 줄이는 것입니다.

  튜닝 전 쿼리를 다시 한번 살펴보겠습니다.

 

  중복된 UID가 존재하는 180일간의 데이터를 읽어 중복되지 않은 UID의 건수를 계산합니다. 하나의 리듀스가 180일간의 데이터를 모두 처리므로 실행 속도가 느려지게 됩니다.

 

  DISTINCT COUNT 연산을 수행하는 리듀스의 입력 데이터를 줄이는 방법사전에 입력 데이터의 중복을 제거하는 것입니다.

  서브 쿼리 안에서 GROUP BY BASE_DT, UID 집계 연산으로 중복이 제거된 중간 데이터 결과 셋이 생성됩니다. 
  DISTINCT COUNT 연산을 처리하는 리듀스는 입력으로 크기가 감소된 중간 데이터 결과 셋을 사용하게 됩니다.

  위 GROUP BY 연산은 다수의 리듀스가 작업을 분배하여 처리합니다.
  하나의 리듀스가 처리하던 중복제거 작업을 다수의 리듀스가 처리하게 되는 겁니다. 작업의 분산 처리가 가능하니 당연히 연산 속도는 빨라지게 됩니다.

 

 

 

다시 한번 첫 번째 Hive에서의 데이터 아키텍처 최적화 방법을 요약 정리하면 아래와 같습니다.

최적화 방법 첫번째 : 많은 데이터는 적게 만들어라.

1. 액세스 대상 테이블 데이터 건수 감소 (Partition Pruring이 적용)
2. 집계 함수 연산 대상 데이터 건수 감소 (사전 Group by 연산 수행)
 

 

 

  데이터 브릭스(Databricks)는 아파치 스파크 실행환경을 제공해주는 클라우드 기반의 플랫폼입니다.

  Notebook 형태로 스파크 소스를 테스트할 수 있는 웹 UI환경을 제공해주므로, 설치 작업 없이도 스파크를 직접 테스트해 볼 수 있습니다.

  데이터브릭스는 두 개의 플랫폼 DATABRICKS PLAFORM과 COMMUNITY EDITION이 존재합니다.
  BUSINESS용인 DATABRICKS PLAFORM은 제약 없이 모든 기능을 사용할 수 있으며. 무료로 Trial 버전을 14일간 사용할 수 있습니다.
  COMMUNITY EDITION은 스파크 학습자를 위한 무료 버전으로 기능이 제한적입니다.

 

  데이터브릭스를 사용하기 위해서는 아래 URL에 접속하여 사용자 등록을 해야 합니다.

* URL : https://databricks.com/try-databricks

 

Try Databricks

Discover why businesses are turning to Databricks to accelerate innovation. Try Databricks’ Full Platform Trial risk-free for 14 days!

databricks.com

 

  위 URL에 접속하면 아래와 같이 사용자 기본 정보를 입력하는 화면이 표시됩니다.

 

  개인 정보를 입력 후 'SIGN UP' 버튼을 클릭하면 데이터브릭스 버전을 선택하는 화면이 표시됩니다.
  이 포스트에서는 Commuity edtion을 선택하여 테스트를 진행해보도록 하겠습니다.

 

 

 

  Commuity edtion의 'GET STARTED' 버튼을 클릭하면 아래와 같이 이메일을 확인하라는 메세지가 출력됩니다.

 

 

  이메일 본문 중에서 'Get started by visiting' 오른쪽의 링크를 클릭하여 이메일을 인증합니다.

 

 

  위 링크를 클릭하면 아래와 같이 계정 비밀번호을 설정하는 화면으로 이동합니다.

 

 

  비밀번호 변경 작업이 완료되면 아래와 같이 데이터브릭스 초기 화면을 볼 수 있습니다.

  화면 중앙에 'Explore the Quickstart Tutorial'을 클릭하면 기본 사용 방법이 정리된 Notebook을 확인하실 수 있습니다.
  이 포스트에서도 위 튜토리얼을 따라서 테스트를 진행해보도록 하겠습니다.

 

 

  데이터브릭스를 사용하기 위해서는 제일 먼저 클러스터를 생성해야 합니다.
  위 화면에서 'New Cluster'를 클릭하여 클러스터 생성 작업을 진행합니다.

  또는 아래와 같이 왼쪽 메뉴에서 'Clusters'를 클릭한 후 나타나는 Clusters 페이지 상단의 'Create Cluster' 버튼을 클릭하여 클러스터를 생성할 수도 있습니다.

 

 

  클러스터 생성화면에서 생성할 클러스터의 이름을 입력하고, Runtime Version을 선택합니다.
  Runtime Version은 테스트하고자 하는 Scala나 Spark의 버전을 기준으로 선택하면 됩니다.
  여기서는 튜토리얼에서 제시된 6.3 (Scala 2.11, Spark 2.4.4) 버전을 선택하도록 하겠습니다.

 

 

  입력이 완료되면 'Create Cluster' 버튼을 클릭하여 클러스터를 생성합니다.

  Clusters 페이지에서 생성된 클러스터를 확인할 수 있습니다.

 

 

 

  이어서 Notebook을 생성하도록 하겠습니다.

 

  위 초기화면 중앙에 위치한 'New Notebook' 메뉴를 클릭하여 신규 노트북을 생성할 수 있습니다.

  또는 아래와 같이 메뉴 'Workspace'에서 계정명 옆의 '∨' 아이콘을 클릭하면 아래와 같이 Notebook을 생성하는 메뉴를 확인하실 수 있습니다. 해당 메뉴를 통해서도 노트북 생성이 가능합니다.

 

 

  노트북 생성 메뉴를 실행하면 아래와 같이 노트북 생성 화면이 표시됩니다.

 

 

  노트북 이름을 입력하고 'Defalut Language'는 우선 SQL을 선택하도록 하겠습니다.
  'Cluster'는 방금 전에 생성한 클러스터를 선택하면 됩니다.
  'Create' 버튼을 클릭하면 아래와 같이 생성된 노트북 페이지가 표시됩니다.

 

 

 

  데이터브릭스에서 제공해주는 diamonds.csv 파일을 소스 데이터로 읽어들이는 diamonds 테이블을 생성하도록 하겠습니다.

 

DROP TABLE IF EXISTS diamonds;

CREATE TABLE diamonds
USING csv
OPTIONS (path "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", header "true")

 

  노트북 셀(cells) 또는 커맨드(commands)에 위 SQL을 입력한 후 실행합니다.
  셀에서 shift+Enter를 입력하거나 셀 오른쪽 상단의 '▶' 아이콘을 클릭하면 해당 셀 내 SQL이 실행됩니다. 

 

 

  SQL을 실행하면 위와 같이 셀 하단에 Spark Jobs 목록과 실행 결과가 표시됩니다.

  위에서 생성한 테이블은 'Data' 메뉴에서도 확인 가능합니다.

 

 

 

  방금 생성한 diamonds 테이블의 데이터를 조회해보도록 하겠습니다.

SELECT * from diamonds

 

 

  GROUP BY 구문을 사용해 색상(color)별 평균 가격(price)을 계산할 수도 있습니다.

SELECT color, avg(price) AS price FROM diamonds GROUP BY color ORDER BY color

 

  Select문을 실행하여 출력된 데이터는 테이블 형태나 다양한 형태의 그래프로 확인할 수 있습니다.
  결과 데이터 셋 화면 아래에 Bar 그래프 아이콘을 클릭하면 다양한 형태의 그래프 옵션을 볼 수 있으며, 출력하고자 하는 형태의 그래프를 선택하여 쉽게 그래프를 출력할 수 있습니다.

 

 

  아래는 diamonds 테이블 데이터의 색상별 평균 가격을 Bar 그래프로 출력한 결과입니다.

 

 

 

  현재 테스트 중인 노트북의 기본 언어를 SQL로 선택했지만 '%python' 명령어를 이용해 파이썬 명령어도 해당 노트북에서 실행할 수 있습니다.
  위에서 SQL로 테스트한 작업을 파이썬 명령어를 이용해서 다시 테스트해보도록 하겠습니다.

 

  diamonds.csv 파일을 읽어 diamonds 데이터 프레임을 생성합니다.

%python
diamonds = spark.read.csv("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", header="true", inferSchema="true")

 

  diamonds 데이터 프레임에서 색상(color)별 평균 가격(price)를 추출합니다.

%python
from pyspark.sql.functions import avg

display(diamonds.select("color","price").groupBy("color").agg(avg("price")).sort("color"))

 

  위 실행 결과도 아래와 같이 그래프로 확인해보겠습니다.

 

 

 

'BigData > Spark' 카테고리의 다른 글

스파크(Spark) 3.0.0 설치 on Windows 10  (0) 2020.06.28
Spark 2.1.3 설치 (CentOS 8)  (0) 2020.06.14

  CentOS 8 환경(VirtaulBox를 이용한 가상 환경)에서 Spark를 설치해보겠습니다.

  이 포스트를 작성하는 시점에 최종 릴리즈 버전은 2.4.6이지만,
  테스트 환경의 자바 버전이 JDK 7인 관계로 2.1.3 버전을 설치하고자 합니다.

  설치 환경 정보는 아래와 같습니다.

- Spark : 2.1.3
- Hadoop : 2.10.0
- OS : CentOS 8 (VirtualBox 6)
- Java : JDK 7 

 

1. Spark 설치 파일 다운로드

  Spark 설치 파일을 다운로드하기 위해 아래 spark 다운로드 사이트에 접속합니다.

  *  URL : https://spark.apache.org/downloads.html

  이번에 설치할 Spark 2.1.3 버전은 최신 릴리즈 버전이 아니므로 홈페이지 하단에 'Spark release archives'를 클릭하여 아카이브 페이지에 접속합니다.

 

  아카이브의 Spark-2.1.3 하위 디렉토리에 접속하면 위와 같은 설치 파일을 확인하실 수 있습니다.

  이 포스트에서는 Hadoop이 설치된 환경에서 Spark를 설치할 계획이므로 'spark-2.3.4-bin-without-hadoop.tgz' 파일을 사용하도록 하겠습니다.

 

  위 설치파일의 링크 주소를 복사한 뒤 wget을 이용해 파일을 로컬에 다운로드합니다.

 

> sudo wget https://archive.apache.org/dist/spark/spark-2.1.3/spark-2.1.3-bin-without-hadoop.tgz

  참고로 Hadoop 설치 방법은 아래 포스트에서 확인할 수 있습니다.

 

하둡(Hadoop) 설치하기[#1] - 설치 준비

데스크탑이 너무 느려져서 포맷을 했더니, VirtualBox에서 하둡 관리노드가 있던 가상 OS만 복구가 안되는 상황을 맞이하게 되었습니다.;; 이번 기회에 다시 한번 하둡을 설치하면서 여기에 그 과정

sparkdia.tistory.com

 

 

2. 설치 파일 압축 해제

  다운로드 한 파일을 압축해제합니다.

 

  > sudo tar -xvzf ./spark-2.1.3-bin-without-hadoop.tgz

 

  압축 해제된 파일의 소유자를 변경하고 디렉토리명을 spark-2.1.3으로 변경합니다.

  > sudo chown -R hduser:hadoop ./spark-2.1.3-bin-without-hadoop
  > sudo mv spark-2.1.3-bin-without-hadoop spark-2.1.3

 

 

3. 환경 변수 설정

  사용자 홈 디렉토리 내 .bashrc 파일을 열어 아래와 같이 환경변수와 PATH를 추가해줍니다.

export SPARK_HOME=/usr/local/spark-2.1.3
export PATH=$PATH:$SPARK_HOME/bin

 

  하둡이 포함되지 않은 설치 파일을 다운로드한 경우 하둡의 CLASSPATH를 설정해줘야 합니다.

  $SPARK_HOME/conf/spark-env.sh 파일을 생성하고 아래와 같이 입력합니다.

export SPARK_DIST_CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath)

 

 

4. 실행 테스트 1

  ${SPARK_HOME}/bin./spark-shell을 실행하여 설정이 정상적으로 완료되었는지 확인해봅니다.

  > spark-shell

  위와 같은 Welcome 메시지가 출력되면 정상적으로 설정된 것입니다.

 

  만약, spark-shell 실행 시 아래와 같은 에러 로그가 출력된다면 SPARK_DIST_CLASSPATH 설정이 잘못된 것입니다.

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream

 

  위와 같은 에러가 발생한다면 점검을 위해 직접 쉘 프롬프트에서 classpath 설정 구문을 테스트해보는 것도 하나의 해결 방법이 될 것입니다.

  > export SPARK_DIST_CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath)
  > echo $SPARK_DIST_CLASSPATH

 

  아래와 같이 실제로 hadoop 명령어가 제대로 작동되는지 확인하는 것도 하나의 해결 방법이 될 수 있습니다.

 

 > ${HADOOP_HOME}/bin/hadoop classpath

 

 

5. Spark 서비스 시작 및 종료

  ${SPARK_HOME}/sbin/start-all.sh을 실행하면 Spark 서비스를 시작할 수 있습니다.

  > ${SPARK_HOME}/sbin/start-all.sh

  jps 실행 시 Master와 Wroker 프로세스가 확인된다면 정상적으로 Spark 서비스가 시작된 것입니다.

 

  웹 브라우저에서 아래 URL(기본 포트 : 8080)을 통해 Spark Web UI를 확인하실 수 있습니다.

* URL : http://서버IP:8080

 

  Spark 서비스 종료는 ${SPARK_HOME}/sbin/stop-all.sh을 실행하면 됩니다.

  > ${SPARK_HOME}/sbin/stop-all.sh

 

 

6. 실행 테스트 2

  spark-submit을 통해서 작성된 Spark application을 실행할 수 있습니다.

  Spark에서 제공하는 SparkPi 샘플 프로그램을 yarn 환경에서 cluster 모드로 실행하는 명령어는 아래와 같습니다.

spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
${SPARK_HOME}/examples/jars/spark-examples*.jar \
10

  위 명령어를 실행하면 아래와 같이 Hadoop resource manager 웹 페이지에서 spark application이 실행된 것을 확인하실 수 있습니다.

 

 

[Ref.] https://spark.apache.org/docs/2.1.3/hadoop-provided.html

 

'BigData > Spark' 카테고리의 다른 글

스파크(Spark) 3.0.0 설치 on Windows 10  (0) 2020.06.28
스파크(Spark) 실행하기 on Databricks  (2) 2020.06.22

 

  하이브에서는 다양한 형식의 파일을 읽고 저장할 수 있도록 테이블 별로 파일 형식을 지정할 수 있습니다.

 

 

하이브(Hive) 파일 형식

하이브에서는 테이블 생성 시 'STORED AS 저장형식명' 옵션을 사용하여 테이블별로 파일 형식을 지정할 수 있습니다. 파일 형식을 그냥 쉽게 표현하자면 하이브에서 테이블 형태로 보여지는 데이��

sparkdia.tistory.com

  하이브에서 기본적으로 제공하는 파일 형식 중에서 현재 가장 많이 사용되는 것이 ORC 파일 형식입니다.

  ORC 파일 형식은 컬럼 기반으로 데이터를 저장하며, 기존의 컬럼 기반 저장 방식을 사용하는 RCFile을 개선한 파일 형식입니다. ( ※ 컬럼 기반 저장 방식은 위 하이브(Hive) 파일 형식 포스트를 참고 바랍니다. )

 

   RCFile과 비교하였을 때 ORC의 장점은 아래와 같습니다.

- 각 태스크의 결과가 하나의 파일로 생성되어 네임노드의 부하가 감소됨.
- Datetime, Decimal, complex types (struct, list, map, and union)등의 하이브 데이터 타입을 지원함.
- 파일에 경량 인덱스가 저장됨.
. 필터링 조건에 부합하지 않는 로우 그룹은 skip함. 
. 특정 row로 바로 탐색이 가능.
- 데이터 타입 기반의 block-mode 압축. 
. Integer 컬럼은 run-length encoding을 사용함.
. String 컬럼은 dictionary encoding을 사용함.
- 하나의 파일을 여러 개의 레코드리더를 사용해 동시 읽기 가능함
- 마커 스캐닝 없이 파일 분할이 가능함.
- 읽거나 쓰기에 필요한 메모리의 용량을 제한 가능함
- 메타 데이터를 Protocol Buffers를 사용해 저장함으로써 필드의 추가/삭제가 가능함.

 

 

파일 구조

  ORC의 파일 구조는 아래와 같습니다.

 

Stripe

  컬럼 기반 저장 방식에서는 저장 시 테이블의 데이터를 우선 수평적으로 분할합니다. 분할 기준은 로우 그룹의 크기입니다. 분할된 각각의 로우 그룹 내 데이터를 컬럼 기준으로 저장하게 됩니다. 

  이렇게 분할된 로우 그룹이 Stripe이며, Index Data, Row Data, Stripe Footer로 구성됩니다.

 

Index data

  Index data에는 각 컬럼의 최솟값, 최댓값과 각 컬럼 내 로우의 위치를 저장하고 있습니다. 

  필터링 조건에 의해 테이블의 일부 데이터만 추출하는 경우, 이 Index data를 참고하여 조건에 맞지 않은 로우 그룹은 skip하고 조건에 맞는 로우 그룹만 액세스하게 됩니다.

 

Row data

  실제 테이블 데이터가 저장되어 있습니다.

 

Stripe footer

  스트림 위치 정보가 저장되어 있습니다.

 

File footer

  File footer에는 아래와 같은 보조 정보가 저장되어 있습니다.

  • 파일 내 stripe의 목록
  • stripe별 로우의 수
  • 각 컬럼의 데이터 타입
  • 컬럼 레벨별 통계 정보(건수, 최솟값, 최댓값, 합계)

 

Postscript

  파일의 가장 끝에 위치하며, 압축된 footer의 크기와 압축 파라미터 정보가 저장됩니다.

 

 

 

Integer 컬럼 직렬화 (Integer Column Serialization)

  ORC에서 Interger 컬럼은 두 개의 스트림, bit stream(해당 값이 not-null인지 표현)과 data stream(integer의 스트림)으로 직렬화(serialize)됩니다.

  Interger 데이터는 아래와 같이 숫자의 특징에 따른 encoding 방식이 적용되어 직렬화됩니다.

  •   interger 값이 작아 데이터의 크기가 경량인 경우 variable-width encoding을 사용.
  •   값이 반복되는 경우 run-length encoding을 사용.
  •   정수 값의 범위가 -128에서 127인 경우 run-length encoding을 사용.

  

Variable-width encoding 

  Variable-width encoding은 Google의 프로토콜 버퍼 기반입니다.

  가변 폭 데이터를 인코딩하므로 각 Integer 값의 byte 크기는 각각 상이할 것입니다. 인코딩되는 데이터의 크기가 서로 다른 값들을 구분하기 위해 각 byte의 최상위 bit를 사용합니다. 각 byte의 최상위 bit는 현재 byte가 인코딩하는 데이터의 마지막 바이트를 나타냅니다. 하위 7bit는 인코딩된 데이터를 나타냅니다.

 

Run-length encoding

  Run-length encoding은 반복되는 문자를 반복회수로 표현하는 압축 방법입니다.

  예를들어, 문자열 'AAAAABBBCCCCCCCCDDDEEEEEEEE'인 경우에 Run-length encoding을 적용하면 '5A3B7C3D8E'로 인코딩 됩니다.

 

 

 

문자열 컬럼 직렬화 (String Column Serialization)

문자열 컬럼은 컬럼의 unique한 값들로 구성된 딕셔너리를 사용하여 직렬화됩니다. 딕서너리 내 값들은 정렬되어 있어 predicate filtering의 속도가 높아지고 압축률이 향상됩니다.

  문자열 컬럼은 아래 4개의 스트림으로 직렬화됩니다. 

  •   bit stream : 값이 not-null인지 표현
  •   dictionary data : 문자열의 크기(byte)
  •   dictionary length : 각 항목의 길이
  •   row data : 로우의 값 

 

 

압축 (Compression)

  ORC 파일 포맷을 사용하는 테이블의 모든 스트림은 지정된 코덱을 사용하여 압축됩니다. 압축 코덱은 테이블 생성 시 TBLPROPERTIES 옵션 내 orc.compress라는 key 값을 정의하여 지정 가능합니다. 코덱은 ZLIB, SNAPPY을 사용하거나 none을 선택할 수 있습니다.

  아래 Hortonworks(현재는 Cloudera로 흡수되었죠.)에서 성능 테스트를 한 결과를 보면 ORC 파일 형식이 다른 파일 형식에 비해 상당히 높은 압축률을 보여주고 있음을 확인할 수 있니다. 

[출처] https://blog.cloudera.com/orcfile-in-hdp-2-better-compression-better-performance/

 

 

 

[Ref.] https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-HiveQLSyntax

    https://blog.cloudera.com/orcfile-in-hdp-2-better-compression-better-performance/

 

  하이브에서는 테이블 생성 시 'STORED AS 저장형식명' 옵션을 사용하여 테이블별로 파일 형식을 지정할 수 있습니다.

 

  파일 형식을 그냥 쉽게 표현하자면
  하이브에서 테이블 형태로 보여지는 데이터를
  어떤 구조로 HDFS에 저장하며,
  어떤 압축 방식을 사용할 것인지 정의한 저장 방식이라고 보시면 됩니다.

 

  하둡 개발자 입장에서는 이전 포스트(맵리듀스(Map-Reduce) 상세 작동 방법)에서 언급했었던 InputFormat과 OutputFormat을 확장하여 구현한 것이며, 추가로 <key, value> 형태의 데이터를 레코드 형태로 보여주는 SerDe까지 구현 한 라이브러리라고 보시면 이해가 쉬울 것 같습니다.

 

맵리듀스(Map-Reduce) 상세 작동 방법

맵리듀스(Map-Reduce) 데이터 흐름 하둡 맵리듀스는 클러스터 환경에서 대량의 데이터를 병렬로 처리하는 응용 프로그램을 쉽게 작성할 수 있는 소프트웨어 프레임워크입니다. 맵리듀스 작업은 일

sparkdia.tistory.com

 

  파일 형식기본 값TEXTFILE이며(hive.default.fileformat 옵션으로 변경 가능), 그 외에도 하이브에서는 아래와 같이 다양한 파일 형식을 제공하고 있습니다.

TEXTFILE
SEQUENCEFILE
RCFILE
ORC
PARQUET
AVRO  
JSONFILE

  물론, 하이브는 오픈 소스이므로 사용자 정의의 파일 형식도 적용할 수도 있습니다.

  이 포스트에서는 Hive에서 기본적으로 제공하고 있는 파일형식에 대해 살펴보도록 하겠습니다.

 

1. TEXTFILE

  말 그대로 텍스트 파일입니다. 우리가 메모장이나 vi에디터를 통해서 읽을 수 있는 그 텍스트 파일 맞습니다.

  일반적으로 TEXTFILE 파일 형식은 External 테이블 생성 시 사용합니다. 원천 시스템에서 수집한 데이터나 테스트 등의 목적으로 사용자가 생성한 데이터를 Hive에서 조회하기 위해서는 데이터를 텍스트 파일 형태로 HDFS에 저장한 뒤 External 테이블을 생성하면 됩니다.

  텍스트 파일 내 데이터는 구분자로 필드와 로우가 구별되어야 하며, 테이블 생성 시 DELIMITED 옵션을 사용하여 구분자를 명시해주면 됩니다.

[출처] https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-StorageFormats

 

  TEXTFILE은 기본적으로 압축되지 않은 파일형식이기 때문에(GZip같은 툴로 압축될 수는 있습니다.) 다른 파일형식에 비해 저장 공간을 많이 필요로한다는 단점이 있습니다.
  지속적으로 ODS 영역에 보관할 원천 시스템 데이터라면 압축된 파일형식의 테이블을 생성하여 데이터를 이관하여 보관해야 합니다. 압축된 파일형식의 테이블을 사용하는 것이 쿼리 성능과 저장 공간 활용도를 높이는 방법입니다. 

 

 

2. SEQUENCEFILE 

  SEQUENCEFILE 파일 형식은 데이터를 압축하여 저장합니다.

  SEQUENCEFILE 파일 형식은 로우 기반으로 데이터를 저장합니다.

  위 특징 때문에 코드 테이블, 디멘전 테이블처럼 데이터와 컬럼이 적은 테이블에 적용합니다. 

 

 

3. RCFile (Record Columnar File)

  RCFile은 컬럼 기반으로 데이터를 압축하여 저장하는 파일 형식입니다.

  아래 테이블의 데이터를 예를 들어 로우 기반(Row-oriented)컬럼 기반(Column-oriented) 저장 방식 차이점을 살펴보도록 하겠습니다. 

EmpId Lastname Firstname Salary
10 Smith Joe 40000
12 Jones Mary 50000
11 Johnson Cathy 44000
22 Jones Bob 55000

 

  아래 그림은 위 예제 데이터를 로우 기반 테이블과 컬럼 기반 테이블에 각각 저장하는 경우 저장되는 데이터 순서를 나타내는 예제입니다.

  로우 기반 저장 방식은 아래 예제처럼 데이터의 각 로우 내 데이터를 순서대로 저장(Serialize) 합니다. (각 로우별 저장 순서는 테이블 조회 시 보여지는 로우의 순서와 상이할 수 있습니다.)

001:10,Smith,Joe,40000;002:12,Jones,Mary,50000;003:11,Johnson,Cathy,44000;004:22,Jones,Bob,55000;

  위 방식은 로우의 전체 컬럼 데이터를 한번에 액세스 하는 경우에 효과적입니다.

  대표적으로 Oracle이 로우 기반 방식으로 데이터를 저장합니다.

 

  컬럼 기반 저장 방식아래 예제처럼 한 컬럼의 데이터를 먼저 저장(Serialize)한 후 다음 컬럼의 데이터를 저장합니다.

10 : 001,12 : 002,11 : 003,22 : 004; Smith : 001, Jones : 002, Johnson : 003, Jones : 004; Joe : 001, Mary : 002, Cathy : 003, Bob : 004; 40000 : 001,50000 : 002,44000 : 003,55000 : 004;

  만약에 컬럼 내 동일한 데이터가 존재한다면, 아래와 같은 방법으로 동일한 데이터를 한번만 저장하여 저장 공간을 절약할 수도 있습니다.

...; Smith : 001, Jones : 002,004, Johnson : 003; ...

 

  컬럼 기반 저장 방식은 아래와 같은 장점이 존재합니다.

  • 열 단위인 경우 데이터가 더 균일하므로 압축률이 향상됩니다. 압축률의 향상되면 부수적으로 저장 공간이 절약되고 액세스 시 Disk I/O가 감소됩니다.
  • 쿼리 시 쿼리문에 명시된 컬럼이 저장된 블록만 액세스하면 되므로 I/O가 감소됩니다.
  • 다양한 데이터 액세스 패턴을 적용하는게 수월해집니다.

 

 

 

4. ORC (Optimized Row Columnar)

  ORC는 Full name 그대로 RCFile을 개선시킨 파일 형식입니다. 물론 RCFile과 마찬가지로 ORC도 컬럼 기반 저장 방식을 사용합니다.

  현재 파일 형식중에서 가장 많이 사용되는 파일 형식이라고 말할 수 있습니다.

  ORC에 관해서는 정리할 내용이 많은 관계로 아래 별도의 포스트로 작성하였으니 참고 바랍니다.

 

ORC(Optimized Row Columnar) in 하이브(Hive)

하이브에서는 다양한 형식의 파일을 읽고 저장할 수 있도록 테이블 별로 파일 형식을 지정할 수 있습니다. https://sparkdia.tistory.com/56 하이브에서 기본적으로 제공하는 파일 형식 중에서 현재 가장

sparkdia.tistory.com

 

 

5. PARQUET 

  PARQUET도 컬럼 기반 저장 방식을 사용하는 파일 형식입니다.

  하이브와 피그만 지원되는 ORC에 비해 더 다양한 하둡 에코 시스템을 지원하고 있습니다.

  • Apache Hive
  • Apache Drill
  • Cloudera Impala
  • Apache Crunch
  • Apache Pig
  • Cascading
  • Apache Spark

  그리고 PARQUET는 Google Protocol Buffers와 비슷한 모델을 사용합니다. 이로 인해 복잡하게 중첩된 데이터 구조(List, Maps, Sets)를 효율적으로 저장할 수 있습니다. (참고 : Dremel made simple with Parquet)

 

 


6. AVRO  

  아파치 에이브로(AVRO)는 데이터 직렬화(Serialization) 시스템입니다.
  하이브에서는 이 에이브로의 파일을 하이브에서도 액세스 할 수 있도록 AVRO SerDe를 제공해줍니다.

  에이브로는 하둡의 직렬화 방식(Writable)방식의 단점인 언어 이식성을 해결하기 위해 만들어진 프로젝트 입니다. 데이터 파일에 스키마 정보를 같이 저장하므로 사전에 스키마 정보를 알지 못해도 데이터를 읽는 시점에 스키마 정보를 파악할 수 있게 됩니다. 이러한 방식으로 데이터를 관리하므로 다양한 개발 언어를 이용해 데이터 셋을 공유할 수 있는 것입니다.

  상세한 내용은 아파치 에이브로 홈페이지를 참고하시길 바랍니다.

  

 


7. JSONFILE

  Hive 4.0 버전에서부터 JSONFILE 파일 형식도 지원한다고 하네요. 

  기존에는 사용자 정의의 파일 형식을 이용하거나 전처리 과정을 통해 CSV 파일 형태로 만든 뒤 TEXTFILE 파일 형식을 사용하여 데이터를 액세스했었는데, 이런 번거로움이 사라지겠네요.

 

 

  이상으로 하이브의 파일 형식에 대해 알아보았습니다.
  위 내용중에서 추가/보완해야 할 사항있으면 친철한 댓글 부탁드립니다.
  감사합니다.

 

 

[Ref.] https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-StorageFormats
  https://cwiki.apache.org/confluence/display/Hive/FileFormats
  https://cwiki.apache.org/confluence/display/Hive/LanguageManual
  https://en.wikipedia.org/wiki/RCFile
  https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
  Hadoop, The Definitive Guide

 

Windows 환경에서 Tableau를 설치해보겠습니다.

설치 파일 다운로드를 위해 우선 아래 URL에 접속합니다.

URL : https://www.tableau.com/ko-kr/products/desktop/download?signin=47cc9ac0f8435895c744899af5d60d2e

위 URL에 접속 후
아래와 같이 '비즈니스 이메일' 텍스트 박스에 이메일 주소를 입력 후
'무료 평가판 다운로드' 버튼을 클릭하면 설치 파일이 다운로드 됩니다.

 

  설치 파일 다운로드가 완료되었으면, 설치 파일을 실행합니다.

 

  설치 파일을 실행하면 아래와 같은 다이얼로그를 확인할 수 있습니다.

 사용권 계약 동의를 위해 '동의함' 체크 박스를 클릭 후 '설치' 버튼을 클릭합니다.

  다음 다이얼로그를 통해 설치 진행 상황을 확인하실 수 있습니다.

 

설치가 완료되면 자동으로  Tableau Desktop 버전이 실행됩니다.

 

  Tableau 를 처음 실행하면 아래와 같이 사용자 기본 정보를 입력해야 합니다.

 

  사용자 정보를 입력하면 아래와 같이 평가판 활성화 작업이 진행되며,

 

  활성화가 완료되면
  아래와 같이 Tableau가 정상적으로 실행된 것을 확인할 수 있습니다.

 

  초기 화면 하단에 '샘플 통합 문서'를 클릭하면 Tableau에서 제공하는 샘플 데이터를 확인할 수 있습니다.

 

 

 

 

 

맵리듀스(Map-Reduce) 데이터 흐름

하둡 맵리듀스는 클러스터 환경에서 대량의 데이터를 병렬로 처리하는 응용 프로그램을 쉽게 작성할 수 있는 소프트웨어 프레임워크입니다. 맵리듀스 작업은 일반적으로 입력 데이터를 독립적

sparkdia.tistory.com

  이전 포스팅에서 맵리듀스의 데이터 흐름을 간략하게 살펴보았습니다. 

  일반적으로 Mapper와 Reducer 인터페이스의 map과 reduce 메소드를 구현하는 것만으로도 간단하게 맵리듀스 프레임워크를 사용할 수 있습니다.

  만약, 입/출력 대상으로 텍스트 파일 외 다른 형식의 파일이나 데이터베이스를 사용하는 경우나 맵리듀스 내부의 상세 작업을 컨트롤하기 위해서는 추가적인 개발 작업이 필요합니다.

  이를 위해 맵리듀스의 작동 방법을 좀 더 상세하게 살펴보도록 하겠습니다.

[Ref.]  Hadoop, The Definitive Guide

 

  맵리듀스 Job은 클라이언트가 수행하는 작업의 기본 단위로, 맵 태스크(Map task)와 리듀스 태스크(Reduce task)로 나눠서 실행합니다.

 

1. Map task 단계

  Map task 단계에서는 입력 데이터를 읽어 <key, value> 구조의 입력 레코드를 구성한 뒤 중간 단계의 레코드로 변환하는 작업을 합니다.

   맵리듀스 프레임워크의 입력으로 다양한 형식의 입력 데이터가 사용 될 수 있습니다. 물론, 하둡에서 제공되는 기본 파일 형식외에는 입력 파일의 형식이 무엇이고 어떻게 레코드 형태로 변환할 수 있는지 정의하는 작업은 필요합니다. 이러한 작업은 InputFormat과 RecordReader라는 인터페이스를 확장 구현하여 정의 가능합니다.

 

InputFormat

  InputFormat은 맵리듀스 Job의 입력 명세서라고 표현할 수 있습니다.

  맵리듀스 프레임워크에서는 Job의 InputFormat을 이용해 아래와 같은 작업을 수행합니다.

  1) 작업의 입력 명세서를 검증
  2) 입력 파일을 분할하여 InputSplit을 생성한 뒤, 각 개별 Mapper에 할당
  3) Mapper가 처리할 입력 레코드를 InputSplit에서 어떻게 추출해야 할 지 명시하는 RecordReader를 구현

  InputFormat을 구현한 기본 클래스는 FileInputFormat이며, 이를 확장한 다수의 하위 클래스가 존재합니다. 대표적으로 기본 InputFormat인 TextInputFormat 클래스도 FileInputFormat의 하위 클래스입니다. FileInputFormat의 하위 클래스의 기본 동작은 입력 파일의 총 크기(Byte)을 기준으로 입력을 논리적 InputSplit 인스턴스로 분할하는 것입니다.

 

InputSplit 

  InputSplit은 Job의 입력을 고정 크기로 분리한 데이터 조각입니다. 입력 데이터를 다수의 Mapper가 병렬로 분산 처리할 수 있도록 다수의 논리적인 InputSplit으로 분리한 것입니다.

  하나의 InputSplit마다 맵 태스크를 생성하게 되며, InputSplit의 각 레코드는 RecordReader에 의해 <key, value> 형태인 입력 레코드로 변환되어 map 함수로 전달됩니다.

 

RecordReader

  RecordReader는 입력 데이터가 크기 기준(byte-oriented view)으로 분리된 InputSplit을 Mapper에서 <key, value> 쌍 형태로 처리할 수 있도록 레코드 기반의 형태(record-oriented view)로 변환하는 작업을 수행합니다.

  LineRecordReader는 TextInputFormat이 제공하는 기본 레코드 입력기입니다. LineRecordReader는 정의된 레코드  delimiter(기본 값은 개행 문자)에 의해 입력 데이터를 분리합니다. 이 때, 분리된 각 문자열이 레코드의 value가 되며, key는 파일에서 해당 문자열의 위치(Byte offset)입니다. 

 

Map

  맵에서는 입력 데이터를 읽어 map 함수에서 사용자가 정의한 연산을 수행합니다.

  맵 함수의 처리 결과는 우선 메모리 버퍼에 저장되며 버퍼의 내용이 한계치에 도달하면 일괄로 디스크에 저장(Spill)됩니다. 이 때, 맵의 출력 데이터는 키 값에 의해 정렬되고 리듀서 수에 맞게 파티션별로 나눠서 저장됩니다. 파티션 별로 출력 데이터를 나눠서 저장하는 이유는 리듀서별로 전송할 출력 데이터를 미리 구분하기 위해서입니다. 

  만약, 컴바이너 함수가 정의되어 있다면 맵의 출력 값이 컴바이너 함수의 입력 값이 되고, 컴바이너 함수의 결과 데이터에 대해 정렬과 파티셔닝 작업이 발생하게 됩니다. 컴바이너 함수를 사용하는 이유는 맵 결과 데이터의 크기를 감소시켜 리듀서로 전송할 데이터 양(네트워크 부하)을 줄이기 위함입니다.

 

 

2. Reduce task 단계

Reduce

  리듀스 태스크는 각 맵 태스크가 완료되는 즉시 맵의 출력 데이터를 리듀스가 실행되는 노드에 복사하기 시작합니다(Shuffle).

  맵의 출력 데이터가 복사될 때 리듀서는 다수의 맵 출력 데이터를 정렬 순서를 유지하면서 병합합니다.
  리듀스에서는 병합된 데이터를 읽어 reduce 함수에서 사용자가 정의한 연산을 수행합니다.

 

OutputFormat

  OutputFormat은 맵리듀스 Job의 출력 명세서라고 표현할 수 있습니다.

  맵리듀스 프레임워크에서는 Job의 OutputFormat을 이용해 아래와 같은 작업을 수행합니다.

    1) 작업의 출력 명세서를 검증 (예: 출력 디렉토리 존재 여부)
    2) 작업의 출력 파일을 작성하는 데 사용되는 RecordWriter 구현

 

RecordWriter

  RecordWriter는 출력 <key, value> 쌍을 출력 파일에 씁니다.

  RecordWriter을 구현한 기본 클래스는 LineRecordWriter입니다. LineRecordWriter은 key와 value를 tab 문자로 구분하며, 각 레코드는 개행 문자로 구분하여 데이터를 저장합니다.

 

 

  이상으로 맵리듀스의 작동 방법에 대해 알아봤습니다.
  맵리듀스의 작동 원리를 이해하게 되면, 맵리듀스 구현이 용이해지고 디버깅 작업이 좀 더 수월해질 수 있을것입니다. 또한,  Hive 등 하둡 에코 시스템에서의 데이터를 저장하고 관리하는 방법에 대한 이해도도 높아질 수 있으실 것입니다.

만약, 위 내용에 대해서 오류를 발견하시거나 궁금한 사항이 있으시면 적극적인 의견 부탁드립니다.

 

[Ref.] Hadoop, The Definitive Guide
       
https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html

 

+ Recent posts