ㄷㅣㅆㅣ's Amusement

[Android/Java] 병렬 프로그래밍 : Executor Framework에대한 고찰 ----- 3 본문

Programming/Android

[Android/Java] 병렬 프로그래밍 : Executor Framework에대한 고찰 ----- 3

ㄷㅣㅆㅣ 2016. 12. 7. 11:28

[Android/Java] 병렬 프로그래밍 : Executor Framework에대한 고찰 ----- 3

반응형

Utilization of Thread Pool




- 지금까지 1,2부, 번외로 volatile관련 포스팅까지 합쳐 세번의 포스팅을 하면서 Thread의 생성, 중단, 안정성에 관한 내용을 정리하였다.

  이번에는 그 마지막인 활용에 대한 포스팅을 한다.


  • Thread pool 사용시 고려할 점
    • 각각의 thread는 독립적이어야 한다.
      • 각각의 thread는 다른 thread에 의존적이지 않아야 성능/관리상의 이슈가 없다고 여러번 언급했었다.  
      • 그 이유로는 의존성이 Deadlock을 발생시키거나 그로인해 Executor를 종료하지 않아 JVM이 종료되지 못하는 상황을 예로 들었었다.
    • Thread Pool에 등록된 각각의 thread가 필요한 시간을 예상해야한다.
      • Thread의 독립성 문제처럼 deadlock이 발생하 않더라도 제한된 thread pool에 많은 시간을 필요로하는 thread들이 등록된다면 문제가 발생할 수 있다.
      • 만약 5개의 Capacity를 가진 thread pool에 짧은 시간을 필요로하는 thread_a와 매우 긴 시간을 필요로하는 thread_b가 등록될 수 있다면, 시간이 지남에따라 thread_b만 풀에 가득 차는 결과를 초래할 것이고, 그렇게되면 thread_a이든 thread_b든 더이상 등록을 못하게되는 상황이 오게된다.
      • 일정 시간동안만을 특정 thread에 할애하고, 그 시간 안에 마치지 못했다면 중단하고 다시 큐의 맨 마지막에 등록하거나, 등록한측에게 리턴하는 식의 방어코드로 thread pool 자체를 한번씩 비워줄 수 있겠지만, thread_b가 언제나 많은 시간을 필요로 한다면 이역시 좋은 코드는 아니다.
      • 그렇다고해서 thread pool의 capacity를 무한정으로 늘리면 각 thread가 cpu를 사용하기위해 경쟁을 하고, context switch, 과도한 memory 사용등으로 인해 성능이 매우 떨어지는 결과를 초래할 것인다.
      • 때문에 thread pool의 크기를 제한할 때에는 어떻게 알맞은 크기로 제한할 것인가에 대한 고민이 동반되어야한다. (물론 정확히 알수는 없다는 것을 안다.)


  • ThreadPoolExecutor
    • Executor 생성시의 Factory method와 옵션을 줄 수 있는 ThreadPoolExecutor.
      • 지금까지 예제에서 Executor를 생성할 때에는 다음과 같은 Factory method를 사용했었다.
        1. newFixedThreadPool() 
          • corePoolSize : 생성시 지정
          • maximumPoolSize :  생성시에 지정한다
          • keepAliveTime(각 쓰레드가 쓸 수 있는 시간) : 무한정.
          • queue : unlimited size of LinkedBlockingQueue
        2. newCachedThreadPool()
          • corePoolSize : 0
          • maximumPoolSize : Integer.MAX_VALUE
          • keepAliveTime : 1 min.
          • queue : SynchronousQueue. (자세한 내용은 아래에서 언급하겠다.)
        3. newScheduledThreadPool() - 1번의 newFixedThreadPool()과 같으나, 시작시간을 정할 수 있는 특성이 있다.
        4. newSingleThreadExecutor()
          • corePoolSize : 1
          • maximumPoolSize : 1
          • keepAliveTime : 무한정
          • queue : unlimited size of LinkedBlockingQueue
      • Factory method를 이용해 executor를 생성하면 기본적으로 설정되어있어 매우 편하게 사용할 수 있는 반면, 현재 개발하고있는 프로젝트의 성격과 맞지 않아 어려움을 격게될지도 모른다.
      • 이럴때에는 ThreadPoolExecutor클래스를 이용하여 여러가지 옵션을 실정에 맞게 설정할 수 있다. 다음은 ThreadPoolExecutor의 생성자들이다.

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            throw new RuntimeException("Stub!");
        }
         
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            throw new RuntimeException("Stub!");
        }
         
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            throw new RuntimeException("Stub!");
        }
         
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactoryRejectedExecutionHandler handler) {
            throw new RuntimeException("Stub!");
        }
        cs
        • ThreadPoolExecutor는 queue에 작업이 들어올 때마다 thread의 수를 늘리지만, queue의 작업이 없어지더라도 (되도록이면)corePoolSize에 맞게 thread 수를 유지한다. (1부에서 언급한대로 executor는 작업이 들어올때마다 thread를 생성하고 작업이 종료되면 thread도 함께 중단하는 는 방식이 아니라 쉬고있는 thread에 작업을 할당하고, 작업이 종료되면 thread를 다시 쉬게한다.
        • ThreadPoolExecutor는 queue에 작업이 가득 찬 경우에는 thread의 수를 corePoolSize로 늘릴 수도 있다. (물론 그전에는 넘지 않는다)
        • ThreadPoolExecutor를 이용하면 queue가 가득 찼을 때 work around할 수도 있다. (RejectedExecutionHandler, 하단에 따로 자세히 다룬다.)  
    • BlockingQueue<?> - Executor에서 사용하는 queue
      • 지금까지의 예제에서는 BlockingQueue<Runnable>를 사용했으나, queue 또한 다른것을 사용하여 특성을 달리할 수 있다.
      • queue의 방식은 크게 다음의 세 가지로 나뉜다.
        1. 크기에 제한이 없는 queue.
        2. 크기에 제한이 있는 queue.
        3. 단순히 thread에 넘겨주기만 하는 queue - SynchronousQueue
          • thread의 개수가 많거나 생성에 제한이 없는 경우에 작업을 queue에 쌓지 않고 바로 그리고 직접 thread에 넘겨줄 수도 있다.
          • 이때에는 thread가 작업을 받기위해 미리 대기중이어야 한다.
          • Java 6 이상에서는 blocking되지 않는 방식으로 바뀌었고, 이로인해 이전 버전에 의해 3배 이상의 성능향상을 보인다고 한다. (Scherer et al., 2006)
          • 명칭에는 queue라고 되어있지만 사실 이것은 queue가 아니다.

    • RejectedExecutionHandler - Queue가 가득 찼을 때
      • 크기가 제한된 queue가 가득 차면 위의 예제코드 9,13번 라인에서 인자로 받은 RejectedExecutionHandler에게 메세지가 간다.
      • 미리 정의되어있는 RejectedExecutionHandler는 다음과 같다.
        1. AbortPolicy
          • execute()에서 RuntimeException을 상속받은 RejectedExecutionException을 던진다.
          • 호출한 곳에서 exception을잡아 더이상 queue에 추가하지 못하는 상황에 대해 직접 처리해야한다.
        2. CallerRunsPolicy
          • 작업을 중단하거나 exception을 던지지 않고 호출한 곳의 thread로 넘겨 그 thread에서 작업되도록 한다.
          • 호출을 UI 또는 main thread에서 했다면 낭패..
        3. DiscardPolicy
          • exception을 던지지 않고 추가하려고 시도한 작업을 무시한다.
        4. DiscardOldestPolicy
          • exception을 던지지 않고 queue에서 가장 오래된 작업을 취소하고 현재 작업을 넣는다. 
      • 미리 정의되어있는 핸들러가 마음에 들지 않는다면 직접 정의해서 쓰면 된다.

    • ThreadFactory
      • 5번,13번 행에서는 threadFactory를 넘겨받는 생성자를 볼 수 있는데, threadFactory를 받던 받지 않던 thread를 생성할 때에는 threadFactory를 통해 진행한다.
      • 다만 지정해주지 않았을 때에는 데몬이 아니면서 default setting이 되어있는 thread를 생성하도록 되어있다.
      • ThreadFactory를 직접 코딩해서 프로젝트에 맞는 thread를 생성이 가능한데, 인터페이스는 매우 간단다하다.
        1
        2
        3
        4
        5
        package java.util.concurrent;
         
        public interface ThreadFactory {
            Thread newThread(Runnable var1);
        }
        cs
        • 위에서 보는바와 같이 executor에서 thread를 생성할 때에는 항상 newThread를 호출하는데, 이것만 정의해주면 된다. 
      • ThreadFactory를 작성하여 얻을 수 있는 이익에는 다음과 같은 것들도 있다.
        • UncaughtExceptionHandler를 직접 지정
        • BaseThread를 만들고 그것을 상속한 thread를 생성
        • Thread마다 이름을 지어주고, 로그에서 분간할 수 있도록 함

    • Hooks
      • beforeExecute
      • afterExecute
      • terminated
      • ThreadPoolExecutor를 상속받아 코딩하면 쓰레드의 전/후/종료시에 hook을 사용하여 훨신 간편한 프로그래밍을 할 수 있다.
      • 이름으로부터 hook의 성격을 명확히 파악할 수 있기 때문에 자세한 설명은 생략하고, 참고서적에 나와있는 예제코드만 올리도록 하겠다. (물론 안드로이드에 맞게 수정을 가하였고, AsyncTask를 통하여 많이 경험했던 패턴이라고 믿는다.)
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    public class CustomThreadPool extends ThreadPoolExecutor {
        private final ThreadLocal<Long> mStartTime = new ThreadLocal<>();
        private final AtomicLong mTaskCount = new AtomicLong();
        private final AtomicLong mTotalTime = new AtomicLong();
        private final String THREAD_POOL_NAME = "CustomThreadPool";
     
        public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        
        public long currentTaskCount() {
            return mTaskCount.get();
        }
        
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
     
            mStartTime.set(System.currentTimeMillis());
            mTaskCount.incrementAndGet();
     
            Log.d(THREAD_POOL_NAME, String.format("thread : %s, runnable[%s] started", t, r));
        }
     
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            try {
                long lEndTime = System.currentTimeMillis();
                long lSpendTime = lEndTime - mStartTime.get();
                mTotalTime.addAndGet(lSpendTime);
                mTaskCount.decrementAndGet();
                
                Log.d(THREAD_POOL_NAME, String.format("throwable : %s, runnable[%s] ended, time : %d", t, r, lSpendTime));
            } finally {
                super.afterExecute(r, t);
            }
        }
     
        @Override
        protected void terminated() {
            try {
                Log.d(THREAD_POOL_NAME, String.format("Thread is Terminated"));
            } finally {
                super.terminated();
            }
        }
    }
    cs


반응형
Comments