본문 바로가기

진리는어디에/C#

[C#] 비동기 프로그래밍 - 코루틴(coroutine) 기반 비동기 서버

이번 포스트에서는 node.js와 같은 싱글 스레드 기반 비동기 서버(? 정확하게는 싱글 스레드는 아니지만..이건 다음에 설명하자) 프레임워크를 C# 프로그래밍 언어를 이용해 만드는 방법에 대해 살펴 보도록하겠다. 추가하여 콜백을 사용하지 않고도 비동기 작업을 처리 할 수 있는 부분도 함께 살펴 본다.

들어가며

일반적으로 네트워크 프로그래밍을 하게 되면(특히 서버) 대용량 트래픽을 처리하기 위해 비동기 핸들링을 선호하게 된다. 예를 들어 IO를 처리 할 때 메시지를 보내고 응답을 받을 때 까지 블로킹 되는 시간을 줄이기 위해 비동기를 많이 사용한다.

비동기 IO는 성능상으로는 확실히 이점을 가질 수 있지만, 비동기 작업이 완료 되었을 때 운영체제로 부터 작업 완료 통보를 받을 수 있는 별도의 콜백 함수를 등록해야하며, 콜백이 호출 되기 전까지 상태를 유지하기 위해 별도의 컨텍스트들을 어딘가에 유지시켜 주어야만 한다.

이는 코드의 가독성을 낮추고 복잡도를 높이는 주요 원인이 된다.

Hell of Callback

그렇다면 별도의 컨텍스트를 유지나 콜백 함수 등록 없이 비동기 작업을 처리 할 수 있는 방법은  없을까? 당연히 있다. 이전에 다루었던 async/await를 이용하면 언어 레벨에서 자동으로 비동기 작업이 시작 될 때 모든 컨텍스트들(eg. 변수)을 저장하고 중단 되었다가, 비동기 작업이 완료되면 중단 지점 부터 다시 프로그램을 재개(resume) 시켜 준다.

하지만 여기서 다른 문제가 발생하게 된다. 정확히는 문제라기 보다는 비동기 작업의 특성인데, 비동기 작업이 완료 되고 프로그램을 재개(resume) 할 때 사용되는 스레드를 사용자가 지정 할 수 있는 방법이 없다. 일반적인 경우에는 이러한 메커니즘이 아무런 문제가 되지 않지만(오히려 도움이 되는 경우가 많다), 메인 로직을 지정된 스레드 하나에서 처리하는 "멀티 IO, 싱글 로직 스레드"과 같은 구조에서는 비동기 작업이 완료 되는 시점에 로직 스레드로 다시 한번 작업을 넘겨 주어야 할 필요가 발생하게 된다. 결국 비동기 작업이 완료 되고 난 후의 작업을 특정 스레드에서만 처리하도록 지정하기 위해서는 다시 콜백이 등장해야만 하는데 이렇게 되면 async/await를 사용하는 이유가 없게 된다.

Tip. "멀티 IO , 싱글 로직 스레드" 아키텍쳐

MMORPG와 같이 전역 데이터에 대한 업데이트가 빈번한 서비스의 경우, 거의 모든 작업에서 전역 데이터의 동기화를 위해 락을 걸어야 하기 때문에 멀티스레드를 사용하더라도 싱글 스레드와 별반 차이가 없는 경우가 있을 수 있다. 이때 전역 데이터에 락을 걸어 동기화를 하는 대신, 구조를 단순화 하고 데드락과 같은 문제를 원천적으로 방지하기 위해 로직 처리를 전담하는 특정한 단일 스레드에서 모든 서비스 로직을 처리 할 수 있다.

문제 해결 방식을 멀티 로직 스레드 아키텍쳐와는 정 반대 방향에서 부터 출발한 것으로써, 멀티 스레드 서버는 기본적으로 개별 스레드가 동작하다 동기화가 필요한 부분에 대해 락을 걸지만, 싱글 로직 스레드는 기본적으로 락을 사용하지 않고, 블로킹 되는 부분에 대해서만 별도의 멀티 스레드 풀을 이용한다.

그럼 이런 구조에서는 async/await를 이용한 작업이 불가능하고, 계속 콜백함수의 지옥 속에서 살아야만 하는가? 당연히 아니다. 기존 async/await에 C#의 코루틴을 추가하면 지정된 특정 스레드에서 비동기 완료 이후 부분을 계속 실행 할 수 있다.

우리는 이번 포스트에서 C#의 '코루틴'을 이용해 작업 스레드를 지정하며, 비동기 IO를 처리하는 방법에 대해 연구해 볼것이다.

코루틴

먼저 코루틴에 대해 간단히 알아 보자. 코루틴이란 일반 함수와는 달리 함수를 중단하고, 중단 이후 지점 부터 다시 재개 할 수있는 특별한 함수를 말한다.

Concept of coroutine

또 다른 코루틴의 특징은 코루틴의 실행은 어느 스레드에서든 가능하다는 것이다(하지만 thread-safe하다는 뜻은 아니다). 예를 들어 A 스레드에서 코루틴을 생성하고 실행하다가 중단하고, B스레드에서 다시 실행하더라도 코루틴은 중단 지점 부터 정상적으로 재개(resume)된다. 

다행히 C#에서 코루틴은 아래 코드 처럼 아주 간단히 IEnumerator 인터페이스와 yield 연산자만으로 만들 수 있다(사족을 달자면 C++에서는 코루틴을 만드는 것이 여간 번거로운 것이 아니다, C++ 코루틴은 [여기]에 정리 되어 있으니 시간이 난다면 한번 살펴 보도록 하자).

using System;
using System.Collections;

class Program
{
    public IEnumerator Coroutine()
    {
        int i = 0;
        Console.WriteLine($"Coroutine {++i}");
        yield return null;

        Console.WriteLine($"Coroutine {++i}");
        yield return null;

        Console.WriteLine($"Coroutine {++i}");
        yield return null;
    }
    
    static void Main(string[] args)
    {
        Program program = new Program();
        IEnumerator coroutine = program.Coroutine();

        Console.WriteLine("Main 1");
        coroutine.MoveNext();
        Console.WriteLine("Main 2");
        coroutine.MoveNext();
        Console.WriteLine("Main 3");
        coroutine.MoveNext();
    }
}

// OUTPUT :
// Main 1
// Coroutine 1
// Main 2
// Coroutine 2
// Main 3
// Coroutine 3

위 예제의 결과를 확인하면 Coroutine 함수와 Main 함수를 번갈아 가면서 실행 했지만 Coroutine 함수 내부의 로컬 변수 i가 해제 되지 않고 계속 값을 유지하는 것을 확인 할 수 있다. 

서버 만들기

이제 부터 간단한 예제 코드들을 보며 서버에 대한 구조를 이해해 나갈 것이다. 이런저런 설명이 귀찮은 분들은 [여기]에서 전체 서버 코드를 확인 할 수 있다. 그렇게 어렵지 않은 내용이므로 C#에 익숙한 분이라면 바로 넘어가도 상관 없다.

기본적인 구조는 아래와 같다. IO Thread Pool에서 네트워크로 부터 데이터 도착 이벤트가 발생하면 Single Logic Thread와 연결 된 큐에 작업을 넘긴다. Single Logic Thread는 작업을 진행하다 비동기 작업이 필요한 경우 Async Task Thread Pool에 작업을 전달하고 완료 되면 다시 큐를 이용해 Single Logic Thread에서 나머지 작업을 처리하는 구조다.

싱글 로직 스레드 서버 아키텍쳐

핸들러 만들기

가장 먼저 IO 이벤트를 핸들링하는 핸들러를 코루틴으로 만들어 보자.

public class Coroutine
{
    private IEnumerator enumerator;

    public Coroutine()
    {
        enumerator = Handler();
    }
    
    public bool MoveNext()
    {
        return enumerator.MoveNext();
    }
    
    public virtual IEnumerator Handler()
    {
        // 사용자의 핸들링 로직은 여기 작성 된다.        
        yield break; // 코루틴 종료
    }
}

앞에서 살펴 보았던 코루틴 함수를 가지고 있는 클래스일 뿐이다. 자세한 설명은 생략한다.

IO 스레드 풀 만들기

이제 네트워크로 부터 입력을 받을 수 있는 IO 스레드 풀을 만들어야 한다. 하지만 여기서는 간단한 예제를 보여 주기 위해 실제 네트워크 함수 대신 Console.ReadLine() 사용해 유저로 부터 입력을 받으면 네트워크로 부터 메시지가 도착한것으로 가정 하겠다.

static BlockingCollection<Coroutine> queue = new BlockingCollection<Coroutine>();

for (int i = 0; i < 4; i++)
{
    Task.Run(() => {
        while (true)
        {
            Console.ReadLine();
            Coroutine coroutine = new Coroutine();
            queue.Add(coroutine);
        }
    });
}

실제 서버라면 네트워크로 부터 메시지가 도착하면 메시지의 아이디를 파악한 후 그에 맞는 처리 핸들러를 생성해야 하지만 여기서는 단순히 Coroutine 객체를 생성하여 로직 스레드로 연결 되는 큐에 넘겨 준다. 실제 프로그램에서는 Coroutine 클래스를 상속 받은 다양한 핸들러를 생성하는 팩토리 클래스 정도가 저기 위치하게 될 것이다.

로직 스레드 만들기

로직 스레드는 큐에 작업이 들어오길 기다리며 블로킹 되어 있는 상태다. 로직 스레드는 큐에 작업이 도착하면 블로킹 상태에서 깨어나 코루틴의 MoveNext()를 호출한다.

while (true)
{
    try
    {
        Coroutine coroutine = queue.Take();
        coroutine.MoveNext();
    }
    catch (InvalidOperationException)
    {
        Console.WriteLine("That's All!");
    }
}

로직 스레드에서 코루틴 객체의 MoveNext() 를 호출하게 되면 코루틴의 중단점이 어디였던지 관계없이 그 부분 부터 다시 작업을 재개 할 수 있다. 지금 까지 코드는 아래와 같다. IO 이벤트가 발생하면 코루틴 객체를 생성하고, 대기하고 있던 메인 스레드에게 넘겨 로직을 실행한다. 아주 간단한 구조다.

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Threading.Tasks;

class Program
{
    public class Coroutine
    {
        // ...생략...
        
        public string message; // 네트워크 메시지 버퍼를 표현 하기 위해 추가
        
        // ...생략...
        
        public virtual IEnumerator Handler()
        {
            Console.WriteLine($"received from network:'{message}'");
            yield break;
        }
    }

    static BlockingCollection<Coroutine> queue = new BlockingCollection<Coroutine>();

    static void Main(string[] args)
    {
        for (int i = 0; i < 4; i++)
        {
            Task.Run(() => {
                while (true)
                {
                    string message = Console.ReadLine();
                    Coroutine coroutine = new Coroutine();
                    coroutine.message = message;
                    Console.WriteLine($"received message. thread_id is {Thread.CurrentThread.ManagedThreadId}");
                    queue.Add(coroutine);
                }
            });
        }

        while (true)
        {
            try
            {
                Coroutine coroutine = queue.Take();
                Console.WriteLine($"run handler. thread_id is {Thread.CurrentThread.ManagedThreadId}");
                coroutine.MoveNext(); // 코루틴 실행
            }
            catch (InvalidOperationException)
            {
                Console.WriteLine("That's All!");
            }
        }
    }
}

// INPUT :
//  Hello World
// OUTPUT :
//  received message. thread_id is 7
//  run handler. thread_id is 1
//  received from network:'Hello World'

위 예제를 실행하면 입력한 문자열을 그대로 출력하는 것을 확인 할 수 있다. 여기서 주목할 것은 메시지를 받은 스레드와 코루틴 핸들러를 실행하는 스레드가 다르다는 것이다.

비동기 작업 만들기 - CPU 바운드 작업

이제 부터 코루틴 핸들러에서 비동기 작업들을 호출 해보도록 하겠다. 먼저 사용자가 임의로 로직을 작성하여 비동기로 실행 할 수 있도록 Action을 인자로 받을 수 있는 클래스를 만들어 보자. 로직 스레드를 블로킹 할 만큼 CPU를 많이 사용하지만 동기화 이슈가 없는 경우 이런 형태의 비동기 작업을 만들 수 있다.

public class AsyncAaction
{
    public AsyncAaction(Coroutine coru, Action action)
    {
        Task.Run(() =>
        {
            action();
            queue.Add(coru); // 작업이 완료 되면 로직 스레드에게 코루틴 객체를 넘김
        });
    }
}

객체가 생성되면 Task를 이용해 다른 스레드에서 작업을 진행한다. 그리고 작업이 완료 되는 시점(action 함수 리턴)에 코루틴 객체를 로직 스레드와 연결된 큐에 넘겨준다. 앞에서 확인 했던것 처럼 로직 스레드는 큐에 작업이 들어오면 해당 코루틴 객체의 MoveNext()를 호출해주어 중단점 이후 부터 작업을 계속 할 수 있다.

그럼 코루틴 핸들러 클래스에서 위 비동기 작업을 어떻게 호출 할 수 있는지 확인 해보자.

public class Coroutine
{
    // ... 생략 ...
    public virtual IEnumerator Handler()
    {
        // yield를 만나면 코루틴은 '중단'된다.
        yield return new AsyncAction(this, () =>
        {
            Console.WriteLine($"[{DateTime.Now}] {task_seq}-{Thread.CurrentThread.ManagedThreadId}\ttask is sleeping.");
            Thread.Sleep(1000); // CPU 많이 쓰고 오래 걸리는 작엄임. 암튼 그럼
            Console.WriteLine($"[{DateTime.Now}] {task_seq}-{Thread.CurrentThread.ManagedThreadId}\ttask will resume.");
        });
        
        // MoveNext가 호출 되어 작업이 재개 되면 여기서 부터 다시 시작한다.
    }
}

혹시나 유니티에서 코루틴을 작성해보신 경험이 있는 분들이라면 보다 쉽게 이해 할 수 있을 것이다. 코루틴에서 yield를 만나면 코루틴은 더 이상 블로킹 되지 않고 중단되며, 로직 스레드는 다른 작업들을 계속 진행 할 수 있다. 그리고 AsyncAction클래스에서 Task가 완료 되면 코루틴 객체를 로직 스레드로 보내 다시 중단점 이후 부터 다시 작업을 재개한다. 물론, 모든 컨텍스트들은 그대로 유지 된다.

비동기 작업 만들기 - IO 바운드 작업

이번에는 사용자 스레드를 사용하지 않는 비동기 IO 작업을 담당하는 클래스를 만들어 보자. 비동기 IO에 대한 내용은 [여기]에 보다 자세히 정리되어 있으니 참고 하도록 하자.

using System.IO;

public class AsyncWrite
{
    Coroutine coroutine;
    public AsyncWrite(Coroutine coru, string filename)
    {
        coroutine = coru;
        Write(filename);
    }
    async void Write(string filename)
    {
        using (FileStream fs = new FileStream(filename, FileMode.Create))
        {
            byte[] buff = new byte[1024 * 1024 * 1000];
            await fs.WriteAsync(buff);
            Console.WriteLine($"[{DateTime.Now}] {task_seq}-{Thread.CurrentThread.ManagedThreadId}\ttask finished file writing.");
        }
        queue.Add(coroutine);
    }
}

예제에서는 네트워크에 데이터를 쓰는 대신 파일에 1G바이트 정도의 데이터를 쓰는 것으로 IO 바운드 비동기 작업을 대신했다. 데이터가 쓰이는 대상은 달라도 결국 OS에서 처리하는 것이므로 원리는 같다고 생각하면 된다.

여기서는 앞의 CPU 바운드 작업 처럼 별도의 스레드를 만드는 것이 아닌 async/await를 이용하여 OS에게 쓰기 작업을 맡기고 작업이 완료 되면 역시 로직 스레드로 코루틴 객체를 넘긴다. async/await에 관련된 자세한 내용은 [여기]에 정리 되어 있다.

전체 코드 보기

앞에서 작성한 코드들을 하나로 합치면 아래와 같다.

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    public class AsyncAction
    {
        public AsyncAction(Coroutine coru, Action action)
        {
            Task.Run(() =>
            {
                action();
                queue.Add(coru);
            });
        }
    }

    public class AsyncWrite
    {
        Coroutine coroutine;
        public AsyncWrite(Coroutine coru, string filename)
        {
            coroutine = coru;
            Write(filename);
        }
        async void Write(string filename)
        {
            using (FileStream fs = new FileStream(filename, FileMode.Create))
            {
                byte[] buff = new byte[1024 * 1024 * 1000];
                await fs.WriteAsync(buff); // 완료 되면
                Console.WriteLine($"[{DateTime.Now}] task_seq:{task_seq},\ttid:{Thread.CurrentThread.ManagedThreadId}\ttask finished file writing.");
            }
            queue.Add(coroutine);
        }
    }

    public class Coroutine
    {
        private IEnumerator enumerator;
        public string message;
        public Coroutine()
        {
            enumerator = Handler();
        }
        public bool MoveNext()
        {
            return enumerator.MoveNext();
        }
        public virtual IEnumerator Handler()
        {
            DateTime start_time = DateTime.Now;
            int task_seq = ++Program.task_seq;

            Console.WriteLine($"[{DateTime.Now}] task_seq:{task_seq},\ttid:{Thread.CurrentThread.ManagedThreadId}\ttask '{message}' starts and suspend.");
            
            // 비동기 작업을 시작하고 함수를 중단한다.
            yield return new AsyncAction(this, () =>
            {
                Console.WriteLine($"[{DateTime.Now}] task_seq:{task_seq},\ttid:{Thread.CurrentThread.ManagedThreadId}\ttask is sleeping.");
                Thread.Sleep(1000);
                Console.WriteLine($"[{DateTime.Now}] task_seq:{task_seq},\ttid:{Thread.CurrentThread.ManagedThreadId}\ttask will resume.");
            });
            
            // 로직 스레드에서 MoveNext()가 호출 되면 여기서 부터 다시 시작(resume) 된다.
            Console.WriteLine($"[{DateTime.Now}] task_seq:{task_seq},\ttid:{Thread.CurrentThread.ManagedThreadId}\ttask starts file writing.");
            
            // 비동기 작업을 시작하고 함수를 중단한다.
            yield return new AsyncWrite(this, task_seq.ToString() + ".txt");
            
            // 로직 스레드에서 MoveNext()가 호출 되면 여기서 부터 다시 시작(resume) 된다.
            TimeSpan elapsed = DateTime.Now - start_time;
            Console.WriteLine($"[{DateTime.Now}] task_seq:{task_seq},\ttid:{Thread.CurrentThread.ManagedThreadId}\ttask finished. elapsed_time:{elapsed}.");
            yield break;
        }
    }

    static int task_seq = 0; // 모든 이벤트에서 접근하는 전역 데이터
    static BlockingCollection<Coroutine> queue = new BlockingCollection<Coroutine>();

    static void Main(string[] args)
    {
        for (int i = 0; i < 4; i++)
        {
            Task.Run(() => {
                while (true)
                {
                    string message = Console.ReadLine();
                    Coroutine coroutine = new Coroutine();
                    coroutine.message = message;
                    queue.Add(coroutine);
                }
                // 스트레스 테스트용 코드
                // 테스트 한번마다 1GB파일을 생성하므로 스트레스 테스트 할 때 주의 필요
                //for (int i = 0; i < 10; i++)
                //{
                //    Coroutine coroutine = new Coroutine();
                //    queue.Add(coroutine);
                //}
            });
        }

        while (true)
        {
            try
            {
                Coroutine coroutine = queue.Take();
                coroutine.MoveNext();
            }
            catch (InvalidOperationException)
            {
                Console.WriteLine("That's All!");
            }
        }
    }
}

프로그램을 실행하면 아래와 같은 결과를 확인 할 수 있다.

1
[2021-09-04 오후 6:31:38] task_seq:1,   tid:1   task '1' starts and suspend.
[2021-09-04 오후 6:31:38] task_seq:1,   tid:10  task is sleeping.
2
[2021-09-04 오후 6:31:38] task_seq:2,   tid:1   task '2' starts and suspend.
[2021-09-04 오후 6:31:38] task_seq:2,   tid:11  task is sleeping.
[2021-09-04 오후 6:31:39] task_seq:1,   tid:10  task will resume.
[2021-09-04 오후 6:31:39] task_seq:1,   tid:1   task starts file writing.
[2021-09-04 오후 6:31:39] task_seq:2,   tid:11  task will resume.
[2021-09-04 오후 6:31:39] task_seq:2,   tid:1   task starts file writing.
[2021-09-04 오후 6:31:43] task_seq:2,   tid:10  task finished file writing.
[2021-09-04 오후 6:31:43] task_seq:1,   tid:1   task finished. elapsed_time:00:00:05.9560125.
[2021-09-04 오후 6:31:44] task_seq:2,   tid:9   task finished file writing.
[2021-09-04 오후 6:31:44] task_seq:2,   tid:1   task finished. elapsed_time:00:00:05.9332010.

task_seq는 각 작업의 고유 번호, tid는 프로그램이 실행 되고 있는 스레드 아이디를 의미한다. 1과 2를 입력 했을 때 각자의 작업이 서로를 블로킹하지 않고 비동기적으로 수행되며, 핸들러에서 비동기 처리가 되지 않은 부분의 코드들은 1번 스레드(로직 스레드)에서 일괄 처리 되는 것을 확인 할 수 있다.

마치며

이렇게 비동기 작업을 진행하면서 별도의 동기화 처리가 필요 없는 - 물론 로직 스레드로 넘기는 부분에서 동기화 작업이 필요하지만 이 프레임워크를 사용하는 프로그래머는 신경 쓸 필요가 없는 부분이다 - 서버를 C#의 비동기 프로그래밍 방식을 이용하여 처리하는 방법에 대해 알아 보았다.

사족으로 이런 구조는 로직을 단일 스레드에서 처리 할 수 밖에 없으므로 멀티코어 시스템에서 확실히 성능이 떨어진다. 하지만 항상 동기화가 필요한 전역 데이터를 자주 업데이트 해야 하는 서비스라면 어차피 모든 로직 처리에 있어서 락을 걸고 작업 해야 하므로 싱글 스레드와 다름이 없다. 이런 서비스라면 구조를 단순화하고 데드락 같은 문제를 원천적으로 제거 할 수 있는 "멀티 IO, 싱글 로직" 구조의 서버를 한번 고려해볼만 하다.

부록 1. 같이 읽으면 좋은 글

유익한 글이었다면 공감(❤) 버튼 꾹!! 추가 문의 사항은 댓글로!!