티스토리 뷰

프로그래밍/C++

task, 태스크

에어버스 2023. 8. 5. 17:01

스레드 외에도 작업을 비동기로 수행한다.

태스크를 사용하려면 future 헤더가 필요하다.
태스크는 작업 패키기지로 매개변수화 되며, 프로미스(promise)외 퓨처(future) 등 서로 연동되는 두 개의 컴포넌트로 구성된다. 둘 다 데이터 채널을 통해 연결된다.프로미스는 작업 패키지를 실행해 그 결과를 데이타 체널에 보낸다. 그러면 프로미스에 연동된 퓨처가 이 결과를 가져온다. 이 두 교신 지점(Communication endpoint)은 별도의 스레드로 실행될 수 있다. 퓨처는 시간이 약간 흐른 뒤에 결과를 가져올 수 있다는 점에서 특별하다. 따라서 프로미스의 결과 계산은 연동된 퓨처의 결과와 독립적이다.

태스크는 교신 지점 간 데이터 채널과도 같다. 데이터 채널의 한쪽 지점이 프로미스라면 다른 쪽 지점은 퓨처다. 두지점은 동일한 스레드 또는 서로 다른 스레드로 존재할 수 있다 프로미스는 결과를 데이터 채널로 보내고, 퓨처는 이를 기다렸다 가져온다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <future>
#include <thread>
 
int main()
{
    std::cout << std::endl;
 
    int res;
    std::thread t([&]{ res = 2000 + 11; });
    t.join();
    std::wcout << "res ; " << res << std::endl;
 
    auto fut = std::async([] { return 2000 + 11; });
    std::cout << "fut.get() : " << fut.get() << std::endl;
 
    std::cout << std::endl;
}
cs

 

11행 : 스레드가 공유변수인 res 에 계산 결과를 받는다.
14행 : std::async 호출은 발신자(프로미스)와 수신자(퓨처) 사이에 데이터 채널을 만든다. 퓨처는 데이터 채널에 fut.get() 으로 계산 결과를 요청한다. 이 fut.get() 호출은 블로킹이다.

실행결과>

 

기준 스레드 태스크
교신 공유 변수 교신 채널
스레드 생성 필수 선택
동기화 join() 을 통해 (대기) get 호출 블록
자식 스레드의 예외 자식 및 생성자 스레드 중단 프로미스의 리턴 값
교신의 종류 값, 알림, 예외


스레드에는 thread 헤더, 태스크는 future 헤더 필요

std::async
퓨처를 만들 수 있는 가장 쉬운 방법이다.
비동기 함수 호출과 비슷하다. 인수와 콜러블(callable)을 함께 받기 떄문이다.
std::async 는 가변 템플릿이며, 따라서 임의의 여러 인수를 받을 수 있다. std::async 호출은 퓨처 객체인 fut 을 리턴한다.

시작 정책

비동기 호출이 동일한 스레드(std::launch::deferred) 에서 실행되어야 하는지 아니면 또 다른 스레드(std::launch:async)에서 실행되어야 하는지를 명시벅으로 지정할 수 있다.
기본적으로 std::async 는 자신의 작업 패키지를 곧바로 실행한다. 이 때문에 조급한 계산법으로 불린다.

auto fut = std::async(std::launch::deferred, ...) 호출은 프로미스가 곧바로 실행되지 않는다는 점에서 특별하다. fut.get() 호출은 프로미스를 느긋하게 시작하는데, 이는 다시 말해, 프로미스는 퓨처가 fut.get() 을 통해 결과를 요청할 때만 실행된다는 뜻이다. 

퓨처의 조급한 게산법과 느긋한 계산법

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <future>
#include <thread>
 
int main()
{
    std::cout << std::endl;
 
    auto begin = std::chrono::system_clock::now();
    auto asyncLazy = std::async(std::launch::deferred, 
        [] { return std::chrono::system_clock::now(); });
    auto asyncEager = std::async(std::launch::async,
        [] { return std::chrono::system_clock::now(); });
    std::this_thread::sleep_for(std::chrono::seconds(1));
    auto lazyStart = asyncLazy.get() - begin;
    auto eagerStart = asyncEager.get() - begin;
    auto lazyDuration = std::chrono::duration<double>(lazyStart).count();
    auto eagerDuration = std::chrono::duration<double>(eagerStart).count();
    std::cout << "asyncLasy evaluted after : " << lazyDuration << " senconds." << std::endl;
    std::cout << "asyncEager evaluted after : " << eagerDuration << " senconds." << std::endl;
 
    std::cout << std::endl;
}
cs

15행 : asyncLazy.get() 은 10행의 프로미스 실행을 유발한다. 그 결과는 14행의 짧은 1초 대기 이후에 사용 가능하다.
16행 : asyncEager.get() 은 곧바로 실행된 작업 패키지로부터 결과를 받는다.

실행결과>



std::packaged_task

std::packaged_task pack 은 콜러블(Callable)의 비동기 호출을 위한 래퍼(wrapper) 다.
pack.get_future() 르 ㄹ호출하면 연동돈 퓨처를 받는다. 호출 연산자를 pack 에 호출하면(pack()) std::packaged_task 가 실행되고, 그 결과 볼러블이 실행된다.

std::packaged_task 으 ㅣ관리는 크게 네 단계로 나뉜다.

1. 작업을 래핑한다.
std::packaged_task<int(int, int)> sumTask({}(int a, int b) { return a + b; });

2. 퓨처를 생성한다.
std::future<int> sumResult = sumTask.get_future();

3. 계산을 수행한다.
sumTask(2000, 11);

4. 결과를 조회한다.
sumResult.get();

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <iostream>
#include <future>
#include <thread>
#include <utility>
#include <deque>
 
class SumUp {
public:
    int operator() (int beg, int end) {
        long long int sum{ 0 };
        for (int i = beg; i < end; i++) sum += i;
        return sum;
    }
};
int main()
{
    std::cout << std::endl;
 
    SumUp sumUp1;
    SumUp sumUp2;
    SumUp sumUp3;
    SumUp sumUp4;
 
    // 태스크 래핑
    std::packaged_task<int(intint)> sumTask1(sumUp1);
    std::packaged_task<int(intint)> sumTask2(sumUp2);
    std::packaged_task<int(intint)> sumTask3(sumUp3);
    std::packaged_task<int(intint)> sumTask4(sumUp4);
    
    // 퓨처 생성
    std::future<int> sumResult1 = sumTask1.get_future();
    std::future<int> sumResult2 = sumTask2.get_future();
    std::future<int> sumResult3 = sumTask3.get_future();
    std::future<int> sumResult4 = sumTask4.get_future();
    
    //컨테이너에 태스크 넣기
    std::deque<std::packaged_task<int(intint)>> allTasks;
    allTasks.push_back(std::move(sumTask1));
    allTasks.push_back(std::move(sumTask2));
    allTasks.push_back(std::move(sumTask3));
    allTasks.push_back(std::move(sumTask4));
 
    int begin1 };
    int increment{ 2500 };
    int end = begin + increment;
 
    // 별도의 스레드로 각 계산 수행
    while (not allTasks.empty()) {
        std::packaged_task<int(intint)> myTask = std::move(allTasks.front());
        allTasks.pop_front();
        std::thread sumThread(std::move(myTask), beginend);
        begin = end;
        end += increment;
        sumThread.detach();
    }
 
    // 결과 얻기
    auto sum = sumResult1.get() + sumResult2.get() + sumResult3.get() + sumResult4.get();
    std::cout << "sum of 0 .. 1000 = " << sum << std::endl;
 
    std::cout << std::endl;
}
cs

25~28행 : std::packaged_task 객체로 작업 패키지를 감싼다. 작업 패키지는 SumUp 클래스의 인스턴스이다. 작업은 호출 연산자로 완료된다. (9~13행) 호출 연산자는 begin에서 end - 1 까지 모든 수를 나타내며 그 합계로 리턴한다. std::packaged_task 는 두개의 int를 사용해 처리하고 하나의 int를 반환하는(int(int, int)) 콜러블들을 관리한다.
31~34행 : std::packaged_task 객체의 도움을 받아 퓨처 객체를 생성해야 한다. packaged_task는 겨신 채널의 프로미스다. 퓨처의 타입은 std::future<int> sumResult1 = sumTask1,get_future() 처럼 명시적으로 정의되지만 컴파일러가
auto sumResult1 = sumTask1,get_future() 로 이를 담당한다.
37~41행 : packaged_task 는 std::deque 로 이동한다. 
48~55행 : packaged_task 각 패키지가 실행된다.
52행 : 이를 위해 std::deque 의 헤드를 std::packaged_task 로 옮긴다.
57행 : 백그라운드에서 실행한다.
61행 : 최종 단계에서 모든 퓨처의 값을 요청해서 계산한다.

실행 결과>

 

std::promise 와 std::future

값이나 예외 또는 다눈 알림을 공유 데이터 채널로 보낼 수 있다. 하나의 프로미스가 std::shared_futue 퓨처 여럿을 담당한다. C++20 에서는 확장퓨처를 제공한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <future>
#include <thread>
#include <utility>
 
void product(std::promise<int>&& intPromise, int a, int b) {
    intPromise.set_value(a * b);
}
 
struct Div {
    void operator() (std::promise<int>&& intPromise, int a, int b) const {
        intPromise.set_value(a / b);
    }
};
int main()
{
    std::cout << std::endl;
 
    int a = 20;
    int b = 10;
    std::cout << std::endl;
 
    // 프로미스 정의
    std::promise<int> prodPromise;
    std::promise<int> divPromise;
 
    // 퓨처 받기
    std::future<int> prodResult = prodPromise.get_future();
    std::future<int> divResult = divPromise.get_future();
 
    // 별도의 스레드로 결과 계산
    std::thread prodThread(product, std::move(prodPromise), a, b);
    Div div;
    std::thread divThread(div, std::move(divPromise), a, b);
 
    // 결과 받기
    std::cout << "20 + 10 = " << prodResult.get() << std::endl;
    std::cout << "20 / 10 = " << divResult.get() << std::endl;
 
    prodThread.join();
    divThread.join();
 
    std::cout << std::endl;
}
cs

32행 : prodThread 스레드는 6~8행의 product 함수와 28행의 prodPromise 함수, a와 b 라는 수를 받는다. ProdThread 의 인수를 이해하려면 함수의 서명을 파악해야 한다. prodThread 의 첫번째 인수는 콜러블(Callable) 이어야 하는데, 이느 ㄴ앞서 언급한 product 함수다. product 함수에는 rvalue 레퍼런스(std::promise<int>&& intPromise) 의 프로미스와 두개의 수가 필요한데 이는 prodThread 의 마지막 세 인수다. 32행의 std::move 가 rvalue 레퍼런스를 생성한다. 
34행 : 두 수 a와 b 를 나눈다. 이때 Div 클래스의 div 객체가 사용된다. (10~13행) div는 함수 객체의 인스턴스다.
41, 42행 : 퓨처는 prodResult.get() 과 divResult.get() 을 호출해 결과를 가져온다.

실행 결과>

 

std::promise
값이나 알림, 예외를 설정하는데 사용된다. 
결과를 지연 방식으로 제공한다.

메서드 설명
prom.wap(prom2)와 std::swap(prom, prom2) 프로미스를 서로 맞바꾼다.
prom.get_future() 퓨차를 반환한다.
prom.set_value(val) 값을 설정한다.
prom.set_exception(ex) 예외를 설정한다.
prom.set_value_at_thread_exit(val) 값을 지정하고, 프로미스 종료 시 사용되도록 준비한다.
prom.set_exception_at_thread_exit(val) 예외를 지정하고, 프로미스 종료 시 사용되도록 준비힌다.

값이나 예외가 프로미스에 의해 한 번 이상 설정되면 std::future_error 예외가 발생한다.

std::future

프로미스에서 값을 가져온다.
값을 사용할 수 있는지 프로미스에 확인한다.
프로미스의 알림을 기다린다. 이때 상대적 기간(time duration) 이나 절대적 시점(time point) 이 적용된다.
공유 퓨처(std::future)를 생성한다.

메서드 설명
fut,share() std::shared_future 를 반환한다. 반환 이후에는 결과를 사용할 수 없다.
fut.get() 값 또는 예외인 결과를 린턴한다.
fut.valid() 결과를 사용할 수 있는지 확인한다. fut.get() 을 호출한 이후에는 false 를 리턴한다.
fut.wait() 결과를 기다린다.
fut.wait_for(Time) 결과를 기다린다. 단, reaTime 이상은 기다리지 않는다.
fut.wait_untile(absTime) 결과를 기다린다. 단, absTime 이상은 기다리지 않는다.

fut 퓨처가 한 번 이상 결과를 요청하면 std::future_error 예외가 발생한다.
프로미스와 퓨처 사이에는 1:1 관계가 성립한다. 이와 반대로 std::shared_future 는 프로미스외 퓨처 사이에 1:N 관계를 지원한다.

std::shared_future

퓨처는 fut.share() 를 사용해 공유 퓨처를 만든다. 공유 퓨처는 프로미스와 연동되어 결과를 독립적으로 요청할 수 있다. std::shared_future 의 인터페이스는 std::future 와 같다.
std::future 의 기능외에도 연동된 다른 퓨처들과는 독립적으로 프로미스를 쿼리항 수 있다.

두 가지 방법으로 만들 수 있다.
1. std::future 인 fut에 fut.share() 를 호출한다. 호출 이후에는 결과를 더 이상 사용할 수 없다. 따라서 valid==false 가 된다.
2. std::promise 로 부터 std::shared_future를 초기화 한다. (std::shared_future<int> divResult=divPromise.get_future())

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <iostream>
#include <future>
#include <thread>
#include <utility>
 
std::mutex coutMutex;
 
struct Div {
    void operator() (std::promise<int>&& intPromise, int a, int b) const {
        intPromise.set_value(a / b);
    }
};
 
struct Requestor {
    void operator() (std::shared_future<int> shaFut) {
        // std:;:cout 잠그기
        std::lock_guard<std::mutex> coutGuard(coutMutex);
 
        // 스레드 id 받기
        std::cout << "threadId(" << std::this_thread::get_id() << ") : ";
        std::cout << "20 / 10 = " << shaFut.get() << std::endl;
    }
};
 
int main()
{
    std::cout << std::endl;
 
    // 프로미스 정의
    std::promise<int> divPromise;
 
    // 퓨처 받기
    std::shared_future<int> divResult = divPromise.get_future();
 
    // 별도의 스레드로 결과 계산
    Div div;
    std::thread divThread(div, std::move(divPromise), 2010);
 
    Requestor req;
    std::thread sharedThread1(req, divResult);
    std::thread sharedThread2(req, divResult);
    std::thread sharedThread3(req, divResult);
    std::thread sharedThread4(req, divResult);
    std::thread sharedThread5(req, divResult);
   
    divThread.join();
 
    sharedThread1.join();
    sharedThread2.join();
    sharedThread3.join();
    sharedThread4.join();
    sharedThread5.join();
 
    std::cout << std::endl;
}
cs

두 작업 패키지, 다시 말해 프로미스와 퓨처의 작업 패키지는 여기서 함수 객체다. 
37행 : divPromise는 divThread 스레드로 이동되어 실행된다. 이에 따라 std::shared_future 도 총 다섯개의 스레드로 복사된다. (48~52행) 이 사실은 매우 중요하다 이동될 수 있는 std::future 객체와 반대로 std::shared_future 객체는 복사될 수 있다. 메인 스레드는 48~52행에서 자식 스레드가 일을 마칠때까지 기다렸다가 결과를 표시하게 된다.

실행 결과>

 

예외 반환하기

executeDivision 함수는 계산 결과 또는 예외를 표시한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <iostream>
#include <future>
#include <thread>
#include <utility>
#include <exception>
#include <string>
 
struct Div {
    void operator() (std::promise<int>&& intPromise, int a, int b){
        try {
            if (0 == b) {
                std::string errMsg = std::string("Illegal division by zero : "+ std::to_string(a) + "/" + std::to_string(b);
                throw std::runtime_error(errMsg);
            }
            intPromise.set_value(a / b);
        }
        catch (...) {
            intPromise.set_exception(std::current_exception());
        }
    }
};
 
void execeptionDivision(int nom, int denom) {
    std::promise<int> divPromise;
    std::future<int> divResult = divPromise.get_future();
    Div div;
    std::thread divThread(div, std::move(divPromise), nom, denom);
    
    // 결과 또는 예외 받기
    try {
        std::cout << nom << " / " << denom << " = " << divResult.get() << std::endl;
    }
    catch (std::runtime_error& e) {
        std::cout << e.what() << std::endl;
    }
 
    divThread.join();
}
 
void main()
{
    std::cout << std::endl;
    execeptionDivision(200);
    execeptionDivision(2010);
    std::cout << std::endl;
}
cs

 

프로미스는 분모가 0 이라는 문제를 처리한다. 분모가 0 이면 18행의 intPromise.set_exception(std::current_exception()) 처럼 예외를 반환 값으로 설정한다. 퓨처는 30~35행의 try~catch 블록에서 이 예외를 처리한다.

실행 결과>

가능하다면 조건 변수 대신 태스크를 사용하는 것이 안전해서 좋다

 

알림 반환하기

스레드를 동기화하는데 프로미스와 퓨처를 사용한다면 조건 변수를 사용할 때ㄹ와 공통점이 많아진다. 대개는 프로미스와 퓨처가 더 나은 선택이다.

기준 조건 변수 태스크
다수 동기화 아니오
임계 구역 아니오
수신자와 오류 처리 아니오
가짜 깨우기 아니오
사라진 깨우기 아니오

프로미스에 비해 조건 뱐숙 우위를 점하는것은 스레드 동기화를 여러번 할 수 있다는 점이다. 이와 반대로 프로미스는 알림을 한 번만 보낼 수 있으므로 조건 변수 같은 기능을 발휘하기 위해서는 더 많은 프로미스와 퓨처를 사용해야 한다. 만일 한 번 동기화 하는데 조건 변수르 ㄹ사용한다면 오히려 사용법만 훨씬 더 복잡해진다. 프로메미스외 퓨처는 공유 변수가 필요하지 않으며, 그에 따라 잠김도 필요하지 않다. 그리고 가자 깨우기나 사라진 꺠우기에 취약하지도 않다. 더구나 태스크는 예외를 처리할 수 있다. 조건 변수보다 태스크를 선호할 이유는 부지기수다.

조건 변수(condition varibles) 를 사용하는것이 얼마나 어려운지 기억하고 있을지 모르겠다 여기서는 두 스레드를 동기화하기 위한 핵심 부문만 제시하고자 한다.

setDataReady 함수는 동기화의 알림 부분을 수행하며, waitingForWork 함수는 대기 부분을 수행한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <future>
#include <utility>
 
void doTheWork() {
    std::cout << "Processing share data." << std::endl;
}
void waitingForWork(std::future<void>&& fut) {
    std::cout << "Worker : Waiting for work." << std::endl;
    fut.wait();
    doTheWork();
    std::cout << "Work done." << std::endl;
}
void setDataReady(std::promise<void>&& prom) {
    std::cout << "Sender : Data is ready." << std::endl;
    prom.set_value();
}
void main()
{
    std::cout << std::endl;
    std::promise<void> sendReady;
    auto fut = sendReady.get_future();
    std::thread t1(waitingForWork, std::move(fut));
    std::thread t2(setDataReady, std::move(sendReady));
    t1.join();
    t2.join();
    std::cout << std::endl;
}
cs

22행 : sendReady 덕분에 fut 퓨처를 받는다. 프로미스는 유일하게 알림을 보낼 수 있는 std::promise<void> sendReady (21행) 반환값을 사용해서 교신한다. 두 교신점은 스레드 t1 과 스레드 t2 (23, 24행) 로 이동된다. 퓨처는 fut.wait() 호출을 사용해(10행) 프로미스의 알림을 기다린다. (16행)

프로그램의 구조와 출력 결과는 조건 변수(condition variable) 절의 해당 프로그램과 일치한다.

실행 결과>

 

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30