Welcome to Experts Exchange

Add your voice to the tech community where 5M+ people, just like you, are talking about what matters.

  • Help others & share knowledge
  • Earn cash & points
  • Learn & ask questions
Solved

Named Pipes and Completion Ports.

Posted on 2003-11-08
2
2,471 Views
Last Modified: 2013-11-20
Running a simple client that opens a pipe and sends a block of data to the server I have written produces the following output on the server.
000007F4
Thread: 2308 - Async Read :000007F4
Thread: 3840 - Async Read :000007F4
Thread: 3872 - Async Read :000007F4
connecting pipe
Thread: 2876 - Async Read :000007F4
Thread: 3816 - Async Read :000007F4
Thread: 3440 - Async Read :000007F4
pipe connected
connecting pipe

What I am trying to figure out is why my threads GetQueuedCompletionStatus never ever returns for any of the threads. They do not get woken up. Please don't check the code for memory leaks and other nonsense, I am just trying to get the port to work, so I have tried a few things, and its a little messy, but you should get the idea.

I am doing something in the wrong order or something.

Here is the code.
// Server.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include "winexception.h"

static const PIPE_BUFFER_SIZE      = 0x2000; // 8k
static const PIPE_TIMEOUT            = 0x0400; // 1k milliseconds
static const PIPE_KEY                  = 0x00ff; // recognisable key value.
static const PIPE_THREADS            = 0x0006; // number of wekas.
CRITICAL_SECTION cs = {0};

struct WORKFLOWDATA
{
      char m_achQueueName[512];
       int m_iQueueId;
   
    WORKFLOWDATA(LPCSTR lpszQueueName,
                       int iQueueId) : m_iQueueId(iQueueId)
      {
            lstrcpyn(m_achQueueName, lpszQueueName, sizeof(m_achQueueName));
      }
      WORKFLOWDATA() : m_iQueueId(0)
      {
            memset(m_achQueueName, 0, sizeof(m_achQueueName));
      }
};

struct ASYNC_CONTEXT
{
      HANDLE m_hCp;
      HANDLE m_hPipe;

      ASYNC_CONTEXT(HANDLE hCompletionPort,
                          HANDLE hPipe) : m_hCp(hCompletionPort),
                                                    m_hPipe(hPipe) {};

      HANDLE GetCompletionHandle() { return m_hCp; }
      HANDLE GetPipeHandle() { return m_hPipe; }
};

DWORD WINAPI AsyncCompletionPortThreadProcessor(LPVOID lpParam)
{

      DWORD dwTransferred = 0L;
      DWORD dwBytesRead = 0L;
      ULONG ulCompletionKey = 0L;
      OVERLAPPED ovl = {0};
       LPOVERLAPPED lpOverlapped = &ovl;
      ASYNC_CONTEXT* pParameters = (ASYNC_CONTEXT*) lpParam;
      WORKFLOWDATA wfd;

      while(1) {
            EnterCriticalSection(&cs);
            std::cout << "Thread: " << GetCurrentThreadId() << " - Async Read :" << pParameters->GetCompletionHandle() << std::endl;
            LeaveCriticalSection(&cs);
            BOOL bStatus = GetQueuedCompletionStatus(pParameters->GetCompletionHandle(),
                                                                         &dwTransferred,
                                                                         &ulCompletionKey,
                                                                         &lpOverlapped,
                                                                         INFINITE);
            if (!bStatus || !lpOverlapped) {
                  // FIXME:Craig some logging before continue.      
            EnterCriticalSection(&cs);
                  std::cout << "Error and continue" << std::endl;
                  GetLastError();
            LeaveCriticalSection(&cs);
                  continue;
            }

            EnterCriticalSection(&cs);
            std::cout << "reading..." << std::endl;
            //std::cout << GetCurrentThreadId() << ":" << wfd.m_iQueueId << wfd.m_achQueueName << std::endl;
            LeaveCriticalSection(&cs);
      }
      return 0;
}

void StartService()
{
      InitializeCriticalSection(&cs);
      HANDLE hPipe = ::CreateNamedPipe("\\\\.\\pipe\\workflow",
                                                    PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
                                               PIPE_WAIT                         // Wait on messages.
                                                       | PIPE_READMODE_MESSAGE           // Specify byte pipe.
                                                       | PIPE_TYPE_MESSAGE,
                                                       PIPE_UNLIMITED_INSTANCES,
                                                     PIPE_BUFFER_SIZE,
                                                     PIPE_BUFFER_SIZE,
                                                     INFINITE,
                                                     0);
      assert(INVALID_HANDLE_VALUE != hPipe);
      
      HANDLE hCp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE,
                                              0,
                                                            (ULONG_PTR)0,
                                                            0);
      assert(INVALID_HANDLE_VALUE != hCp);
      
      HANDLE hEvent = ::CreateEvent(0,
                                      TRUE,
                                                  TRUE,
                                                  0);
      assert(0 != hEvent);
      
      std::cout << hCp << std::endl;
      ASYNC_CONTEXT* pParameters = new ASYNC_CONTEXT(hCp, hPipe);
      for (int i = 0; i < PIPE_THREADS; i++) {
            
            
            HANDLE hThread = ::CreateThread(
                  0,
                  0,
                  AsyncCompletionPortThreadProcessor,
                  (LPVOID)pParameters,
                  CREATE_SUSPENDED,
                  0);
            ::SetThreadPriority(hThread, THREAD_PRIORITY_BELOW_NORMAL);            
            ::ResumeThread(hThread);
            ::CloseHandle(hThread); // We don't need these any more, the threads will run.
            

      }


      HANDLE hPipeCP = ::CreateIoCompletionPort(hPipe,
                                                                       hCp,
                                                                    PIPE_KEY,
                                                                    PIPE_THREADS);
      assert(0 != hPipeCP);


      //OVERLAPPED ov = {0};
      
      WORKFLOWDATA wfd;
      DWORD dwBytesRead = 0;
      while(1) {

            OVERLAPPED* ov = new OVERLAPPED;
            EnterCriticalSection(&cs);
            std::cout << "connecting pipe" << std::endl;
            LeaveCriticalSection(&cs);
            memset(ov, 0, sizeof(OVERLAPPED));
            ov->hEvent = ((HANDLE)((DWORD)hEvent|0x1));
            BOOL bConnected = ::ConnectNamedPipe(hPipe, ov);
            WaitForSingleObject(hEvent,      INFINITE);
            /*
            */

            BOOL bRead = ReadFile(
                  hPipe,
                  new char[8196],//&wfd,
                  8196,
                  0,//&dwBytesRead,
                  ov);
            if (bRead == FALSE) {
                  assert(GetLastError() == ERROR_IO_PENDING) ;
                        std::cout << "Pending...." << std::endl;
            }

            EnterCriticalSection(&cs);
            std::cout << "pipe connected" << std::endl;
            LeaveCriticalSection(&cs);
      }
      
      std::cout << "pipe connected" << std::endl;
      DeleteCriticalSection(&cs);
      Sleep(INFINITE);
}

int _tmain(int argc, _TCHAR* argv[])
{
      StartService();
      return 0;
}// Server.cpp : Defines the entry point for the console application.
//
0
Comment
Question by:cmain
2 Comments
 
LVL 16

Expert Comment

by:_nn_
ID: 9707462
I have no experience with completion ports, but I've felt surprised with the lowest-bit setting in the hEvent field in the OVERLAPPED structure, and found this in the MSDN docs :

<quote>
Even if you have passed the function a file handle associated with a completion port and a valid OVERLAPPED structure, an application can prevent completion port notification. This is done by specifying a valid event handle for the hEvent member of the OVERLAPPED structure, and setting its low-order bit. A valid event handle whose low-order bit is set keeps I/O completion from being queued to the completion port.
</quote>

If you're preventing it to happen, it's about normal that it doesn't, what do you think ?
0
 
LVL 48

Accepted Solution

by:
AlexFM earned 250 total points
ID: 9707773
ConnectNamedPipe should be handled using completion port, exactly as ReadFile. ReadFile should be issued after ConnectNamedPipe is completed (and also after previous ReadFile is completed). To differentiate these two cases (GetQueuedCompletionStatus is waked up by ConnectNamedPipe or ReadFile), you need to pass additional information with lpOverlapped parameter. You can see how this is done in the following sample:

http://www.codeproject.com/internet/winsockiocp.asp

See OVERLAPPEDPLUS structure which is passed to GetQueuedCompletionStatus and contains additional field defining type of I/O operation.

However, before making these changes, try to wake up GetQueuedCompletionStatus with ConnectNamedPipe call. If this succeded, you are in the right way.
0

Featured Post

Free Tool: ZipGrep

ZipGrep is a utility that can list and search zip (.war, .ear, .jar, etc) archives for text patterns, without the need to extract the archive's contents.

One of a set of tools we're offering as a way to say thank you for being a part of the community.

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

Here is how to use MFC's automatic Radio Button handling in your dialog boxes and forms.  Beginner programmers usually start with a OnClick handler for each radio button and that's just not the right way to go.  MFC has a very cool system for handli…
Introduction: Ownerdraw of the grid button.  A singleton class implentation and usage. Continuing from the fifth article about sudoku.   Open the project in visual studio. Go to the class view – CGridButton should be visible as a class.  R…
This video will show you how to get GIT to work in Eclipse.   It will walk you through how to install the EGit plugin in eclipse and how to checkout an existing repository.
Although Jacob Bernoulli (1654-1705) has been credited as the creator of "Binomial Distribution Table", Gottfried Leibniz (1646-1716) did his dissertation on the subject in 1666; Leibniz you may recall is the co-inventor of "Calculus" and beat Isaac…

829 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question