Back to home page

LXR

 
 

    


File indexing completed on 2025-05-11 08:24:18

0001 /**
0002  * @file
0003  *
0004  * @ingroup FIFO_PIPE
0005  *
0006  * @brief FIFO/Pipe Support
0007  */
0008 
0009 /*
0010  * Author: Wei Shen <cquark@gmail.com>
0011  *
0012  * The license and distribution terms for this file may be
0013  * found in the file LICENSE in this distribution or at
0014  * http://www.rtems.org/license/LICENSE.
0015  */
0016 
0017 
0018 #ifdef HAVE_CONFIG_H
0019 #include "config.h"
0020 #endif
0021 
0022 #include <sys/param.h>
0023 #include <sys/filio.h>
0024 #include <errno.h>
0025 #include <stdlib.h>
0026 #include <string.h>
0027 
0028 #include <rtems.h>
0029 #include <rtems/libio_.h>
0030 #include <rtems/pipe.h>
0031 
0032 #define LIBIO_ACCMODE(_iop) (rtems_libio_iop_flags(_iop) & LIBIO_FLAGS_READ_WRITE)
0033 #define LIBIO_NODELAY(_iop) rtems_libio_iop_is_no_delay(_iop)
0034 
0035 static rtems_mutex pipe_mutex = RTEMS_MUTEX_INITIALIZER("Pipes");
0036 
0037 
0038 #define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
0039 #define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
0040 #define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
0041 #define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
0042 
0043 #define PIPE_LOCK(_pipe) rtems_mutex_lock(&(_pipe)->Mutex)
0044 
0045 #define PIPE_UNLOCK(_pipe) rtems_mutex_unlock(&(_pipe)->Mutex)
0046 
0047 #define PIPE_READWAIT(_pipe)  \
0048   rtems_condition_variable_wait(&(_pipe)->readBarrier, &(_pipe)->Mutex)
0049 
0050 #define PIPE_WRITEWAIT(_pipe)  \
0051   rtems_condition_variable_wait(&(_pipe)->writeBarrier, &(_pipe)->Mutex)
0052 
0053 #define PIPE_WAKEUPREADERS(_pipe) \
0054   rtems_condition_variable_broadcast(&(_pipe)->readBarrier)
0055 
0056 #define PIPE_WAKEUPWRITERS(_pipe) \
0057   rtems_condition_variable_broadcast(&(_pipe)->writeBarrier)
0058 
0059 /*
0060  * Alloc pipe control structure, buffer, and resources.
0061  * Called with pipe_semaphore held.
0062  */
0063 static int pipe_alloc(
0064   pipe_control_t **pipep
0065 )
0066 {
0067   static char c = 'a';
0068   pipe_control_t *pipe;
0069   int err = -ENOMEM;
0070 
0071   pipe = malloc(sizeof(pipe_control_t));
0072   if (pipe == NULL)
0073     return err;
0074   memset(pipe, 0, sizeof(pipe_control_t));
0075 
0076   pipe->Size = PIPE_BUF;
0077   pipe->Buffer = malloc(pipe->Size);
0078   if (pipe->Buffer == NULL) {
0079     free(pipe);
0080     return -ENOMEM;
0081   }
0082 
0083   rtems_condition_variable_init(&pipe->readBarrier, "Pipe Read");
0084   rtems_condition_variable_init(&pipe->writeBarrier, "Pipe Write");
0085   rtems_mutex_init(&pipe->Mutex, "Pipe");
0086 
0087   *pipep = pipe;
0088   if (c ++ == 'z')
0089     c = 'a';
0090   return 0;
0091 }
0092 
0093 /* Called with pipe_semaphore held. */
0094 static inline void pipe_free(
0095   pipe_control_t *pipe
0096 )
0097 {
0098   rtems_condition_variable_destroy(&pipe->readBarrier);
0099   rtems_condition_variable_destroy(&pipe->writeBarrier);
0100   rtems_mutex_destroy(&pipe->Mutex);
0101   free(pipe->Buffer);
0102   free(pipe);
0103 }
0104 
0105 static void pipe_lock(void)
0106 {
0107   rtems_mutex_lock(&pipe_mutex);
0108 }
0109 
0110 static void pipe_unlock(void)
0111 {
0112   rtems_mutex_unlock(&pipe_mutex);
0113 }
0114 
0115 /*
0116  * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
0117  * pipe control structure and set *pipep to its address.
0118  * pipe is locked, when pipe_new returns with no error.
0119  */
0120 static int pipe_new(
0121   pipe_control_t **pipep
0122 )
0123 {
0124   pipe_control_t *pipe;
0125   int err = 0;
0126 
0127   _Assert( pipep );
0128   pipe_lock();
0129 
0130   pipe = *pipep;
0131   if (pipe == NULL) {
0132     err = pipe_alloc(&pipe);
0133     if (err) {
0134       pipe_unlock();
0135       return err;
0136     }
0137   }
0138 
0139   PIPE_LOCK(pipe);
0140 
0141   *pipep = pipe;
0142   pipe_unlock();
0143   return err;
0144 }
0145 
0146 void pipe_release(
0147   pipe_control_t **pipep,
0148   rtems_libio_t *iop
0149 )
0150 {
0151   pipe_control_t *pipe = *pipep;
0152   uint32_t mode;
0153 
0154   if (!pipe)
0155     return;
0156 
0157   pipe_lock();
0158   PIPE_LOCK(pipe);
0159 
0160   mode = LIBIO_ACCMODE(iop);
0161   if (mode & LIBIO_FLAGS_READ)
0162      pipe->Readers --;
0163   if (mode & LIBIO_FLAGS_WRITE)
0164      pipe->Writers --;
0165 
0166   PIPE_UNLOCK(pipe);
0167 
0168   if (pipe->Readers == 0 && pipe->Writers == 0) {
0169 #if 0
0170     /* To delete an anonymous pipe file when all users closed it */
0171     if (pipe->Anonymous)
0172       delfile = TRUE;
0173 #endif
0174     pipe_free(pipe);
0175     *pipep = NULL;
0176   }
0177   else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
0178     /* Notify waiting Writers that all their partners left */
0179     PIPE_WAKEUPWRITERS(pipe);
0180   else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
0181     PIPE_WAKEUPREADERS(pipe);
0182 
0183   pipe_unlock();
0184 
0185 #if 0
0186   if (! delfile)
0187     return;
0188   if (iop->pathinfo.ops->unlink_h == NULL)
0189     return;
0190 
0191   /* This is safe for IMFS, but how about other FSes? */
0192   rtems_libio_iop_flags_clear( iop, LIBIO_FLAGS_OPEN );
0193   if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
0194     return;
0195 #endif
0196 
0197 }
0198 
0199 int fifo_open(
0200   pipe_control_t **pipep,
0201   rtems_libio_t *iop
0202 )
0203 {
0204   pipe_control_t *pipe;
0205   unsigned int prevCounter;
0206   int err;
0207 
0208   err = pipe_new(pipep);
0209   if (err)
0210     return err;
0211   pipe = *pipep;
0212 
0213   switch (LIBIO_ACCMODE(iop)) {
0214     case LIBIO_FLAGS_READ:
0215       pipe->readerCounter ++;
0216       if (pipe->Readers ++ == 0)
0217         PIPE_WAKEUPWRITERS(pipe);
0218 
0219       if (pipe->Writers == 0) {
0220         /* Not an error */
0221         if (LIBIO_NODELAY(iop))
0222           break;
0223 
0224         prevCounter = pipe->writerCounter;
0225         err = -EINTR;
0226         /* Wait until a writer opens the pipe */
0227         do {
0228           PIPE_READWAIT(pipe);
0229         } while (prevCounter == pipe->writerCounter);
0230       }
0231       break;
0232 
0233     case LIBIO_FLAGS_WRITE:
0234       pipe->writerCounter ++;
0235 
0236       if (pipe->Writers ++ == 0)
0237         PIPE_WAKEUPREADERS(pipe);
0238 
0239       if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
0240     PIPE_UNLOCK(pipe);
0241         err = -ENXIO;
0242         goto out_error;
0243       }
0244 
0245       if (pipe->Readers == 0) {
0246         prevCounter = pipe->readerCounter;
0247         err = -EINTR;
0248         do {
0249           PIPE_WRITEWAIT(pipe);
0250         } while (prevCounter == pipe->readerCounter);
0251       }
0252       break;
0253 
0254     case LIBIO_FLAGS_READ_WRITE:
0255       pipe->readerCounter ++;
0256       if (pipe->Readers ++ == 0)
0257         PIPE_WAKEUPWRITERS(pipe);
0258       pipe->writerCounter ++;
0259       if (pipe->Writers ++ == 0)
0260         PIPE_WAKEUPREADERS(pipe);
0261       break;
0262   }
0263 
0264   PIPE_UNLOCK(pipe);
0265   return 0;
0266 
0267 out_error:
0268   pipe_release(pipep, iop);
0269   return err;
0270 }
0271 
0272 ssize_t pipe_read(
0273   pipe_control_t *pipe,
0274   void           *buffer,
0275   size_t          count,
0276   rtems_libio_t  *iop
0277 )
0278 {
0279   int chunk, chunk1, read = 0, ret = 0;
0280 
0281   if (!pipe)
0282     return -EPIPE;
0283 
0284   PIPE_LOCK(pipe);
0285 
0286   while (PIPE_EMPTY(pipe)) {
0287     /* Not an error */
0288     if (pipe->Writers == 0)
0289       goto out_locked;
0290 
0291     if (LIBIO_NODELAY(iop)) {
0292       ret = -EAGAIN;
0293       goto out_locked;
0294     }
0295 
0296     /* Wait until pipe is no more empty or no writer exists */
0297     pipe->waitingReaders ++;
0298     PIPE_READWAIT(pipe);
0299     pipe->waitingReaders --;
0300   }
0301 
0302   /* Read chunk bytes */
0303   chunk = MIN(count - read,  pipe->Length);
0304   chunk1 = pipe->Size - pipe->Start;
0305   if (chunk > chunk1) {
0306     memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
0307     memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
0308   }
0309   else
0310     memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
0311 
0312   pipe->Start += chunk;
0313   pipe->Start %= pipe->Size;
0314   pipe->Length -= chunk;
0315   /* For buffering optimization */
0316   if (PIPE_EMPTY(pipe))
0317     pipe->Start = 0;
0318 
0319   if (pipe->waitingWriters > 0)
0320     PIPE_WAKEUPWRITERS(pipe);
0321   read += chunk;
0322 
0323 out_locked:
0324   PIPE_UNLOCK(pipe);
0325 
0326   if (read > 0)
0327     return read;
0328   return ret;
0329 }
0330 
0331 ssize_t pipe_write(
0332   pipe_control_t *pipe,
0333   const void     *buffer,
0334   size_t          count,
0335   rtems_libio_t  *iop
0336 )
0337 {
0338   int chunk, chunk1, written = 0, ret = 0;
0339 
0340   if (!pipe)
0341     return -EPIPE;
0342 
0343   /* Write nothing */
0344   if (count == 0)
0345     return 0;
0346 
0347   PIPE_LOCK(pipe);
0348 
0349   if (pipe->Readers == 0) {
0350     ret = -EPIPE;
0351     goto out_locked;
0352   }
0353 
0354   /* Write of PIPE_BUF bytes or less shall not be interleaved */
0355   chunk = count <= pipe->Size ? count : 1;
0356 
0357   while (written < count) {
0358     while (PIPE_SPACE(pipe) < chunk) {
0359       if (LIBIO_NODELAY(iop)) {
0360         ret = -EAGAIN;
0361         goto out_locked;
0362       }
0363 
0364       /* Wait until there is chunk bytes space or no reader exists */
0365       pipe->waitingWriters ++;
0366       PIPE_WRITEWAIT(pipe);
0367       pipe->waitingWriters --;
0368 
0369       if (pipe->Readers == 0) {
0370         ret = -EPIPE;
0371         goto out_locked;
0372       }
0373     }
0374 
0375     chunk = MIN(count - written, PIPE_SPACE(pipe));
0376     chunk1 = pipe->Size - PIPE_WSTART(pipe);
0377     if (chunk > chunk1) {
0378       memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
0379       memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
0380     }
0381     else
0382       memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
0383 
0384     pipe->Length += chunk;
0385     if (pipe->waitingReaders > 0)
0386       PIPE_WAKEUPREADERS(pipe);
0387     written += chunk;
0388     /* Write of more than PIPE_BUF bytes can be interleaved */
0389     chunk = 1;
0390   }
0391 
0392 out_locked:
0393   PIPE_UNLOCK(pipe);
0394 
0395 #ifdef RTEMS_POSIX_API
0396   /* Signal SIGPIPE */
0397   if (ret == -EPIPE)
0398     kill(getpid(), SIGPIPE);
0399 #endif
0400 
0401   if (written > 0)
0402     return written;
0403   return ret;
0404 }
0405 
0406 int pipe_ioctl(
0407   pipe_control_t  *pipe,
0408   ioctl_command_t  cmd,
0409   void            *buffer,
0410   rtems_libio_t   *iop
0411 )
0412 {
0413 
0414   if (!pipe)
0415     return -EPIPE;
0416 
0417   if (cmd == FIONREAD) {
0418     if (buffer == NULL)
0419       return -EFAULT;
0420 
0421     PIPE_LOCK(pipe);
0422 
0423     /* Return length of pipe */
0424     *(unsigned int *)buffer = pipe->Length;
0425     PIPE_UNLOCK(pipe);
0426     return 0;
0427   }
0428 
0429   return -EINVAL;
0430 }