들어가며
이전 글 데드락(Deadlock)에서 데드락이 발생하기 위해서는 상호 배제(Mutual Exclusion), 점유 대기(Hold and Wait), 비선점(Non-Preemption), 순환 대기(Circular Wait). 이 네가지 조건이 모두 충족되어야 한다고 이야기 했다.
예를 들어 두 유저간의 거래를 처리하는 서버가 있다고 가정하자.
- A가 B와의 거래를 요청 한다.
- 동시에 B가 A와의 거래를 요청한다.
- 서버는 스레드 1번에서 A->B의 거래를 위해 요청자인 A유저 객체의 lock을 설정한다.
- 동시에 스레드 2번에서 B->A의 거래를 위해 요청자인 B유저 객체의 lock을 설정한다.
- 스레드 1번은 A->B의 거래를 완료하기 위해 B유저 객체의 lock을 마저 설정 하려고 하지만 B유저 객체의 lock은 이미 선점되어 있다.
- 스레드 2번도 B->A의 거래를 완료하기 위해 A유저 객체의 lock을 설정 하려고 하지만 A유저 객체의 lock은 이미 선점 되어 있다.
위의 시나리오를 따르면 비선점형이며, 동시에 점유 할 수 없는 상호 배제 자원인 lock을 각각 유저 객체가 점유한 상태에서 다른 유저 객체의 lock을 요청하는 점유 대기 상태, 서로가 서로의 자원을 요청하는 순환 대기 상태까지 모든 데드락 조건이 갖춰져 데드락 상태에 빠지게 된다.
데드락 상태를 해결하기 위해서는 네가지 조건 중 하나라도 만족하지 않게 만들면 된다. 이 포스트에서는 데드락 조건 네 가지 중 순환 대기를 방지하는 lock_guard 클래스를 만들어 보도록 하겠다.
개념은 간단하다. 각 락 객체 마다 고유한 번호를 부여하여 번호가 작은 순서에서 부터 큰 순서로 락 객체를 잡는다.
이미 필요한 락을 잡았다는 것은 추가로 필요한 락 객체는 나와 경쟁하고 있는 다른 스레드가 선점하고 있지 않음을 보장한다. 참고로 고유한 번호를 부여하기 위해 별도의 처리를 할 것은 아니고 락 객체의 메모리 주소를 고유 번호로 삼고 메모리 주소가 작은 것에서 부터 큰 순으로 락을 잡아 나가도록 한다.
구현
아래는 std::lock_guard와 비슷한 방법으로 std::mutex나 std::recursive_mutex등의 락 객체를 사용 할 수 있는 deadlockfree_guard 클래스다.
#include <set>
template <class LOCK>
class deadlockfree_guard
{
public :
template <class... LOCKS>
deadlockfree_guard(LOCKS&&... locks)
{
Sort(locks...);
for (LOCK* lock : ordered_locks)
{
lock->lock();
}
}
~deadlockfree_guard()
{
for (LOCK* lock : ordered_locks)
{
lock->unlock();
}
}
deadlockfree_guard& operator = (const deadlockfree_guard& other) = delete;
private:
template <class... LOCKS>
void Sort(LOCK& lock, LOCKS&... others)
{
ordered_locks.insert(&lock);
Sort(others...);
}
void Sort(LOCK& lock)
{
ordered_locks.insert(&lock);
}
private :
std::set<LOCK*> ordered_locks;
};
- 40 라인 : 락 객체의 메모리 주소로 정렬하기 위해 포인터를 저장하는 std::set을 사용했다.
- 8 라인 : 불특정 다수개의 락 객체를 인자로 받을 수 있도록 하기 위해 생성자의 파라메터리스트를 Parameter pack으로 선언했다.
- 10, 28 - 37 라인 : 함수 오버로딩을 이용해 인자로 넘어온 Parameter pack을 쪼개 set에 저장 및 정렬을 한다.
- 11 - 14 라인 : 정렬 된 락 객체들을 순서대로 잡는다.
- 19 - 22 라인 : 정렬 된 락 객체들을 순서대로 해제한다.
사용
비교를 위해 먼저 std::lock_guard를 이용하는 예제를 먼저 살펴 본다.
#include <iostream>
#include <mutex>
#include <thread>
struct Resource
{
std::mutex lock;
int count;
Resource() : count(0)
{
}
};
Resource resource1;
Resource resource2;
int main()
{
std::cout << "start test" << std::endl;
std::thread thread1([]() {
for (int i = 0; i < 10000; i++)
{
std::lock_guard<std::mutex> lo1(resource1.lock);
std::lock_guard<std::mutex> lo2(resource2.lock);
std::cout << "thread 1:" << resource1.count++ << std::endl;
}
});
std::thread thread2([]() {
for (int i = 0; i < 10000; i++)
{
std::lock_guard<std::mutex> lo2(resource2.lock);
std::lock_guard<std::mutex> lo1(resource1.lock);
std::cout << "thread 2:" << resource2.count++ << std::endl;
}
});
thread1.join();
thread2.join();
std::cout << "finish test" << std::endl;
}
- 24-25, 33-34 라인을 보면 각 락 객체를 잡는 순서가 스레드 마다 서로 반대로 되어 있음을 알 수있다.
위 예제를 실행 시키면 정말 운이 좋다면 "finish test" 문구를 볼 수 있겠지만 대부분의 경우 구동 중 데드락 상태에 빠져 테스트를 끝까지 진행하지 못한다.
그럼 이젠 위의 하이라이트 된 24-25, 33-34 라인을 deadlockfree_guard 을 사용 하는 코드로 변경한 예제를 살펴 보자.
int main()
{
std::cout << "start test" << std::endl;
std::thread thread1([] () {
for (int i = 0; i < 10000; i++)
{
deadlockfree_guard<std::mutex> lo(resource1.lock, resource2.lock);
std::cout << "thread 1:" << resource1.count++ << std::endl;
}
});
std::thread thread2([] () {
for (int i = 0; i < 10000; i++)
{
deadlockfree_guard<std::mutex> lo(resource2.lock, resource1.lock);
std::cout << "thread 2:" << resource2.count++ << std::endl;
}
});
thread1.join();
thread2.join();
std::cout << "finish test" << std::endl;
}
deadlockfree_guard의 경우는 위에서 설명한것 처럼 인자로 받은 락 객체들의 인자 순서와 상관 없이 메모리 주소 순서로 정렬한 후 락을 잡으므로 순환 대기 상태를 방지 할 수 있다. 즉, 데드락 상태가 발생하지 않는다.
이상 데드락의 네 가지 조건 중 '순환 대기 방지' 방법을 이용하여 데드락 상태를 방지하는 예제를 살펴 보았다.