MOONSUN
[알고리즘 실습] 멀티 스레드로 TaskGraph 동시성 관리 구현 (WinAPI사용) 본문
이전 글에서 위상정렬(Topological Sort), Task Graph, 그리고 Kahn 알고리즘을 통해
작업 간 의존성을 관리하는 방법을 살펴보았다.
이번 글에서는 이러한 Task Graph를 WinAPI를 사용해
멀티 스레드 환경에서 병렬로 실행하도록 작성한 예제를 정리해보도록 하겠다.
즉, 동시성(Concurrency)을 고려해 여러 Worker Thread가 독립적인 작업을 동시에 수행하도록 구현하고,
Critical Section과 Event를 활용해 스레드 간 안전하게 작업 큐를 관리하는 방법까지 살펴보도록 하겠다.
0. 스레드 생성과 라이프 사이클
스레드는 프로그램에서 독립적으로 실행되는 흐름으로, 다음 5가지 상태를 가진다.
- 생성 (Created): 스레드 객체 생성, 아직 실행 전
- 준비 (Ready): 실행 가능한 상태, CPU 할당 대기
- 실행 (Running): CPU에서 실제 실행 중
- 대기 (Waiting): 특정 조건이나 이벤트 대기
- 종료 (Terminated): 실행 완료 또는 강제 종료
여러 스레드를 사용할 때, 이러한 상태 전환을 적절히 관리해야 한다.
윈도우 API로 스레드를 만드는 가장 기본적인 함수는 CreateThread 이다.
하지만 CreateThread 은 C 런타임 라이브러리(CRT) 초기화 작업을 수행하지 않음
→ CRT를 사용하는 코드에서 문제가 발생할 수 있음.
→ CreateThread 보다 _beginThreadex 가 권장
그래서 _beginThreadex 를 사용해 스레드를 생성했다.
1. 동기화 객체
1-1. Critical Section
Critical Section은 같은 프로세스 내의 스레드들 간의 동기화를 관리해 줄 수 있다.
주요 함수:
- InitializeCriticalSection(): 임계 영역 초기화
- EnterCriticalSection(): 임계 영역 진입
- LeaveCriticalSection(): 임계 영역 해제
- DeleteCriticalSection(): 임계 영역 삭제
1-2. Event
Event 객체는 특정 조건이나 상태 변화를 알리는 데 사용되는 동기화 객체
Event 종류는 2가지가 있다.
- Auto-Reset Event: 하나의 대기 스레드가 깨어나면 자동으로 non-signaled 상태로 변경
- Manual-Reset Event: 명시적으로 ResetEvent()를 호출할 때까지 signaled 상태 유지
해당 코드에서는 Manual-Reset Event(수동 리셋)를 사용해 스레드가 작업을 기다리도록 구현했다.
2. 병렬 처리 가능
이 코드는 Kahn 알고리즘을 이용한 DAG(TaskGraph) 기반 작업 처리를 수행한다.

- 진입차수(indegree)가 0인 노드 : 의존성이 없는 노드 → 동시에 여러 Worker가 병렬 처리 가능
- 의존성 있는 노드 : 선행 Task가 완료되어야 실행 가능 → DAG 순서 보장
Worker Thread를 여러 개 만들어서, 각 스레드가 독립적인 Task를 처리하도록 한다.
단, 새 의존성을 추가할 때마다 순환 검사를 수행 해야 한다.
3. 동시성 관리
메인 스레드와 워커 스레드는 다음의 역할을 가진다.
- Main Thread 는
- Run()을 실행하며 → 진입 차수가 0인 Task를 큐에 넣어준다
- 그리고 여러 Worker Thread를 생성하고 종료까지 대기
- Worker Thread 는
- 큐에서 Task를 꺼내 실행
- 후속 노드의 진입 차수를 줄이고, 0이 되면 큐에 추가
- 모든 Task 완료 시 종료
- 큐가 비었으면 대기 (CPU 자원 절약을 위해 Thread Blocking) </aside>
3-1. 주요 변수와 역할
CRITICAL_SECTION cs; // 크리티컬 섹션 : ready_queue 보호
HANDLE hEvent; // 큐가 비었을 때 Worker 깨우는 Event 핸들
HANDLE hWorkers[3]; // Worker thread 핸들
int runningTasks = 0; // 현재 실행 중인 작업 수
volatile bool stopSignal = false; // 모든 작업 끝났을 때 종료 플래그
- CRITICAL_SECTION : 여러 스레드가 동시에 ready_queue에 접근하지 못하게 보호
- hEvent : 큐가 비었을 때 Worker가 잠들고, 새 작업이 오면 깨우는 역할
- hWorkers[3] : 3개의 워커 스레드
- runningTasks : 현재 실행중인 Task 수
- stopSignal : 모든 작업 끝났을 때 Worker 종료하는 신호
3-2. WorkerThread 동작 흐름
1. 큐에서 작업 꺼내기
EnterCriticalSection
if (!ready_queue.empty())
{
taskId = ready_queue.front();
ready_queue.pop();
g->runningTasks++; // 현재 실행중 카운트 증가
}
...
LeaveCriticalSection
- 여러 스레드가 동시에 ready_queue.pop() 하면 안되니까 → 락으로 접근 막기
- 실행하러 가져가기 전에 runningTasks → 카운트 증가
2. 큐가 비었으면 Worker 잠들기
ResetEvent(...)
LeaveCriticalSection
WaitForSingleObject(...)
- 할 일이 없으므로 Event 대기 상태로 → sleep (CPU 절약)
3. Task 실행
g->nodes[taskId].run();
- Task 실행 자체는 독립적이니까 → 락 없이 바로 실행
- 이렇게 해야 Worker Thread가 동시에 여러 Task를 병렬로 수행할 수 있다..!
4. Task 완료 후 결과 기록
EnterCriticalSection
g->results.push_back(...)
LeaveCriticalSection
- 여러 스레드가 동시에 result vector에 push할 수 있으므로 → 락 필요
5. 후속 Task indegree 및 ready_queue push
for (int v : nodes[taskId].next)
{
if (--current_indeg[v] == 0)
{
ready_queue.push(v);
SetEvent(hEvent); // 새 일 들어옴 → Worker 깨움
}
}
- indegree 감소해서 0이면 실행 준비됨 → ready_queue에 push
- 그리고 Event 신호 보내서 sleep(대기 중) Woker 깨우기
6. 모든 작업 끝났는지 체크
if (ready_queue.empty() && runningTasks == 0)
{
stopSignal = true;
SetEvent(hEvent);
}
- 큐 비었고, 실행 중인 Task도 없으면 → 전체 Worker 종료
4. 실행 시간 비교
1. 싱글 스레드로 작업했을 때

2. 멀티 스레드로 작업했을 때

5. 전체 코드
#include <functional>
#include <vector>
#include <queue>
#include <iostream>
#include <windows.h>
#include <process.h> // _beginthreadex : CreateThread 보다 beginTread가 안전
using namespace std;
// [ 시간 측정 ]
double GetTimeSeconds()
{
static LARGE_INTEGER freq;
static bool init = false;
if (!init)
{
QueryPerformanceFrequency(&freq);
init = true;
}
LARGE_INTEGER now;
QueryPerformanceCounter(&now);
return (double)now.QuadPart / freq.QuadPart;
}
// [ 결과 ]
struct TaskResult
{
int taskId;
double elapsed_ms;
DWORD threadId;
};
struct TaskNode
{
function<void()> run; // 실행할 함수 객체
vector<int> next; // 후속 노드들 (간선 u->v)
int indeg = 0; // 진입차수 (선행 노드 수)
};
struct TaskGraph
{
vector<TaskNode> nodes;
queue<int> ready_queue; // BFS 탐색용 큐-진입차수가 0인 노드들 저장한다.
vector<int> current_indeg; // 실행 중 변하는 진입차수 복사본
vector<TaskResult> results; // 각 Task의 실행 시간 저장
CRITICAL_SECTION cs; // 크리티컬 섹션 : ready_queue 보호
HANDLE hWorkers[3]; // Worker thread 핸들
HANDLE hEvent; // 큐가 비었을 때 Worker 깨우는 Event 핸들
int runningTasks = 0; // 현재 실행 중인 작업 수
volatile bool stopSignal = false; // 모든 작업 끝났을 때 종료 플래그
// [ 생성자 / 소멸자 ]
TaskGraph()
{
InitializeCriticalSection(&cs); // 임계 영역 초기화
// 이벤트 생성 : 수동 리셋 : ResetEvent 호출하기 전까지 신호 상태 유지
hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
}
~TaskGraph()
{
DeleteCriticalSection(&cs); // 임계 영역 삭제
CloseHandle(hEvent); // 이벤트 핸들 정리
}
// [ 새 노드 추가 ] : 함수 객체를 등록하고 id 반환
int AddTask(function<void()> fn)
{
nodes.push_back(TaskNode{ std::move(fn), {}, 0 });
return (int)nodes.size() - 1;
}
// [ 순환 검사 ] : DFS 탐색으로 의존성 추가 가능한지 검사한다.
bool CanAddDependency(int u, int v)
{
vector<bool> visited(nodes.size(), false);
// v에서 u로 가는 경로가 있는지 확인 -> 경로 없으면 간선 추가 가능
bool canAdd = !DFS(v, u, visited);
return canAdd;
}
bool DFS(int cur, int target, vector<bool>& visited)
{
if (cur == target) return true;
if (visited[cur]) return false;
visited[cur] = true;
for (int next : nodes[cur].next)
{
if (DFS(next, target, visited)) return true;
}
return false;
}
// [ 의존성 추가 ] : u -> v
void AddDependency(int u, int v)
{
if (!CanAddDependency(u, v))
{
cout << u << "->" << v << " Cycle 발생. 추가할 수 없음.\n";
return;
}
nodes[u].next.push_back(v); // id 추가
nodes[v].indeg += 1; // v의 진입차수 증가
}
// [ 워커 스레드 ]
static unsigned __stdcall WorkerThread(void* param)
{
TaskGraph* g = (TaskGraph*)param;
while (true)
{
int taskId = -1;
// 큐에서 작업 꺼내기
EnterCriticalSection(&g->cs);
if (!g->ready_queue.empty())
{
taskId = g->ready_queue.front();
g->ready_queue.pop();
g->runningTasks++;
}
else if (g->stopSignal)
{
LeaveCriticalSection(&g->cs);
break; // 종료
}
else
{
ResetEvent(g->hEvent); // 큐 비었음 -> 대기
LeaveCriticalSection(&g->cs);
WaitForSingleObject(g->hEvent, INFINITE);
continue;
}
LeaveCriticalSection(&g->cs);
// Task 실행
double tStart = GetTimeSeconds(); // 개별 작업 시간 측정
if (taskId != -1 && g->nodes[taskId].run)
{
g->nodes[taskId].run();
}
double tEnd = GetTimeSeconds();
// 결과 저장
double elapsed_ms = (tEnd - tStart) * 1000.0;
DWORD tid = GetCurrentThreadId();
EnterCriticalSection(&g->cs);
g->results.push_back({ taskId, elapsed_ms, tid });
LeaveCriticalSection(&g->cs);
// 후속 노드 indegree 감소
EnterCriticalSection(&g->cs);
for (int v : g->nodes[taskId].next)
{
if (--g->current_indeg[v] == 0)
{
g->ready_queue.push(v);
SetEvent(g->hEvent); // 새 작업 들어옴 -> Worker 깨우기
}
}
g->runningTasks--;
// 모든 작업 완료 시 종료 신호
if (g->ready_queue.empty() && g->runningTasks == 0)
{
g->stopSignal = true;
SetEvent(g->hEvent); // 남은 Worker 깨우기
}
LeaveCriticalSection(&g->cs);
}
return 0;
}
// [ Kahn 방식 위상 실행 ] : 멀티 스레드 버전
void Run()
{
// 현재 진입차수 복사
current_indeg.resize(nodes.size());
for (int i = 0; i < (int)nodes.size(); ++i)
{
current_indeg[i] = nodes[i].indeg;
}
// 진입차수 0인 노드들 id 큐에 삽입
for (int i = 0; i < (int)nodes.size(); ++i)
{
if (nodes[i].indeg == 0)
{
ready_queue.push(i);
}
}
stopSignal = false;
runningTasks = 0;
results.clear(); // 이전 결과 초기화
SetEvent(hEvent);
double start = GetTimeSeconds(); // 전체 시작 시간
// 3개 워커 스레드 생성
for (int i = 0; i < 3; ++i)
{
HANDLE hThread = (HANDLE)_beginthreadex(
NULL, // 보안 속성
0, // 스택 크기
WorkerThread, // 스레드 함수
this, // 스레드 함수에 전달할 인자
0, // 생성 플래그
NULL // 스레드 ID 변수
);
if (hThread)
{
hWorkers[i] = hThread;
}
else
{
cout << "스레드 " << i + 1 << " 생성 실패\n";
}
}
// 모든 워커 스레드 종료 대기
WaitForMultipleObjects(3, hWorkers, TRUE, INFINITE);
double end = GetTimeSeconds(); // 전체 종료 시간
for (int i = 0; i < 3; ++i)
{
CloseHandle(hWorkers[i]); // 워커 핸들 정리
}
// 실행 후 큐 정리
while (!ready_queue.empty()) ready_queue.pop();
// [ 결과 출력 ]
cout << "\n=== 멀티 스레드 실행 결과 ===\n";
for (auto& r : results)
{
cout << "Task " << r.taskId << " 완료 (" << r.elapsed_ms << " ms, Thread " << r.threadId << ")\n";
}
cout << "전체 작업 완료 시간: " << (end - start) * 1000.0 << " ms\n\n";
}
};
int main()
{
TaskGraph g;
int t1 = g.AddTask([] { Sleep(10); cout << "Task1 End\n"; });
int t2 = g.AddTask([] { Sleep(1000); cout << "Task2 End\n"; });
int t3 = g.AddTask([] { Sleep(1000); cout << "Task3 End\n"; });
int t4 = g.AddTask([] { Sleep(1000); cout << "Task4 End\n"; });
int t5 = g.AddTask([] { cout << "Task5 (after 2 & 3 & 4)\n"; });
g.AddDependency(t1, t2);
g.AddDependency(t1, t3);
g.AddDependency(t1, t4);
g.AddDependency(t2, t5);
g.AddDependency(t3, t5);
g.AddDependency(t4, t5);
g.AddDependency(t5, t1); // 순환 발생
g.Run();
g.Run();
}
'CS' 카테고리의 다른 글
| [알고리즘] BVH : 정적/동적 트리 갱신 (0) | 2025.12.04 |
|---|---|
| [알고리즘] BVH (Bounding Volume Hierarchy) (0) | 2025.11.06 |
| [알고리즘] 그래프 : 위상정렬(Topological Sort)과 Task Graph, Kahn 알고리즘 (0) | 2025.10.30 |
| [알고리즘] 그래프 : A*(asterisk) 알고리즘 (1) | 2025.10.23 |
| [백준 C++] 11779번: 최소비용 구하기2 (0) | 2025.10.20 |