Link to home
Start Free TrialLog in
Avatar of baijajusav
baijajusav

asked on

C# Using threads and semaphores and need help to see if code is blocking

C# ASP .NET 2.0 Ajax Web Application

I have a web application that is responsible for queuing a set of tasks to be performed based on a task id. The way the system is setup is there is another web application (java) that will connect to our mainframe and get necessary information based on a task id. The web page simply loads with a phrase that says whether the task completed or did not complete.

So, in essence, my web application just submits a series of calls to another web application and uses the WebClient class to download the string of the page's results.

Here's where things get tricky. The web application I am calling via the WebClient class can only have 3 active task requests to the mainframe at any given time and it does not handle that implementation itself.

The code below takes a series of task ids. I then create a thread array and limit the max # of concurrent requests to the web application via the use of a semaphore. My worry is that something is blocking all other threads somewhere. I believe if any line of code is blocking it is this one (as denoted by the comment):

               String result = client.DownloadString(uri);

where client is my WebClient object.

I tried using the Asynchronous version of this, but my issue is the web page loads all the way and I have no way of getting results back to the page that I know of. Currently the main page that calls this code is in a while loop that checks the ThreadsAlive property and as long as one of the created threads is alive, the main thread sleeps.
using System;
using System.Data;
using System.Configuration;
using System.Web;
using System.Web.Security;
using System.Web.UI;
using System.Web.UI.WebControls;
using System.Web.UI.WebControls.WebParts;
using System.Web.UI.HtmlControls;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Net;
using System.IO;
 
 
public class DoTask {
 
     //Used for object locking for log file
     static Object writeLogLock = new Object();
 
     //Used for object locking for task/thread status
     static Object statusLogLock = new Object();
 
     //Don't think this is used...
     static Object threadCountLock = new Object();
 
     //Maximum # of concurrent threads
     static int maxThreads = 3;
     
     //Controls max # of concurrent threads
     private static Semaphore semaphore ;
 
     private DoTask() {
     }
 
     public DoTask(List<String> pIds) {
 
          if (DoTask.status != null)
               DoTask.status.Clear();
 
          this.ids = pIds;
 
     }
 
     //List that contains the results of each task processed.
     public static List<String> status = new List<string>();
 
     //Called _rope b/c when dealing w/ threads you get plenty to hang yourself with.
     private Thread[] _rope;
 
     //Property so that calling code can know if threads are still being processed
     public Boolean ThreadsAlive {
          get {
               if (_rope != null) {
                    int cnt = _rope.Length;
                    for (int i = 0; i < cnt; i++) {
                         if (_rope[i].IsAlive) {
                              return true;
                         }
                    }
               }
               return false;
          }
     }
 
     private List<String> _ids;
 
 
     public void RunRev() {
          /* hold all available semaphore slots till threads are created */
          semaphore = new Semaphore(0, maxThreads);
          int c = ids.Count;
          this._rope = new Thread[c];
          writeLog("   /*************************BEGIN ENTRY************* ");
          for (int i = 0; i < c; i++) {
               String s = ids[i];
               _rope[i] = new Thread(new ParameterizedThreadStart(doTask));
               _rope[i].Start(s);
          }
          /* make maxThreads (3) number of threads available */
          semaphore.Release(maxThreads);
     }
 
     //Keep track of results so calling code can know what ids failed and what ids succeeded
     public static void writeStatus(String logMessage) {
          lock (statusLogLock) {
               status.Add(logMessage);
          }
     }
 
     //Writes message to a log file
     public static void writeLog(String logMessage) {
          lock (writeLogLock) {
               String logFile = "C:\\log.txt";
               using (StreamWriter logEntry = new StreamWriter(logFile, true)) {
                    logEntry.WriteLine(DateTime.Now.ToString() + " " + logMessage);
                    logEntry.Close();
               }
          }
     }
 
     /*
      * Note that semaphore.WaitOne(); will force the thread to wait until there is a freed thread
      *  semaphore.Release(1); releases a spot in the allocated threads from the semaphore
      */
     public void doTask(Object pId) {
          WebClient client = null;
          String id = (String)pId;
          String err = "";
          String uri = null;
          try {
 
		//Force waiting for free/open thread slot in semaphore
               semaphore.WaitOne();
               uri = "http://retrieveDataPage?" + "id=" + id ;
              
 
               client = new WebClient();
               //client.Headers.Add("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.2; .NET CLR 1.0.3705;)");
               client.Headers.Add("Content-Type", "application/x-www-form-urlencoded");
 
		//Is this blocking?????
               String result = client.DownloadString(uri);
               
               if (result != null && result.Contains("completed"))
               {
                    writeStatus(String.Format("SUCCESS: {0} ", result));
                    writeLog(String.Format("SUCCESS: {0} ", result));
               }
               else
               {
                    writeStatus(String.Format("FAILED: {0} ", result));
                    writeLog(String.Format("FAILED: {0} ", result));
               }      
               
               //Asynchronous testing
               //client.DownloadStringAsync(siteUri);
               //Uri siteUri = new Uri(uri);               
               //client.DownloadStringCompleted += Test;
           
          } catch (Exception e) {
               err = e.Message;
               writeStatus(String.Format("ERROR: {0} ", id));
               writeLog(String.Format("ERROR: {0} ", id));
               writeLog("ERROR: " + e.Message + " FOR ID: " + id + "\n" + "STACK TRACE: " + e.StackTrace);
          } finally {
               semaphore.Release(1);
          }
     }
 
     private void Test(Object sender, DownloadStringCompletedEventArgs args) 
     {
          String result = args.Result;
 
          if (result != null && result.Contains("completed"))
          {
               writeStatus(String.Format("SUCCESS: {0} ", result));
               writeLog(String.Format("SUCCESS: {0} ", result));
          }
          else
          {
               writeStatus(String.Format("FAILED: {0} ", result));
               writeLog(String.Format("FAILED: {0} ", result));
          }          
     }
 
}

Open in new window

Avatar of aibusinesssolutions
aibusinesssolutions
Flag of United States of America image

Not tested, but have you tried setting Response.BufferOuput = True, and then run the Asynchronous version?  In theory it should not display anything on the page until all processes are complete.  I'm not quite sure though as I've never tried to multi-thread a web page.
And if that doesn't work, you could try using the HttpWebResponse, which offers a timeout, so you won't have to worry about the DownloadString getting stuck.
HttpWebRequest wr = (HttpWebRequest)WebRequest.Create(strLink);
// Timeout in milliseconds
wr.Timeout = 20000;
 
HttpWebResponse webResponse = (HttpWebResponse)wr.GetResponse();
 
Stream strm = webResponse.GetResponseStream();

Open in new window

Avatar of karakav
karakav

I had a similar problem and I found out that for my case, since I didn't know aproximatively how long will last each task, calling thread.release(maxnumber) is not a good idea. I suggest to change the code this way:

     public void RunRev() {
          /* hold all available semaphore slots till threads are created */
          semaphore = new Semaphore(maxThreads, maxThreads);
          int c = ids.Count;
          this._rope = new Thread[c];
          writeLog("   /*************************BEGIN ENTRY************* ");
          for (int i = 0; i < c-1; i++) {
               String s = ids[i];
               _rope[i] = new Thread(new ParameterizedThreadStart(doTask));
               _rope[i].Start(s);
          }
          /* make maxThreads (3) number of threads available */
          /*semaphore.Release(maxThreads);*/
     }

But changing the constructor of the semaphore that way, you allow all the slots to be opened. Later they will be released one by one in the doTask method. I also changed you loop but I am not sure if I am write for the latter.

Hope it helps.
Avatar of baijajusav

ASKER

Well, the idea behind that code is that each thread is started and halted at the semaphore.WaitOne(). Once all threads have been started 3 get released. Still, I'll give it a try your way on monday. Thanks!
The problem in your approach is that if you call release(3) while there is at least one busy slot in the semaphore, you will get an exception as you will be attempting to open more that allowed slots.

I've modified the code again. See the snippet.

Karakav: I see what you're pointing out and admit it's a possible issue; however, if I start the semaphore with 3 open slots, will threads continue to be generated in a concurrent fashion? I might not be explaining myself very well here and it's probably due to a lack of knowledge on threads.

I modified the code to try and keep only 3 threads running at a given time. I had a test run where I had 48 id's and it seemed unwise to allow the creation of 48 separate threads. It's not game programming after all...

See the snippet below and tell me what you think. If you see any new or old issues, please advise. I'm pretty green on the whole thread management stuff.
public void RunRev() {
          /* hold all available semaphore slots till threads are created */
          semaphore = new Semaphore(maxThreads, maxThreads);
          int c = ids.Count;
          this._rope = new Thread[c];
          writeLog("   /*************************BEGIN ENTRY************* ");
          for (int i = 0; i < c-1; i++) {
               String s = ids[i];
               _rope[i] = new Thread(new ParameterizedThreadStart(doTask));
              _rope[i].Name = s;
              _rope[i].Start(s);
               //Attempts to limit # of total threads running at a time
               if((i + 1) == maxThreads)
                  semaphore.Release(maxThreads);
               
               //do I need the following to force the thread creation
               //loop to halt until a semaphore slot is open?
               //semaphore.waitOne();
               //semaphore.Release(1);
          }
          /* make maxThreads (3) number of threads available */
          /*semaphore.Release(maxThreads);*/
     }

Open in new window

Sorry...Error in above code. Use this:
public void RunRev() {
          /* hold all available semaphore slots till threads are created */
          semaphore = new Semaphore(0, maxThreads);
          int c = ids.Count;
          this._rope = new Thread[c];
          writeLog("   /*************************BEGIN ENTRY************* ");
          for (int i = 0; i < c-1; i++) {
               String s = ids[i];
               _rope[i] = new Thread(new ParameterizedThreadStart(doTask));
              _rope[i].Name = s;
              _rope[i].Start(s);
               //Attempts to limit # of total threads running at a time
               if((i + 1) == maxThreads)
                  semaphore.Release(maxThreads);
               
               //do I need the following to force the thread creation
               //loop to halt until a semaphore slot is open?
               //semaphore.waitOne();
               //semaphore.Release(1);
          }
          /* make maxThreads (3) number of threads available */
          /*semaphore.Release(maxThreads);*/
     }

Open in new window

ASKER CERTIFIED SOLUTION
Avatar of karakav
karakav

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
Thanks for your help!