대신증권 API를 활용해서 해외 주식의 주가를 가져오는 코드를 작성 중이였는데 문제가 발생했다.
너무 오래 걸린다..
한 종목당 2 ~ 5초 정도는 걸리는 듯하다. 한 종목당 2800개의 데이터를 가져오는데, 종목 전체가 11,464개이다.
위 이미지에 보이는 4000개 정도 넣은 것도 귀찮아서 반나절은 기다려서 받은 것이다.
문제가 되는 코드를 보자.
# 모든 종목코드 순회
for idx, stock_info in enumerate(all_stock_code):
logging.info(f"{idx} / {len(all_stock_code)} .... {round((idx/len(all_stock_code)) * 100, 2)}%")
# 특정 종목코드의 금액 정보 가져오기
maria_connection.clean_bind()
maria_connection.add_bind(stock_info['stock_code'])
curr_price_days_diff = maria_connection.select(StockPrice.SELECT_CURR_PRICE_DAYS_DIFF) # 테이블에 있는 데이터와 차이나는 날짜
days_diff = curr_price_days_diff[0].get('DAYS_DIFF', 10000)
data_size = days_diff-1
# 어제 데이터까지 존재한다면 1
if days_diff > 1:
stock_price_list = stock_price_fetcher.get_stock_price(stock_code=stock_info['stock_code'], data_size=data_size)
bind_stock_list = []
logging.info(f"종목: {stock_info['stock_name']}, 넣을 데이터양: {len(stock_price_list)}")
for stock_price in stock_price_list:
bind_stock = (
stock_info['stock_code'], # 종목 코드
stock_price['date'], # 날짜(YYYYMMDD)
stock_price['open'], # 시가
stock_price['high'], # 고가
stock_price['low'], # 저가
stock_price['close'], # 종가
stock_price['volume'] # 거래량
)
bind_stock_list.append(bind_stock)
maria_connection.clean_bind()
maria_connection.add_bind(bind_stock_list)
maria_connection.insert_many(StockPrice.INSERT_STOCK_PRICE)
else:
logging.info(f"종목: {stock_info['stock_name']} [PASS]")
maria_connection.close()
모든 종목코드의 주식 데이터를 넣기 위해 모든 종목에 대해서 순회를 돌며 데이터를 삽입한다.
커넥션 하나로 11,464(종목) X 2800(주가) 만큼의 데이터를 넣으려고 하니 시간이 오래 걸리는 것 같다.
내가 생각한 개선 방법으로는
1. async(비동기) 처리
2. 멀티스레딩 처리
위 두 가지 방법을 생각했는데, 공통적으로 결국 커넥션 풀이 필요하다.
[커넥션 풀 코드]
# Connection pool
import queue
import threading
import mysql.connector
from common.database.MySQL import MySQL
class MySQLPool:
def __init__(self):
self.acquire_lock = threading.Lock()
self.release_lock = threading.Lock()
self.connection_pool = queue.Queue()
def connect(self, host: str, user: str, passwd: str, database: str, max_count: int = 10, port: int = 3306) -> None:
for _ in range(max_count):
self.connection_pool.put(
mysql.connector.connect(
host=host,
port=port,
user=user,
passwd=passwd,
database=database
)
)
def close(self):
for _ in range(self.connection_pool.qsize()):
connection: MySQL = self.connection_pool.get()
connection.close()
def get_connection(self):
with self.acquire_lock:
try:
# 모두 꺼낸 경우 무한대기 상태 방지
connection = self.connection_pool.get(block=False)
except queue.Empty:
connection = None
return connection
def put_connection(self, connection: MySQL) -> None:
with self.release_lock:
self.connection_pool.put(connection)
def get_size(self) -> int:
return self.connection_pool.qsize()
[테스트 코드]
# Client code
from common.database.MySQLPool import MySQLPool
if __name__ == '__main__':
maria_pool = MySQLPool()
maria_pool.connect(
host='192.168.35.175',
user='root',
passwd='1234',
database='daesin',
max_count=10
)
# 최초 생성시 풀 사이즈 체크(10)
pool_size = maria_pool.get_size()
print(f"pool_size: {pool_size}")
# 커넥션 획득 후 mysql.connector 오브젝트 확인
connection = maria_pool.get_connection()
print(connection.__class__)
# 하나의 커넥션을 꺼낸 후 풀 사이즈 체크(9)
pool_size = maria_pool.get_size()
print(f"pool_size: {pool_size}")
# 커넥션 반환 후 풀 사이즈 체크(10)
maria_pool.put_connection(connection)
pool_size = maria_pool.get_size()
print(f"pool_size: {pool_size}")
maria_pool.close()
위와 같이 간단하게 커넥션 풀을 만들어보았다.
이제 하루종일 걸리는 데이터 삽입 처리를 개선해보자.
'이슈' 카테고리의 다른 글
[Python] 2. 멀티스레딩으로 MariaDB 병렬 삽입(with 대신증권 API) (0) | 2024.04.12 |
---|