Language/Java

BlockingQueue<E> vs ConcurrentHashMap<K, V>

사과만쥬 2024. 10. 13. 11:54

부제: BlockingQueue를 설명하려다 생산자-소비자 패턴만 설명하게 된 상황에 대하여....

 

저번 글에 작성한 ConcurrentHashMap과 동일하게 thread-safe한 자료구조이다. 그러나 사용 용례가 다른 면이 있기에, 이 둘의 공통점과 차이점, 특히 차이점을 중심으로 글을 작성해보려고 한다.

 

https://manjyuv.tistory.com/16

 

ConcurrentHashMap<K, V>

ConcurrentHashMap해시맵(Map)의 동시성 버전으로, 다수의 스레드가 동시에 데이터를 삽입, 수정, 삭제할 수 있도록 안전한 구조를 제공합니다.Hashtable과 비슷하지만 HashMap과 달리 이 클래스는 null

manjyuv.tistory.com

ConcurrentHashMap에 대해 작성해 본 저번 글.

 

 

 

이 둘은, 

 

성능 최적화를 위해 내부적으로 세분화된 동기화 메커니즘을 이용하고 있다.

ConcurrentHashMap의 경우에는 세그먼트 락을 이용해서 처리하고 있으며, BlockingQueue의 경우에는 큐 내부에서 동시접근을 안전하게 처리하고 있다.

 

 

BlockingQueue만의 특징에 대해서 서술하자면

 

 

FIFO(First In First Out) 원칙을 따른다.

큐 구조에서 데이터를 처리할 때 블로킹을 통해 생산자-소비자 패턴을 쉽게 구현할 수 있게 도와준다고 하는데, 이 생산자-소비자 패턴을 처음 보는 사람들은 잘 이해하지 못할 것 같아서 좀 더 검색해 보았다.

사실상 이 BlockingQueue의 핵심이 생산자-소비자 패턴이라고 보면 될 것 같다.

부제에도 적었던 것처럼, 사실상 생산자-소비자 패턴을 설명하고 나면... 끝이다.

 

 


생산자-소비자 패턴

생산자 스레드가 데이터를 생성해서 큐 같은 자료구조에 넣고, 소비자 스레드가 큐에서 데이터를 꺼내 처리하는 방식이다. 이를 설명하기 위해 몇가지 핵심 개념들이 있는데, 이를 정리해보려고 한다.

 

생산자
(Producer)
데이터를 생성하는 역할을 하는 스레드입니다. 주로 어떤 작업의 결과를 생성하고, 이를 공유 자원(예: 큐)에 저장합니다.
소비자
(Consumer)
생산자가 생성한 데이터를 사용하는 역할을 하는 스레드입니다. 공유 자원에서 데이터를 가져와 이를 처리하는 것이 주된 역할입니다.
공유 자원
(Shared Resource)
생산자소비자가 동시에 접근할 수 있는 공유된 버퍼 또는 입니다. 이 자원은 스레드 간의 안전한 데이터 교환을 보장하기 위해 스레드 안전한 자료구조가 사용됩니다. 예를 들어, 앞서 언급한 BlockingQueue가 좋은 예입니다.
동기화 및 블로킹 생산자가 데이터를 생성해서 공유 자원에 저장하려고 할 때, 자원이 꽉 차 있으면 더 이상 데이터를 추가하지 않고 대기(블로킹)합니다. 반대로, 소비자는 자원에서 데이터를 꺼내 처리하려 할 때 자원이 비어 있으면 새로운 데이터가 추가될 때까지 대기합니다. 이러한 동작은 BlockingQueue 같은 자료구조가 자연스럽게 처리해 줍니다.

 

또한 이 동작과정과 예시 코드를 통해 더 자세하게 설명하려고 한다.

 

 

  1. 생산자 스레드는 새로운 데이터를 생성하고 이를 큐(공유 자원)에 넣습니다.
  2. 소비자 스레드는 큐에 저장된 데이터를 꺼내어 처리합니다.
  3. 큐가 가득 차 있으면 생산자는 데이터를 넣을 수 없고, 소비자는 큐에서 데이터를 꺼내기 전까지 대기합니다.
  4. 큐가 비어 있으면 소비자는 데이터를 꺼낼 수 없고, 생산자는 새로운 데이터를 생성할 때까지 대기합니다.

 

 

위와 같은 메커니즘으로 동작하게 되고, 이제 예시코드를 보면서 더 자세히 설명한다.

 

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class Producer implements Runnable {
    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                System.out.println("Produced: " + i);
                queue.put(i);  // 큐가 꽉 차면 블로킹됨
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class Consumer implements Runnable {
    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer item = queue.take();  // 큐가 비어있으면 블로킹됨
                System.out.println("Consumed: " + item);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class ProducerConsumerExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
        Thread producerThread = new Thread(new Producer(queue));
        Thread consumerThread = new Thread(new Consumer(queue));

        producerThread.start();
        consumerThread.start();
    }
}

 

 

코드를 필요한 줄마다 상세하게 설명하면 아래와 같습니다.

 

class Producer implements Runnable {
    private BlockingQueue<Integer> queue;
  • Producer 클래스는 생산자 역할을 하며, Runnable 인터페이스를 구현합니다. 이로써 스레드에서 실행 가능한 객체로 만들 수 있습니다.
  • Queue는 공유 자원으로 사용되는 BlockingQueue 타입의 큐이며, 생성자에서 주입받은 큐에 생산자가 데이터를 넣습니다.

 

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
  • 생성자: 생성 시 외부에서 BlockingQueue 타입의 큐를 받아서 클래스 내부의 queue에 저장합니다. 생산자가 데이터를 저장할 큐를 지정해줍니다.
    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                System.out.println("Produced: " + i);
                queue.put(i);  // 큐가 꽉 차면 블로킹됨
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

 

  • run 메서드: Runnable 인터페이스의 추상 메서드인 run()을 구현합니다. 이 메서드는 스레드가 실행될 때 호출됩니다.
  • for 루프를 통해 0부터 9까지의 숫자를 차례로 생성하여 queue에 넣습니다.
  • queue.put(i)는 큐에 데이터를 추가하는 메서드로, 큐가 꽉 차면 대기 상태(블로킹)로 전환됩니다. 즉, 큐에 여유 공간이 생길 때까지 스레드는 대기합니다.
  • InterruptedException은 스레드가 대기 중에 인터럽트되면 발생하는 예외로, 이를 잡아서 스레드의 인터럽트 상태를 복구해줍니다.
class Consumer implements Runnable {
    private BlockingQueue<Integer> queue;

 

 

  • Consumer 클래스는 소비자 역할을 하며, Runnable 인터페이스를 구현해 스레드로 실행될 수 있습니다.
  • queue는 공유 자원으로 사용되는 BlockingQueue입니다. 소비자는 이 큐에서 데이터를 꺼내 처리합니다.

 

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
  • 생성자: 외부에서 BlockingQueue 타입의 큐를 받아서 클래스 내부의 queue에 저장합니다. 소비자가 데이터를 꺼낼 큐를 지정해줍니다.
    @Override
    public void run() {
        try {
            while (true) {
                Integer item = queue.take();  // 큐가 비어있으면 블로킹됨
                System.out.println("Consumed: " + item);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
  • run 메서드: 스레드가 실행될 때 무한 루프 while (true)를 돌며, queue에서 데이터를 하나씩 꺼내 처리합니다.
  • queue.take()는 큐에서 데이터를 가져오는 메서드로, **큐가 비어 있으면 대기 상태(블로킹)**로 전환됩니다. 큐에 데이터가 들어올 때까지 스레드는 대기합니다.
  • System.out.println("Consumed: " + item)은 소비한 데이터를 출력합니다.
  • InterruptedException: 대기 중에 인터럽트가 발생하면 예외를 잡아 스레드를 정상적으로 종료할 수 있도록 인터럽트 상태를 복구합니다.
public class ProducerConsumerExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
        Thread producerThread = new Thread(new Producer(queue));
        Thread consumerThread = new Thread(new Consumer(queue));

        producerThread.start();
        consumerThread.start();
    }
}

 

  • main 메서드: 프로그램의 진입점으로, Producer와 Consumer의 스레드를 생성하고 실행합니다.
  • BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);: 크기가 5ArrayBlockingQueue를 생성합니다. 이 큐는 생산자가 데이터를 넣고 소비자가 데이터를 꺼내는 공유 자원입니다.
  • Thread producerThread = new Thread(new Producer(queue));: Producer 객체를 생산자 스레드로 만듭니다.
  • Thread consumerThread = new Thread(new Consumer(queue));: Consumer 객체를 소비자 스레드로 만듭니다.
  • producerThread.start(); 및 consumerThread.start();: 두 스레드를 시작합니다. 이때 생산자는 데이터를 생성해 큐에 넣고, 소비자는 큐에서 데이터를 꺼내는 작업을 수행합니다.

 


 

주요 구현체로는 아래의 4가지가 있다.

 

 

  • ArrayBlockingQueue: 고정된 크기를 가지며, 내부적으로 배열로 구현된 큐.
  • LinkedBlockingQueue: 크기가 제한될 수도 있고, 제한되지 않을 수도 있는 링크드 리스트 기반의 큐.
  • PriorityBlockingQueue: 우선순위에 따라 요소를 관리하는 큐.
  • SynchronousQueue: 큐에 데이터를 저장하지 않으며, 하나의 생산자가 데이터를 추가하면 즉시 소비자가 데이터를 가져가야 한다.

 

그리고 예외 처리를 위해 다양한 메서드를 제공한다.

 

  • add(), remove(): 큐가 꽉 차거나 비었을 때 예외를 던진다.
  • offer(), poll(): 큐가 가득 차거나 비었을 때 실패하고 false를 반환한다.
  • put(), take(): 큐가 가득 차거나 비었을 때 스레드를 블로킹한다.

 


 

상반기 토스 시험 주관식에서 동기화 문제가 많이 나오면서, 멀티 스레드에서 어떻게 해야 이런 충돌을 막을 수 있을지에 대해 좀 더 알아보고 싶어졌다. CS를 공부하면서 더....

실제 프로그래밍을 아직 하지 않아서, 관련한 코드를 작업한 바가 없는데 나중에 프로그래밍을 하게 되면 지금 공부했던 동기화 관련 내용들이 유용하게 쓰이지 않을까.....

 

 

 

'Language > Java' 카테고리의 다른 글

ConcurrentHashMap<K, V>  (1) 2024.10.03
String과 Char  (1) 2024.09.14
Integer.parseInt()란 무엇인가?  (2) 2024.09.08
[Java] StringBuilder vs System.out.println() 성능 비교  (0) 2024.08.06