Script communication with pipes in a qsub context

Hi all,

I am coding a script that needs to run several 'children' scripts and communicate with them. Basically, the parent script does an iteration loop in which each children has a job to do. Each iteration must end when all the children jobs are done.

A simplified code is given bellow.

parent.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <stdio.h>
#include <memory.h>
#include <unistd.h>
#include <stdlib.h>

int main( int argc, char ** argv )
{
  int ***pfd;
  int N,Ne;
  char buffer[BUFSIZ];
  int i,ie;
  
  // Read arguments
  if (argc<3) return 1;
  sscanf(argv[1],"%i",&Ne); // Number of children
  sscanf(argv[2],"%i",&N);  // Number of iterations
  
  // Create pipes
  pfd = (int***)malloc(Ne*sizeof(int**));
  for (ie=0;ie<Ne;ie++) {
    pfd[ie] = (int**)malloc(2*sizeof(int*));
    pfd[ie][0] = (int*)malloc(2*sizeof(int));
    pfd[ie][1] = (int*)malloc(2*sizeof(int));
    pipe(pfd[ie][0]);
    pipe(pfd[ie][1]);
  }
  
  // Run children
  for (ie=0;ie<Ne;ie++) {
    sprintf(buffer,"./child %i %i %i &",ie,pfd[ie][0][0],pfd[ie][1][1]);
    system(buffer);
  }
  
  // Iteration loop
  for (i=0;i<N;i++) {
    printf("%i: ",i); fflush(stdout);
    sprintf(buffer,"%i ",i);
    // Send new iteration order to children
    for (ie=0;ie<Ne;ie++) write(pfd[ie][0][1],buffer,strlen(buffer));
    // Wait for children reply
    for (ie=0;ie<Ne;ie++) read(pfd[ie][1][0],buffer,BUFSIZ);
    printf(" -> OK\n");
  }
  
  // Send ending order to children
  for (ie=0;ie<Ne;ie++) write(pfd[ie][0][1],"done",5);
  
  // Deallocate
  for (ie=0;ie<Ne;ie++) {
    free(pfd[ie][0]);
    free(pfd[ie][1]);
    free(pfd[ie]);
  }
  free(pfd);
  
  return 0;
}


child.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <stdio.h>
#include <memory.h>
#include <unistd.h>

int main( int argc, char ** argv )
{
  int pfd[2],ie;
  char buffer[BUFSIZ];
  
  // Read arguments
  if (argc<4) return 1;
  sscanf(argv[1],"%i",&ie);     // Child number
  sscanf(argv[2],"%i",&pfd[0]);
  sscanf(argv[3],"%i",&pfd[1]);
  
  // Wait for first order
  read(pfd[0],buffer,BUFSIZ);
  while (strcmp(buffer,"done")) {
    printf(" %i",ie); fflush(stdout);
    // Send "iteration done" to parent
    write(pfd[1],"done",5);
    // Wait for next order
    read(pfd[0],buffer,BUFSIZ);
  }
  
  return 0;
}


The code works well, eg with the following command: ./parent 3 5

My problem is that I want to cluster the children jobs using qsub. I first tryed to replace the call './child ...' by 'qsub ./child ...'. Jobs are well put into the qsub queue, but then the program blocks, as if the parent and children codes do not communicate via pipes.

I also tryed to use named pipes (via mknod). There again, the program works well without qsub, but not with qsub.

Does anyone faced this problem? Or has an idea to correct my code?

Thanks in advance
Each iteration must end when all the children jobs are done.
That means you can't start them with system(), you have to use an exec() function and wait for it with waitpid().
Thanks, but I'm not sure that the exec() function is really adapted to my situation. I'll try to be more specific. First, I have to execute the children scripts on the cluster, so I have to use the qsub command. Second, for some reasons, I don't want to run qsub at each iteration (I did it once successfully, but have other troubles with that solution), and I prefer to run it once at the beginning and than pass a new argument at each iteration. That's why I think that piping does exactly what I need. And it works perfectely well without qsub. The cluster does have an access to my filesytem, so it should be able to read/write into "named pipe" files. Am I wrong on this point?
Jobs are well put into the qsub queue, but then the program blocks, as if the parent and children codes do not communicate via pipes.

Named pipe between different machines? I don't think so.
http://stackoverflow.com/questions/1038788/cant-write-to-fifo-file-mouted-via-nfs

How about MPI?
Ok, this answers my question. I'll read about MPI, or try the netcat option suggested in the above link.
For people who might be interested, I'll post the solution when I find it.
Thanks a lot.
Well, I finally made my script work using sockets. Here is my solution:

parent.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>

#define PORTSTART 10000

int main( int argc, char ** argv )
{
  int N,Ne;
  char buffer[BUFSIZ];
  int i,ie;
  int *sockfd,*newsockfd,*portno;
  int portcount = PORTSTART;
  socklen_t clilen = sizeof(struct sockaddr_in);
  struct sockaddr_in *serv_addr,*cli_addr;
  
  // Read arguments
  if (argc<3) return 1;
  sscanf(argv[1],"%i",&Ne);
  sscanf(argv[2],"%i",&N);
  
  // Allocate memory
  sockfd = (int*)malloc(Ne*sizeof(int));
  portno = (int*)malloc(Ne*sizeof(int));
  newsockfd = (int*)malloc(Ne*sizeof(int));
  serv_addr = (struct sockaddr_in*)malloc(Ne*sizeof(struct sockaddr_in));
  cli_addr = (struct sockaddr_in*)malloc(Ne*sizeof(struct sockaddr_in));
  
  // Create sockets
  for (ie=0;ie<Ne;ie++) {
    printf("Setting %2i:",ie); fflush(stdout);
    sockfd[ie] = socket(AF_INET,SOCK_STREAM,0);
    bzero((char*)&serv_addr[ie],sizeof(serv_addr[ie]));
    serv_addr[ie].sin_family = AF_INET;
    serv_addr[ie].sin_addr.s_addr = INADDR_ANY;
    // Loop to find available ports
    do {
      portno[ie] = portcount;
      serv_addr[ie].sin_port = htons(portno[ie]);
      if (portcount>PORTSTART+100) {
        printf("Port error: all ports tested\n");
        return 1;
      }
      portcount++;
    } while (bind(sockfd[ie],(struct sockaddr *) &serv_addr[ie],sizeof(serv_addr[ie])) < 0);
    printf(" on port %i",portno[ie]); fflush(stdout);
    listen(sockfd[ie],5);
    // Ready to run child
    sprintf(buffer,"./child %i %i &",ie,portno[ie]);
    system(buffer);
    printf(" -> OK\n");
  }
  
  // Wait for children connection
  for (ie=0;ie<Ne;ie++) {
    newsockfd[ie] = accept(sockfd[ie],(struct sockaddr *) &cli_addr[ie],&clilen);
  }
  
  // Iteration loop
  for (i=0;i<N;i++) {
    printf("%i: ",i); fflush(stdout);
    // Run new children iteration
    for (ie=0;ie<Ne;ie++) {
      sprintf(buffer,"%i",ie);
      write(newsockfd[ie],buffer,strlen(buffer));
    }
    // Wait for children processes
    for (ie=0;ie<Ne;ie++) {
      read(newsockfd[ie],buffer,BUFSIZ);
    }
    printf(" -> OK\n");
  }
  
  // Make children exit
  for (ie=0;ie<Ne;ie++) {
    write(newsockfd[ie],"done",4);
  }
  
  // Close sockets and deallocate memory
  for (ie=0;ie<Ne;ie++) {
    close(newsockfd[ie]);
    close(sockfd[ie]);
  }
  free(sockfd);
  free(portno);
  free(newsockfd);
  free(serv_addr);
  free(cli_addr);
  
  return 0;
}



child.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <stdio.h>
#include <string.h>
#include <netdb.h>

int main( int argc, char ** argv )
{
  char buffer[BUFSIZ];
  char servername[BUFSIZ];
  int ie,sockfd,portno;
  struct sockaddr_in serv_addr;
  struct hostent *server;
  
  // Put the name of the host server
  strcpy(servername,"localhost");
  
  // Read arguments
  if (argc<3) return 1;
  sscanf(argv[1],"%i",&ie);
  portno = atoi(argv[2]);
  
  // Create socket and connect with server
  sockfd = socket(AF_INET,SOCK_STREAM,0);
  server = gethostbyname(servername);
  bzero((char *) &serv_addr, sizeof(serv_addr));
  serv_addr.sin_family = AF_INET;
  bcopy((char *)server->h_addr,(char *)&serv_addr.sin_addr.s_addr,server->h_length);
  serv_addr.sin_port = htons(portno);
  connect(sockfd,(struct sockaddr *) &serv_addr,sizeof(serv_addr));
  
  // Iteration loop
  read(sockfd,buffer,BUFSIZ);
  while (strcmp(buffer,"done")) {
    printf(" %i",ie); fflush(stdout);
    usleep(500000); // Or other processes
    write(sockfd,"done",4);
    read(sockfd,buffer,BUFSIZ);
  }
  
  // Close when finished
  close(sockfd);
  return 0;
}



I tested the script locally (both parent and children on the same machine, servername="localhost"), and on a cluster using qsub. There may be some possible improvement, but as it is it works as expected.
Topic archived. No new replies allowed.