Chinaunix首页 | 论坛 | 博客
  • 博客访问: 174829
  • 博文数量: 47
  • 博客积分: 3053
  • 博客等级: 少校
  • 技术积分: 451
  • 用 户 组: 普通用户
  • 注册时间: 2007-06-01 04:33
个人简介

malware/APT detection, silicon valley, entrepreneur, CTO, start-up operation, team build, Nanjing/Beijing, if you want to do creative things, join the adventure.

文章分类

全部博文(47)

分类: C/C++

2009-06-14 09:40:08

C++/CLI supports the ability to create multiple threads of execution within a single program. Rex covers the creation and synchronization of threads in C++/CLI.

By Rex Jaeschke,  C/C++ Users Journal
Oct 01, 2005
URL:

C++/CLI supports the ability to create multiple threads of execution within a single program. This month, we'll see how threads are created and synchronized. Next month, we'll see how shared variables can be guarded against compromise during concurrent operations.

Introduction

A thread is an individual stream of execution as seen by the processor, and each thread has its own register and stack context. The runtime environment executes only one thread at a time. The execution of a thread is interrupted when it needs resources that are not available, it is waiting for an operation such as an I/O to complete, or if it uses up its processor time slice. When the processor changes from executing one thread to another, this is called "context switching." By executing another thread when one thread becomes blocked, the system allows processor idle time to be reduced. This is called "multitasking."

When a program is executed, the system is told where on disk to get instructions and static data. A set of virtual-memory locations, collectively called an address space, is allocated to that program, as are various system resources. This runtime context is called a "process." However, before a process can do any work, it must have at least one thread. When each process is created, it is automatically given one thread, called the "primary thread." However, this thread has no more capability than other threads created for that process; it just happens to be the first thread created for that process. The number of threads in a process can vary at runtime under program control. Any thread can create other threads; however, a creating thread does not in any sense own the threads it creates—all threads in a process belong to the process as a whole.

The work done by a process can be broken into subtasks with each being executed by a different thread. This is called "multithreading." Each thread in a process shares the same address space and process resources. When the last remaining thread in a process terminates, the parent process terminates.

Why have more than one thread in a process? If a process has only one thread, it executes serially. When the thread is blocked, the system is idle if no other process has an active thread waiting. This may be unavoidable if the subtasks of the process must be performed serially; however, this is not the case with many processes. Consider a process that has multiple options. A user selects some option, which results in lots of computations using data in memory or a file and the generation of a report. By spawning off a new thread to perform this work, a process can continue accepting new requests for work without waiting for the previous option to complete. Moreover, by specifying thread priorities, a process can allow less-critical threads to run only when more-critical threads are blocked.

Once a thread has been dispatched, another thread can be used to service keyboard or mouse input. For example, the user might decide that a previous request is not the way to go after all, and wish to abort the first thread. This can be done by selecting the appropriate option on a pull-down menu and having one thread stop the other.

Another example involves a print spooler. Its job is to keep a printer as busy as possible and to service print requests from users. The users would be very unhappy if the spooler waited until a job had completed printing before it started accepting new requests. Of course, it could periodically stop printing to see if any new requests were pending (this is called "polling"), but that wastes time if there are no requests. In addition, if the time interval between polls is too long, there is a delay in servicing requests. If it is too short, the thread spends too much time polling. Why not have the spooler have two threads—one to send work to the printer, the other to deal with requests from users? Each runs independent of the other, and when a thread runs out of work, it either terminates itself or goes into an efficient state of hibernation.

When dealing with concurrently executing threads, we must understand two important aspects: atomicity and reentrancy. An atomic variable or object is one that can be accessed as a whole, even in the presence of asynchronous operations that access the same variable or object. For example, if one thread is updating an atomic variable or object while another thread reads its contents, the logical integrity of those contents cannot be compromised—the read will get either the old or the new value, never part of each. Normally, the only things that can be accessed atomically are those having types supported atomically in hardware, such as bytes and words. Most of the fundamental types in C++/CLI are guaranteed to be atomic. (Others might also be atomic for a given implementation, but that's not guaranteed.) Clearly, a Point object implemented as an x- and y-coordinate pair is not atomic, and a writer of a Point's value could be interrupted by a reader to that Point, resulting in the reader getting the new x and old y, or vice versa. Similarly, arrays cannot be accessed atomically. Because most objects cannot be accessed atomically, we must use some form of synchronization to ensure that only one thread at a time can operate on certain objects. For this reason, C++/CLI assigns each object, array, and class a synchronization lock.

A reentrant function is one that can safely be executed in parallel by multiple threads of execution. When a thread begins executing a function, all data allocated in that function comes from either the stack or the heap. In any event, it's unique to that invocation. If another thread begins executing that same function while the first thread is still working there, each thread's data will be kept separate. However, if that function accesses variables or files that are shared between threads, it must use some form of synchronization.

Creating Threads

In Listing 1, the primary thread creates two other threads, and the three threads run in parallel without synchronization. No data is shared between the threads and the process terminates when the last thread terminates.

Let's begin by looking at the first executable statement in the program in case 3a. Here we create an object having the user-defined type ThreadX. That class has a constructor, an instance function, and three fields. We call the constructor passing it a start and end count, and an increment amount, which it stores for later use in controlling a loop.

In case 3b, we create an object of the library type System::Thread, which is from the namespace System::Threading. A new thread is created using such an object; however, before a thread can do useful work, it must know where to start execution. We indicate this by passing to Thread's constructor a delegate of type System::ThreadStart, which supports any function taking no arguments and returning no value. (Being a delegate, it could encapsulate multiple functions; however, in our examples, we'll specify only one.) In this case, we identify that the thread is to begin by executing instance function ThreadEntryPoint on object o1. Once started, this thread will execute until this function terminates. Finally, in case 3c, an arbitrary name is given to this thread by setting its Name property.

In cases 4a, 4b, and 4c, we do the same thing for a second thread, giving it a different set of loop control data and a different name.

At this stage, two thread objects have been constructed but no new threads have yet been created; these threads are inactive. To make a thread active, we must call Thread's function Start, as shown in cases 5 and 6. This function starts a new executing thread by calling its entry-point function. (Calling Start on a thread that is already active results in an exception of type ThreadStateException.) The two new threads each display their names and then proceed to loop and display their progress periodically. Because each of these threads is executing its own instance function, each has its own set of instance data members.

All three threads write to standard output and, as we can see from Figure 1, the output from the threads in one execution is intertwined. (Of course, the output might be ordered differently on subsequent executions.) We see that the primary thread terminated before either of the other two started running. This demonstrates that although the primary thread was the parent of the other threads, the lifetimes of all three threads are unrelated. Although the entry-point function used in this example is trivial, that function can call any other function to which it has access.

If we want different threads to start execution with different entry-point functions, we simply define those functions in the same or different classes (or as nonmember functions) as we see fit.

Synchronized Statements

The main program in Listing 2 has two threads accessing the same Point. One of them continually sets the Point's x- and y-coordinates to some new values while the other retrieves these values and displays them. Even though both threads start executing the same entry-point function, by passing a value to their constructors, we can make each thread behave differently.

The purpose of the call to Sleep for 100 milliseconds is to allow the two threads to start executing before we attempt to access p's x- and y-coordinates. That is, we want the primary thread to compete for exclusive access to p's coordinates with those two threads.

A call to Thread::Join suspends the calling thread until the thread on which Join is called terminates.

Consider the type ThreadY in Listing 2. The potential for conflict arises from the fact that one thread can be calling Move in case 1 while the other is (implicitly) calling ToString in case 2. Since both functions access the same Point, without synchronization, Move might update the x-coordinate, but before it can update the corresponding y-coordinate, ToString runs and displays a mismatched coordinate pair. In such a case, the output produced might be as shown in Figure 2(a). However, when the appropriated statements are synchronized, the coordinate pairs displayed by ToString always match. The output from one synchronized execution is shown in Figure 2(b). In the type Point in Listing 2, we can see how these (and other) functions' access to the x- and y-coordinates is synchronized.

A set of statements can be marked as wanting exclusive access to some resource by including them in what we shall refer to as a "lock block," by delimiting those statements with a call to the Thread::Monitor functions Enter and Exit, as shown in cases 1a and 1b, 2a and 2b, 3a and 3b, and 4a and 4b.

Since Move and ToString are instance functions, when they are called on the same Point, they share a common lock for that Point. To get exclusive access to an object's lock, we pass a handle to that object to Enter. Then if Move is called to operate on the same Point as ToString, Move is blocked until ToString is completed, and vice versa. As a result, the functions spend time waiting on each other, whereas without synchronization, they both run as fast as possible.

Once a lock block gets control of an object's lock, it ensures that only one instance function from that class can have its critical code be executed on that object at any one time. Of course, an instance function in that class that uses no lock pays no mind to what any of its synchronized siblings are doing, so we must be careful to use locks as appropriate. (Note that the X and Y accessors are not synchronized.) Instance functions' lock blocks that are operating on different objects do not wait on each other.

Ordinarily, a lock is released when Exit is called. (We'll discuss later what happens if an exception is thrown from inside the lock block.) Therefore, the lock is in place while code within a lock block calls any and all other functions. It is the programmer's responsibility to avoid a deadlock—the situation when thread A is waiting on thread B, and vice versa.

Consider a function that contains 25 statements, only three consecutive ones of which really need synchronization. If we enclose all 25 of them in one big lock block, we'll be locking out resources longer than we really need to. As we can see in the aforementioned lock blocks, each lock is held for the minimum possible time.

Look at the struct ArrayManip in Listing 3. When the lock block begins execution in case 2, the lock referenced by array is engaged, thereby blocking all other code that also needs to synchronize on that array, such as case 3, when both functions are called to operate on the same array.

A lock block can contain another lock block for the same object because it already has a lock on that object. In this case, the lock count is simply increased; it must decrease to zero before that object can be operated on by another synchronized statement in another thread. A lock block can also contain a lock block for a different object, in which case, it will be blocked until that second object becomes available. Function CopyArrays contains an example.

The obvious thing to use a lock for is to use the instance object for the parent function. However, we can invent lock objects and synchronize on them without actually having those objects contain any information. For example, see Listing 4. Class C has a lock object called Lock that contains no data and is never initialized or used in any context except a lock block. Functions F3 and F4 each contain a set of statements, one of which must be blocked while the other runs, and vice versa.

If a class function (rather than an instance function) needs synchronizing, the lock object is obtained by using the typeid operator, as shown in case 2. There is one lock object for each CLI type (as well as one for each instance of that type). A lock on a class means that only one class function's lock block for that class can execute at a time.

Note the try/finally in case 3. If execution of the lock block completes normally, the previous examples of calling Monitor::Exit will work correctly. However, if an exception is thrown inside the lock block, the calls to Exit will never happen because the flow of control is interrupted. As a result, if there is any chance that an exception could be thrown from within a lock block—either directly, or indirectly from any function that blocks calls—we should use a try/finally construct, as shown. That way, Exit is called both on normal and abnormal termination of the lock block.

Exercises

To reinforce the material we've covered, perform the following activities:

  1. In your implementation's documentation, carefully read the description of the class Thread.
  2. Modify the example in Listing 2 such that it contains three classes: Point, ManipulateThread, and a main application, where ManipulateThread has two entry-point functions, StartUpMover and StartUpDisplay.
  3. Write a program that has the primary thread create one secondary thread. Every second, the secondary thread gets the current date and time (using type System::DateTime) and displays it on the console. Have the primary thread sleep for some time and, when it wakes up, have it terminate the secondary thread by setting a shared variable (that should be declared volatile) that the secondary thread checks regularly to see if it should shut itself down.

CUJ

Figure 1: Intertwined output of three threads.

Primary thread terminating
t1: i = 0
t1: i = 200000
t1: i = 400000
t1: i = 600000
t2: i = -1000000
t2: i = -800000
t2: i = -600000
t2: i = -400000
t2: i = -200000
t2: i = 0
t2 thread terminating
t1: i = 800000
t1: i = 1000000
t1 thread terminating

Figure 2: (a) Thread output producing a mismatched coordinate pair; (b) matched coordinate pair from a synchronized execution.

(a)
(1878406,1878406)
(2110533,2110533)
(2439367,2439367)
(2790112,2790112)
x: 3137912
y: 3137911 // y is different from x
(3137912,3137911) // y is different from x
(3466456,3466456)
(3798720,3798720)
(5571903,5571902) // y is different from x
(5785646,5785646)
(5785646,5785646)

(b)
(333731,333731)
(397574,397574)
(509857,509857)
(967553,967553)
x: 853896
y: 967553 // y is still different from x
(1619521,1619521)
(1720752,1720752)
(1833313,1833313)
(2973291,2973291)
(3083198,3083198)
(3640996,3640996)

Listing 1

using namespace System;
using namespace System::Threading;

public ref class ThreadX
{
int loopStart;
int loopEnd;
int dispFrequency;
public:
ThreadX(int startValue, int endValue, int frequency)
{
loopStart = startValue;
loopEnd = endValue;
dispFrequency = frequency;
}

/*1*/ void ThreadEntryPoint()
{
/*2*/ String^ threadName = Thread::CurrentThread->Name;

for (int i = loopStart; i <= loopEnd; ++i)
{
if (i % dispFrequency == 0)
{
Console::WriteLine("{0}: i = {1,10}", threadName, i);
}
}
Console::WriteLine("{0} thread terminating", threadName);
}
};

int main()
{
/*3a*/ ThreadX^ o1 = gcnew ThreadX(0, 1000000, 200000);
/*3b*/ Thread^ t1 = gcnew Thread(gcnew ThreadStart(o1, &ThreadX::ThreadEntryPoint));
/*3c*/ t1->Name = "t1";

/*4a*/ ThreadX^ o2 = gcnew ThreadX(-1000000, 0, 200000);
/*4b*/ Thread^ t2 = gcnew Thread(gcnew ThreadStart(o2, &ThreadX::ThreadEntryPoint));
/*4c*/ t2->Name = "t2";

/*5*/ t1->Start();
/*6*/ t2->Start();
Console::WriteLine("Primary thread terminating");
}

Listing 2

using namespace System;
using namespace System::Threading;

public ref class Point
{
int x;
int y;
public:

// define read-write instance properties X and Y

property int X
{
int get() { return x; }
void set(int val) { x = val; }
}

property int Y
{
int get() { return y; }
void set(int val) { y = val; }
}

// ...

void Move(int xor, int yor)
{
/*1a*/ Monitor::Enter(this);
X = xor;
Y = yor;
/*1b*/ Monitor::Exit(this);
}

virtual bool Equals(Object^ obj) override
{

// ...

if (GetType() == obj->GetType())
{
int xCopy1, xCopy2, yCopy1, yCopy2;
Point^ p = static_cast(obj);

/*2a*/ Monitor::Enter(this);
xCopy1 = X;
xCopy2 = p->X;
yCopy1 = Y;
yCopy2 = p->Y;
/*2b*/ Monitor::Exit(this);

return (xCopy1 == xCopy2) && (yCopy1 == yCopy2);
}

return false;
}

virtual int GetHashCode() override
{
int xCopy;
int yCopy;

/*3a*/ Monitor::Enter(this);
xCopy = X;
yCopy = Y;
/*3b*/ Monitor::Exit(this);
return xCopy ^ (yCopy << 1);
}

virtual String^ ToString() override
{
int xCopy;
int yCopy;

/*4a*/ Monitor::Enter(this);
xCopy = X;
yCopy = Y;
/*4b*/ Monitor::Exit(this);

return String::Concat("(", xCopy, ",", yCopy, ")");
}
};

public ref class ThreadY
{
Point^ pnt;
bool mover;
public:
ThreadY(bool isMover, Point^ p)
{
mover = isMover;
pnt = p;
}

void StartUp()
{
if (mover)
{
for (int i = 1; i <= 10000000; ++i)
{
/*1*/ pnt->Move(i, i);
}
}
else
{
for (int i = 1; i <= 10; ++i)
{
/*2*/ Console::WriteLine(pnt); // calls ToString
Thread::Sleep(10);
}
}
}
};

int main()
{
Point^ p = gcnew Point;

/*1*/ ThreadY^ o1 = gcnew ThreadY(true, p);
/*2*/ Thread^ t1 = gcnew Thread(gcnew ThreadStart(o1, &ThreadY::StartUp));

/*3*/ ThreadY^ o2 = gcnew ThreadY(false, p);
/*4*/ Thread^ t2 = gcnew Thread(gcnew ThreadStart(o2, &ThreadY::StartUp));

t1->Start();
t2->Start();

Thread::Sleep(100);
/*5*/ Console::WriteLine("x: {0}", p->X);
/*6*/ Console::WriteLine("y: {0}", p->Y);

/*7*/ t1->Join();
t2->Join();
}

Listing 3

using namespace System;
using namespace System::Threading;

public ref struct ArrayManip
{
static int TotalValues(array^ array)
{
/*1*/ int sum = 0;
/*2*/ Monitor::Enter(array);
{
for (int i = 0; i < array->Length; ++i)
{
sum += array[i];
}
}
Monitor::Exit(array);
return sum;
}

static void SetAllValues(array^ array, int newValue)
{
/*3*/ Monitor::Enter(array);
{
for (int i = 0; i < array->Length; ++i)
{
array[i] = newValue;
}
}
Monitor::Exit(array);
}

static void CopyArrays(array^ array1, array^ array2)
{
/*4*/ Monitor::Enter(array1);
{
/*5*/ Monitor::Enter(array2);
{
Array::Copy(array1, array2,
array1->Length < array2->Length ? array1->Length
: array2->Length);
}
Monitor::Exit(array2);
}
Monitor::Exit(array1);
}
};

Listing 4

using namespace System::Threading;

public ref class C
{
/*1*/ static Object^ Lock = gcnew Object;

public:
static void F1()
{
/*2*/ Monitor::Enter(C::typeid);
/*3*/ try {
// perform some operation(s)
}
finally {
Monitor::Exit(C::typeid);
}
}

static void F2()
{
Monitor::Enter(C::typeid);
// ...
Monitor::Exit(C::typeid);
}

static void F3()
{
/*4*/ Monitor::Enter(Lock);
// ...
Monitor::Exit(Lock);
}

static void F4()
{
Monitor::Enter(Lock);
// ...
Monitor::Exit(Lock);
}
};
// ------------- part 2

C++/CLI supports the ability to create multiple threads of execution within a single program. Last month, we saw how threads are created and synchronized. This month, we'll see how shared variables can be guarded against compromise during concurrent operations, learn about thread-local storage, and we'll look at interlocked operations.

Other Forms of Synchronization

We can control synchronization of threads directly by using a number of functions in classes Monitor and Thread. Listing 1 contains an example.

In case 1, a shared buffer of type MessageBuffer is created. In cases 2a, 2b, and 2c, a thread is created and started such that it processes each message placed in that buffer. Cases 3a, 3b, and 3c create and start a thread that causes a series of five messages to be put into the shared buffer for processing. The two threads are synchronized such that the processor can't process the buffer until the creator has put something there, and the creator can't put another message there until the previous one has been processed. In case 4, we wait until the creator thread has completed its work.

By the time case 5 executes, the processor thread should have processed all of the messages the creator put there, so we tell it to stop work by interrupting it using Thread::Interrupt. We then wait on that thread in case 6 by calling Thread::Join, which allows the calling thread to block itself until some other thread terminates. (Instead of waiting indefinitely, a thread can specify a maximum time that it will wait.)

The CreateMessages thread is quite straightforward. It writes five messages to the shared message buffer, waiting two seconds between each one. To suspend a thread for a given amount of time (in milliseconds), we call Thread::Sleep. A sleeping thread is resumed by the runtime environment rather than by another thread.

The ProcessMessages thread is even simpler because it has the MessageBuffer class do all its work. Class MessageBuffer's functions are synchronized because only one of them at a time can have access to the shared buffer.

The main program starts the processor thread first. As such, that thread starts executing ProcessMessages, which causes the parent object's synchronization lock to be obtained. However, it immediately runs into a call to Wait in case 10, which causes it to wait until it is told to continue; however, it also gives up its hold on the synchronization lock in the meantime, allowing the creator thread to obtain the synchronization lock and to execute SetMessage. Once that function has put the new message in the shared buffer, it calls Pulse in case 8, which allows any one thread waiting on that lock to wake up and resume operation. However, this cannot happen until SetMessage completes execution because it doesn't give up its hold on the lock until that function returns. Once that happens, the processor thread regains the lock, the wait is satisfied, and execution resumes beyond case 10. A thread can wait indefinitely or until a specified amount of time has lapsed. For completeness, the output is shown in Figure 1.

Note carefully that the processor thread was started before the creator thread. If they were started in the opposite order, the first message would be added, yet no processor thread would be waiting, so no processor thread is woken up. By the time the processor thread gets to its first call to Wait, it will have missed the first message and will only be woken up when the second one has been stored.

Managing Threads

By default, a thread is a foreground thread that executes until its entry-point function terminates, regardless of the life span of its parent. On the other hand, a background thread automatically terminates when its parent terminates. We configure a thread as being a background thread by setting Thread's property IsBackground. A background thread can also be made a foreground thread by the same approach.

Once a thread has been started, it is alive. We can test for this by inspecting Thread's property IsAlive. A thread can give up the rest of its CPU time slice by calling Wait with a time of zero milliseconds. A thread can get at its own Thread object via the property CurrentThread::Thread::CurrentThread.

Each thread has a priority level associated with it and this is used by the runtime environment to schedule the execution of threads. A thread's priority can be set or tested via the property Thread::Priority. Priorities range from ThreadPriority::Lowest to ThreadPriority::Highest. By default, a thread has priority ThreadPriority::Normal. Because thread scheduling varies from one implementation to another, we should not rely too heavily on priority levels as a means of controlling threads.

Volatile Fields

The type qualifier volatile tells the compiler that no single thread controls all aspects of the object to which it is applied; specifically, one or more other threads might be reading from and/or writing to this variable asynchronously. Essentially, this qualifier forces the compiler to be less aggressive when performing optimization.

Consider the code fragment in Listing 2. In the absence of volatile, case 1 could safely be ignored because we immediately overwrite the value of i in case 2; however, given the volatile qualifier, the compiler must perform both store operations.

In case 3, the compiler must generate code to fetch the value of i twice; however, its value might change between fetches. To make sure we are testing the same value, we have to write something like case 4 instead. By storing a snapshot of i in the nonvolatile variable copy, we can safely use the value of copy multiple times, knowing that its value cannot be changing "behind the scenes." By using volatile, we can avoid explicit synchronization for certain kinds of variable access.

Thread-Local Storage

When writing a multithreaded application, it can be useful to have variables that are specific to a particular thread. For example, consider the program in Listing 3.

m1 is an instance field, so each instance of type ThreadX has its own copy, and that exists for the life of its parent object. On the other hand, m2 is a class field, so there is only one occurrence of it for the class, regardless of the number of instances of that class. In theory, this field exists until the application terminates. Neither of these fields is specific to a thread. With the appropriate constructs, both kinds of fields can be accessed by multiple threads.

Simply stated, thread-local storage is memory that is owned by a particular thread, and that memory is allocated when a new thread is created, and deallocated when that thread terminates. It combines the privacy of local variables with the persistence of static variables. A field is marked as being thread-local by attaching to it the attribute ThreadStatic, as shown in case 3 of Listing 3. Being a static field, m3 can have an initializer.

Function TMain is the entry point for new threads. This function simply increments the three fields m1, m2, and m3, five times each, and prints their current value. The lock block in case 4 makes sure that no other thread can concurrently access these fields while their values are being incremented or printed.

The primary thread sets its own name to t0 in case 5, and then creates and starts two threads. It also calls TMain directly, as a regular function rather than as part of thread creation and startup. One example of the output that can result is shown in Figure 2. (The only difference between the possible outputs is the order in which the threads do their incrementing and printing.)

Each of the three threads has its own instance of m1, which is initialized to 10, so it is no surprise that each has the value 15 after being incremented five times. In the case of m2, all three threads share the same variable, so that one variable is incremented 15 times.

The threads t1 and t2 go through the thread-creation process, each getting its own version of m3. However, these thread-local variables take on their default value zero, rather than the initializer 30 shown in the source code. Beware! After being incremented five times, each has the value 5. Thread t0 exhibits different behavior. As we can see, this thread was not created by the same machinery as the other two threads. As a result, its m3 does take on the explicit initial value, 30. Also note that in case 6, TMain is being called as a regular function, not as part of the creation of a new thread.

Atomicity and Interlocked Operations

Consider the following scenario: An application has multiple threads executing in parallel, with each thread having write access to some shared integer variable. Each thread simply increments that variable by 1, using ++value. That looks harmless enough; after all, this looks like an atomic operation, and on many systems, it is—at least from the point of view of a machine instruction. However, C++/CLI's execution environment does not universally guarantee this for all integer types.

To demonstrate this, the program in Listing 4 has three threads, each concurrently incrementing a shared 64-bit integer variable 10 million times. It then displays that variable's final value, which, in theory, should be 30 million. The resulting application can be run in one of two modes: the default mode is unsynchronized and uses the ++ operator; the alternate mode, indicated by using a command-line argument of Y or y, uses a synchronized library increment function instead.

When the standard ++ operator is used, five consecutive executions of the application resulted in the output shown in Figure 3. As we can see, the reported total falls far short of the correct answer. Simply stated, between 17 and 50 percent of the increments went unreported. When the same program was run in synchronized mode—that is, using Interlocked's Increment instead, all 30 million increments are done and reported correctly.

Class Interlocked also has a Decrement function.

Exercises

To reinforce the material we've covered, perform the following activities:

  1. In Listing 4, change the type of the shared variable, value, and run the application with and without synchronization.
  2. In your implementation's documentation, carefully read the description of the Increment and Decrement functions in class Interlocked. Note that there are two sets, one for int and one for long long. Note also that there is no support for arguments of type unsigned int or unsigned long long.
  3. Carefully read the description of the other functions in Interlocked, especially Add, Exchange, and CompareExchange.
  4. The class Queue in Listing 5 (available online at http://www.cuj .com/code/) implements a queue of strings. Modify this class so that it is thread safe; that is, provide support for multiple threads adding and/or removing nodes from the same queue at the same time. The class has two public functions:
  5.      void AddNode(String^ s);
    String^ RemoveNode();

  6. RemoveNode must not return to its caller until it has something to return; that is, it must wait indefinitely, if necessary, for a node to be added.
  7. In some other assembly, write a Main that creates three threads that each adds some fixed number of nodes, and one thread that removes nodes, all running asynchronously. Once all the adder threads have finished, have the main thread add one last string with the value "END" and wait for the remover thread to shut itself down, which it does when it sees this node. Hint: You will need to use the Wait, Pulse, and Join functions, and you might find it useful to use Sleep as well, to stagger the adder threads' actions.

CUJ

Figure 1: Output of Listing 1.

Set new message M-1
Processed new message M-1
Set new message M-2
Processed new message M-2
Set new message M-3
Processed new message M-3
Set new message M-4
Processed new message M-4
Set new message M-5
Processed new message M-5
CreateMessages thread terminating
ProcessMessage interrupted
ProcessMessages thread terminating
Primary thread terminating

Figure 2: One possible output of Listing 3.

Thread t0: m1 = 15, m2 = 25, m3 = 35
Thread t1: m1 = 15, m2 = 30, m3 = 5
Thread t2: m1 = 15, m2 = 35, m3 = 5

Figure 3: Output of Listing 4.

Output using the ++ operator

After 30000000 operations, value = 14323443
After 30000000 operations, value = 24521969
After 30000000 operations, value = 20000000
After 30000000 operations, value = 24245882
After 30000000 operations, value = 25404963


Output using Interlocked's Increment

After 30000000 operations, value = 30000000

Listing 1

using namespace System;
using namespace System::Threading;

int main()
{
/*1*/ MessageBuffer^ m = gcnew MessageBuffer;

/*2a*/ ProcessMessages^ pm = gcnew ProcessMessages(m);
/*2b*/ Thread^ pmt = gcnew Thread(gcnew ThreadStart(pm,
&ProcessMessages::ProcessMessagesEntryPoint));
/*2c*/ pmt->Start();

/*3a*/ CreateMessages^ cm = gcnew CreateMessages(m);
/*3b*/ Thread^ cmt = gcnew Thread(gcnew ThreadStart(cm,
&CreateMessages::CreateMessagesEntryPoint));
/*3c*/ cmt->Start();

/*4*/ cmt->Join();
/*5*/ pmt->Interrupt();
/*6*/ pmt->Join();

Console::WriteLine("Primary thread terminating");
}

public ref class MessageBuffer
{
String^ messageText;
public:
void SetMessage(String^ s)
{
/*7*/ Monitor::Enter(this);
messageText = s;
/*8*/ Monitor::Pulse(this);
Console::WriteLine("Set new message {0}", messageText);
Monitor::Exit(this);
}

void ProcessMessages()
{
/*9*/ Monitor::Enter(this);
while (true)
{
try
{
/*10*/ Monitor::Wait(this);
}
catch (ThreadInterruptedException^ e)
{
Console::WriteLine("ProcessMessage interrupted");
return;
}

Console::WriteLine("Processed new message {0}", messageText);
}
Monitor::Exit(this);
}
};

public ref class CreateMessages
{
MessageBuffer^ msg;
public:
CreateMessages(MessageBuffer^ m)
{
msg = m;
}

void CreateMessagesEntryPoint()
{
for (int i = 1; i <= 5; ++i)
{
msg->SetMessage(String::Concat("M-", i.ToString()));
Thread::Sleep(2000);
}
Console::WriteLine("CreateMessages thread terminating");
}
};

public ref class ProcessMessages
{
MessageBuffer^ msg;
public:
ProcessMessages(MessageBuffer^ m)
{
msg = m;
}

void ProcessMessagesEntryPoint()
{
msg->ProcessMessages();
Console::WriteLine("ProcessMessages thread terminating");
}
};

Listing 2

    volatile int i = 0;
/*1*/ i = 10;
/*2*/ i = 20;
/*3*/ if (i < 5 || i > 10) {
// ...
}

int copy = i;
/*4*/ if (copy < 5 || copy > 10) {
// ...
}

Listing 3

using namespace System;
using namespace System::Threading;

public ref class ThreadX
{
/*1*/ int m1;
/*2*/ static int m2 = 20;
/*3*/ [ThreadStatic] static int m3 = 30;

public:
ThreadX()
{
m1 = 10;
}

void TMain()
{
String^ threadName = Thread::CurrentThread->Name;

/*4*/ Monitor::Enter(ThreadX::typeid);
for (int i = 1; i <= 5; ++i)
{
++m1;
++m2;
++m3;
}
Console::WriteLine("Thread {0}: m1 = {1}, m2 = {2}, m3 = {3}",
threadName, m1, m2, m3);
Monitor::Exit(ThreadX::typeid);
}
};

int main()
{
/*5*/ Thread::CurrentThread->Name = "t0";

ThreadX^ o1 = gcnew ThreadX;
Thread^ t1 = gcnew Thread(gcnew ThreadStart(o1, &ThreadX::TMain));
t1->Name = "t1";

ThreadX^ o2 = gcnew ThreadX;
Thread^ t2 = gcnew Thread(gcnew ThreadStart(o2, &ThreadX::TMain));
t2->Name = "t2";

t1->Start();
/*6*/ (gcnew ThreadX)->TMain();
t2->Start();
t1->Join();
t2->Join();
}

Listing 4

using namespace System;
using namespace System::Threading;

static bool interlocked = false;
const int maxCount = 10000000;
/*1*/ static long long value = 0;

void TMain()
{
if (interlocked)
{
for (int i = 1; i <= maxCount; ++i)
{
/*2*/ Interlocked::Increment(value);
}
}
else
{
for (int i = 1; i <= maxCount; ++i)
{
/*3*/ ++value;
}
}
}

int main(array^ argv)
{
if (argv->Length == 1)
{
if (argv[0]->Equals("Y") || argv[0]->Equals("y"))
{
interlocked = true;
}
}

/*4*/ Thread^ t1 = gcnew Thread(gcnew ThreadStart(&TMain));
Thread^ t2 = gcnew Thread(gcnew ThreadStart(&TMain));
Thread^ t3 = gcnew Thread(gcnew ThreadStart(&TMain));

t1->Start();
t2->Start();
t3->Start();
t1->Join();
t2->Join();
t3->Join();

Console::WriteLine("After {0} operations, value = {1}", 3 * maxCount, value);
}

阅读(2086) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~