2024.04.07 - [Python] - [Python] 1. 멀티스레딩으로 MariaDB 병렬 삽입(with 대신증권 API)
전에 대신증권 API를 통해서 받은 주식 데이터를 MariaDB에 넣으려고 했으나 오래걸리는 이슈가 있었다.
그래서 병렬작업으로 데이터를 밀어넣기 위해 커넥션 풀을 만들어줬다.
그 후 작업으로는 생성한 커넥션 풀을 가지고 병렬작업을 해주어야 하는데
방법은
1. 멀티스레딩
2. 비동기 처리
이정도로 생각해볼 수 있다.
일단 나는 스레딩 처리가 익숙해서 멀티스레딩 처리로 코드를 수정했고, 코드를 돌려보니 에러가 발생했다.
아..
찾아보니 누군가 문의한 글이 있다.
API 함수 호출을 스레딩으로는 지원하진 않는다고 한다.
내가 하려는 방식은
종목 코드(API, 11,000) -> 코드에 대한 주가 데이터(API, 2800) -> 데이터 삽입(Maria)
위와 같은 순서이기 때문에 가져온 주가 데이터만 스레딩 처리한다면 문제는 없겠지만...
음... 그래도 2800건에 대한 데이터를 쪼개서 처리하기보단 11,000건에 대해서 쪼개는 게 훨씬 효과적이지 않을까?
일단 다른 방안으로는 비동기처리하는 방법이 있으니 비동기 방법으로도 해봤다.
227.7 / 50(batch_size) = 4.5
=> 한 종목당 4.5초
??
API에서 데이터를 가져오는데 시간은 크게 걸리지 않는데 RDB에 데이터를 넣는데 오래 걸린다.
동시성 코드에서 I/O 바운드 작업이 일어날 땐 블럭킹이 되어 의도한대로 동작하지 못하는 경우가 있다.
이런 경우 run_in_executor() 메서드를 통해서 개선할 수 있다.
from DaesinAPI.StockCodeFetcher import StockCodeFetcher
from DaesinAPI.StockPriceFetcher import StockPriceFetcher
from common.database.MySQL import MySQL
from common.database.MySQLPool import MySQLPool
from resources.queries import StockPrice
import asyncio
import logging, traceback
import time
class InsertDaesinStockPrice:
def __init__(self):
self.maria_pool = MySQLPool()
# 종목가격 인스턴스 생성
self.stock_price_fetcher = StockPriceFetcher()
def maria_pool_connect(self):
self.maria_pool.connect(
host='192.168.35.175',
user='root',
passwd='1234',
database='daesin',
max_count=50
)
def maria_pool_close(self):
self.maria_pool.close()
async def process_stock_price(self, stock_code):
def insert_stock_price():
maria_connection.clean_bind()
maria_connection.add_bind(bind_stock_list)
maria_connection.insert_many(StockPrice.INSERT_STOCK_PRICE)
try:
# 특정 종목코드의 금액 정보 가져오기
maria_connection: MySQL = self.maria_pool.get_connection()
try:
maria_connection.clean_bind()
maria_connection.add_bind(stock_code)
curr_price_days_diff = maria_connection.select(StockPrice.SELECT_CURR_PRICE_DAYS_DIFF) # 테이블에 있는 데이터와 차이나는 날짜
days_diff = curr_price_days_diff[0].get('DAYS_DIFF', 10000)
data_size = days_diff-1
# 어제 데이터까지 존재한다면 1
if days_diff > 1:
stock_price_list = self.stock_price_fetcher.get_stock_price(stock_code=stock_code,
data_size=data_size)
bind_stock_list = []
logging.info(f"종목코드: {stock_code}, 넣을 데이터양: {len(stock_price_list)}")
for stock_price in stock_price_list:
bind_stock = (
stock_code, # 종목 코드
stock_price['date'], # 날짜(YYYYMMDD)
stock_price['open'], # 시가
stock_price['high'], # 고가
stock_price['low'], # 저가
stock_price['close'], # 종가
stock_price['volume'] # 거래량
)
bind_stock_list.append(bind_stock)
maria_connection.clean_bind()
maria_connection.add_bind(bind_stock_list)
await asyncio.get_running_loop().run_in_executor(None, insert_stock_price)
# maria_connection.insert_many(StockPrice.INSERT_STOCK_PRICE)
else:
logging.info(f"종목코드: {stock_code} [PASS]")
except Exception as error_message:
maria_connection.rollback()
logging.info(traceback.print_exc())
# log.info(error_message)
finally:
self.maria_pool.put_connection(maria_connection)
except Exception as e:
logging.error(e)
async def process_stock_batch(self, batch_data):
# 사이즈만큼의 데이터를 한 번에 동작
tasks = []
for stock_info in batch_data:
task = asyncio.create_task(self.process_stock_price(stock_info['stock_code']))
tasks.append(task)
await asyncio.gather(*tasks)
async def insert_all_stock_price(self):
# 마리아 커넥션 풀 생성
self.maria_pool_connect()
# 종목코드 인스턴스 생성
stock_code_fetcher = StockCodeFetcher()
# 종목코드 모두 가져오기
all_stock_code = stock_code_fetcher.select_code_all()
# 50개씩 동작
batch_size = 50
for idx in range(0, len(all_stock_code), batch_size):
start_time = time.time()
logging.info(f"{idx} / {len(all_stock_code)} .... {round((idx/len(all_stock_code)) * 100, 2)}%")
batch_data = all_stock_code[idx:idx+batch_size]
await self.process_stock_batch(batch_data)
logging.info(f"It took {round(time.time() - start_time, 1)} seconds")
# 마리아 커넥션 풀 해제
self.maria_pool_close()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s [%(filename)s:%(lineno)d]')
insert_daesin_stock_price = InsertDaesinStockPrice()
asyncio.run(insert_daesin_stock_price.insert_all_stock_price())
'이슈' 카테고리의 다른 글
[Python] 1. 멀티스레딩으로 MariaDB 병렬 삽입(with 대신증권 API) (0) | 2024.04.07 |
---|