C# - ThreadPooled

22. October 2011 10:34

 

This is another example in a series of posts for working with thread in c#. In this example it shows how to create a custom thread pool. If you know anything about working with threads in c# you will know that it already has a thread pool. However the existing thread pool is typically in flexible when attempting to write more complex systems.

 

The existing has support for a single queue which is executed on a first in first out bases (fifo). This has a poor limit as it make it impossible to balance the load between tasks. An example would be backgrounding a lot of releatively unimportant taks then not being able to execute a high priority task as quickly as possible in the standard thread pool. It would also not permit the programmer to be able to balance task's. In that case of having a maximum number of threads (eg 5) for task X and a larger number of tasks (eg 20) for task Y.

 

Another approach to solving the above problem would be to use a priority queue. I will write about these in some later posts.

 

I did a previous example on how to create a basic thread queue. I would suggest that you read and understand it before trying to understand the following. Of course the basic queue will only execute one background task at a time. this example will execute many concurrent items at a time.

 

The basic variables we have in our class are almost the same as the basic queue. The only difference being that we needto clean up older threads when they are not being used. So I added a variable LastKilled to mark the time a spare thread exited because it didn't have any work. So we have the current variables.

 

 

private Queue<IRunnable> Q = new Queue<IRunnable>();
private DateTime LastKilled = DateTime.UtcNow;

 

Since we are a queue we need to be able to accept new items onto the thread. So we have almost the same Add method to add task's to our queue. The major difference being here is that if we do not have a spare thread or any active thread we will start a new thread so long as we are below our maximum number of permitted threads.

 

 

public void Add(IRunnable obj)
{
	lock (Q)
	{
		if (Q.Count > MaxItems)
			throw (new Exception("Too Many Items have been queued"));

		Q.Enqueue(obj);

		/* Mayby Start a new thread? */
		if (CurrentRunning >= CurrentThreads && CurrentThreads < MaxThreads)
		{
			ThreadSimpleRun tmp = new ThreadSimpleRun(this);
			tmp.Start();
			_CurrentThreads++;
		}

		Monitor.Pulse(Q);
	}
}

 

The part of code that changes significatly compared to the basic queue example is how item's are dequeued. Every time we start a new thread it will be executing on the following function. The Run function is used to take an item off the queue and execute it in the background. When there is no work to be done it will also sleep until a task is added to the queue and it is told to "wakeup" from the Add method above. At the same time the thread will sleep for a maximum time then "wakeup" if a significant period of time has passed with no work then it will exit to save on resources.

 

 

public void Run()
{
	while (true)
	{
		try
		{
			IRunnable item = null;

			lock (Q)
			{
				if (Q.Count == 0)
					Monitor.Wait(Q, ShrinkTime);

				if (Q.Count > 0)
				{
					item = Q.Dequeue();
					_CurrentRunning++;
				}
				else
				{
					if (LastKilled.AddMilliseconds(ShrinkTime) > DateTime.UtcNow)
					{
						_CurrentThreads--;
						return;
					}
				}
			}

			if (item == null)
				continue;	//Restart the loop

			item.Run();

			lock (Q)
			{
				_CurrentRunning--;
			}
		}
		catch (Exception ex)
		{
			throw (ex);
		}
	}
}

 

It is probably worth pointing out that we keep a count on the numbers of current executing threads and the total number of background threads. This is so the add function can decide when to create a new thread.

There is also an issue of error handling to be addressed. In this case we leave the error handling to be decided on in the "task". If an exception is passed back into the run function the exepected functionality is to crash. It is left in this way as only the queued tasks will know how to deal with their own error's.

I have left a small deliberate mistake in the code above. It will still function correctly but there is still a slight room for improvment. I wonder if anyone would be able to spot it?

 

Here is a copy of the complete class. Now you can create as many thread pools in c# as you require.

 

 

public class ThreadPooled : IRunnable
{
	private Queue<IRunnable> Q = new Queue<IRunnable>();
	private DateTime LastKilled = DateTime.UtcNow;

	public ThreadPooled()
	{

	}

	public void Add(IRunnable obj)
	{
		lock (Q)
		{
			if (Q.Count > MaxItems)
				throw (new Exception("Too Many Items have been queued"));

			Q.Enqueue(obj);

			/* Mayby Start a new thread? */
			if (CurrentRunning >= CurrentThreads && CurrentThreads < MaxThreads)
			{
				ThreadSimpleRun tmp = new ThreadSimpleRun(this);
				tmp.Start();
				_CurrentThreads++;
			}

			Monitor.Pulse(Q);
		}
	}

	public void Run()
	{
		while (true)
		{
			try
			{
				IRunnable item = null;

				lock (Q)
				{
					if (Q.Count == 0)
						Monitor.Wait(Q, ShrinkTime);

					if (Q.Count > 0)
					{
						item = Q.Dequeue();
						_CurrentRunning++;
					}
					else
					{
						if (LastKilled.AddMilliseconds(ShrinkTime) > DateTime.UtcNow)
						{
							_CurrentThreads--;
							return;
						}
					}
				}

				if (item == null)
					continue;	//Restart the loop

				item.Run();

				lock (Q)
				{
					_CurrentRunning--;
				}
			}
			catch (Exception ex)
			{
				throw (ex);
			}
		}
	}

	public int QueueLength
	{
		get
		{
			lock (Q)
			{
				return Q.Count;
			}
		}
	}

	private int _MaxItems = int.MaxValue;
	public int MaxItems
	{
		get
		{
			return _MaxItems;
		}
		set
		{
			lock (Q)
			{
				if (QueueLength > _MaxItems)
					throw (new Exception("Cannot shrink queue because it already contains more than MaxItems"));
				_MaxItems = value;
			}
		}
	}

	private int _MinThreads = 0;
	public int MinThreads
	{
		get
		{
			return _MinThreads;
		}
		set
		{
			_MinThreads = value;
		}
	}

	private int _CurrentRunning = 0;
	public int CurrentRunning
	{
		get
		{
			return _CurrentRunning;
		}
		set
		{
			_CurrentRunning = value;
		}
	}

	private int _CurrentThreads = 0;
	public int CurrentThreads
	{
		get
		{
			return _CurrentThreads;
		}
	}

	private int _MaxThreads = int.MaxValue;
	public int MaxThreads
	{
		get
		{
			return _MaxThreads;
		}
		set
		{
			_MaxThreads = value;
		}
	}

	private int _ShrinkTime = 60000;
	public int ShrinkTime
	{
		get
		{
			return _ShrinkTime;
		}
		set
		{
			_ShrinkTime = value;
		}
	}
}

 

E-mail Kick it! DZone it! del.icio.us Permalink


Pingbacks and trackbacks (2)+