John Nagle (nagle@animats.com)
June, 2010
newthreading is a Python module which handles most of the details of locking in threaded programs, in a way which attempts to eliminate race conditions in programs. The module is a proof of concept version for a possible future addition to Python which would increase performance on multi-core computers.
For theoretical background on the concepts behind "newthreading", see Improved concurrency for Python.
In your program, import and use newthreading instead of the standard threading module. The suggested import is
from newthreading import *
which will import all the necessary objects needed for threading. All the functions of the threading module are duplicated in the newthreading module.
The base class AtomicObject is provided for simple cases where a traditional monitor is desired. Only one thread at a time can be inside an AtomicObject. Each AtomicObject is protected by a lock which is automatically locked when a thread enters any public function of the object, and is unlocked when the thread leaves. An AtomicObject is a traditional multiprocessing monitor and is a safe way to communicate between threads.
A simple to-do list |
---|
class ToDoList(AtomicObject) : """ A to-do list. First in, first out. """ def __init__(self) : self._todo = [] def append(self, item) : if item is None : raise(TypeError("Can't add None to a to-do list") self._todo.append(item) def getitem(self) : if self._todo.empty() : # the class is locked, so _todo can't change between here return(None) return(self._todo.pop(0)) # and here. So this is race condition free. |
To make each AtomicObject thread-safe, all data going into the object (via parameters) or coming out of the object (via return value) is restricted to types which contain no mutable state. Otherwise, a race condition could result due to data shared between threads without locking. This rule is enforced by "freezing" all parameters and return values. "Freezing" is covered in more detail below, but in general, it's OK to pass all fully-immutable objects like numbers, and tuples composed of fully-immutable objects. You can also pass objects which handle their own locking, like other AtomicObject objects. If you pass a "dict", "list", or "set", the object will be copied to its "frozen" form, a "frozendict", "tuple", or "frozenset". So, in practice you can pass most things that can reasonably be passed to another thread.
Some restrictions must be imposed to eliminate ambiguity over whether control is “inside” or “outside” the object, to make the locking work.
Methods beginning with "_'" can only be called from inside the object. Class methods without "_" can only be called from outside the object.
Methods not beginning with “_” should not be accessed from within the methods of the object. Doing so will cause an immediate deadlock, because the object has a non-recursive lock. The __init__ function is exempt from this restriction, because it runs with the object unlocked. Since the object is only accessible from one thread during construction, this is safe.
Atomic objects will deadlock if they call their own externally visible methods. This can happen inadvertently if a function calls outside itself, and some other function calls back into the original object. Thus, atomic objects are most useful when they are self-contained and do little calling of other objects.
Synchronized objects, like atomic objects, have the basic property that only one thread at a time can be active within the object. The difference is that if a thread blocks inside the object, by calling sleep or the acquire function of a Semaphore, the synchronized object will be temporarily unlocked, allowing another thread to enter the object. This allows creating useful classes like a queue class where threads wait until another thread adds an item to the queue. Here's an example.
A simple Queue class |
---|
class Queue(SynchronizedObject) : """ Basic Queue class using new threading system. """ def __init__(self, maxsize = 0) : """ Initialize FIFO queue of specified size. Size 0 is treated as infinite """ SynchronizedObject.__init__(self) # init parent self._maxsize = maxsize self._items = [] self._lock = Semaphore(0) def put(self, item) : """ Put item on queue. A frozen copy of the item will be made if necessary. """ if self._maxsize > 0 and length(self._items) >= self._maxsize : raise IndexError("Queue overflow") self._items.append(item) self._lock.release() def get(self) : """ Get item from queue. Block if no items available """ self._lock.acquire() # This waits on the semaphore and unlocks the object. return(self._items.pop(0)) |
In the above example, one or more threads could be adding items to the queue by calling put(item), while other threads get new queue entries by calling get(). Only one thread at a time can be active in the class. But here, when a thread calls self._lock.acquire() and blocks, the class is temporarily unlocked. Without this feature, no other thread could get in to do a put(), and the program would deadlock. (Which is what will happen if you make Queue an AtomicObject instead of a SynchronizedObject).
This feature makes synchronization classes work If you've encountered Java "synchronized", you known that the synchronization mechanism there has to be controlled on a method by method basis. In Java, object methods that can block can't be synchronized, or deadlock will result. So the built-in thread safety of synchronized objects in Java has to be bypassed. With the SynchronizedObject of "newthreading", that's not necessary.
The hard cases all work. You can call an AtomicObject from inside a SynchronizedObject and vice versa. AtomicObject and SynchronizedObject objects can be members of other objects. AtomicObject and SynchronizedObject objects can be passed into other AtomicObject and SynchronizedObject objects as parameters.
When a thread blocks inside a SynchronizedObject blocks, all the SynchronizedObject objects currently locked by that thread are unlocked. When the thread unblocks, all those objects are relocked, one at a time, from the "outside" in.
At present, the only blocking operations that do this unlocking are the acquire() operation of a Semaphore, and sleep(). In particular, blocking I/O operations don't unlock SynchronizedObject objects.
Access to data fields of an object is from outside the object is permitted under the usual Python rules. For a synchronized class, data access locks and unlocks the object's lock, maintaining concurrency safety. Because global variables are very restricted when "newthreading" is in use, it's useful to move all global variables to an AtomicObject, from which they can be accessed safely by multiple threads.
Above, the "freeze" operation that takes place when data is passed into or out of a synchronized or atomic object was mentioned. Here are the details.
The function: freeze(obj) returns only shareable objects, ones which are either immutable or synchronized. ("synchronized" here includes both SynchronizedObject and AtomicObject).
Mutable | Immutable | Notes |
---|---|---|
- |
bool | |
- |
int | |
- |
float | |
- |
long | |
- |
complex | |
- |
str | |
- |
unicode | Merged into "str" in Python 3 |
list | tuple | |
set | frozenset | |
dict | frozendict | "frozendict" provided by "newthreading" |
bytes | bytearray | |
listiterator | - |
Not useful as an immutable object |
xrange | - |
Has internal state; not useful as an immutable. |
class f(object) | - |
Mutable user object |
- |
class f(ImmutableObject) | Immutable user object |
The related predicate isfrozen(obj)indicates whether an object is "frozen". isfrozen is true if and only if freeze(obj) == obj.
"frozen" and "immutable" are not the same. "frozen" objects cannot contain mutable unsynchronized objects. A tuple contaning a list is immutable, but not frozen - it is not safe to share between threads.
Immutable user-defined classes can be declared by inheriting from ImmutableObject. Immutable user-defined objects are instances of such classes, and cannot change once __init__() has returned. Outside __init__(), no attribute of the object can be changed. __init__ can be called only once for each object. Immutable classes can inherit only from other immutable classes. Immutable classes cannot have destructors. (This feature is of marginal usefulness, and may be removed.)
A frozendict type is provided. It's simply a read-only "dict" object. freeze() of a "dict" returns a frozendict. You can also call frozendict(data) to make a frozen dictionary, with the same arguments as the built-in "dict".
Startup of a Python program is a dynamic process. "import" is considered executable, and can be executed conditionally. Functions and classes can be modified and patched dynamically. However, when a program has more than one active thread, elaborate locking is required to make this level of dynamism work. As a compromise, this proposal allows all the usual dynamism of Python until the moment the program creates its second thread. This is implemented through freezeprogram().
Upon the first call to freezeprogram(), the following events happen:
At the creation of the second thread, the point at which the program goes multi-thread, freezeprogram() is implicitly invoked.
Global variables are allowed, but can contain only frozen types. Global variables can be assigned new (frozen) values.
This is a tough restriction for some programs. For example, a global variable which is a list won't work. An attempt to "append" to it will fail.
Global variable names which are all upper case, the Python convention for constants, are treated as constants, and cannot be assigned new values. This allows fast access to constants without locking. Access to other global variables implies setting an implicit lock on the module containing the variable during the access.
The locking system of "newthreading" requires that programs be explicit about which parts of synchronized and atomic objects are internal to the object and which are exposed to the outside. The Python convention is that class members begriming with "_" are for internal use only, but this is just a convention. "Newthreading" uses that convention, but now it matters. A call to a method whose name does not begin with an underscore is an "external call", and the object is locked. A call to a method whose name does not begin with an underscore is an "internal call", and does not lock the object. The same rule applies to accesses to data attributes.
In addition, for any external call, all arguments to the call are "frozen". When the method returns, the return value is also "frozen". Mutable objects (other than synchronized ones) cannot be passed through an external call.
Therefore, for AtomicObjectand SynchronizedObject objects, data attributes and methods for internal use of the class should have names beginning with "_".
For AtomicObject, the rules are strict. Calling a non-underscore method from inside the class will cause an immediate deadlock, because AtomicObject has a non-recursive lock. SynchronizedObject is less restrictive; you can call non-underscore methods from inside the class. But there's a substantial performance penalty for such calls; unnecessary locking and freezing take place.
In the current implementation of "newthreading", access to single-underscore and double-underscore attributes from outside a class is not detected, but is not thread-safe. Don't do it. A later implementation will enforce this restriction.
eval is allowed, but only in its multiple-argument form, in which a list of objects which can be accessed is provided. The arguments to "eval" are "frozen", so that "eval" can only operate on thread-safe data.
The semantics of the 'threading” module have not changed. Threads are created and joined as before. However, the functions from newthreading, rather than threading, must be used. Do not import the threading module.
The "newthreading" module has optional logging of lock and unlock events. To enable logging, call setthreadingdebuglog(logger), where logger is a logger object from the standard logging module.
import logging logging.basicConfig() logger = logging.getLogger('ThreadTest') logger.setLevel(logging.DEBUG) setthreadingdebuglog(logger) |
In addition to this logging, the first call to freezeprogram()produces Python warnings for global variables which are being used in ways which may not be thread-safe. Global variables in all modules which contain non-frozen objects will be listed. These are potential race conditions. Several standard Python library modules will generate such warnings. About half of them seem to be valid. In future, those modules should be modified to use AtomicObject to protect their data, making the module thread-safe.
What does this all mean, and how can it be used?
First, the semantics of single-thread programs do not change. This allows for backwards compatibility with most routine Python programs.
This approach supports the two standard models of concurrency - message passing and shared data. Threaded programs which work by passing messages from one thread to another via a queue object work fine. That's straightforward. Any program that works with the "multiprocessing" module can easily be converted to work with "newthreading" (although, with this proof of concept implementation, multi-core CPUs will not be utilized effectively by "newthreading". That's for the future.)
Shared data is permitted, using AtomicObject or SynchronizedObject objects. So, if there's a big data structure being updated by multiple threads, it can be shared between threads. Within an AtomicObject or SynchronizedObject object, race conditions are automatically prevented by the automatic locking. It's seldom necessary to explicitly use a Lock or RLock. (They are, however, still available.)
"Frozen" objects can be passed from thread to thread freely. Python has an advantage here - much data is immutable. Immutable objects can be "passed by value" without actually making a copy. Multiple threads can safely share read-only objects without locking. Strings, tuples, and other frozen objects can be passed in and out of synchronized objects without copying overhead.
The restrictions on global variables are designed to retain the dynamic nature of Python during program startup, during which modules can be optionally loaded, parameters and configuration files read, and the program configured as desired. Once the program goes multi-thread, however, the code can no longer change. Globally visible variables are also restricted. This is a minor issue for programs which are strongly object-oriented, but programs which use global variables to excess will need re-working.
Closures and references to methods cannot be passed between threads. To enforce this, “freeze” will raise an exception if it encounters a closure or a reference to a method.
The attributes of a function are locked, like global variables, at freezeprogram() This prevents mutable objects in one thread from becoming visible to another.
Default function parameters for all functions are, like global variables, tested with frozen() when freezeprogram() is called. This prevents the following situation:
def bar(val = [ ]) :
...
This usage (which is usually a bug) traditionally results in all calls to “bar” sharing one mutable empty array as the default value of “val”. That would be a violation of the basic concurrency rules. The check at “freezeprogram()” will detect this and raise an exception.
If a thread is inside more than one synchronized object, a lock is set for each object. Deadlock is thus possible. Synchronized objects may be provided with a timeout option which raises an exception if the object cannot be entered within the time limit.
A more subtle cause of deadlock is the temporary unlocking of an object when a thread blocks inside a synchronized object. When the thread unblocks, it then must re-lock the object. This re-locking operation cannot be allowed to raise a timeout exception, because the catch point of the exception could be within the object. Allowing an object to catch its own lock timeout would result in two threads in the same object at the same time, which is a race condition.
Exceptions raised within a temporary unlock are possible, and the exception unwinder may have to wait for the lock on the synchronized object. It's thus possible for a thread to deadlock during exception processing. Again, a timeout here cannot be allowed. Locking soundness must be maintained even during exception processing.
In general, race conditions are more of a problem than deadlocks. Race conditions happen randomly, depending upon timing. Deadlock problems tend to be repeatable and show up in testing.
We already showed the Queue class in Example 1 above. That's a generally useful class. There's nothing special about it; you can write your own Queue class. Below is a specialized type of queuing class, to illustrate one of the uglier cases.
A synchronizer for RSS feeds with slow polling |
---|
class PollSynchronizer(SynchronizedObject) : """ Synchronizer for RSS feed operations which use slow HTTP polling """ def __init__(self, timeout = 30.0) : """ Initializer """ SynchronizedObject.__init__(self) # initialize base class self._changelock = Semaphore(0) self._waitcount = 0 self._changeid = 0 self._sleepthread = None self._timeout = timeout def setchangeid(self, id) : """ Report that the ID of the feed has changed. All waiting threads are awakened. """ self._changeid = id self._notifyall() # wake up everybody def waitforchange(self, id) : """ Wait until the current feed ID is different from the one given, or until the timeout runs out. Returns True if the id has changed. False on timeout with no ID change. """ endtime = time.time() + self._timeout # wake up at this time if no event if self._sleepthread is None : self._sleepthread = Thread(None, self.tick) # start tick thread while self._changeid == id and (not self._changeid is None) and endtime > time.time() : self._waitcount += 1 self._changelock.acquire() self._waitcount -= 1 return((self._changeid != id, self._changeid)) def __notifyall(self) : cnt = self._waitcount print("NotifyAll, %d items" % (cnt,)) for i in xrange(cnt) : self._changelock.release() def tick(self) : """ tick -- background thread, wakes up everybody every N seconds """ print("Starting tick thread %d" % (thread.get_ident(),)) # startup msg while not self._changeid is None : # stops when print("Tick...") newthreading.sleep(self.timeout * 0.25) self._notifyall() self._notifyall() # wake all on exit |
This snippet of code is a server-side synchronizer for RSS feeds which use slow HTTP polling. The goal here is to immediately inform the client when data has changed on the server, without overly frequent polling. The client makes HTTP requests which are deliberately stalled by the server for up to 30 seconds if no new items are available, but are answered immediately when new data is available. RSS feeds use an ID (typically a hash of the data, or a timestamp) which changes when the feed data changes.
Each server thread handling a client HTTP request calls waitforchange() with the ID from the last poll and a timeout value in seconds. The call returns either when the timer runs out, or immediately when the data source changes the ID by calling setchangeid().
Note the simplicity of the code. Written with traditional locking primitives, this is a tough piece of code to get right, even with a global lock protecting the underlying implementation. Because the object is synchronized, the obvious code is a solution free of race conditions. Because the synchronized object unlocks when a thread blocks, using a synchronized object does not deadlock the program. As before, the built-in locking eliminates the need for a global lock to protect the underlying data structures.
Safe global variables |
---|
from newthreading import * class GlobalVars(SynchronizedObject) : """ Global variable storage """ def __init__(self) : """ Initializer """ self.verbose = False self.debug = False |
This is simple thread-safe storage of global variables. Global variables must be immutable or synchronized. Encapsulating global variables in an SynchronizedObject makes them synchronized. For simple types, variables can be accessed and changed normally. Global variables of mutable types cannot be modified in place from outside the object; that would introduce a race condition.
Note that storing a mutable object, like a list, into one of these global variables actually stores a tuple copy of the list. The assignment
gvars.options = ['e','f']
is treated as
setattr(gvars,"options", freeze(['e','f''])
which stores a "frozen" copy of the list. A frozen list is a tuple. Access to gvars.options, however, does not require a copy. Since the stored value is immutable, access is cheap.