[Python] 병렬처리, Multiprocessing
Programming/Python

[Python] 병렬처리, Multiprocessing

병렬처리의 필요성

요즘같이 대용량 데이터를 처리하는 과정에서 병렬처리 활용은 선택이 아닌 필수가 되어버렸다. 극단적으로 말하자면 코드를 어떻게 짜느냐에 따라 똑같은 작업이라도 1시간이 걸릴 수 있고, 단 1초 만에 완료될 수 도 있다. 코드를 돌려놓고 하염없이 기다리기보다 병렬처리를 배워서 적용하는 시간이 오히려 더 빠르다. 그리고 추후에도 잘 응용하여 사용함으로써 데이터 분석가의 역량을 키워 나갈 수 있다. 하드웨어가 심각하게 싸지지 않는 한 앞으로 더더욱 연산속도와 메모리 관리는 중요한 이슈가 될 것이라고 생각한다.

Multiprocessing

multiprocessing 모듈은 threading 모듈과 유사한 API를 사용하여 *프로세스 스포닝(spawning)을 지원하는 패키지이다. multiprocessing 모듈 내 정의되어 있는 PoolProcess 함수를 이용하여 병렬구조 연산을 처리할 수 있다.

 

* threading 모듈 : 스레드 기반 병렬 처리를 지원하는 모듈

* 프로세스 스포닝(spawning) : 새로운 자식 프로세스를 로드하고 실행하는 것

 

어떤 상황에 사용하면 좋은지?


A. for 문으로는 시간이 너무 오래 소요될 때.

내 생각에 멀티프로세싱은 for 문보다 고차원적인 것 같다.

for문의 반복은 하나씩 처리가 되기 때문에 대용량 데이터이거나 알고리즘이 복잡하면 시간이 기하급수적으로 늘어날 수 있다. 그래서 우리는 처리 시간 및 응답 시간의 소요 시간을 줄이기 위해 병렬 처리를 사용할 수 있다.

 

B. 활용 가능한 CPU 개수가 많을 때.

 

가용 CPU 개수가 많지 않을 땐 싱글 프로세스와 멀티 프로세스의 처리 & 응답 시간에서 별 차이가 없다.

 

실컷 코드를 다 짜 놓고 돌렸는데 처리 시간이 비슷할 수도 있다. 참으로 김새는 일이 아닐 수 없다. 이를 예방하기 위해 사용 가능한 CPU 개수를 미리 파악해두자.

 

사용 가능 CPU 개수 확인 방법 (예시)

 

>>> import multiprocessing as mp
>>> mp.cpu_count()
4

 

Pool

내가 실행시키고자 하는 함수를 process 에 분배하여 병렬처리 할 수 있다. 

 

일반적 연산과 병렬 처리 연산 비교

예시로 내가 만약 Random Walk 시뮬레이션을 진행한 그래프 48장을 png 파일로 저장을 하고 싶다고 하자.

 

Random Walk 시뮬레이션 그래프를 저장하는 코드
import matplotlib.pyplot as plt
import random

def random_walk_(seed_):  
  
  random.seed(seed_)

  fig = plt.figure(figsize=(12,6))

  for i in range(100):
    walk_ = [0]
    for j in range(100):
      walk_.append(random.uniform(-1,1) + walk_[-1])
    plt.plot(walk_,alpha=0.5)
  
  plt.savefig(f'{seed_}.png')
  plt.close(fig)
  
  return None
random_walk_(0)

파일로 저장된 Random Walk 시뮬레이션 그래프

이러한 작업을 일반적인 연산으로 실행해보고 소요시간을 확인해보자.

 

일반적 연산
def process_A():
  list(map(random_walk_, range(48)))
>>> import time

>>> start = int(time.time())
>>> process_A() # 일반적인 연산
>>> print(int(time.time() - start))
15

 

위에서 정의한 함수를 이용해 png 파일 48장을 저장하는 데까지 일반적 연산으로는 약 15초가량 소요됨을 알 수 있었다.

 

병렬처리 Pool
from multiprocessing import Pool

def process_B():
  num_cores = 4
  pool = Pool(num_cores)
  pool.map(random_walk_,range(48))
>>> import time

>>> start = int(time.time())
>>> process_B() # 병렬처리 Pool
>>> print(int(time.time() - start))
7

 

위의 일반적 연산의 소요시간인 15초보다 8초 단축된 약 7초가 소요되었음을 알 수 있다. 이는 거의 절반 정도의 소요시간이라고 할 수 있다. 이와 같이 Pool 함수를 사용하면 보다 더 빠른 처리가 가능하다.

 

+ 추가 

데이터 프레임의 변수에 대해 병렬 처리해보자

>>> import pandas as pd

>>> df = pd.DataFrame({'date':pd.date_range('2019-01-01','2020-12-31',freq='1min')})
>>> df

만약 이러한 데이터가 있다고 가정해보자. 

예제 데이터

행의 개수가 1051201 rows로써 비교적 많다(?)라고 할 수 있다. (물론 빅데이터 시대에 100만 로우 정도면 많지도 않은 거지만..) 이 데이터를 가지고 만약 hh:mm:ss 형태의 정보를 담은 time 변수를 만들고 싶다고 가정하자. 그리고 이 경우에 대해서 일반적 연산과 병렬처리 연산을 비교해보고자 한다.

 

일반적 연산
>>> import time
>>> import pandas as pd
>>> from multiprocessing import Pool

>>> df = pd.DataFrame({'date':pd.date_range('2019-01-01','2020-12-31',freq='1min')})

>>> # 연산 시작
>>> start = int(time.time())

>>> df['time'] = df['date'].apply(lambda x : str(x)[-9:])

>>> # 연산 소요시간 측정
>>> print(int(time.time() - start))

>>> df
6

일반적 연산으로 실행한 결과 데이터프레임

약 6초 정도가 소요된 것을 확인할 수 있다. 데이터 수가 많고 복잡한 연산일수록 소요 시간은 기하급수적으로 늘어날 수 있다.

 

데이터프레임 병렬처리
>>> import time
>>> import pandas as pd
>>> import numpy as np
>>> from multiprocessing import Pool

>>> df = pd.DataFrame({'date':pd.date_range('2019-01-01','2020-12-31',freq='1min')})

>>> def make_time(data):
>>>   data['time'] = data['date'].apply(lambda x : str(x)[-9:])
>>>   return data

>>> def parallel_df(df,func,n_cores):
>>>   df_split = np.array_split(df,n_cores)
>>>   pool = Pool(n_cores)
>>>   df = pd.concat(pool.map(func, df_split))
>>>   pool.close()
>>>   pool.join()
>>>   return df

>>> # 연산 시작
>>> start = int(time.time())

>>> df = parallel_df(df,make_time,n_cores=4)

>>> # 연산 소요시간 측정
>>> print(int(time.time() - start))

>>> df
3

병렬처리 연산으로 실행한 결과 데이터프레임

병렬 처리로 수행한 작업의 소요시간은 3초로 일반적 연산에 비해 절반 수준임을 알 수 있었다. 이와 같은 방법으로 데이터프레임 전처리를 병렬적으로 실행하여 소요시간을 줄일 수도 있겠다.

 

Process

Process는 하나의 프로세스에 하나의 함수를 할당하여 실행하는 방식이다.

>>> from multiprocessing import Process
>>> import time

>>> # 연산 시작
>>> start = int(time.time())

>>> procs = []    

>>> for num in range(48):
>>>       proc = Process(target=random_walk_, args=(num,))
>>>       procs.append(proc)

>>>       proc.start()

>>> for proc in procs:
>>>   proc.join()

>>> # 연산 소요시간 측정
>>> print(int(time.time() - start))
7

Process 함수로 실행한 병렬 처리 연산은 위의 Pool 함수 연산과 동일하게 약 7 초가량 소요됨을 알 수 있었다. 

 

Pool 함수와 Process 함수의 실행 방법의 차이점

구글링을 통해 아주 적절한 비유를 찾을 수 있었다.

 

두 함수 모두 병렬 처리를 위해 사용되지만 방식의 차이가 존재했다. 쉽게 이해하자면 Pool 은 말 그대로 처리할 일을 바닥에 뿌려놓고 알아서 분산 처리를 하게 만드는 방식이고 Process는 각 프로세스별로 할당량을 명시적으로 정해준 뒤 실행되는 방식이다. 두 방식의 차이점을 기억해두면 좋을 것 같다.

 

 

결론

바야흐로 대용량 데이터 시대가 도래함에 따라 우리는 병렬처리에 대한 이해를 통해 좀 더 빠른 연산을 필수적으로 익혀두어야 할 것이다.

 

참고
yganalyst.github.io/data_handling/memo_17_parallel/
https://zzaebok.github.io/python/python-multiprocessing/