ㄷㅣㅆㅣ's Amusement

[Android/Java] 병렬 프로그래밍 : Executor Framework에대한 고찰 ----- 1 본문

Programming/Android

[Android/Java] 병렬 프로그래밍 : Executor Framework에대한 고찰 ----- 1

ㄷㅣㅆㅣ 2016. 12. 1. 18:59

[Android/Java] 병렬 프로그래밍 : Executor Framework에대한 고찰 ----- 1

반응형
Basics of multi thread

 <검색해서 들어왔거나 바쁜 분들은 "Runnable + Callable + Future 예제" 부분만 보세요. 여러분의 시간은 소중하니까요.>


병렬 프로그래밍에 앞서...
 - 작업(Task)의 독립성이 갖춰져 있어여 병렬 프로그래밍을 할 수 있는데, 여기서 "독립성"이라 함은, 작업의 상태, 결과, 부수효과등이 다른 작업에 영향을 받지 않아야 함을 의미한다.
   따라서 병렬 프로그래밍을 하기 위해서는 다음의 과정을 거치는 것이 일반적이다.
  1. 각 작업의 범위를 어디까지로 한정할 것인지 정한다.
  2. 프로그램을 작업단위로 재구성한다.
  3. 작업을 스케쥴링하거나 부하를 분산시킨다.





- 병렬 프로그래밍의 필요성.

   - 작업을 실행하는 가장 간단한 방법은 단일 스레드에서 순차적으로 작업을 실행하는 것이다.  그러나 작업들중 CPU연산과 IO연산이 산재하는경우 매우 낮은 처리량으로 인해 어플리케이션의 품질이 저하될 수 있다.

   다음은 그러한 어플리케이션의 예이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
class SingleThreadServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        for(;;)
            Socket connection = socket.accept();
            handleRequest(connection);
        }
    }

private void handleRequest(Socket socket) {
//.....
}
}
cs

   - 이 클래스는 왠만한 컴퓨터 공학 또는 컴퓨터 과학을 전공한 사람이라면 "네트워크"과목 첫 수업의 실습에서 본 기억이 있을 것이다.  이론상으로 문제는 없고, 첫 실습용으로도 훌륭하다. 그러나 실제 상황에서는 어떨까?

     클라이언트가 접속되고 handleRequest()의 모든 작업을 수행할 때까지 다른 요청은 처리할 수 없기 때문에 실제상황에 적용하기에는 성능이 매우 떨어진다.

     물론 handleRequest()가 시간을 거이 잡아먹지 않는 단순한 작업이라면 문제가 없겠지만, 사실 웹서버가 클라이언트의 요청을 처리하는 것의 대부분은 IO작업이고, CPU연산은 아주 약간일 것인데, IO작업으로인해 CPU가 대기상황인 것을 묵인(?)한다면 "자원의 효율적인 활용"이라는 면에서도 매우 나쁜 구조임에 틀림이 없다.





  - 각각의 작업을 쓰레드로 분리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class SingleThreadServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        for(;;) {
            final Socket connection = socket.accept();
 
            Runnable run = new Runnalble() {
                public void run() {
                    handleRequest(connection);
                }
            }
 
            new Thread(run).start();
        }
    }

private void handleRequest(Socket socket) {
//.....
}
}
cs
    •  앞서 순차적으로 작업을 수행하는 방법에서 문제시 되었던 handleRequest()의 수행시간이 해결되었다.
    • 이제 서버는 handleRequest()의 수행과 상관없이 클라이언트의 접속 요청을 받아들일 수 있게 되었다.
    • 이렇게 동시에 한 개 이상의 작업을 처리할 수 있게 하면, 여러 개의 프로세서를 효율적으로 사용할 수 있게되어 전반적인 속도를 향상시킬 수 있고, IO연산이 수행될때 CPU가 대기상태에 빠지는 일도 없어진다.
    - 쓰레드를 과도하게 생성하면?
    1. 쓰레드 생성에 의한 오버해드
      - OS마다 조금씩의 차이가 있겠지만, 쓰레드를 만들거나 취소/종료후 자원을 반환하는 데에도 꽤 많은 비용이 필요하다.  만약 handleRequest()가 쓰레드의 생명주기를 관리하는 일보다 하찮은(?) 일이라면, 배보다 배꼽이 더 큰 경우가 된다.
    2. 자원관리
      - 하드웨어에 장착되어있는 프로세서보다 많은 수의 쓰레드가 만들어진다면, 프로세서의 수를 초과하는 쓰레드는 대기상태에 머물게되는데, 대기상태에 있는 쓰레드가 많을수록 메모리를 많이 차지하게된다.
      - [프로세서 수 < 쓰레드 수] 의 경우는 [프로세서 수 == 쓰레드 수] 인 경우보다 직접적인 성능향상은 없으면서 메모리만 과다하게 차지하여 오히려 마이너스 요인으로 작용할 수도 있다.
    3. 안정성
      - 1번과 마찬가지로 OS마다 차이가 있겠지만 쓰레드를 무한정 생성할 수는 없다.  수용할 수 있는 모든 쓰레드의 수만큼 생성한 후에도 쓰레드 생성을 시도하면 아마도 OOM(Out Of Memory, 이하 별도 설명 없이 OOM으로 표기)에러가 발생할 것이므로, 이에대한 대비를 필요로 한다. (사실, 이 상황에서 딱히 무슨짓을 하더라도 정답은 아니다. 이 상황 자체를 방지해야한다.)



  - 본론. 그렇다면 어떻게할까?
    • Executor Framework
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      package java.util.concurrent;
       
      /* The {@code Executor} implementations provided in this package
       * implement {@link ExecutorService}, which is a more extensive
       * interface.  The {@link ThreadPoolExecutor} class provides an
       * extensible thread pool implementation. The {@link Executors} class
       * provides convenient factory methods for these Executors.
       *
       * <p>Memory consistency effects: Actions in a thread prior to
       * submitting a {@code Runnable} object to an {@code Executor}
       * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
       * its execution begins, perhaps in another thread.
       *
       * @since 1.5
       * @author Doug Lea
       */
      public interface Executor {
       
          /**
           * Executes the given command at some time in the future.  The command
           * may execute in a new thread, in a pooled thread, or in the calling
           * thread, at the discretion of the {@code Executor} implementation.
           *
           * @param command the runnable task
           * @throws RejectedExecutionException if this task cannot be
           * accepted for execution
           * @throws NullPointerException if command is null
           */
          void execute(Runnable command);
      }
      cs
      • Android Studio로 source browsing르 통해 긁어온 Executor의 소스코드이다. 주석부분을 제거하면 execute()밖에 남지 않은 매우 간단한 구조의 인터페이스이지만, 이것은 다양한 종류의 작업 스케쥴링을 지원하는 강력한 프레임워크의 근간이다.
      • Executor는 작업의 등록과 실행을 분리하는 표준적인 방법이다. (바로 다음 문단에서 submit()과 execute()하는 부분이 분리되어있는 것을 확인할 수 있다)
      • Executor는 Producer-Consumer Pattern을 따르며, 일반적으로 이 패턴을 구현하는 가장 쉬운 방법이라고 생각된다.

    • 예제를 Executor로 구현해보자!
      • 가장 간단하게 100개의 쓰레드를 생성할 수 있는 풀로 만들어보자.
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        class ExecutionServer {
            private static final Executor executor = Executors.newFixedThreadPool(100);
         
            public static void main(String[] args) throws IOException {
                ServerSocket socket = new ServerSocket(80);
                for(;;) {
                    final Socket connection = socket.accept();
         
                    Runnable run = new Runnalble() {
                        public void run() {
                            handleRequest(connection);
                        }
                    }
         
                    executor.execute(run);
                }
            }
         
            private void handleRequest(Socket socket) {
                //...
            }
        }

        c

      • 위에서 보는바와 같이 작업을 정의하는 영역과 실행하는 영역을 분리하면 실행정책(Execute Policy)을 다음과 같은 면에서 자유롭게 변경할 수 있다는 장점이 있다.
        • 최대 몇개의 작업을 큐에 넣을 수 있는가?
        • 작업의 우선순위는 어떻게 지정할것인가?
        • 작업을 어떤 쓰레드에서 실행할 것인가?
        • 최대 몇개의 작업을 동시에(Concurrent) 병렬로(Parallel) 진행할 것인가?
        • 시스템 자원이 부족한 경우 어떤 작업을 희생할것인가? 또한 작업이 희생되면 어떻게 통지할 것인가?
        • Pre-Execution & Post-Execution 가능 (작업 전/후 동작 지정 가능)

      • 쓰레드 풀 (Thread Pool)
        • 이름 그대로.. 쓰레드를 풀의 형태로 관리하는 것이다.
        • 관리해야할 작업을 큐(Queue)에 넣어두었다가, 다음과같은 간단한 메카니즘으로 동작한다.
          1. 작업큐에서 다음 작업을 가져온다
          2. 가져온 작업을 실행한다
          3. 다음 작업을 가져오거나 가져올 수 있는 다음 작업이 나타날 때까지 대기한다.
        • 풀 내부의 쓰레드를 활용하는 방법이 작업마다 쓰레드를 생성하는 방법보다 더 효율적이다. 왜냐하면 전자는 쓰레드를 계속해서 생산하고 반환할 필요 없이 새로은 작업에대해 전에 쓰던 쓰레드를 재사용 하기 때문이다.  쓰레드 생성에 따른 메모리 오버헤드 뿐만 아니라, 생성동작에 걸리는 시간적 오버헤드까지 없어지므로 반응속도 또한 향상된다.
        • Executors를 통해 생성할 수 있는 Executor. (3부에서 좀더 자세히 분석한다.)
          • newFixedThreadPool - 처리할 작업이 등록되면 실제 작업할 쓰레드를 하나씩 생성하고, 그 쓰레드의 수는 제한되어있다.
          • newCachedThreadPool - 큐에 있는 작업의 수보다 생성되어있는 쓰레드의 수가 많으면 초과범위에 있는 쓰레드를 종료한다. 이후에 작업의 수가 더 많아지면 쓰레드를 생성한다.
          • newSingleThreadExecutor - 단일 쓰레드로 동작한다. 만약에 쓰레드에서 Exception이 발생하여 비정상적인 종료가 된다면 쓰레드를 다시 생성한다.  이것을 쓸 때에도 작업의 우선순위를 지정할 수 있다는 면에서 이점이 있다.
          • newScheduledThreadPool - 일정시간 이후에 실행하거나 반복실행을 위해서 사용한다. 쓰레드의 수는 고정되어있다.

      • 쓰레드의 종료
        • JVM은 모든 쓰레드가 종료하지 않으면 자신도 종료하지 않고 대기하므로 Executor를 제대로 종료시켜야 JVM도 정상적으로 종료된다.
        • 따라서 Executor는 정상적이건 비정상적이건간에 종료절차를 밟아야 할 필요가 있다.
        • Executor의 생명주기와 관련된 인터페이스인 ExecutorService가 있으니 코더는 간단히 가져다 쓰면 된다.
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          package java.util.concurrent;
           
          public interface ExecutorService extends Executor {
              void shutdown();
           
              List<Runnable> shutdownNow();
           
              boolean isShutdown();
           
              boolean isTerminated();
           
              boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
           
              <T> Future<T> submit(Callable<T> var1);
           
              <T> Future<T> submit(Runnable var1, T var2);
           
              Future<?> submit(Runnable var1);
           
              <T> List<Future<T>> invokeAll(Collection<extends Callable<T>> var1) throws InterruptedException;
           
              <T> List<Future<T>> invokeAll(Collection<extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;
           
              <T> T invokeAny(Collection<extends Callable<T>> var1) throws InterruptedException, ExecutionException;
           
              <T> T invokeAny(Collection<extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
          }
          cs
          • ExecutorService는 세 가지의 Status를 갖는데 <Running, Shutting Down, Terminated>이다.
          • shutdown()
            • 메소드가 불리면 곧바로 종료 절차를 진행하게되고 isShutdown()은 true가 된다. 이때에는 더이상의 추가 작업은 거부되지만, 이미 등록된 작업(대기중인 작업 포함)은 모두 마친다.
            • 모든 작업이 종료되면 ExecutorService는 isTerminated()의 리턴값으로 true를 가진다.
          • shutdownNow()
            • 메소드가 불리면 종료 절차를 진행하게 되는데 대기중인 작업은 취소시키고, 이미 running인 작업도 가능하면 중단시킨다
          • awaitTermination()
            • ExecutorService가 terminated 상태로 갈때까지 기다린다. (물론 isTerminated()를 폴링할 수도 있다)
          • ExecutorService를 사용하여 종료까지 고려한 예제
            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            13
            14
            15
            16
            17
            18
            19
            20
            21
            22
            23
            24
            25
            26
            27
            28
            29
            30
            31
            32
            33
            class ExecutionServer {
                private static final ExecutorService mExecutorService = Executors.newFixedThreadPool(100);
             
                public static void main(String[] args) throws IOException {
                    ServerSocket socket = new ServerSocket(80);
             
                    for(;;) {
                        final Socket connection = socket.accept();
             
                        try {
                            Runnable run = new Runnalble() {
                                public void run() {
                                    handleRequest(connection);
                                }
                            }
             
                            mExecutorService.execute(run);
                        catch (RejectedExecutionException e) {
                            if (!exec.isShutdown()) {
                                Log.e("ExecutorService""task rejected!!");
                            }
                        } 
                    }
                }
             
                private void handleRequest(Socket socket) {
                    if (/* isRequestToFinish */) {
                        mExecutorService.shutdown();
                    } else {
                        //.....
                    }
                }
            }
            cs


    • Callable
      • Runnable의 run()메소드는 실행이 끝난 다음 뭔가를 리턴해 줄 수가 없다. 만약에 결과를 받고싶다면 클래스의 멤버변수에 할당하는 것이 일반적이다.
      • 그런데 결과를 받아올 때까지 시간이 걸리는 작업이 꽤 많다. 이를테면 서버에서 이미지를 받아오는 것.
      • 이와같이 오랜 작업시간 끝에 결과를 받아오는 작업은 Runnable보다는 Callable를 사용하는 것이 좋다. 
        • call()메소드를 실행하면 결과값을 돌려받을 수 있다.
        • 게다가 Exception도 발생시킬 수 있다.
    • Future
      • Future를 통하여 Executor의 특정 작업이 정상종료되었는지 취소되었는지등의 정보를 확인할 수 있다.
      • get()메소드를 통해 결과를 가져올 수 있으며, 결과가 나올 때까지 기다릴 수도 있다.
      • get()메소드의 결과는 Executor의 상태에 따라 다르다.
        • 작업 정상 완료 : 결과값
        • 작업 비정상 종료 : ExecutionException (원래 비정상종료 원인이 Exception인 경우 포함됨)
        • 작업 전 또는 작업중 : 대기후 결과값
        • 작업 취소 : CancellationException
    • Runnable + Callable + Future 예제
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      class ExecutionServer {
          private static final ExecutorService mExecutorService = Executors.newFixedThreadPool(100);
       
          public static void main(String[] args) throws IOException {
              ServerSocket socket = new ServerSocket(80);
       
              for(;;) {
                  final Socket connection = socket.accept();
                  Callable<Boolean> callable = new Callable<Boolean>() {
                      @Override
                      public Boolean call() throws Exception {
                          return handleRequest(connection);
                      }
                  };
       
                  final Future<Boolean> future = mExecutorService.submit(callable);
                  mExecutorService.execute(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              Drawable drawable = future.get();
                              imageView.setDrawable(drawable);
                          } catch (InterruptedException e) {
                              Thread.currentThread().interrupt();
                              future.cancel(true);
                          } catch (ExecutionException e) {
                              Log.e("ExecutorService", e.getCause());
                          }
                      }
                  });
              }
          }
       
          private Drawable handleRequest(Socket socket) {
              if (/* isRequestToFinish */) {
                  mExecutorService.shutdown();
                  return null;
              } 
       
              //.....
              Drawable = getDrawableFromUri(socket.request.toString());
              return drawable;    
          }
      }
      cs
    • 그런데 이렇게 Future를 가지고있지 않아도 나중에 받아올 수 있는 방법이 있다. 요청 5개마다 한번씩 처리하는 로직으로 바꿔보자.
    • 당연히 실 상황에서 이렇게 배치처리하게 짜면 안된다. 그냥 사용하던 예제를 억지로 끼워마춘것 뿐이다. (까방권 획득!?)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      class ExecutionServer {
          private static final ExecutorService mExecutorService = Executors.newFixedThreadPool(100);
       
          public static void main(String[] args) throws IOException {
              private ServerSocket socket = new ServerSocket(80);
              private ArrayList<Callable> mCallableList = new ArrayList<>(5);
       
              for(;;) {
                  final Socket connection = socket.accept();
                  Callable<Boolean> callable = new Callable<Boolean>() {
                      @Override
                      public Boolean call() throws Exception {
                          return handleRequest(connection);
                      }
                  };
       
                  if (mCallableList.size() < 5) {
                      mCallableList.add(callable);
                  } else {
                      final CompletionService<Drawable> completionService = new ExecutorCompletionService<Drawable>(mExecutorService);
                      completionService.submit(callable);
                      mExecutorService.execute(new Runnable() {
                          @Override
                             public void run() {
                              try {
                                  for (int i=0; i<mCallableList.size(); i++) {
                                      Future<Drawable> future = completionService.take();
                                      Drawable drawable = future.get();
                                      imageView.setDrawable(drawable);
                                  }
                                  
                                  mCallableList.clear();
                              } catch (InterruptedException e) {
                                  Thread.currentThread().interrupt();
                                  future.cancel(true);
                              } catch (ExecutionException e) {
                                  Log.e("ExecutorService", e.getCause());
                          }
                      }
                  });
                  }
       
                  
              }
          }
       
          private Drawable handleRequest(Socket socket) {
              if (/* isRequestToFinish */) {
                  mExecutorService.shutdown();
                  return null;
              } 
       
              //.....
              Drawable = getDrawableFromUri(socket.request.toString());
              return drawable;    
          }
      }
      cs



--- 참고문헌 ---

  1. 자바 병렬 프로그래밍
  2. Oracle Docs - ExcutorService
  3. Wikipedia - Producer-consumer


반응형
Comments