rtsync

As mentioned on the summary page, rtsync is a realtime, actor-based language implemented as a domain-language-to-C++ compiler with a distributed C++ runtime; the implementation was (unsurprisingly) also written in C++, except for various scripts in Python, perl, the odd shell script, samples, and the lexer and parser, which were generated with good old lex and yacc. The original authors were my friend Scott Stoddard and myself, working for, and under the supervision and theoretical guidance of, Dr. Stefan Bruda. (The guidance was very real, mind you—it just had a lot to do with theory.)

Another interesting part of the system was the verifier, which analysed all the constraints in a program and (as far as could be determined statically) determined whether any of them were impossible to satisfy. (Impossible timing constraints were considered semantic errors and would generate compiler errors.)

The main (but rather smaller) rtsync page is here, where you can also download a version of the language (though I don’t really know what it has developed into in the past few years!).

What follows are a few samples of the implementation and domain language, just to get some vague sense of what I was doing in the summer of 2004.

The actor class

Here’s a fun little class. The actor represents an agent in the system (well, rtsync is after all an agent-based system); in the domain language, actors are declared with properties and methods, just like objects in a traditional OO language. The difference between our actors and object in, say, C++, is that message passing in rtsync really involves transferring a message from one agent to another—potentially on another computer. These messages, too, are not treated equivocally, but rather are subject to time constraints regulated by synchronizer entities.

actor.h:

#ifndef ACTOR_H
#define ACTOR_H

class actor;

#include "messages.h"
#include "timing.h"
#include "synchronizer.h"
#include "pqueue.h"
#include "schedulable.h"
#include <pthread.h>


// Used to track message requests by priority
struct synchstruct
{
    syncref s;
    rttime_t priority;
};

// Sorting predicate for above
inline bool operator<(const synchstruct& s1, const synchstruct& s2)
{
    return s1.priority < s2.priority;
}

void* start_polling_func_a(actor* a);

class actor : public rt_sched_actor
{
    pthread_t m_pollthread;
protected:
    rttime_t rt_current;
    
    pthread_mutex_t m_queue_mutex;
    pqueue<synchstruct> m_msg_requests;

    const actorref name;
    
    // Keep track of listeners (for action triggers etc.)
    std::vector<syncref> listeners;
    // Notify all listeners of an event; called by derived
    void notify(const method& m, const rttime_t& t);
    
    // Start the polling thread; each derived constructor shall call this once
    // its members are initialized
    void start_polling();
    // Kill the polling thread; each derived destructor shall call this before
    // destroying any of its members
    void end_polling();

    
    // The functions below should not be called by users or derived classes;
    // the thread creation shall take care of this
    
    // Master polling function; do not touch, do not call
    void poll();
    // Hook to pass to pthread_create; can't use something with thiscall
    friend void* start_polling_func_a(actor* a);
    
    // Process message - inhreiting classes must pass it on to its methods,
    // as appropriate
    virtual void execute(message& m) = 0;
    
    friend class postmaster;
public:
    actor(const actorref& n);
    virtual ~actor() { }
    
    rttime_t getpriority();

    // Methods for individual actors
    // Get status of method
    virtual bool enabled(const method& m) const = 0;
    // Set status of method
    virtual void enable(const method& m) = 0;
    virtual void disable(const method& m) = 0;
    // Find when a method was last run
    virtual rttime_t lastrun(const method& m) = 0;

    // Method for synchronizers to notify the actor that a message of the
    // specified priority is waiting
    void register_msg(syncref s, rttime_t priority);
    // Method for synchronizers to register themselves as listeners
    void register_listener(const syncref& s);
};


#endif // #define ACTOR_H

actor.cc:

#include "actor.h"
#include "postmaster.h"
#include "logger.h"

#include <algorithm>


actor::actor(const actorref& n) : name(n)
{
    // Use default attributes, else we need to setup a pthread_mutexattr_t
    pthread_mutex_init(&m_queue_mutex, 0);
}


// Method for synchronizers to notify the actor that a message of the
// specified priority is waiting
void actor::register_msg(syncref s, rttime_t priority)
{
    rt_log << s << " notified " << name << " a message is available" << "\n";
    synchstruct s1 = { s, priority };
    pthread_mutex_lock(&m_queue_mutex);
    m_msg_requests.enqueue(s1);
    pthread_mutex_unlock(&m_queue_mutex);
}

// Notify all listeners of an event; called by derived
void actor::notify(const method& m, const rttime_t& t)
{
    for(std::vector<syncref>::iterator it = listeners.begin();
        it != listeners.end(); ++it)
    {
        rt_log << name << " notifying " << *it << " that it "
                  << "completed " << m << " at " << t << "\n";
        PostMaster.sm_notify_action(*it, name, m, t);
    }
}

// Method for synchronizers to register themselves as listeners
void actor::register_listener(const syncref& s)
{
    rt_log << "Actor " << name << ": " << s << " is listening" << "\n";
    if(std::find(listeners.begin(), listeners.end(), s) == listeners.end())
        listeners.push_back(s);
}

rttime_t actor::getpriority()
{
    // If we have a current priority, use it
    if (rt_current != rt_inf)
        return rt_current;
    else
    {
        // Use priority of most urgent message
        if (m_msg_requests.size())
            return m_msg_requests.peek().priority;
        else
            return rt_inf;
    }
}

// Polling functionality below

// To pass to pthread_create
void* start_polling_func_a(actor* a)
{
    rt_log << "(actor::)start_polling_func()" << "\n";
    a->poll();
    return 0;
}

// Start the polling thread; each derived constructor shall call this once
// its members are initialized
void actor::start_polling()
{
    rt_log << "actor::start_polling()" << "\n";
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    int result = pthread_create(&m_pollthread, &attr,
                                (void* (*)(void*)) start_polling_func_a,
                                (void*) this);
    sched_register(m_pollthread);
    rt_log << name << " is " << m_pollthread << "\n";

    if(result)
    {
        rt_log << "Error: Actor failed to create polling thread" << "\n";
        throw std::string("Error: Actor failed to create polling thread");
    }
    
    rt_log << "result: " << result << "\n";
    rt_log << "actor::start_polling() completed" << "\n";
}

// Kill the polling thread; each derived destructor shall call this before
// destroying any of its members
void actor::end_polling()
{
    pthread_kill(m_pollthread, 9);
}

// Master polling function; do not touch
void actor::poll()
{
    rt_log << name << " starts its polling ..." << "\n";
    while(true)
    {
        //rt_log << "Inside actor polling loop..." << std::endl;
        pthread_mutex_lock(&m_queue_mutex);
        if(m_msg_requests.size())
        {
            synchstruct ss = m_msg_requests.dequeue();
            pthread_mutex_unlock(&m_queue_mutex);

            message m = PostMaster.sm_request_msg(ss.s, name);
            if(m != null_msg)
            {
                m.timestamp.received = rt_gettime();
                rt_log << m;
                execute(m);
            }
            else
            {
                pthread_mutex_lock(&m_queue_mutex);
                m_msg_requests.enqueue(ss);
                pthread_mutex_unlock(&m_queue_mutex);
                sched_yield();
            }
        }
        else
        {
            pthread_mutex_unlock(&m_queue_mutex);
            sched_yield();
        }
    }
}

actor in action

The actor is nothing special: Just a limited object with properties and methods. Note that at least in this early version of rtsync, there was no registry of actors: The systems envisioned were system where all components were known, such as smart components in electronic or mechanical equipment (where the system can assume that a fixed set of valves and gauges are in place).

actor A {
    int x;
    
    init {
        x = 5;
    }
    
    a() {
        print("A.a()\n");
        D.d();
    }
}

A synchronizer sample

This is what the rtsync system is for. The following example in the domain language defines a synchronizer, which is an entity that monitors transmission of messages between specified actors. A synchronizer may prioritise and even hold messages in order to meet min and max time constraints.

The synchronizer S registers itself to monitor messages between the two actor pairs (A, B) and (A, D). It can also permit or forbid actors to take certain actions based on its own internal state, and trigger actions on those of actors.

synchronizer S {
    int mm;

    init(A<->B, A<->D) {
        mm = 3;
    }

    constraint {
        min B.b -> A.a = 1000;
        max B.b -> A.a = 2000;
        min A.a -> D.d = 200;
        max A.a -> D.d = 800;
        min B.b -> D.d = 100;
        max B.b -> D.d = 900;
    }
    
    trigger {
        disable(B.b) when mm < 0;
        enable(B.b) when mm > 0;
        action(A.a) {
            mm = mm + 1;
        }
    }
}

The error checker

The error checker, or verifier, mentioned above, was responsible for sanity-checking an rtsync program at compile time and ensure that there were no time constraints that couldn’t possibly be satisfied. This was limited in scope—the analyser had no way of knowing how long any actor would spend inside a method—but if A sends a message to B, and B to C; and if AB must take at least 90 ns, and AB must take at least 50 ns; yet the whole system AC must take at most 100 ns, it’s clearly nonsensical.

Additionally, this compiler module could correct certain constraints. For example, if we specify that AB must take at most 100 ns, and BC must take at least 120 ns, but the global constraint AC must take at most 90 ns, both of the in-between constraints (on this message path) can be reduced to (conservatively) 90 ns to improve scheduler performance.

The solution here was more elegant than it was simple (and I’ll remind you, gentle reader, that much as we appreciated some of this stuff, it was not Scott and I, but Dr. Bruda, who did the hard math!):

    // PROBLEM: Indirect max constraints may grow unboundedly due to loops. This
    // can happen, and arbitrarily large global constraints should then be
    // allowed! We must therefore find out what paths -are- cyclic and set the
    // corresponding max constraints to infinity. Min constraints are not an
    // issue since optimal min paths are by necessity non-cyclic.
    //   To find out what paths are cyclic we simply square the matrix and see
    // what paths would increase their max constraints; these are more than n
    // steps and must contain cycles.
    ematrix *= ematrix;
    for(unsigned row = 0; row < call_map.size(); ++row)
    {
        for(unsigned col = 0; col < call_map.size(); ++col)
        {
            if(ematrix(row, col).max > endmatrix(row, col).max)
                endmatrix(row, col).max = rt_inf;
        }
    }
Now in my blog:
RSS feed LiveJournal blog Show me more!