Solved

Named Pipes and Completion Ports.

Posted on 2003-11-08
2
2,442 Views
Last Modified: 2013-11-20
Running a simple client that opens a pipe and sends a block of data to the server I have written produces the following output on the server.
000007F4
Thread: 2308 - Async Read :000007F4
Thread: 3840 - Async Read :000007F4
Thread: 3872 - Async Read :000007F4
connecting pipe
Thread: 2876 - Async Read :000007F4
Thread: 3816 - Async Read :000007F4
Thread: 3440 - Async Read :000007F4
pipe connected
connecting pipe

What I am trying to figure out is why my threads GetQueuedCompletionStatus never ever returns for any of the threads. They do not get woken up. Please don't check the code for memory leaks and other nonsense, I am just trying to get the port to work, so I have tried a few things, and its a little messy, but you should get the idea.

I am doing something in the wrong order or something.

Here is the code.
// Server.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include "winexception.h"

static const PIPE_BUFFER_SIZE      = 0x2000; // 8k
static const PIPE_TIMEOUT            = 0x0400; // 1k milliseconds
static const PIPE_KEY                  = 0x00ff; // recognisable key value.
static const PIPE_THREADS            = 0x0006; // number of wekas.
CRITICAL_SECTION cs = {0};

struct WORKFLOWDATA
{
      char m_achQueueName[512];
       int m_iQueueId;
   
    WORKFLOWDATA(LPCSTR lpszQueueName,
                       int iQueueId) : m_iQueueId(iQueueId)
      {
            lstrcpyn(m_achQueueName, lpszQueueName, sizeof(m_achQueueName));
      }
      WORKFLOWDATA() : m_iQueueId(0)
      {
            memset(m_achQueueName, 0, sizeof(m_achQueueName));
      }
};

struct ASYNC_CONTEXT
{
      HANDLE m_hCp;
      HANDLE m_hPipe;

      ASYNC_CONTEXT(HANDLE hCompletionPort,
                          HANDLE hPipe) : m_hCp(hCompletionPort),
                                                    m_hPipe(hPipe) {};

      HANDLE GetCompletionHandle() { return m_hCp; }
      HANDLE GetPipeHandle() { return m_hPipe; }
};

DWORD WINAPI AsyncCompletionPortThreadProcessor(LPVOID lpParam)
{

      DWORD dwTransferred = 0L;
      DWORD dwBytesRead = 0L;
      ULONG ulCompletionKey = 0L;
      OVERLAPPED ovl = {0};
       LPOVERLAPPED lpOverlapped = &ovl;
      ASYNC_CONTEXT* pParameters = (ASYNC_CONTEXT*) lpParam;
      WORKFLOWDATA wfd;

      while(1) {
            EnterCriticalSection(&cs);
            std::cout << "Thread: " << GetCurrentThreadId() << " - Async Read :" << pParameters->GetCompletionHandle() << std::endl;
            LeaveCriticalSection(&cs);
            BOOL bStatus = GetQueuedCompletionStatus(pParameters->GetCompletionHandle(),
                                                                         &dwTransferred,
                                                                         &ulCompletionKey,
                                                                         &lpOverlapped,
                                                                         INFINITE);
            if (!bStatus || !lpOverlapped) {
                  // FIXME:Craig some logging before continue.      
            EnterCriticalSection(&cs);
                  std::cout << "Error and continue" << std::endl;
                  GetLastError();
            LeaveCriticalSection(&cs);
                  continue;
            }

            EnterCriticalSection(&cs);
            std::cout << "reading..." << std::endl;
            //std::cout << GetCurrentThreadId() << ":" << wfd.m_iQueueId << wfd.m_achQueueName << std::endl;
            LeaveCriticalSection(&cs);
      }
      return 0;
}

void StartService()
{
      InitializeCriticalSection(&cs);
      HANDLE hPipe = ::CreateNamedPipe("\\\\.\\pipe\\workflow",
                                                    PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
                                               PIPE_WAIT                         // Wait on messages.
                                                       | PIPE_READMODE_MESSAGE           // Specify byte pipe.
                                                       | PIPE_TYPE_MESSAGE,
                                                       PIPE_UNLIMITED_INSTANCES,
                                                     PIPE_BUFFER_SIZE,
                                                     PIPE_BUFFER_SIZE,
                                                     INFINITE,
                                                     0);
      assert(INVALID_HANDLE_VALUE != hPipe);
      
      HANDLE hCp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE,
                                              0,
                                                            (ULONG_PTR)0,
                                                            0);
      assert(INVALID_HANDLE_VALUE != hCp);
      
      HANDLE hEvent = ::CreateEvent(0,
                                      TRUE,
                                                  TRUE,
                                                  0);
      assert(0 != hEvent);
      
      std::cout << hCp << std::endl;
      ASYNC_CONTEXT* pParameters = new ASYNC_CONTEXT(hCp, hPipe);
      for (int i = 0; i < PIPE_THREADS; i++) {
            
            
            HANDLE hThread = ::CreateThread(
                  0,
                  0,
                  AsyncCompletionPortThreadProcessor,
                  (LPVOID)pParameters,
                  CREATE_SUSPENDED,
                  0);
            ::SetThreadPriority(hThread, THREAD_PRIORITY_BELOW_NORMAL);            
            ::ResumeThread(hThread);
            ::CloseHandle(hThread); // We don't need these any more, the threads will run.
            

      }


      HANDLE hPipeCP = ::CreateIoCompletionPort(hPipe,
                                                                       hCp,
                                                                    PIPE_KEY,
                                                                    PIPE_THREADS);
      assert(0 != hPipeCP);


      //OVERLAPPED ov = {0};
      
      WORKFLOWDATA wfd;
      DWORD dwBytesRead = 0;
      while(1) {

            OVERLAPPED* ov = new OVERLAPPED;
            EnterCriticalSection(&cs);
            std::cout << "connecting pipe" << std::endl;
            LeaveCriticalSection(&cs);
            memset(ov, 0, sizeof(OVERLAPPED));
            ov->hEvent = ((HANDLE)((DWORD)hEvent|0x1));
            BOOL bConnected = ::ConnectNamedPipe(hPipe, ov);
            WaitForSingleObject(hEvent,      INFINITE);
            /*
            */

            BOOL bRead = ReadFile(
                  hPipe,
                  new char[8196],//&wfd,
                  8196,
                  0,//&dwBytesRead,
                  ov);
            if (bRead == FALSE) {
                  assert(GetLastError() == ERROR_IO_PENDING) ;
                        std::cout << "Pending...." << std::endl;
            }

            EnterCriticalSection(&cs);
            std::cout << "pipe connected" << std::endl;
            LeaveCriticalSection(&cs);
      }
      
      std::cout << "pipe connected" << std::endl;
      DeleteCriticalSection(&cs);
      Sleep(INFINITE);
}

int _tmain(int argc, _TCHAR* argv[])
{
      StartService();
      return 0;
}// Server.cpp : Defines the entry point for the console application.
//
0
Comment
Question by:cmain
2 Comments
 
LVL 16

Expert Comment

by:_nn_
ID: 9707462
I have no experience with completion ports, but I've felt surprised with the lowest-bit setting in the hEvent field in the OVERLAPPED structure, and found this in the MSDN docs :

<quote>
Even if you have passed the function a file handle associated with a completion port and a valid OVERLAPPED structure, an application can prevent completion port notification. This is done by specifying a valid event handle for the hEvent member of the OVERLAPPED structure, and setting its low-order bit. A valid event handle whose low-order bit is set keeps I/O completion from being queued to the completion port.
</quote>

If you're preventing it to happen, it's about normal that it doesn't, what do you think ?
0
 
LVL 48

Accepted Solution

by:
AlexFM earned 250 total points
ID: 9707773
ConnectNamedPipe should be handled using completion port, exactly as ReadFile. ReadFile should be issued after ConnectNamedPipe is completed (and also after previous ReadFile is completed). To differentiate these two cases (GetQueuedCompletionStatus is waked up by ConnectNamedPipe or ReadFile), you need to pass additional information with lpOverlapped parameter. You can see how this is done in the following sample:

http://www.codeproject.com/internet/winsockiocp.asp

See OVERLAPPEDPLUS structure which is passed to GetQueuedCompletionStatus and contains additional field defining type of I/O operation.

However, before making these changes, try to wake up GetQueuedCompletionStatus with ConnectNamedPipe call. If this succeded, you are in the right way.
0

Featured Post

Highfive Gives IT Their Time Back

Highfive is so simple that setting up every meeting room takes just minutes and every employee will be able to start or join a call from any room with ease. Never be called into a meeting just to get it started again. This is how video conferencing should work!

Join & Write a Comment

Introduction: Dialogs (1) modal - maintaining the database. Continuing from the ninth article about sudoku.   You might have heard of modal and modeless dialogs.  Here with this Sudoku application will we use one of each type: a modal dialog …
If you use Adobe Reader X it is possible you can't open OLE PDF documents in the standard. The reason is the 'save box mode' in adobe reader X. Many people think the protected Mode of adobe reader x is only to stop the write access. But this fe…
This video will show you how to get GIT to work in Eclipse.   It will walk you through how to install the EGit plugin in eclipse and how to checkout an existing repository.
You have products, that come in variants and want to set different prices for them? Watch this micro tutorial that describes how to configure prices for Magento super attributes. Assigning simple products to configurable: We assigned simple products…

743 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question

Need Help in Real-Time?

Connect with top rated Experts

15 Experts available now in Live!

Get 1:1 Help Now