Solved

Multiple threads processing a ConcurrentQueue

Posted on 2010-08-17
2
1,296 Views
Last Modified: 2013-11-13
Bit of background, we have a sockets server currently supporting around 500 concurrent users, these users are running lots of different actions all at the same time, thus the sockets server is under a lot of stress.

What we're looking at doing is placing each message that is received from the clients into a managed ConcurrentQueue (.NET 4.0). We've successfully put these into the queue and we can dequeue them on a seperate thread, this works up to a point.

What we'd ideally like is for these messages to be processed asynconously by multiple threads at the same time, but we don't want to have these threads waiting in a while loop because the CPU goes through the roof. Ideally we'd like the queue to be observable and for a thread to kick in and process.

To give some perspective, we probably get 10 requests per second so we can't afford for the queue to be blocking.

What direction should we be heading in with this.
0
Comment
Question by:Type25
[X]
Welcome to Experts Exchange

Add your voice to the tech community where 5M+ people just like you are talking about what matters.

  • Help others & share knowledge
  • Earn cash & points
  • Learn & ask questions
2 Comments
 
LVL 3

Expert Comment

by:SpicyFrog
ID: 33458775
Have you looked into using a thread pool?  Sounds like the ideal application.

http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx
0
 
LVL 2

Accepted Solution

by:
SandyAgo earned 500 total points
ID: 33464948
What you need to do is make the Message Queue block using a threading monitor if there are no message on the queue. Here is an example:


public class MsgQueue {

        protected bool blocked = true; // set false when the queue server is running
        protected int limit;
        protected int length;
        protected int maximum;
        protected int failedPutQ1;
        protected int failedPutQ2;
        protected String name;
        protected QueueableObject first;
        protected QueueableObject last;

        public MsgQueue (int limit, String name) {
            if (limit < 500) limit = 500;
	        this.limit = limit;
	        this.name = name;
        }

	        /**
	         * Tests whether there are more messages on this queue. Note that this
	         * method is not synchronized so the caller must be synchronized with all
	         * other users of this queue.
	         * 
	         * @return
	         */
	        public boolean hasMore() {
		        return first != null;
	        }
        	
            /**
             * Return a queued object or wait if none are on the queue. Return null if
             * interrupted.
             */
            [MethodImpl(MethodImplOptions.Synchronized)]
            public QueueableObject getQ() {
                QueueableObject s = null;
                while (s == null) {
                    if (first == null) {
                        Monitor.Wait(this);
                    }
                    s = first;
                    if (s != null) { // s might be null if we got an interrupt
                        s.onQueue = false;
                        first = s.next;
                        length--;
                        if (first == null) {
                            last = null;
                        }
                    }
                } // end while
                return s;
            }

            /**
             * Return a queued object or null if none are on the queue
             */
            [MethodImpl(MethodImplOptions.Synchronized)]
            public QueueableObject getNoWait() {
                QueueableObject s = null;
                s = first;
                if (s != null) {
                    s.onQueue = false;
                    first = s.next;
                    length--;
                    if (first == null) {
                        last = null;
                    }
                }
                return s;
            }

        /**
	        Insert the object onto the end of the queue.  If the queue is blocked
	        or full, return the object.  Otherwise, return null indicating success.
        */
        [MethodImpl(MethodImplOptions.Synchronized)]
        private QueueableObject putQSync (QueueableObject s) {
	        QueueableObject rtn = null;
	        if (s.onQueue) {
                String currentThread = Thread.CurrentThread.Name;
                String detail = "Queue put while still onQueue, id = " + s.ident
                    + ", Queue Len = " + getLength() + ", Queue Max = "
                    + getMaximum() + (blocked ? ", Blocked" : ", Not Blocked")
                    + "\nprevious putting thread: " + s.queuingThread
                    + ", previously put to queue " + s.queueName
                    + "\ncurrent thread: " + currentThread + ", current queue: "
                    + name;
                throw new Exception(detail);
	        }
	        if (blocked) {
		        rtn = s; // indicate couldn't queue Msg
	        }
	        else if (++length > limit) {
                length--;
                rtn = s;
            }
            else {
                if (length > maximum) {
                    maximum = length;
                }
		        if (first == null) {
			        first = last = s;
		        }
		        else {
			        last.next = s;
			        last = s;
		        }
		        s.next = null;
		        s.onQueue = true;
                s.queuingThread = Thread.CurrentThread.Name;
                s.queueName = this.name;
                Monitor.PulseAll(this);
	        }

	        return rtn; // indicate whether QueueableObject successfully queued
        }

        public QueueableObject putQ (QueueableObject s) {
            QueueableObject rtn = putQSync (s);
            if (rtn != null){
               lock (typeof(MsgQueue)) {            
                    failedPutQ1++;
                }
                // try again in 150ms
                try {
                    Thread.Sleep(150);
                } catch (ThreadInterruptedException e) {
                }
                rtn = putQSync (s);
                if (rtn != null){                
                    lock (typeof(MsgQueue)) {                
                        failedPutQ2++;
                    }
                }
                
            }
            return rtn; // indicate whether QueueableObject successfully queued
        }

        /**
         * Put the object onto the queue.  If there is no space, free the object.
         * 
         * @return null
         */
        public QueueableObject putQorFree (QueueableObject s) {
            s = putQ(s);
            if (s != null) {
                s.putFree();
            }
            return null;
        }

        [MethodImpl(MethodImplOptions.Synchronized)]
        public void drain() {
	        QueueableObject s = getNoWait();
	        while (s != null) {
		        s.putFree();
		        s = getNoWait();
	        }
	        failedPutQ1 = 0;
	        failedPutQ2 = 0;
        }

        public void setBlocked(bool newstate) {
            blocked = newstate;
        }

        public void setBlocked(bool newstate, String threadName) {
            blocked = newstate;
        }

        public bool getBlocked() {
            return blocked;
        }

        public int getLength () {
            return length;
        }

        public int getMaximum () {
            return maximum;
        }

        public int getFailedPutQ1() {
            return failedPutQ1;
        }

        public int getFailedPutQ2() {
            return failedPutQ2;
        }

        public void resetStats() {
            maximum = 0;
            failedPutQ1 = 0;
            failedPutQ2 = 0;
        }
    } // end class Queue

Open in new window

0

Featured Post

Salesforce Has Never Been Easier

Improve and reinforce salesforce training & adoption using WalkMe's digital adoption platform. Start saving on costly employee training by creating fast intuitive Walk-Thrus for Salesforce. Claim your Free Account Now

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

Suggested Solutions

There is an easy way, in .NET, to centralize the treatment of all unexpected errors. First of all, instead of launching the application directly in a Form, you need first to write a Sub called Main, in a module. Then, set the Startup Object to th…
Exception Handling is in the core of any application that is able to dignify its name. In this article, I'll guide you through the process of writing a DRY (Don't Repeat Yourself) Exception Handling mechanism, using Aspect Oriented Programming.
The viewer will learn how to implement Singleton Design Pattern in Java.
The goal of the video will be to teach the user the difference and consequence of passing data by value vs passing data by reference in C++. An example of passing data by value as well as an example of passing data by reference will be be given. Bot…

710 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question