부제: BlockingQueue를 설명하려다 생산자-소비자 패턴만 설명하게 된 상황에 대하여....
저번 글에 작성한 ConcurrentHashMap과 동일하게 thread-safe한 자료구조이다. 그러나 사용 용례가 다른 면이 있기에, 이 둘의 공통점과 차이점, 특히 차이점을 중심으로 글을 작성해보려고 한다.
https://manjyuv.tistory.com/16
ConcurrentHashMap<K, V>
ConcurrentHashMap해시맵(Map)의 동시성 버전으로, 다수의 스레드가 동시에 데이터를 삽입, 수정, 삭제할 수 있도록 안전한 구조를 제공합니다.Hashtable과 비슷하지만 HashMap과 달리 이 클래스는 null
manjyuv.tistory.com
ConcurrentHashMap에 대해 작성해 본 저번 글.
이 둘은,
성능 최적화를 위해 내부적으로 세분화된 동기화 메커니즘을 이용하고 있다.
ConcurrentHashMap의 경우에는 세그먼트 락을 이용해서 처리하고 있으며, BlockingQueue의 경우에는 큐 내부에서 동시접근을 안전하게 처리하고 있다.
BlockingQueue만의 특징에 대해서 서술하자면
FIFO(First In First Out) 원칙을 따른다.
큐 구조에서 데이터를 처리할 때 블로킹을 통해 생산자-소비자 패턴을 쉽게 구현할 수 있게 도와준다고 하는데, 이 생산자-소비자 패턴을 처음 보는 사람들은 잘 이해하지 못할 것 같아서 좀 더 검색해 보았다.
사실상 이 BlockingQueue의 핵심이 생산자-소비자 패턴이라고 보면 될 것 같다.
부제에도 적었던 것처럼, 사실상 생산자-소비자 패턴을 설명하고 나면... 끝이다.
생산자-소비자 패턴
생산자 스레드가 데이터를 생성해서 큐 같은 자료구조에 넣고, 소비자 스레드가 큐에서 데이터를 꺼내 처리하는 방식이다. 이를 설명하기 위해 몇가지 핵심 개념들이 있는데, 이를 정리해보려고 한다.
생산자 (Producer) |
데이터를 생성하는 역할을 하는 스레드입니다. 주로 어떤 작업의 결과를 생성하고, 이를 공유 자원(예: 큐)에 저장합니다. |
소비자 (Consumer) |
생산자가 생성한 데이터를 사용하는 역할을 하는 스레드입니다. 공유 자원에서 데이터를 가져와 이를 처리하는 것이 주된 역할입니다. |
공유 자원 (Shared Resource) |
생산자와 소비자가 동시에 접근할 수 있는 공유된 버퍼 또는 큐입니다. 이 자원은 스레드 간의 안전한 데이터 교환을 보장하기 위해 스레드 안전한 자료구조가 사용됩니다. 예를 들어, 앞서 언급한 BlockingQueue가 좋은 예입니다. |
동기화 및 블로킹 | 생산자가 데이터를 생성해서 공유 자원에 저장하려고 할 때, 자원이 꽉 차 있으면 더 이상 데이터를 추가하지 않고 대기(블로킹)합니다. 반대로, 소비자는 자원에서 데이터를 꺼내 처리하려 할 때 자원이 비어 있으면 새로운 데이터가 추가될 때까지 대기합니다. 이러한 동작은 BlockingQueue 같은 자료구조가 자연스럽게 처리해 줍니다. |
또한 이 동작과정과 예시 코드를 통해 더 자세하게 설명하려고 한다.
- 생산자 스레드는 새로운 데이터를 생성하고 이를 큐(공유 자원)에 넣습니다.
- 소비자 스레드는 큐에 저장된 데이터를 꺼내어 처리합니다.
- 큐가 가득 차 있으면 생산자는 데이터를 넣을 수 없고, 소비자는 큐에서 데이터를 꺼내기 전까지 대기합니다.
- 큐가 비어 있으면 소비자는 데이터를 꺼낼 수 없고, 생산자는 새로운 데이터를 생성할 때까지 대기합니다.
위와 같은 메커니즘으로 동작하게 되고, 이제 예시코드를 보면서 더 자세히 설명한다.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Produced: " + i);
queue.put(i); // 큐가 꽉 차면 블로킹됨
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take(); // 큐가 비어있으면 블로킹됨
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
Thread producerThread = new Thread(new Producer(queue));
Thread consumerThread = new Thread(new Consumer(queue));
producerThread.start();
consumerThread.start();
}
}
코드를 필요한 줄마다 상세하게 설명하면 아래와 같습니다.
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
- Producer 클래스는 생산자 역할을 하며, Runnable 인터페이스를 구현합니다. 이로써 스레드에서 실행 가능한 객체로 만들 수 있습니다.
- Queue는 공유 자원으로 사용되는 BlockingQueue 타입의 큐이며, 생성자에서 주입받은 큐에 생산자가 데이터를 넣습니다.
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
- 생성자: 생성 시 외부에서 BlockingQueue 타입의 큐를 받아서 클래스 내부의 queue에 저장합니다. 생산자가 데이터를 저장할 큐를 지정해줍니다.
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Produced: " + i);
queue.put(i); // 큐가 꽉 차면 블로킹됨
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- run 메서드: Runnable 인터페이스의 추상 메서드인 run()을 구현합니다. 이 메서드는 스레드가 실행될 때 호출됩니다.
- for 루프를 통해 0부터 9까지의 숫자를 차례로 생성하여 queue에 넣습니다.
- queue.put(i)는 큐에 데이터를 추가하는 메서드로, 큐가 꽉 차면 대기 상태(블로킹)로 전환됩니다. 즉, 큐에 여유 공간이 생길 때까지 스레드는 대기합니다.
- InterruptedException은 스레드가 대기 중에 인터럽트되면 발생하는 예외로, 이를 잡아서 스레드의 인터럽트 상태를 복구해줍니다.
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
- Consumer 클래스는 소비자 역할을 하며, Runnable 인터페이스를 구현해 스레드로 실행될 수 있습니다.
- queue는 공유 자원으로 사용되는 BlockingQueue입니다. 소비자는 이 큐에서 데이터를 꺼내 처리합니다.
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
- 생성자: 외부에서 BlockingQueue 타입의 큐를 받아서 클래스 내부의 queue에 저장합니다. 소비자가 데이터를 꺼낼 큐를 지정해줍니다.
@Override
public void run() {
try {
while (true) {
Integer item = queue.take(); // 큐가 비어있으면 블로킹됨
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- run 메서드: 스레드가 실행될 때 무한 루프 while (true)를 돌며, queue에서 데이터를 하나씩 꺼내 처리합니다.
- queue.take()는 큐에서 데이터를 가져오는 메서드로, **큐가 비어 있으면 대기 상태(블로킹)**로 전환됩니다. 큐에 데이터가 들어올 때까지 스레드는 대기합니다.
- System.out.println("Consumed: " + item)은 소비한 데이터를 출력합니다.
- InterruptedException: 대기 중에 인터럽트가 발생하면 예외를 잡아 스레드를 정상적으로 종료할 수 있도록 인터럽트 상태를 복구합니다.
public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
Thread producerThread = new Thread(new Producer(queue));
Thread consumerThread = new Thread(new Consumer(queue));
producerThread.start();
consumerThread.start();
}
}
- main 메서드: 프로그램의 진입점으로, Producer와 Consumer의 스레드를 생성하고 실행합니다.
- BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);: 크기가 5인 ArrayBlockingQueue를 생성합니다. 이 큐는 생산자가 데이터를 넣고 소비자가 데이터를 꺼내는 공유 자원입니다.
- Thread producerThread = new Thread(new Producer(queue));: Producer 객체를 생산자 스레드로 만듭니다.
- Thread consumerThread = new Thread(new Consumer(queue));: Consumer 객체를 소비자 스레드로 만듭니다.
- producerThread.start(); 및 consumerThread.start();: 두 스레드를 시작합니다. 이때 생산자는 데이터를 생성해 큐에 넣고, 소비자는 큐에서 데이터를 꺼내는 작업을 수행합니다.
주요 구현체로는 아래의 4가지가 있다.
- ArrayBlockingQueue: 고정된 크기를 가지며, 내부적으로 배열로 구현된 큐.
- LinkedBlockingQueue: 크기가 제한될 수도 있고, 제한되지 않을 수도 있는 링크드 리스트 기반의 큐.
- PriorityBlockingQueue: 우선순위에 따라 요소를 관리하는 큐.
- SynchronousQueue: 큐에 데이터를 저장하지 않으며, 하나의 생산자가 데이터를 추가하면 즉시 소비자가 데이터를 가져가야 한다.
그리고 예외 처리를 위해 다양한 메서드를 제공한다.
- add(), remove(): 큐가 꽉 차거나 비었을 때 예외를 던진다.
- offer(), poll(): 큐가 가득 차거나 비었을 때 실패하고 false를 반환한다.
- put(), take(): 큐가 가득 차거나 비었을 때 스레드를 블로킹한다.
상반기 토스 시험 주관식에서 동기화 문제가 많이 나오면서, 멀티 스레드에서 어떻게 해야 이런 충돌을 막을 수 있을지에 대해 좀 더 알아보고 싶어졌다. CS를 공부하면서 더....
실제 프로그래밍을 아직 하지 않아서, 관련한 코드를 작업한 바가 없는데 나중에 프로그래밍을 하게 되면 지금 공부했던 동기화 관련 내용들이 유용하게 쓰이지 않을까.....
'Language > Java' 카테고리의 다른 글
ConcurrentHashMap<K, V> (1) | 2024.10.03 |
---|---|
String과 Char (1) | 2024.09.14 |
Integer.parseInt()란 무엇인가? (2) | 2024.09.08 |
[Java] StringBuilder vs System.out.println() 성능 비교 (0) | 2024.08.06 |