++it18 – Channels are Useful, not only for Water

High performance computing made quite some progress lately. Maybe you heard about the reactive manifesto (you should if you are a reader of this blog ;-)), if not, it is an interesting readying … if nothing else as a conversational icebreaker.

Basically the manifesto preaches for systems more responsive and reliable by reacting to requests and dusting it into smaller requests processed by smaller and autonomous systems. This is nothing new, but quite far from traditional processing where single monolitic application took care of requests from reception to response.

There are several programming patterns useful for implementing a reactive system, channels (and streams which are quite the same thing) are one of them. A channel allows you to define a data flow with inputs, computation units, merge, split and collector of information coming in sequence into your system. Once the dataflow is defined, the data flows into it and results come out from the other end.

In this talk Felix Petriconi describes his implementation of channels for C++. You can find the slides here and the video here. As for all the other talks I’m writing about, errors and mistakes are just mine. (It is worth noting, here as in the other posts about this conference, that I am rebuilding this from the notes I took during the conference and not from the videos. It is very possible I missed something or got it messed up during the expansion of my notes).


Channels are useful, not only for water

Felix Petriconi

The C++ language is too large for anyone to master. So everyone lives within a subset.

(Sean Parent C++Now 2012)

C++11 introduced concurrent programming primitives. These tools have a low level of abstraction making it hard to develop and maintain concurrent programs.

I’ve been collaborating with Sean Parent on a new concurrent library for C++: stlab. Concurrent programming is no longer an option, since when chip technology reached a plateau where increasing clock frequency started generating heat faster then we are able to extract from the chip itself.

The only way we have now to increase computational power in a single chip is to increase the number of computing cores. As Herb Sutter wrote in 2005 – the free lunch is over.

So, now we have many cores and the problem of concurrent programming. According to the Amdahl’s law the speed up increase versus the number of cores is not linear because of the synchronization overhead.

How can we use multiple cores?

  • One single thread process per core. This is simple because no synchronization is needed among processes. Facebook tried this solution in the past. Nonetheless the hardware needs to synchronize memroy access, since the memory is shared.
  • Multi threaded process without synchronization. I.e. every thread does something different and does not share resources, so synchronization is not needed.
  • Multi threaded process with low level synchronization (mutex, semaphores, memory fences and transactional memory).
  • Multi threaded process with high level synchronization (futures, actors, channels).

Futures

Future (see also my post) is a meachanism (developed in 1977/78) to decouple a function from its result. A function is called and magically its result will appear in the future.

Futures can be combined together using continuations – results from one or more futures can be collected together and fed to another future, oe the result can be split among two or more futures.

Futures work great if you have a single value to calculate.

Channels

Channel is another high level mechanism for modeling concurrent programming. It has been devised in 1978 by C.A.R. Hoare. At the core, a channel allows the creation of a persistent execution graph [NdM: by employing channels you basically design the dataflow, letting the channel support take care of the concurrency and synchronization details]. The channels is fed by a sender, several processes are chained together to produce the response and the receiver gets the result.

Here is how it looks a channel using the stlab library:

stlab::sender<int> send;
stlab::receiver<int> receiver;
std::tie(send, receiver) = stlab::channel<int>(stlab::default_executor);

auto printer = [](int x ){ std::cout << x << '\n'; };

auto printer_process = receiver | printer;
receiver.set_ready();

send(1); send(2); send(3);

In order to allocate computations to threads an executor is needed. The default executor uses the system thread pool.

Computation blocks are executed asynchronously w.r.t. the send() instructions.

New edges of the data flow can be concatenated together with the operator|(). In this way the same producer can be chained to multiple receivers:

auto [send,receive] = channel<int>..
auto printerA = [](int x ){printf("Process A %d\n", x );};
auto printerB = [](int x ){printf("Process B %d\n", x );};

auto printer_processA = receive | printerA;
auto printer_processB = receive | printerB;
<pre class="enlighterjsraw" data-enlighter-language="cpp">receiver.set_ready();

send(1); send(2); send(3);

The opposite operation – i.e. merging multiple channels into a single receiver – is more convoluted since you may want different behaviors (stlab name in parenthesis):

  • downstream processor is invoked when all its inputs are ready (join)
  • downstream processor is invoked in a round robin fashion with data from its inputs (zip)
  • downstream processor is invoked as soon a data from one of its input is ready (merge)

Unfortunately these names are not so good since they are different from what is coming in C++.

Here an example about join:

using namespace stlab;

int main()
{
    auto [sendA,receiverA] = channel<int>( default_executor );
    auto [sendB,receiverB] = channel<int>( default_executor );

    auto printer = []( int a, int b )
    {
        printf( "Process %d %d\n", a, b ); 
    }

    auto print_process = join( default_executor, printer, receiverA, receiverB );

    receiverA.set_ready();
    receiverB.set_ready();

    
    sendA( 1 );
    sendA( 2 );
    sendB( 3 );
    sendA( 4 );
    sendB( 5 );
    sendB( 6 );

    int end;
    std::cin >> end;

    return 0;
}

The output is:

Process 1 3
Process 2 5
process 4 6

Note that the sender is never blocked.

You can add options in the stream. E.g.:

  • With buffer_size{n} within concatenation says that it is possible to limit the incoming queue to size n.
  • With executer{ex} within concatenation it is possible to specify a specifc executor for what follows. eg. immediate_executor
using namespace stlab;

int main()
{
    auto [send,receiver] = channel<int>( default_executor );

    auto printerA = []( int x )
    {
        printf( "Process A %d\n", x );
    };
    auto printerB = []( int x )
    {
        printf( "Process B %d\n", x );
    };

    auto printer_processA = receiver | printerA
    auto printer_processB = receiver | (executor{ immediate_executor} & printerB )};
    receiver.set_ready();

    send( 1 );
    send( 2 );
    send( 3 );

    int end;
    std::cin >> end;

    return 0;
}

The immediate_executor immediately executes the task in the caller thread context. [NdM: I haven’t notes about this, and the slides don’t offer the solution, but I’m pretty sure, the code above is going to print:

Process B 1
Process B 2
Process B 3
Process A 1
Process A 2
Process A 3

Unless the current thread is somewhere preempted before send statements are completed (it could be since main thread will perform some I/O).]

There is no a try_send operation to get a false if the spool is full. So you just send data and hope for the best. Buffers are allocated on the heap and implemented using deques.

Stateful process

Not all useful process are purely transformational, some of them require that the processor held a state across data flow.

States make things more complicated.

In order to define a stateful processor, you need to define a class with a specific signature:

#include <stlab/concurrency/channel.hpp>

enum class process_state
{
    await,
    yield
};

using process_state_scheduled=std::pair<process_state,std::chrono::steady_clock::time_point>;

struct process_signature
{
    void await(T... val ); // not a real template. This function needs
                           // many agruments as inputs
    U yield(); // return the result.
    process_state_scheduled state() const; // scheduling info

    // optional methods below
    void close();
    void set_error( std::excpetion_ptr);
};

Now let’s suppose we want to create an adder that sums all the integer flowing through it and produce the result. The adder is going to be used like this:

struct adder
{
    // to be defined
};

int main()
{
    auto [send,receiver] = channel<int>( default_executor );
    auto calculator = receiver | adder{} | [] ( int x )
    {
        std::cout << x << '\n';
    };
    receiver.set_ready();
    while( true )
    {
        int x;
        std::cin >> x;
        send( x );
    }
    return 0;
}

The adder is instantiated at the end of the channel. Input is taken from the command line and sent (forever) to the channel.

Let’s write an adder that takes a series of integers from the standard input and prints the sum when the user enters 0 (digit zero).

struct adder
{
    int _sum = 0;
    process_state_scheduled _state = await_forever;

    void await( int x ) 
    {
        _sum += x;
        if( x== 0 )
        {
           _state = yield_immediate;
        }
    }

    int yield()
    {
        int result = _sum;
        _sum = 0;
        _state = await_forever;

        return result;
    }

    auto state() const 
    {
        return _state;
    }
};

That’s fine, but we can’t make assumptions on external entities, we have to grant service level for our part. So, what happens if the user falls asleep or goes aways? Well, it would be nice, that after some time, let’s say 15 seconds, if no new number is typed, the sum is printed anyway.

This can be implemented by using the second part of the state type:

struct adder
{
    int _sum = 0;
    process_state_scheduler _state = await_forever;

    void await( int x )
    {
        _sum += x;
        if( x == 0 )
        {
            _state = yield_immediate;
        }
        else
        {
            _state.first = process_state::await;
            _state.second = std::chrono::steady_clock::now() + std::chrono::seconds( 15 );
        }
    }

    int yield()
    {
        int result = _sum;
        _sum = 0;
        _state = await_forever;
        return result;
    }
};

This is very convenient, especially if you compare it with an alternative implementation based on callbacks.

Conclusions

The idea behind the channels is to exploit the cores as much as possible.

Lessons learned:

  • do not try to implemnet your thread pool. It is easy to get them wrong. Use the ones provided by your operating system. Probably you can’t do better.
  • Design your application with the mindset that it can run dead-lock free on a single core machine.
  • Don’t let your application be pested by threads, mutex and atomics, because it very easy to use them badly and thus causing hard to find bugs.

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.