Aggregations을 가지고 Upbit 거래(체결) 데이터 검증
Aggregations을 가지고 Upbit 거래(체결) 데이터 검증

Aggregations을 가지고 Upbit 거래(체결) 데이터 검증

안녕하세요. cosign에서 back-end 개발을 하고 있는 Ian입니다.
오늘은 MongoDB에서 지원하는 Aggregation을 가지고 Upbit 거래(체결) 데이터 검증했던 방법을 적어보고자 합니다.


외부 API에서 받아온 데이터를 신뢰하고 정확한 데이터인지 확인이 이루어져야 사용할 수 있는 데이터가 됩니다. 그래서 Reponse의 값들이 누락이 없는지 꼬인 데이터는 없는지 검증을 하게 됩니다.

저희는 Upbit의 최근 체결 내역을 조회하는 API를 활용하는 부분이 있습니다. 그렇다면 이 데이터가 제대로 수집되고 있는지, 누락이 없는지 검증하는 과정이 필요하겠죠? 어떻게 데이터가 누락이 없는지 확인할 수 있을까요?

체결 API의 검증은 Upbit의 캔들 API를 활용할 수 있습니다. 캔들 API는 특정 날짜의 데이터를 받을 수 있는 파라미터를 제공합니다. 이를 이용하여 원하는 날짜의 데이터를 가져올 수 있습니다. 캔들 데이터에는 ‘candle_acc_trade_volume’이라는 필드가 있는데, 이 값을 통해 누적 거래량을 확인할 수 있습니다.

여담으로 체결 API는 현재시간 기준, 최대 7일 이내의 데이터만 수집이 가능하고, 캔들 API는 upbit 상장 시점부터 현재까지 데이터 수집이 가능합니다.

예를 들어, 2023-05-23일 ‘비트코인’ 의 누적 거래량을 알고 싶다고 가정 해봅니다.

시세 캔들 조회 API

GET https://api.upbit.com/v1/candles/days?market=KRW-BTC&to=2023-05-24T00:00:00

‘to’ 파라미터는 마지막 체결 시각 형식입니다. 그래서 24일을 호출하면 23일 데이터를 얻을 수 있습니다.

[
  {
    "market": "KRW-BTC",
    "candle_date_time_utc": "2023-05-23T00:00:00",
    "candle_date_time_kst": "2023-05-23T09:00:00",
    "opening_price": 35947000,
    "high_price": 36660000,
    "low_price": 35841000,
    "trade_price": 36381000,
    "timestamp": 1684886399259,
    "candle_acc_trade_price": 91284785438.44258,
    "candle_acc_trade_volume": 2510.27578792,
    "prev_closing_price": 35947000,
    "change_price": 434000,
    "change_rate": 0.0120733302
  }
]

일봉 기준 ‘2023-05-23’일 누적 거래량이 ‘2510.27578792’ 인 것을 확인 해볼 수 있었습니다.

그럼 저희가 수집하는 체결 API에서도 ‘비트코인’의 하루치 거래량이 ‘2510.27578792’ 인지 확인 해봐야겠죠?

문제 발생

수집을 하며 캔들 API에서 받은 하루치 거래량과 체결 API에서 받은 하루치 거래량(기준이 되는 날짜를 가지고’trade_volume’을 전부 더한 값)이 같은지 비교하는 도중 거래량이 다른 문제가 발생하게 되었습니다.

왜 다르지…??

저희 백엔드는 mongo를 통해 체결 API에서 받아오는 데이터를 저장합니다. 체결 데이터를 mongo에 저장할 당시 Upsert를 통해 저장하지 않았고, Insert를 통해 저장 하고 있었습니다. 그러니 체결 데이터가 중복적으로 들어와도 이를 알 방법이 적었죠. 그렇기 때문에 데이터 검증이 옳바르게 되지 않았던 것이었습니다. 그럼 중복 데이터를 제외하면서 검증하는 방법은 뭐가 있을까요?

Upsert란 “Update”와 “Insert”의 합성어로 데이터베이스에서 “Insert”와 “Update”를 한 번에 수행하는 작업을 의미 합니다. 데이터가 이미 존재하는 경우는 업데이트를 수행하고, 데이터가 존재하지 않는 경우에는 새로운 데이터를 삽입합니다. 즉, 중복 데이터의 방지 할 수 있다는 말입니다.

바로 Mongo에서 지원하는 ‘Aggregations’을 이용하면 됩니다.


Aggregations을 가지고 검증 시작

Aggregations(집계)은 MongoDB에서 데이터를 그룹화하고 분석하는 기능을 제공하는 파이프 라인 작업입니다. 파이프 라인 작업은 데이터 처리를 위해 연속적으로 연결된 단계들로 구성된 작업 흐름을 말합니다. 쉽게 말해 위에서부터 단계적으로 데이터를 가공하고 다음 단계로 가공된 데이터를 전달 시키는 하나 구성 입니다.

위에 말이 아직도 이해가 되지 않았을 수 있으니 저희가 수집한 일부의 체결 데이터를 가지고 설명 하겠습니다. 앞서 설명 한 것처럼 캔들 API를 활용해 ‘비트코인’의 일봉 기준 2023-05-23일 데이터를 받아온 체결양과 체결 API를 활용한 하루치 거래량이 같은지 비교해보고자 합니다. 즉, 하루치를 다 더했을 때 ‘2510.27578792’ 값이 나와야 합니다.

  • 일부 데이터를 가지고 Local 환경에서 작업 하였으며, MongoDB를 시각적으로 도와 줄 수 있는 Mongo Compass를 이용 하였습니다.

1. DB에 접속 했다면, Aggregations를 눌러 줍니다.

2. ‘Add Storage’ 를 통해 파이프 라인을 만들 수 있습니다. 현재 아무 파이프 라인을 만들지 않았기에 모든 데이터가 보일 것 입니다.

3. ‘2023-05-23T00:00:00.000′ ~ ‘2023-05-23T23:59:59.999’ 사이에 있는 값만 가져오기 위해 $match를 사용해줍니다. $match란 Aggregations에서 사용하는 mongo의 filter 기능과 같습니다. 특정 조건의 일치하는 문서만을 수정하지 않고 가져 온 후 다음 파이프 라인으로 전달 시킵니다.

  • $match
{
    "main_trade_time_utc": {
      "$gte": "2023-05-23T00:00:00.000",
      "$lte": "2023-05-23T23:59:59.999"
  }
}

4. 저희 cosign에서는 체결 데이터 수집 시 Upsert를 사용하지 않았기에 중복된 데이터가 있어 위에 $match에서 전달 받은 데이터를 가지고 $group을 사용하여 중복된 데이터가 없는지 찾아봅니다. 찾았다면 또 다시 다음 파이프 라인으로 전달 시켜줍니다.

  • $group
/**
 * _id: 그룹화 할 필드명, 필드명 앞에 '$'을 붙여 줍니다.
 * fieldN: The first field name.-> '별칭할 이름 : {조건(Accumulators) : 필드명}'
 */
 
 {
    _id: "$sequential_id",
    "firstVolume":{"$first":"$trade_volume"}
}

위에 코드를 하나씩 보겠습니다. _id의 value 값은 그룹화 할 필드명을 적어 주시면 됩니다. 필드명 앞에 $를 붙여 줍니다. firstVolume 은 alias로 별칭을 적어주면 됩니다. {“$first”:”$trade_volume”} 에서 $first는 Accumulators로 파이프 라인을 통해 집계 연산자 부분으로 동일한 문서 중 첫번째 문서를 가지고 오는 조건문입니다. $trade_volume는 마찬가지로 필드명을 적어주시면 됩니다.

5. 위에서 $match와 $group으로 된 문서를 받았다면 trade_volume 값들만 더해주면 저희가 원하는 하루치 거래량을 받아 올 수 있습니다.

  • $group
{
    "_id":null,
    "totalVolumeSum":{"$sum":"$firstVolume"},
    "count":{"$sum":1} // 하루동안 거래가 몇번 이루어졌는지 확인차 넣었습니다. $sum":1 -> $count 연산자와 같습니다.
}

_id null을 통해 그룹화를 진행하지 않고 $sum 집계 연산자를 통해 trade_volume값을 다 더해주면 됩니다. 이때 $trade_volume가 아닌 $firstVolume 로 저희가 수집한 필드명과 다르다는 것을 알 수 있습니다.이는 위에서 전달 받은 문서를 토대로 필드명을 적어주어야 합니다. 이 이유는 바로 하나의 파이프 라인이기 때문에 가능합니다.

최종문서

_id: null
totalVolumeSum: 2510.275787920
count: 63081

totalVolumeSum을 보시면 ‘2510.275787920’이 나온 것을 확인 할 수 있었습니다. 이 숫자 어디서 많이 보지 않았나요? 바로 위에서 캔들 API를 통해 받은 candle_acc_trade_volume: ‘2510.27578792’의 값과 같은 것을 볼 수 있습니다. 이것으로 저희가 수집한 체결 API가 누락이 없었다는 것을 검증할 수 있었습니다.


추가로, java 코드로도 봐야겠죠?

@Repository
@RequiredArgsConstructor
public class UpbitTradeRepository {

    private final MongoTemplate mongoTemplate;

    // ...

    public List<TradeVolumeGroup> getTradeVolume(String cryptoId) {

        // $match
        MatchOperation matchOperation = Aggregation.match(Criteria.where("main_trade_time_utc")
                .gte("2023-05-23T00:00:00.000")
                .lte("2023-05-23T23:59:59.999")
        );

        // $group
        GroupOperation firstGroupOperation = group("sequential_id").first("trade_volume").as("firstVolume");
        
        // $group
        GroupOperation secondGroupOperation = group().sum("firstVolume").as("totalVolumeSum");
        
        //aggrgation 모든 파이프라인을 합쳐주는 로직
        AggregationResults<TradeVolumeGroup> aggregate = this.mongoTemplate.aggregate(newAggregation(matchOperation, firstGroupOperation,secondGroupOperation), cryptoId/*컬렉션*/, TradeVolumeGroup.class/*매핑할 class*/);
        
        return aggregate.getMappedResults();
    }
    
    // ...
  }

지금까지, Aggregation을 가지고 저희가 수집하고 있는 Upbit의 거래 데이터를 나름의 방법으로 검증하는 방법이었습니다. 다음엔 더 좋은 주제를 가지고 돌아오겠습니다~!! ㅎ.ㅎ

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다