rtsync
As mentioned on the summary page, rtsync is a realtime,
actor-based language implemented as a domain-language-to-C++ compiler
with a distributed C++ runtime; the implementation was (unsurprisingly)
also written in C++, except for various scripts in Python, perl, the
odd shell script, samples, and the lexer and parser, which were generated
with good old lex and yacc. The original authors were
my friend Scott Stoddard and myself, working for, and under the supervision
and theoretical guidance of, Dr. Stefan Bruda. (The guidance was very real,
mind you—it just had a lot to do with theory.)
Another interesting part of the system was the verifier, which analysed all
the constraints in a program and (as far as could be determined statically)
determined whether any of them were impossible to satisfy. (Impossible
timing constraints were considered semantic errors and would generate
compiler errors.)
The main (but rather smaller) rtsync page is
here, where you can also download
a version of the language (though I don’t really know what it has developed
into in the past few years!).
What follows are a few samples of the implementation and domain language,
just to get some vague sense of what I was doing in the summer of 2004.
The actor class
Here’s a fun little class. The actor represents an agent in the
system (well, rtsync is after all an agent-based system); in the
domain language, actors are declared with properties and methods, just like
objects in a traditional OO language. The difference between our
actors and object in, say, C++, is that message passing in
rtsync really involves transferring a message from one agent to
another—potentially on another computer. These messages, too, are not
treated equivocally, but rather are subject to time constraints
regulated by synchronizer entities.
actor.h:
#ifndef ACTOR_H
#define ACTOR_H
class actor;
#include "messages.h"
#include "timing.h"
#include "synchronizer.h"
#include "pqueue.h"
#include "schedulable.h"
#include <pthread.h>
// Used to track message requests by priority
struct synchstruct
{
syncref s;
rttime_t priority;
};
// Sorting predicate for above
inline bool operator<(const synchstruct& s1, const synchstruct& s2)
{
return s1.priority < s2.priority;
}
void* start_polling_func_a(actor* a);
class actor : public rt_sched_actor
{
pthread_t m_pollthread;
protected:
rttime_t rt_current;
pthread_mutex_t m_queue_mutex;
pqueue<synchstruct> m_msg_requests;
const actorref name;
// Keep track of listeners (for action triggers etc.)
std::vector<syncref> listeners;
// Notify all listeners of an event; called by derived
void notify(const method& m, const rttime_t& t);
// Start the polling thread; each derived constructor shall call this once
// its members are initialized
void start_polling();
// Kill the polling thread; each derived destructor shall call this before
// destroying any of its members
void end_polling();
// The functions below should not be called by users or derived classes;
// the thread creation shall take care of this
// Master polling function; do not touch, do not call
void poll();
// Hook to pass to pthread_create; can't use something with thiscall
friend void* start_polling_func_a(actor* a);
// Process message - inhreiting classes must pass it on to its methods,
// as appropriate
virtual void execute(message& m) = 0;
friend class postmaster;
public:
actor(const actorref& n);
virtual ~actor() { }
rttime_t getpriority();
// Methods for individual actors
// Get status of method
virtual bool enabled(const method& m) const = 0;
// Set status of method
virtual void enable(const method& m) = 0;
virtual void disable(const method& m) = 0;
// Find when a method was last run
virtual rttime_t lastrun(const method& m) = 0;
// Method for synchronizers to notify the actor that a message of the
// specified priority is waiting
void register_msg(syncref s, rttime_t priority);
// Method for synchronizers to register themselves as listeners
void register_listener(const syncref& s);
};
#endif // #define ACTOR_H
actor.cc:
#include "actor.h"
#include "postmaster.h"
#include "logger.h"
#include <algorithm>
actor::actor(const actorref& n) : name(n)
{
// Use default attributes, else we need to setup a pthread_mutexattr_t
pthread_mutex_init(&m_queue_mutex, 0);
}
// Method for synchronizers to notify the actor that a message of the
// specified priority is waiting
void actor::register_msg(syncref s, rttime_t priority)
{
rt_log << s << " notified " << name << " a message is available" << "\n";
synchstruct s1 = { s, priority };
pthread_mutex_lock(&m_queue_mutex);
m_msg_requests.enqueue(s1);
pthread_mutex_unlock(&m_queue_mutex);
}
// Notify all listeners of an event; called by derived
void actor::notify(const method& m, const rttime_t& t)
{
for(std::vector<syncref>::iterator it = listeners.begin();
it != listeners.end(); ++it)
{
rt_log << name << " notifying " << *it << " that it "
<< "completed " << m << " at " << t << "\n";
PostMaster.sm_notify_action(*it, name, m, t);
}
}
// Method for synchronizers to register themselves as listeners
void actor::register_listener(const syncref& s)
{
rt_log << "Actor " << name << ": " << s << " is listening" << "\n";
if(std::find(listeners.begin(), listeners.end(), s) == listeners.end())
listeners.push_back(s);
}
rttime_t actor::getpriority()
{
// If we have a current priority, use it
if (rt_current != rt_inf)
return rt_current;
else
{
// Use priority of most urgent message
if (m_msg_requests.size())
return m_msg_requests.peek().priority;
else
return rt_inf;
}
}
// Polling functionality below
// To pass to pthread_create
void* start_polling_func_a(actor* a)
{
rt_log << "(actor::)start_polling_func()" << "\n";
a->poll();
return 0;
}
// Start the polling thread; each derived constructor shall call this once
// its members are initialized
void actor::start_polling()
{
rt_log << "actor::start_polling()" << "\n";
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int result = pthread_create(&m_pollthread, &attr,
(void* (*)(void*)) start_polling_func_a,
(void*) this);
sched_register(m_pollthread);
rt_log << name << " is " << m_pollthread << "\n";
if(result)
{
rt_log << "Error: Actor failed to create polling thread" << "\n";
throw std::string("Error: Actor failed to create polling thread");
}
rt_log << "result: " << result << "\n";
rt_log << "actor::start_polling() completed" << "\n";
}
// Kill the polling thread; each derived destructor shall call this before
// destroying any of its members
void actor::end_polling()
{
pthread_kill(m_pollthread, 9);
}
// Master polling function; do not touch
void actor::poll()
{
rt_log << name << " starts its polling ..." << "\n";
while(true)
{
//rt_log << "Inside actor polling loop..." << std::endl;
pthread_mutex_lock(&m_queue_mutex);
if(m_msg_requests.size())
{
synchstruct ss = m_msg_requests.dequeue();
pthread_mutex_unlock(&m_queue_mutex);
message m = PostMaster.sm_request_msg(ss.s, name);
if(m != null_msg)
{
m.timestamp.received = rt_gettime();
rt_log << m;
execute(m);
}
else
{
pthread_mutex_lock(&m_queue_mutex);
m_msg_requests.enqueue(ss);
pthread_mutex_unlock(&m_queue_mutex);
sched_yield();
}
}
else
{
pthread_mutex_unlock(&m_queue_mutex);
sched_yield();
}
}
}
actor in action
The actor is nothing special: Just a limited object with
properties and methods. Note that at least in this early version of
rtsync, there was no registry of actors: The systems
envisioned were system where all components were known, such as smart
components in electronic or mechanical equipment (where the system can
assume that a fixed set of valves and gauges are in place).
actor A {
int x;
init {
x = 5;
}
a() {
print("A.a()\n");
D.d();
}
}
A synchronizer sample
This is what the rtsync system is for. The following
example in the domain language defines a synchronizer, which is
an entity that monitors transmission of messages between specified
actors. A synchronizer may prioritise and even hold
messages in order to meet min and max time constraints.
The synchronizer S registers itself to monitor messages
between the two actor pairs (A, B) and (A,
D). It can also permit or forbid actors to take
certain actions based on its own internal state, and trigger actions
on those of actors.
synchronizer S {
int mm;
init(A<->B, A<->D) {
mm = 3;
}
constraint {
min B.b -> A.a = 1000;
max B.b -> A.a = 2000;
min A.a -> D.d = 200;
max A.a -> D.d = 800;
min B.b -> D.d = 100;
max B.b -> D.d = 900;
}
trigger {
disable(B.b) when mm < 0;
enable(B.b) when mm > 0;
action(A.a) {
mm = mm + 1;
}
}
}
The error checker
The error checker, or verifier
, mentioned above, was responsible
for sanity-checking an rtsync program at compile time and ensure that there
were no time constraints that couldn’t possibly be satisfied. This was
limited in scope—the analyser had no way of knowing how long any
actor would spend inside a method—but if A
sends a message to B, and B to C; and if
A→B must take at least 90 ns, and
A→B must take at least 50 ns; yet the whole
system A→C must take at most 100 ns, it’s clearly
nonsensical.
Additionally, this compiler module could correct certain
constraints. For example, if we specify that A→B
must take at most 100 ns, and B→C must take at least
120 ns, but the global constraint A→C must take
at most 90 ns, both of the in-between constraints (on this message path)
can be reduced to (conservatively) 90 ns to improve scheduler performance.
The solution here was more elegant than it was simple (and I’ll remind you,
gentle reader, that much as we appreciated some of this stuff,
it was not Scott and I, but Dr. Bruda, who did the hard math!):
// PROBLEM: Indirect max constraints may grow unboundedly due to loops. This
// can happen, and arbitrarily large global constraints should then be
// allowed! We must therefore find out what paths -are- cyclic and set the
// corresponding max constraints to infinity. Min constraints are not an
// issue since optimal min paths are by necessity non-cyclic.
// To find out what paths are cyclic we simply square the matrix and see
// what paths would increase their max constraints; these are more than n
// steps and must contain cycles.
ematrix *= ematrix;
for(unsigned row = 0; row < call_map.size(); ++row)
{
for(unsigned col = 0; col < call_map.size(); ++col)
{
if(ematrix(row, col).max > endmatrix(row, col).max)
endmatrix(row, col).max = rt_inf;
}
}