/* * Written By Hyouck Kim, peakhunt@yahoo.com, 2003 * * A simple message queue test * */ #include #include #include #include #include #include #include #include "msg_test.h" #include #include #include #define MSGQ_KEY1 "./.msgq_key1" #define MSGQ_KEY2 "./.msgq_key2" void* msgq_thread(void* data); typedef struct _msg { long mtype; char buffer[1024+1]; } MSG; main(int argc, char** argv) { key_t key_mine, key_dest; int msgq_id; int msgq_id_dest; pthread_t thrd; int q_fd, q_fd2; int r; fd_set rset, wset; if(argc != 2) { printf("ysage : msgq 1|2\n"); return -1; } if(argv[1][0] == '1') { key_mine = ftok(MSGQ_KEY1, 0); if((int)key_mine == -1) { perror("ftok:"); return -1; } key_dest = ftok(MSGQ_KEY2, 0); if((int)key_dest == -1) { perror("ftok:"); return -1; } } else { key_mine = ftok(MSGQ_KEY2, 0); if((int)key_mine == -1) { perror("ftok:"); return -1; } key_dest = ftok(MSGQ_KEY1, 0); if((int)key_dest == -1) { perror("ftok:"); return -1; } } msgq_id = msgget(key_mine, IPC_CREAT | S_IRWXU); if(msgq_id == -1) { perror("msgget:"); return -1; } msgq_id_dest = msgget(key_dest, IPC_CREAT | S_IRWXU); if(msgq_id_dest == -1) { perror("msgget:"); return -1; } /* r = pthread_create(&thrd, NULL, msgq_thread, (void*)msgq_id); if(r != 0) { perror("pthread_create:"); msgctl(msgq_id, IPC_RMID, NULL); return -1; } */ q_fd = msgqToFd(msgq_id); q_fd2 = msgqToFd(msgq_id_dest); if(q_fd < 0 || q_fd2 < 0) { perror("msgqToFd:"); return -1; } while(1) { char buffer[1024+1]; int len; MSG msg; FD_ZERO(&rset); FD_ZERO(&wset); FD_SET(0, &rset); FD_SET(q_fd, &rset); FD_SET(q_fd2, &wset); /* XXX: no timeout */ printf("waiting on select\n"); if(select(q_fd2 + 1, &rset, &wset, NULL, NULL) <= 0) { perror("select error:"); exit(-1); } if(FD_ISSET(0, &rset)) { len = read(0, buffer, 1024); if(len <= 0) { perror("read:"); msgctl(msgq_id, IPC_RMID, NULL); return 0; } buffer[len] = '\0'; strcpy(msg.buffer,buffer); msg.mtype = 1; r = msgsnd(msgq_id_dest, &msg, sizeof(msg.buffer), 0); if( r == -1) { perror("msgsnd:"); msgctl(msgq_id, IPC_RMID, NULL); return 0; } } if(FD_ISSET(q_fd, &rset)) { MSG msg; int r; printf("read to receive\n"); r = msgrcv(msgq_id, &msg, sizeof(msg.buffer), 0, 0); if( r == -1) { perror("msgrcv:"); exit(-1); } printf("%s\n", msg.buffer); } if(FD_ISSET(q_fd2, &wset)) { strcpy(msg.buffer,"ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ\n"); msg.mtype = 1; printf("before writing\n"); r = msgsnd(msgq_id_dest, &msg, sizeof(msg.buffer), IPC_NOWAIT); if( r == -1 && errno != EAGAIN) { perror("msgsnd:"); msgctl(msgq_id, IPC_RMID, NULL); return 0; } else if(errno == EAGAIN) { perror("queue full"); } } } } void* msgq_thread(void* data) { int msgq_id = (int)data; MSG msg; int r; while(1) { r = msgrcv(msgq_id, &msg, sizeof(msg.buffer), 0, 0); if( r == -1) { perror("msgrcv:"); return NULL; } printf("%s\n", msg.buffer); } }