First steps in the asynchronous world
A while ago, a friend of mine blogged on commands and command dispatchers. After a "sure, why not" phase, I decided to give it a try for implementing a small application.
Its purpose is pretty simple :
- browse an ftp directory
- get some information from the remote files
- use them to download other files by http
I could simply implement a foreach loop to enumerate the files by ftp, read my informations and then download the file. I have a fine control of the execution flow, but, do I really need it? Basically, I just want the job done.
The jobs and the job dispatcher
The jobs are nothing more than commands as described in the GoF's Design Patterns, i.e. objects encapsulating a request. I simply changed the name from command to job because I have the feeling commands are too often considered as the result of a user interaction, which is not really the case here. What is important here is that I make no asumptions on when and where the jobs will be executed. Remember, I just want the job done.
In order to do that, I need another object. As Marc mentioned, the intent of the JobDisptacher is to abstract the context of execution of the Jobs. You could implement it synchronously, using the thread pool, running in a BackgroundWorker...
If it was obvious for Marc and I that the IJob should be an interface, we had a little discussion about the JobDispatcher. Eventhough we still have no idea on how to extent it, we decided that an object is-a dispatcher, thus the JobDispatcher is an abstract class, while another object can-do a job.
Now that I have the basic concept, let's prepare the basic application.
A program as a component
One thing I really like in .Net is the Component model and the design time support. Therefore I often have the Program class inherit from Component. Afterwards, it is pretty straigthforward to include a BackgroundWorker, run it asynchronously and wait for its completion, while handling the Ctrl^C.
class Program : Component
{
static void Main(string[] args) {
(new Program()).Run(args);
}
public Program() {
InitializeComponent()
}
...
public void Run(string[] args) {
AutoResetEvent waitHandle = new AutoResetEvent(false);
Console.CancelKeyPress += delegate(object sender, ConsoleCancelEventArgs e) {
// first chance to terminate 'nicely',
// next attempt will terminate the application
if (!backgroundWorker.CancellationPending) {
backgroundWorker.CancelAsync();
e.Cancel = true;
}
};
backgroundWorker.RunWorkerCompleted +=
delegate(object sender, RunWorkerCompletedEventArgs e) {
waitHandle.Set();
};
backgroundWorker.RunWorkerAsync();
waitHandle.WaitOne();
}
}
Using the BackgroundWorker to implement the JobDispatcher
Normally, I would add a handler to the DoWork event of the BackgroundWorker to perform the required task. So this is how I will implement the dispatcher. Dispatch will enqueue the jobs and the DoWork will dequeue the jobs and execute them:
public class BackgroundWorkerDispatcher : JobDispatcher
{
Queue queue;
BackgroundWorker backgroundWorker;
public BackgroundWorkerDispatcher(BackgroundWorker backgroundWorker) {
queue = new Queue();
backgroundWorker = backgroundWorker;
backgroundWorker.DoWork +=
new System.ComponentModel.DoWorkEventHandler(this.backgroundWorker_DoWork);
}
private void backgroundWorker_DoWork(object sender, DoWorkEventArgs e) {
while (queue.Count > 0 && !backgroundWorker.CancellationPending) {
IJob job = queue.Dequeue();
job.Execute(this);
}
if (backgroundWorker.CancellationPending) {
queue.Clear();
}
}
#region JobDispatcher Membres
public override void Dispatch(IJob job) {
queue.Enqueue(job);
}
#endregion
}
I used the template method pattern so I could define BeforeExecute/AfterExecute method, or handle/log the exceptions whenever I need to.
public class EnumerateCategoriesJob : JobBase
{
...
protected override void DoExecute() {
FtpWebRequest request = (FtpWebRequest)WebRequest.Create(this.ftpUri);
request.Method = WebRequestMethods.Ftp.ListDirectory;
FtpWebResponse response = (FtpWebResponse)request.GetResponse();
using (StreamReader reader = new StreamReader(response.GetResponseStream())) {
while (!reader.EndOfStream) {
string line = reader.ReadLine().Trim();
if (!String.IsNullOrEmpty(line)) {
Uri uri = new Uri(this.ftpUri, line);
IJob job = new EnumerateProductsJob(this.dispatcher, uri);
this.dispatcher.Dispatch(job);
}
}
}
}
}
public class EnumerateProductsJob : JobBase
{
...
protected override void DoExecute() {
Settings settings = Settings.Default;
FtpWebRequest request = (FtpWebRequest)WebRequest.Create(this.ftpUri);
request.Method = WebRequestMethods.Ftp.ListDirectory;
FtpWebResponse response = (FtpWebResponse)request.GetResponse();
using (StreamReader reader = new StreamReader(response.GetResponseStream())) {
while (!reader.EndOfStream) {
string line = reader.ReadLine().Trim();
if (!String.IsNullOrEmpty(line) && line.EndsWith(".xml")) {
...
}
}
}
}
}
Putting it all together
Finally, I just have to create the dispatcher in the program and Dispatch the first EnumerateCategoriesJob:
JobDispatcher dispatcher = new BackgroundWorkerDispatcher(backgroundWorker);
dispatcher.Dispatch(new EnumerateCategoriesJob(...));
backgroundWorker.RunWorkerAsync();
Once all the enqueued job will be completed, the program will terminate.
Conclusion
I took me only a couple of hours to have all this working. Sure, I could have done this in less time if implemented it using the foreach loop, but:
- adding more jobs would be more and more difficult, as the complexity would rapidly increase, with the obvious risks on the stability of the application;
- the cancellable feature would be more difficult to implement, having to test CancellationPending in several places;
- it would be difficult to improve the overall performance of the application by using multiple threads without a major refactoring, while now I can do that simply by replacing the dispatcher;
- even if we can still replace the console by a form with a progress bar, it would be more difficult to display detailled informations about what is happening.
I used the BackgroundWorker, ThreadPool and BeginInvoke/EndInvoke methods in the past, but it was mostly to keep the UI reactive while doing long computations. So, in a way, the user was still stuck in synchronous mode.
With the job and dispatcher model, it is easier to implement concise task, thus making them re-usable, and the user could have better control on what is happening, by removing unnecessary job before they ran.