ㄷㅣㅆㅣ's Amusement

[Android/Java] 병렬 프로그래밍 : Executor Framework에대한 고찰 ----- 2 본문

Programming/Android

[Android/Java] 병렬 프로그래밍 : Executor Framework에대한 고찰 ----- 2

ㄷㅣㅆㅣ 2016. 12. 5. 22:13

[Android/Java] 병렬 프로그래밍 : Executor Framework에대한 고찰 ----- 2

to Cancel thread



이전 포스팅에서는 Executor Framework를 통해서 task를 실행하는 것에 대해 알아보았다. (

2016/12/01 - [프로그래밍/Android] - [Android/Java] 병렬 프로그래밍 : Executor Framework에대한 고찰 ----- 1)


이번에는 실행된 작업들을 중단하고, Executor를 종료하는 방법에 대해 알아본다.

<검색해서 들어왔거나 이전 포스트를 읽었다면 "Future를 이용한 작업 중단" 부분만 읽으세요!>




들어가기에 앞서... 작업 중단이 필요한 때는?

  • 사용자에 의한 중단
    • GUI에서 "취소"를 선택한 경우 등..
  • 시간이 제한된 작업
    • 주어진 시간동안에만 작업을 하고 timeout시 종료해야 하는 task.
  • Programmer 의도
    • 동일한 답을 찾지만 condition을 달리하여 여러 쓰레드를 돌리는 경우 한 task에서 답을 찾으면 모든 쓰레드 종료
  • Error
    • 메모리 또는 디스크 용량이 부족할 때 등...

  - 그 밖에도 여러 이유가 있겠지만, App.이나 service를 종료할 때에는 진행중이던 task나 queue에서 대기중이던 task 모두 "종료절차"가 필요하다. (물론 급하게 중단해야 하는 경우라면 실행중이던 작업마저도 취소시켜야 하는 경우도 존재한다)


  • 변수를 활용한 작업 중단
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    class PrimeProducer extends Thread {
        private final BlockingQueue<BigInteger> queue;
        private volatile boolean mIsCanceled;
     
        PrimeProducer(BlockingQueue<BigInteger> queue) {
            this.queue = queue;
        }
     
        public void run() {
            try {
                BigInteger p = BigInteger.ONE;
     
                while (mIsCanceled == false) {
                    queue.put(p = p.nextProbablePrime());
                }
            } catch (InterruptedException e) {
                // Do nothing
            }
        }
     
        public void cancel() {
            mIsCanceled = true;
        }
    }
     
    public void consumePrimes() throws InterruptedException {
        BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>(100);
        PrimeProducer producer = new PrimeProducer(primes);
        producer.start();
     
        try {
            while (needMorePrime()) {
                BigInteger bi = primes.take();
                Log.d("PrimeConsumer""Prime found : " + bi);
    // Do something else
             }
        } finally {
            producer.cancel();
        }
    }
     
    private boolean needMorePrime() {
        /* if (condition) {
            return false;
        } */
     
        return true;
    }
    cs
    • 상기 코드는 volatile 속성을 이용한 작업 중단이다. 얼핏 보기에는 정상 작동할 것으로 예상되지만, 큰 문제점을 가지고 있다.
      • 소수를 찾는 작업이 너무도 빠르고, 35번행부터 수행할 동작들(위의 예시에서는 편의상 생략되었으나...)이 상대적으로 느리다면, 100을 capacity로 가진 큐가 언젠가는 모두 채워질 것이다.
      • 큐가 모두 채워져있다면 14번 행에서의 put은 consumer가 queue에서 꺼내갈 때까지(33번행이 수행될 때까지) block이 된다.
      • 이때에 consumer는 32번행의 while()문의 조건을 만족하지 못하여 바로 38번행을 수행하고 끝난다면?
      • Producer는 영원히 끝나지 않게된다.
      • volatile 속성은 여기를 참고 : 2016/12/06 - [프로그래밍/Android] - [Android/Java] 변수를 Volatile로 선언하면?


  • Interrupt를 이용한 작업 중단
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    class PrimeProducer extends Thread {
        private final BlockingQueue<BigInteger> queue;
     
        PrimeProducer(BlockingQueue<BigInteger> queue) {
            this.queue = queue;
        }
     
        public void run() {
            try {
                BigInteger p = BigInteger.ONE;
     
                while (Thread.currentThread().isInterrupted() == false) {
                    queue.put(p = p.nextProbablePrime());
                }
            } catch (InterruptedException e) {
                // Do nothing.
                // It escaped the while() routine already, so It can finished.
            }
        }
     
        public void cancel() {
            interrupt();
        }
    }
    cs
    • Volatile을 이용한 예제에서 PrimeProducer의 내용만 조금 수정하였다.
    • 일반적으로 put()처럼 블러킹 메소드에서 걸려있을 때 interrupt를 받게되면 곧바로(사실 언제가 될지는 OS 또는 Platform 마음) InterruptedException이 발생하고 catch()문으로 이동한다.
    • 매우 안정적인 방법이며 간단하다 할 수 있기에 작업 중단을 해야할 때에는 interrupt를 이용하는 방법이 가장 좋다.
    • 다만, 각각의 Thread interrupt에대한 동작이 다를 수 있기 때문에 thread의 인터럽트시 처리하는 로직을 모두 알고있지 않다면 사용하지 말아야 한다.
      • (여기서는 쓰레드에 cancel()을 통한 호출이고, interrupt시의 동작을 직접 구현하였기 때문에 괜찮다)



  • Future를 이용한 작업 중단
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public static void timedRun(Runnable runnable, long timeout, TimeUnit timeUnit) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(100);
            Future<?> future = executorService.submit(runnable);
     
            try {
                future.get(timeout, timeUnit);
            } catch (TimeoutException e) {
                // nothing to do. go to finally
            } catch (ExecutionException e) {
                if (e.getCause() instanceof RuntimeException) {
                    throw (RuntimeException)e.getCause();
                } else {
                    throw new RuntimeException("Unknown exception");
                }
            } finally {
                future.cancel(true);
            }
        }
    cs
    • 앞서 나온 Interrupt를 이용한 중단 방법에서 언급했듯이 모든 쓰레드의 인터럽트시 동작 루틴을 알고있는 경우에만 쓰레드에 인터럽트를 걸어도 된다.
    • 그런데 Executor의 쓰레드는 기본적으로 인터럽트가 걸렸을 때 종료하는 동작을 하므로 위의 코드처럼 cancel()의 인자(mayInterruptIfRunning)로 true를 넣어도 괜찮다. (당연히 cancel()메소드를 거치지 않고 직접적으로 interrupt를 거는 것은 좋지 않다.)


  • 지금까지는 블로킹중이더라도 interrupt에 반응하는 메소드만 알아보았다. 그렇다면 interrupt에 반응하지 않는 메소드는 어떤가? 그런 메소드에는 다음과 같은 것들이 있다.
    • 사용자가 만든 interrupt에대한 처리가 없는 메소드
    • java.io 패키지의 socket io
    • java.nio 패키지의 io
    • java.nio.channels 패키지의 selector메소드



  • newTaskFor()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    public interface CancellableTask<T> extends Callable<T> {
        public void cancel();
        RunnableFuture<T> newTask();
    }
     
    public class CancelingExecutor extends ThreadPoolExecutor {
        public CancelingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
     
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            if (callable instanceof CancelableTask) {
                return ((CancelableTask<T>) callable).newTask();
            }
     
            return super.newTaskFor(callable);
        }

    /* ... */
    }
     
    public abstract class SocketUsingTask<T> implements CancelableTask<T> {
        private Socket mSocket;
     
        protected synchronized void setSocket(Socket socket) {
            mSocket = socket;
        }
     
        public synchronized void cancel() {
            try {
                if (mSocket != null) {
                    mSocket.close();
                }
            } catch (IOException e) {
                // ignored
            }
        }
     
        @Override
        public RunnableFuture<T> newTask() {
            return new FutureTask<T>(this) {
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    try {
                        SocketUsingTask.this.cancel();
                    } finally {
                        return super.cancel(mayInterruptIfRunning);
                    }
     
                }
            };
        }
    }
    cs
    • Java 6부터는 ThreadPoolExecutor에 newTaskFor() 메소드로 바로 위에서 예를 든 interrupt에의해 종료절차를 밟지 않은 메소드에대한 처리를 할 수 있게 되었다.
    • 따라서 interrupt()가 아닌 cancel()메소드를 오버라이드를 함으로써 그 의미가 명확한 코드를 짤 수 있다.
    • 위의 예에서는 cancel()메소드를 오버라이드하여 사용한 소켓을 닫도록 했는데, InputStream 또는 OutputStream을 사용할 때에 사용하는 소켓을 닫아버리면 block상태에서 벗어나는 특성을 이용한 예제이다.



  • ExecutorService 종료
    • 첫번째 포스팅(2016/12/01 - [프로그래밍/Android] - [Android/Java] 병렬 프로그래밍 : Executor Framework에대한 고찰 ----- 1)에서 shutdown()과 shutdownNow()를 알아보았다.
    • 이 메소드들을 ExecutorService를 멤버변수로 가지고있다가 직접 호출하기보다는 새로운 Class로 한번 더 감싸서 이용하는 편이 서비스나 쓰레드의 시작과 종료에 관련된 기능을 관리할 수 있어서 더 좋다.
    • 다음 예제코드는 Log를 출력하는 프로그램이다. 로그는 서로 다른 쓰레드에서 마구 호출하여도 시간 순서에 맞게 차례로 나와야하기 때문에 (그렇지 않으면 굉장히 긴 로그를 출력하는 도중 다른 로그가 섞일 수도 있다) SingleThreadExecutor를 사용한다.
    • stop()을 호출하면 shutdown()메소드를 실행하고 실행중이거나 등록되어있는 모든 쓰레드가 작업을 마칠때까지 대기한다. 단, 3초 이내.

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      public class LogService {
          private final ExecutorService mExecutorService = Executors.newSingleThreadExecutor();
          private final PrintWriter mPrintWriter;
       
          public LogService(PrintWriter printWriter) {
              this.mPrintWriter = printWriter;
          }
          
          public void stop() throws InterruptedException {
              try {
                  mExecutorService.shutdown();
                  mExecutorService.awaitTermination(3, TimeUnit.SECONDS);
              } finally {
                  mPrintWriter.close();
              }
          }
       
          public void log(final String strLog) {
              try {
                  mExecutorService.execute(new Runnable() {
                      @Override
                      public void run() {
                          mPrintWriter.println(strLog);
                      }
                  });
              } catch (RejectedExecutionException e) {
                  // nothing to do
              }
          }
      }
      cs



2 Comments
댓글쓰기 폼