암호화폐 자동 트레이딩 봇 구현하기
가끔씩 자동매매 구현하고 싶어질 때가 있다. Upbit API를 활용해 데이터 수집, 매수/매도 전략 적용, 그리고 실제 거래를 자동으로 수행시켜보았다.
1. 설정 로드 및 로깅 설정
트레이딩 봇의 설정을 JSON 파일에서 로드하고, 콘솔과 파일에 로그를 기록하기 위한 로깅 시스템을 설정한다.
import json
import logging
from logging.handlers import RotatingFileHandler
def load_config(file_path):
with open(file_path, 'r') as f:
return json.load(f)
def setup_logger():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(console_format)
logger.addHandler(console_handler)
file_handler = RotatingFileHandler('trading_log.txt', maxBytes=10*1024*1024, backupCount=5)
file_handler.setLevel(logging.INFO)
file_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_format)
logger.addHandler(file_handler)
return logger
logger = setup_logger()
2. UpbitAPI 클래스
이 클래스는 Upbit API와의 상호작용을 관리하며, 주요 기능으로는 API 호출, 시장 데이터 수집, 거래 실행 등이 있다.
import pyupbit
import requests
import time
from collections import defaultdict
from datetime import datetime
class UpbitAPI:
def __init__(self, config):
self.upbit = pyupbit.Upbit(config['access_key'], config['secret_key'])
self.request_limit = 600
self.request_count = 0
self.request_time = time.time()
self.data_cache = defaultdict(lambda: {"data": None, "timestamp": None})
def call_api(self, func, *args, **kwargs):
current_time = time.time()
if current_time - self.request_time >= 60:
self.request_count = 0
self.request_time = current_time
if self.request_count >= self.request_limit:
sleep_time = 60 - (current_time - self.request_time)
if sleep_time > 0:
logger.warning(f"API 제한에 도달했습니다. {sleep_time:.2f}초 동안 대기합니다.")
time.sleep(sleep_time)
self.request_count = 0
self.request_time = time.time()
for attempt in range(3):
try:
result = func(*args, **kwargs)
self.request_count += 1
time.sleep(0.1) # API 호출 간 짧은 대기 시간 추가
return result
except Exception as e:
logger.warning(f"API 호출 실패 (시도 {attempt + 1}): {e}")
if "JWT" in str(e):
logger.error("JWT 인증 실패. API 키를 확인하세요.")
time.sleep(2 ** attempt)
logger.error("3번의 시도 후 API 호출 실패")
return None
def get_top_market_cap_tickers(self, limit=50):
url = "https://api.upbit.com/v1/market/all"
response = self.call_api(requests.get, url)
if response is None:
logger.error("시장 데이터를 가져오는데 실패했습니다.")
return []
markets = [market['market'] for market in response.json() if market['market'].startswith('KRW-')]
tickers_data = []
for market in markets:
url = f"https://api.upbit.com/v1/ticker?markets={market}"
response = self.call_api(requests.get, url)
if response is None:
logger.warning(f"{market} 데이터를 가져오는데 실패했습니다.")
continue
data = response.json()
if data and isinstance(data, list) and len(data) > 0:
ticker_data = data[0]
tickers_data.append({
'market': market,
'market_cap': ticker_data['acc_trade_price_24h'] * ticker_data['trade_price']
})
time.sleep(0.1)
sorted_tickers = sorted(tickers_data, key=lambda x: x['market_cap'], reverse=True)
return [ticker['market'] for ticker in sorted_tickers[:limit]]
def get_ohlcv(self, ticker, interval="day", count=10):
cache_key = f"{ticker}_{interval}_{count}"
cache_data = self.data_cache[cache_key]
if cache_data["data"] is not None and (datetime.now() - cache_data["timestamp"]).total_seconds() < 600:
return cache_data["data"]
try:
df = self.call_api(pyupbit.get_ohlcv, ticker, interval=interval, count=count)
if df is None:
logger.warning(f"{ticker}의 OHLCV 데이터 가져오기 실패: None 반환")
return None
if df.empty:
logger.warning(f"{ticker}의 OHLCV 데이터 가져오기 실패: 빈 데이터프레임")
return None
self.data_cache[cache_key] = {"data": df, "timestamp": datetime.now()}
return df
except Exception as e:
logger.error(f"{ticker}의 OHLCV 데이터 가져오기 오류: {e}", exc_info=True)
return None
def get_current_price(self, ticker):
price = self.call_api(pyupbit.get_current_price, ticker)
if price is None:
logger.warning(f"{ticker}의 현재 가격을 가져오는데 실패했습니다.")
return price
def get_balance(self, ticker):
balance = self.call_api(self.upbit.get_balance, ticker)
if balance is None:
logger.warning(f"{ticker}의 잔액을 가져오는데 실패했습니다.")
return 0
return balance
def buy_market_order(self, ticker, amount):
result = self.call_api(self.upbit.buy_market_order, ticker, amount)
if result is None:
logger.error(f"{ticker} 매수 주문 실패")
return result
def sell_market_order(self, ticker, amount):
result = self.call_api(self.upbit.sell_market_order, ticker, amount)
if result is None:
logger.error(f"{ticker} 매도 주문 실패")
return result
3. 거래 전략 함수
여러 가지 거래 전략을 정의한다. 단순 이동 평균, 볼린저 밴드, RSI 등을 사용해 매수/매도 신호를 생성한다.
import pandas as pd
import numpy as np
def simple_ma_strategy(df):
if df is None or df.empty:
return pd.Series()
df['MA5'] = df['close'].rolling(window=5).mean()
df['MA10'] = df['close'].rolling(window=10).mean()
df['Signal'] = np.where(df['MA5'] > df['MA10'], 1, 0)
return df['Signal']
def bollinger_bands_strategy(df, window=20):
if df is None or df.empty:
return pd.Series()
df['MA'] = df['close'].rolling(window=window).mean()
df['STD'] = df['close'].rolling(window=window).std()
df['Upper'] = df['MA'] + (df['STD'] * 2)
df['Lower'] = df['MA'] - (df['STD'] * 2)
df['Signal'] = np.where(df['close'] > df['Upper'], -1, np.where(df['close'] < df['Lower'], 1, 0))
return df['Signal']
def rsi_strategy(df, period=14):
if df is None or df.empty:
return pd.Series()
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=period).mean()
avg_loss = loss.rolling(window=period).mean()
rs = avg_gain / avg_loss
df['RSI'] = 100 - (100 / (1 + rs))
df['Signal'] = np.where(df['RSI'] > 70, -1, np.where(df['RSI'] < 30, 1, 0))
return df['Signal']
4. 백테스트 함수
주어진 거래 전략을 과거 데이터에 적용해 성과를 평가한다.
def backtest_strategy(df, signal):
if df is None or df.empty or signal.empty:
return 0
df['Return'] = df['close'].pct_change()
df['Strategy_Return'] = df['Return'] * signal.shift(1)
return df['Strategy_Return'].sum()
5. 전략 업데이트 및 거래 실행 함수
트레이딩 봇은 상위 시가총액 코인을 대상으로 최적의 거래 전략을 업데이트하고, 이를 바탕으로 실제 거래를 수행한다.
전략 업데이트 함수
이 함수는 시가총액 상위 50개의 코인에 대해 데이터를 수집하고, 여러 거래 전략을 적용해 성과를 분석한다. 그런 다음, 가장 성과가 좋은 전략과 코인을 선택한다.
def update_strategy(api):
try:
top_tickers = api.get_top_market_cap_tickers(50)
if not top_tickers:
logger.error("시가총액 상위 종목 가져오기 실패")
return None, []
strategy_results = {}
coin_performance = {}
valid_data_count = 0
for ticker in top_tickers:
try:
df = api.get_ohlcv(ticker, interval="day", count=10)
if df is None or df.empty:
logger.warning(f"데이터 부족으로 {ticker} 건너뛰기")
continue
valid_data_count += 1
coin_performance[ticker] = {}
for strategy_name, strategy_func in strategies.items():
signal = strategy_func(df)
returns = backtest_strategy(df, signal)
if strategy_name not in strategy_results:
strategy_results[strategy_name] = []
strategy_results[strategy_name].append(returns)
coin_performance[ticker][strategy_name] = returns
except Exception as e:
logger.error(f"{ticker} 분석 중 오류 발생: {e}", exc_info=True)
if valid_data_count < 5: # 최소 5개의 유효한 데이터가 필요하다고 가정
logger.error(f"유효한 데이터가 부족합니다. (현재: {valid_data_count}개)")
return None, []
if not strategy_results:
logger.error("유효한 전략 결과 없음")
return None, []
avg_strategy_performance = {strategy: np.mean(returns) for strategy, returns in strategy_results.items()}
best_strategy = max(avg_strategy_performance, key=avg_strategy_performance.get)
N = min(5, valid_data_count) # 선택할 코인 수를 유효한 데이터 수로 제한
best_coins = sorted(coin_performance.items(),
key=lambda x: x[1].get(best_strategy, -np.inf),
reverse=True)[:N]
selected_tickers = [coin[0] for coin in best_coins]
logger.info(f"업데이트된 최고 전략: {best_strategy}")
logger.info(f"선택된 코인: {selected_tickers}")
return best_strategy, selected_tickers
except Exception as e:
logger.error(f"update_strategy 함수에서 오류 발생: {e}", exc_info=True)
return None, []
거래 실행 함수
이 함수는 선택된 전략과 코인을 바탕으로 실제 거래를 수행한다. 매수 신호가 발생하면 매수를, 매도 신호가 발생하면 매도를 실행한다.
def execute_trades(api, best_strategy, selected_tickers):
logger.info("거래 실행 중")
for ticker in selected_tickers:
try:
df = api.get_ohlcv(ticker, interval="minute60", count=24)
if df is None or df.empty:
logger.warning(f"데이터 부족으로 {ticker} 거래 건너뛰기")
continue
signal = strategies[best_strategy](df)
current_price = api.get_current_price(ticker)
balance = api.get_balance(ticker)
krw_balance = api.get_balance("KRW")
logger.info(f"{ticker} - 현재 가격: {current_price}, 보유량: {balance}, KRW 잔액: {krw_balance}")
if signal.iloc[-1] == 1 and balance == 0 and krw_balance > 1000:
buy_amount = min(krw_balance, 100000)
result = api.buy_market_order(ticker, buy_amount)
if result:
logger.info(f"{ticker} 매수 성공 - 금액: {buy_amount}")
else:
logger.error(f"{ticker} 매수 실패 - 금액: {buy_amount}")
elif balance > 0:
avg_buy_price = api.upbit.get_avg_buy_price(ticker)
if avg_buy_price is not None and current_price is not None:
if current_price >= avg_buy_price * 1.05:
result = api.sell_market_order(ticker, balance)
if result:
logger.info(f"{ticker} 매도 성공 - 목표 수익 달성")
else:
logger.error(f"{ticker} 매도 실패 - 목표 수익 달성")
elif current_price <= avg_buy_price * 0.97:
result = api.sell_market_order(ticker, balance)
if result:
logger.info(f"{ticker} 매도 성공 - 손절 트리거")
else:
logger.error(f"{ticker} 매도 실패 - 손절 트리거")
else:
logger.warning(f"가격 데이터 부족으로 {ticker} 거래 건너뛰기")
except Exception as e:
logger.error(f"{ticker} 거래 중 오류 발생: {e}", exc_info=True)
이와 같이 update_strategy
함수는 최적의 거래 전략과 코인을 선택하고, execute_trades
함수는 이를 바탕으로 실제 거래를 수행한다. 이를 통해 트레이딩 봇은 수익을 극대화하고, 손실을 최소화하는 거래를 자동으로 진행할 수 있다.
6. 메인 함수
트레이딩 봇의 메인 루프를 설정해 전략 업데이트와 거래 실행을 주기적으로 수행한다.
def main():
try:
logger.info("트레이딩 봇 시작")
config = load_config("config.json")
api = UpbitAPI(config)
best_strategy, selected_tickers = update_strategy(api)
if best_strategy is None:
logger.error("전략 및 종목 초기화 실패. 종료합니다.")
return
last_update = datetime.now().date()
while True:
try:
now = datetime.now()
logger.info(f"현재 시간: {now}")
if now.date() > last_update:
logger.info("새로운 날짜 - 전략 및 코인 목록 업데이트")
new_best_strategy, new_selected_tickers = update_strategy(api)
if new_best_strategy is not None:
best_strategy, selected_tickers = new_best_strategy, new_selected_tickers
last_update = now.date()
else:
logger.warning("전략 및 종목 업데이트 실패. 이전 값을 사용합니다.")
logger.info(f"전략 실행: {best_strategy}")
execute_trades(api, best_strategy, selected_tickers)
logger.info("다음 실행 대기 중")
time.sleep(3600)
except Exception as e:
logger.error(f"메인 루프에서 오류 발생: {e}", exc_info=True)
logger.info("5분 후 재시도")
time.sleep(300)
except Exception as e:
logger.critical(f"치명적인 오류 발생: {e}", exc_info=True)
if __name__ == "__main__":
main()
크롬 원격 PC제어
크롬 원격 PC제어 통해서 집에서 동작중인 코드 점검하기. AWS나 구글클라우드서버 사용해도 되지만, 투입 비용에 비해서 사용료가 너무 비싸서 그냥 로컬에서 잠시 사용해본다. 10만원 체결됐다.
'프로젝트 > 자동매매' 카테고리의 다른 글
파이썬 가상화폐 자동매매 서비스 구현 (2) - 자본 분배 기능, 다중 거래 지원, 리스크 관리, 로깅 강화 (0) | 2024.07.07 |
---|