Solved

Named Pipes and Completion Ports.

Posted on 2003-11-08
2
2,510 Views
Last Modified: 2013-11-20
Running a simple client that opens a pipe and sends a block of data to the server I have written produces the following output on the server.
000007F4
Thread: 2308 - Async Read :000007F4
Thread: 3840 - Async Read :000007F4
Thread: 3872 - Async Read :000007F4
connecting pipe
Thread: 2876 - Async Read :000007F4
Thread: 3816 - Async Read :000007F4
Thread: 3440 - Async Read :000007F4
pipe connected
connecting pipe

What I am trying to figure out is why my threads GetQueuedCompletionStatus never ever returns for any of the threads. They do not get woken up. Please don't check the code for memory leaks and other nonsense, I am just trying to get the port to work, so I have tried a few things, and its a little messy, but you should get the idea.

I am doing something in the wrong order or something.

Here is the code.
// Server.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include "winexception.h"

static const PIPE_BUFFER_SIZE      = 0x2000; // 8k
static const PIPE_TIMEOUT            = 0x0400; // 1k milliseconds
static const PIPE_KEY                  = 0x00ff; // recognisable key value.
static const PIPE_THREADS            = 0x0006; // number of wekas.
CRITICAL_SECTION cs = {0};

struct WORKFLOWDATA
{
      char m_achQueueName[512];
       int m_iQueueId;
   
    WORKFLOWDATA(LPCSTR lpszQueueName,
                       int iQueueId) : m_iQueueId(iQueueId)
      {
            lstrcpyn(m_achQueueName, lpszQueueName, sizeof(m_achQueueName));
      }
      WORKFLOWDATA() : m_iQueueId(0)
      {
            memset(m_achQueueName, 0, sizeof(m_achQueueName));
      }
};

struct ASYNC_CONTEXT
{
      HANDLE m_hCp;
      HANDLE m_hPipe;

      ASYNC_CONTEXT(HANDLE hCompletionPort,
                          HANDLE hPipe) : m_hCp(hCompletionPort),
                                                    m_hPipe(hPipe) {};

      HANDLE GetCompletionHandle() { return m_hCp; }
      HANDLE GetPipeHandle() { return m_hPipe; }
};

DWORD WINAPI AsyncCompletionPortThreadProcessor(LPVOID lpParam)
{

      DWORD dwTransferred = 0L;
      DWORD dwBytesRead = 0L;
      ULONG ulCompletionKey = 0L;
      OVERLAPPED ovl = {0};
       LPOVERLAPPED lpOverlapped = &ovl;
      ASYNC_CONTEXT* pParameters = (ASYNC_CONTEXT*) lpParam;
      WORKFLOWDATA wfd;

      while(1) {
            EnterCriticalSection(&cs);
            std::cout << "Thread: " << GetCurrentThreadId() << " - Async Read :" << pParameters->GetCompletionHandle() << std::endl;
            LeaveCriticalSection(&cs);
            BOOL bStatus = GetQueuedCompletionStatus(pParameters->GetCompletionHandle(),
                                                                         &dwTransferred,
                                                                         &ulCompletionKey,
                                                                         &lpOverlapped,
                                                                         INFINITE);
            if (!bStatus || !lpOverlapped) {
                  // FIXME:Craig some logging before continue.      
            EnterCriticalSection(&cs);
                  std::cout << "Error and continue" << std::endl;
                  GetLastError();
            LeaveCriticalSection(&cs);
                  continue;
            }

            EnterCriticalSection(&cs);
            std::cout << "reading..." << std::endl;
            //std::cout << GetCurrentThreadId() << ":" << wfd.m_iQueueId << wfd.m_achQueueName << std::endl;
            LeaveCriticalSection(&cs);
      }
      return 0;
}

void StartService()
{
      InitializeCriticalSection(&cs);
      HANDLE hPipe = ::CreateNamedPipe("\\\\.\\pipe\\workflow",
                                                    PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
                                               PIPE_WAIT                         // Wait on messages.
                                                       | PIPE_READMODE_MESSAGE           // Specify byte pipe.
                                                       | PIPE_TYPE_MESSAGE,
                                                       PIPE_UNLIMITED_INSTANCES,
                                                     PIPE_BUFFER_SIZE,
                                                     PIPE_BUFFER_SIZE,
                                                     INFINITE,
                                                     0);
      assert(INVALID_HANDLE_VALUE != hPipe);
      
      HANDLE hCp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE,
                                              0,
                                                            (ULONG_PTR)0,
                                                            0);
      assert(INVALID_HANDLE_VALUE != hCp);
      
      HANDLE hEvent = ::CreateEvent(0,
                                      TRUE,
                                                  TRUE,
                                                  0);
      assert(0 != hEvent);
      
      std::cout << hCp << std::endl;
      ASYNC_CONTEXT* pParameters = new ASYNC_CONTEXT(hCp, hPipe);
      for (int i = 0; i < PIPE_THREADS; i++) {
            
            
            HANDLE hThread = ::CreateThread(
                  0,
                  0,
                  AsyncCompletionPortThreadProcessor,
                  (LPVOID)pParameters,
                  CREATE_SUSPENDED,
                  0);
            ::SetThreadPriority(hThread, THREAD_PRIORITY_BELOW_NORMAL);            
            ::ResumeThread(hThread);
            ::CloseHandle(hThread); // We don't need these any more, the threads will run.
            

      }


      HANDLE hPipeCP = ::CreateIoCompletionPort(hPipe,
                                                                       hCp,
                                                                    PIPE_KEY,
                                                                    PIPE_THREADS);
      assert(0 != hPipeCP);


      //OVERLAPPED ov = {0};
      
      WORKFLOWDATA wfd;
      DWORD dwBytesRead = 0;
      while(1) {

            OVERLAPPED* ov = new OVERLAPPED;
            EnterCriticalSection(&cs);
            std::cout << "connecting pipe" << std::endl;
            LeaveCriticalSection(&cs);
            memset(ov, 0, sizeof(OVERLAPPED));
            ov->hEvent = ((HANDLE)((DWORD)hEvent|0x1));
            BOOL bConnected = ::ConnectNamedPipe(hPipe, ov);
            WaitForSingleObject(hEvent,      INFINITE);
            /*
            */

            BOOL bRead = ReadFile(
                  hPipe,
                  new char[8196],//&wfd,
                  8196,
                  0,//&dwBytesRead,
                  ov);
            if (bRead == FALSE) {
                  assert(GetLastError() == ERROR_IO_PENDING) ;
                        std::cout << "Pending...." << std::endl;
            }

            EnterCriticalSection(&cs);
            std::cout << "pipe connected" << std::endl;
            LeaveCriticalSection(&cs);
      }
      
      std::cout << "pipe connected" << std::endl;
      DeleteCriticalSection(&cs);
      Sleep(INFINITE);
}

int _tmain(int argc, _TCHAR* argv[])
{
      StartService();
      return 0;
}// Server.cpp : Defines the entry point for the console application.
//
0
Comment
Question by:cmain
[X]
Welcome to Experts Exchange

Add your voice to the tech community where 5M+ people just like you are talking about what matters.

  • Help others & share knowledge
  • Earn cash & points
  • Learn & ask questions
2 Comments
 
LVL 16

Expert Comment

by:_nn_
ID: 9707462
I have no experience with completion ports, but I've felt surprised with the lowest-bit setting in the hEvent field in the OVERLAPPED structure, and found this in the MSDN docs :

<quote>
Even if you have passed the function a file handle associated with a completion port and a valid OVERLAPPED structure, an application can prevent completion port notification. This is done by specifying a valid event handle for the hEvent member of the OVERLAPPED structure, and setting its low-order bit. A valid event handle whose low-order bit is set keeps I/O completion from being queued to the completion port.
</quote>

If you're preventing it to happen, it's about normal that it doesn't, what do you think ?
0
 
LVL 48

Accepted Solution

by:
AlexFM earned 250 total points
ID: 9707773
ConnectNamedPipe should be handled using completion port, exactly as ReadFile. ReadFile should be issued after ConnectNamedPipe is completed (and also after previous ReadFile is completed). To differentiate these two cases (GetQueuedCompletionStatus is waked up by ConnectNamedPipe or ReadFile), you need to pass additional information with lpOverlapped parameter. You can see how this is done in the following sample:

http://www.codeproject.com/internet/winsockiocp.asp

See OVERLAPPEDPLUS structure which is passed to GetQueuedCompletionStatus and contains additional field defining type of I/O operation.

However, before making these changes, try to wake up GetQueuedCompletionStatus with ConnectNamedPipe call. If this succeded, you are in the right way.
0

Featured Post

Technology Partners: We Want Your Opinion!

We value your feedback.

Take our survey and automatically be enter to win anyone of the following:
Yeti Cooler, Amazon eGift Card, and Movie eGift Card!

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

This is to be the first in a series of articles demonstrating the development of a complete windows based application using the MFC classes.  I’ll try to keep each article focused on one (or a couple) of the tasks that one may meet.   Introductio…
Introduction: The undo support, implementing a stack. Continuing from the eigth article about sudoku.   We need a mechanism to keep track of the digits entered so as to implement an undo mechanism.  This should be a ‘Last In First Out’ collec…
This video will show you how to get GIT to work in Eclipse.   It will walk you through how to install the EGit plugin in eclipse and how to checkout an existing repository.
There's a multitude of different network monitoring solutions out there, and you're probably wondering what makes NetCrunch so special. It's completely agentless, but does let you create an agent, if you desire. It offers powerful scalability …

729 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question