Link to home
Start Free TrialLog in
Avatar of oumer
oumer

asked on

libipq and pthreads

Hi,

Here is a sample code where I try to delay every packet by some time (in the code I posted here it is fixed to 100 msec.). Inorder to do so, I start a pthread for every packet that is received by libipq . I have iptables rules that queues tcp packets that are coming and going to some other machine. I then ftp to that machine. The program works, the problem is with the dumping. I have two files, one is supposed to capture the files when they reach the queue (new_file_2) and the other just before the set verdict is called (new_file_3). The new_file_2 works properly. The problem is with new_file_3. I couldn't get how, but somehow, at some location, some erroneous entries are put into the file, the end effect being tcpdump not able to recognize the file at some point. To check, I commented out the only line which makes the two files differ, i.e the one that changes the time stamp of the packet header (as indicated by *********), and the two files are not the same. First I thought maybe one thread was writing to the file, and then another thread becomes active and writes also. So I introduced a mutex_variable that will avoid such
problem. But it doesn't help at all. Could you please tell me what I am doing wrong? I have to save the files myself as tcpdump get the packets before the firewall for incoming packets. I have read somewhere that libipq is not thread safe, can the problem be attributed to this? How can i deal with it
Thanks in advance,
Oumer

#include <sys/time.h>
#include <linux/netfilter.h>
#include <netinet/in.h>
#include <libipq.h>
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include "errors.h"
#include <mcheck.h>

#define BUFSIZE 2048

struct ipq_handle *h;
ipq_packet_msg_t *m;

FILE* new_file_2;
FILE* new_file_3;

pthread_mutex_t mutex_write;

/* char src_mac[6] = {0,0,0xb4,0x4b,0x2c, 0xe1}; */
/* char dst_mac[6] = {0,0xb,0x46, 0x10, 0x57, 0x80}; */
/* char ether_type[2] = {8,0}; */

char src_dst_ether [14] = {0,0,0xb4,0x4b,0x2c, 0xe1,0,0xb,0x46, 0x10, 0x57, 0x80,8,0};

//stolen from pcap
struct PCAP_file_header {
   unsigned int magic;
   short major;
   short minor;
   int thiszone;    /* gmt to local correction */
   unsigned int sigfigs;    /* accuracy of timestamps */
   unsigned int snaplen;    /* max length saved portion of each pkt */
   unsigned int linktype;    /* data link type (LINKTYPE_*) */
};

struct PCAP_packet_header {
   struct timeval ts;    /* time stamp */
   int caplen;    /* length of portion present */
   int len;    /* length this packet (off wire) */
};

typedef struct {
 int         sleep_time;
 ipq_packet_msg_t *packet;
//  unsigned char *data;
 struct PCAP_packet_header pkt_header;
 int thread_id;
} params;

static void die(struct ipq_handle *h)
{
       ipq_perror("ipq_error::");
       ipq_destroy_handle(h);
       exit(1);
}

void *delay_packet (void *param_packet)
{
   int status;
   params *myparams= (params*) param_packet;
   status = pthread_detach (pthread_self ());
   if (status != 0)
       err_abort (status, "Detach thread");

  usleep(100000);
  pthread_mutex_lock (&mutex_write);
  //  gettimeofday(&(myparams->pkt_header.ts), NULL);   ************************
  fwrite(&(myparams->pkt_header), sizeof(struct PCAP_packet_header),1, new_file_3);
  fwrite(src_dst_ether,1, 14, new_file_3);
  fwrite((unsigned int*)(myparams->packet+1), 1, myparams->packet->data_len, new_file_3);
  //free(myparams->packet);
  pthread_mutex_unlock (&mutex_write);
  status = ipq_set_verdict(h, myparams->packet->packet_id,NF_ACCEPT, 0, NULL);
  if (status < 0)
     die(h);
  free(myparams);
  return NULL;
}

void signal_handler(int sig)
{
 if (h)
   ipq_destroy_handle(h);
 fclose(new_file_2);
 fclose(new_file_3);
 exit(0);
}

int main(int argc, char **argv)
{
  mtrace();
  int status;
  unsigned char buf[BUFSIZE];
  int thread_id=0;
    new_file_2= fopen("dump_tcp_data.txt", "w+b");
  new_file_3 = fopen("dump_tcp_data_after.txt", "w+b");
    pthread_mutex_init(&mutex_write, NULL);
  struct PCAP_file_header *file_header =   (struct PCAP_file_header*)(malloc(sizeof(struct PCAP_file_header)));
    file_header->magic=0xa1b2c3d4;
  file_header->major=2;
  file_header->minor=4;
  file_header->thiszone=0;
  file_header->sigfigs=0;
  file_header->snaplen=65535;
  file_header->linktype=1;
    fwrite(file_header, sizeof(struct PCAP_file_header),1, new_file_2);
  fwrite(file_header, sizeof(struct PCAP_file_header),1, new_file_3);
    free(file_header);
  pthread_t thread;
     h = ipq_create_handle(0, PF_INET);
   sigset(SIGINT, signal_handler);
   if (!h)
               die(h);
   status = ipq_set_mode(h, IPQ_COPY_PACKET, BUFSIZE);
   if (status < 0)
     die(h);
   printf("PACKET_ID\t\tDATA_LENGTH\tSTATUS\n");
   printf("=========\t\t============\t=======\n");
   while (1) {
     status = ipq_read(h, buf, BUFSIZE, 0);
     if (status < 0)
       die(h);
     switch (ipq_message_type(buf)) {
        case NLMSG_ERROR:
        {
          fprintf(stderr, "Error! %s\n", ipq_errstr());
          break;
        }
        case IPQM_PACKET: {
           params *packet_param = (params*)malloc (sizeof (params));;
          if (!packet_param)
         {
          printf("memory allocatin problem\n");
          die(h);
        }
        struct PCAP_packet_header *pkt_header=
         (struct PCAP_packet_header*)(malloc(sizeof(struct PCAP_packet_header)));
        ipq_packet_msg_t *m = ipq_get_packet(buf);
        gettimeofday(&(pkt_header->ts), NULL);
        pkt_header->caplen=m->data_len+14;
        pkt_header->len=m->data_len+14;
                  unsigned char *data = (unsigned char*)(m+1);                   printf("%lud\t\t%u\t\t", m->packet_id, m->data_len);
                 packet_param-> packet =  m;
       packet_param->pkt_header = *(pkt_header);
       if (m->data_len > 0)
          {
        fwrite(pkt_header, sizeof(struct PCAP_packet_header),1, new_file_2);
        fwrite(src_dst_ether,1,14,new_file_2);
     /*   packet_param->data =(unsigned char*) malloc (sizeof(unsigned char*) * m->data_len);

        for (x=0; x < m->data_len; x++)
           {
            packet_param->data[x] = *(data+x);
          }
        */
        fwrite(data, 1, m->data_len, new_file_2);

        status = pthread_create ( &thread, NULL, delay_packet, packet_param);
        if (status != 0)
           err_abort (status, "Create a new thread");
        break;
          }
        }

        default:{
          fprintf(stderr, "Unknown message type!\n");
          break;
        }
     }
   }
   ipq_destroy_handle(h);
   return 0;
}

Avatar of grg99
grg99

If a library isnt thread-safe, that means it's going to goof up at odd times.
If that's your symptom, it may be this problem.

you shouldnt have two threads writing to the same file. File writes are not atomic, so eventually the writes are going to get done out of sequence.  How about you write your own fwrite()wrapper that does the right thing, i.e. buffer up the data and interlock things so the writes don't get scrambled?


You might try my standard admonitions, to check the error code on each and EVERY library or API call.  
Tedious, but most any call can return failure, it's better you instrument your code now rather than later.




As for libipq, it's not thread-safe, but this doesn't seem to be the reason your files differ. Might be a good idea to fix that anyway.

The mutex lock around the file writes in the threads is definitely needed, since you want those three writes to be sequential. Since writing is now synchronized, you might as well replace those with fwrite_unlocked() instead of fwrite().

The difference between the files seems to be caused by a race between threads.

A little example...

Suppose two packets arrive in quick succession. The second file will contain the sequence packet1, packet2 and two threads have been spawned, one for each packet. After each thread has slept for roughly the same amount time, they both become runnable again. The packet order of the third file now depends upon which thread writes its packet first and is either packet1, packet2 or packet2, packet1.
Avatar of oumer

ASKER

I tried to use the method recommend by the glibc manual, i.e using flockfile and funlockfile, instead of the mutex , (see http://www.gnu.org/manual/glibc-2.2.5/html_node/Streams-and-Threads.html#Streams%20and%20Threads)

flockfile(new_file_3);
 fwrite_unlocked(&(myparams->pkt_header), sizeof(struct PCAP_packet_header),1, new_file_3);
  fwrite_unlocked(src_dst_ether,1, 14, new_file_3);
  fwrite_unlocked((unsigned int*)(myparams->packet+1), 1, myparams->packet->data_len, new_file_3);
  //free(myparams->packet);
  funlockfile(new_file_3);

I get the same error. And the thing is for a given timer value i set (for example for the 10 ms case, I always get the first difference between the file I expect (new_file_3) and new_file_2 at exactly the same byte position.
Avatar of oumer

ASKER

If it was a problem with thread race, then how come the two files differe in size also? For the simple case where I just ftp to a server, enter my user name and password, and then quit, and I am using a delay value of 10 ms, I always get a new_file_3 that is always 4 bytes longer than new_file_2? where did the extra 4 bytes came from?

Took a better look the libipt sources...

Seems that the ipq_get_packet(buf) returns a pointer to the packet data _inside_ the on-stack buffer. So, if you read a new packet before a thread writes it out, the packet will have been changed (since ipq_read() updates the on-stack buffer). The thread then writes out the wrong packet. Looks like the packet needs to be copied before passing it to a thread.
Avatar of oumer

ASKER

I think you may be right! I changed my params struct into these

typedef struct
{
 int         sleep_time;
 ipq_packet_msg_t packet;
 unsigned char *data;
 struct PCAP_packet_header pkt_header;
 int thread_id;
} params;

and then did after ipq_read,
        packet_param-> packet = * m;
instead of
        packet_param-> packet =  m;

and also copied the data following the packet (As m+1 now points to the payload that was copied)

so I did something like (just to check)
packet_parm->data=(unsigned char*) malloc(sizeof(unsigned char*) * m->data_len);
for (i=0; i < m->data_len; i++)
{
  packet_param->data[i] = *(data + i);  
}
//don't laugh at the way I am copying, I am just testing :-) but the fastest way of doing this is really appreaciated

and in the delay_packet function, I use a similar set except that I changed,
 
  fwrite_unlocked((unsigned int*)(myparams->packet+1), 1, myparams->packet->data_len, new_file_3);

into

  fwrite_unlocked((unsigned int*)(myparams->data), 1, myparams->packet.data_len, new_file_3);

and i tested it only once, and it seems ok. but it is like half past midnight where I am and have to sleep, some serious checking waits for tomorrow. But I still need some help, as I am doing a real time program, I have to be careful in making the code as fast as possible. Now I have been denied the possibility of using the pointers, what is the fastest way to do it? (For the point, you don't have to answer this, you will get it once I check it thoroghly, but come on be generous!:-). Also, when writing to files, how can I make it faster than fwrite, using memory mapped i/o (never done it, but i have looked into a sample code at the glibc manual) or is there any other way?

Thousand thanks!!
ASKER CERTIFIED SOLUTION
Avatar of mtmike
mtmike

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial