Back to home page

LXR

 
 

    


File indexing completed on 2025-05-11 08:24:21

0001 /* SPDX-License-Identifier: BSD-2-Clause */
0002 
0003 /**
0004  * @file
0005  *
0006  * @ingroup POSIX_AIO
0007  *
0008  * @brief Private implementation for Asynchronous I/O.
0009  *
0010  * This file contains the implementation of private methods used for the processing of Asynchronous I/O requests.
0011  */
0012 
0013 /*
0014  *  Copyright 2010-2011, Alin Rus <alin.codejunkie@gmail.com>
0015  *  Copyright 2024, Alessandro Nardin <ale.daluch@gmail.com>
0016  * 
0017  *  COPYRIGHT (c) 1989-2011.
0018  *  On-Line Applications Research Corporation (OAR).
0019  *
0020  * Redistribution and use in source and binary forms, with or without
0021  * modification, are permitted provided that the following conditions
0022  * are met:
0023  * 1. Redistributions of source code must retain the above copyright
0024  *    notice, this list of conditions and the following disclaimer.
0025  * 2. Redistributions in binary form must reproduce the above copyright
0026  *    notice, this list of conditions and the following disclaimer in the
0027  *    documentation and/or other materials provided with the distribution.
0028  *
0029  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
0030  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
0031  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
0032  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
0033  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
0034  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
0035  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
0036  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
0037  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
0038  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
0039  * POSSIBILITY OF SUCH DAMAGE.
0040  */
0041 
0042 #include <pthread.h>
0043 #include <stdlib.h>
0044 #include <unistd.h>
0045 #include <time.h>
0046 #include <signal.h> 
0047 #include <rtems/posix/aio_misc.h>
0048 #include <rtems/score/assert.h>
0049 #include <errno.h>
0050 #include <limits.h>
0051 
0052 /**
0053  * @brief Thread processing AIO requests.
0054  * 
0055  * @param[in,out] arg A pointer to the chain for the FD to be worked on.
0056  * 
0057  * @retval NULL if an error occurs
0058  */
0059 static void *rtems_aio_handle( void *arg );
0060 
0061 /**
0062  * @brief Helper function por request processing
0063  * 
0064  * @param[in,out] req A pointer to a single request.
0065  *                    It will store the results of the request.
0066  */
0067 static void rtems_aio_handle_helper( rtems_aio_request *req );
0068 
0069 /**
0070  * @brief Move chain of requests from IQ to WQ
0071  * 
0072  * @param[in,out] r_chain the chain of requests to move in WQ
0073  */
0074 static void rtems_aio_move_to_work( rtems_aio_request_chain *r_chain );
0075 
0076 /**
0077  * @brief Add request to given FD chain.
0078  *
0079  * Inserts the request in a the fd chain, which is ordered by priority.
0080  * 
0081  * @param[in,out] chain A pointer to the chain of requests for a given FD.
0082  * @param[in,out] req   A pointer to a request (see aio_misc.h).
0083  */
0084 static void rtems_aio_insert_prio(
0085   rtems_chain_control *chain,
0086   rtems_aio_request *req 
0087 );
0088 
0089 /**
0090  * @brief Wrapper for pthread_create() call.
0091  *
0092  * This function serves as a wrapper with the appropriate signature for a call 
0093  * to pthread_create(). It receives a pointer to a sigevent structure that 
0094  * contains a pointer to the function to be called and the parameters to be 
0095  * passed to it.
0096  * 
0097  * @param args Pointer to the sigevent struct containing a pointer to the 
0098  *             function and its parameters.
0099  * @return void*
0100  */
0101 static void *rtems_aio_notify_function_wrapper( void *args );
0102 
0103 /**
0104  * @brief Generates a notification.
0105  *
0106  * The signal is generated using a sigevent struct, as defined from the 
0107  * POSIX specifications.
0108  * 
0109  * @param sigp is a pointer to the sigevent struct that will be used 
0110  *             to generate the signal
0111  */
0112 static void rtems_aio_notify( struct sigevent *sigp );
0113 
0114 rtems_aio_queue aio_request_queue;
0115 
0116 int rtems_aio_init( void )
0117 {
0118   int result = 0;
0119 
0120   result = pthread_attr_init( &aio_request_queue.attr );
0121   if ( result != 0 ){
0122     return -1;
0123   }
0124 
0125   result = pthread_attr_setdetachstate(
0126     &aio_request_queue.attr,
0127     PTHREAD_CREATE_DETACHED
0128   );
0129   if ( result != 0 ) {
0130     pthread_attr_destroy( &aio_request_queue.attr );
0131     return -1;
0132   }
0133 
0134   result = pthread_mutex_init( &aio_request_queue.mutex, NULL );
0135   if ( result != 0 ) {
0136     pthread_attr_destroy( &aio_request_queue.attr );
0137     return -1;
0138   }
0139 
0140   pthread_mutex_lock( &aio_request_queue.mutex );
0141 
0142   result = pthread_cond_init( &aio_request_queue.new_req, NULL );
0143   if ( result != 0 ) {
0144     pthread_mutex_unlock( &aio_request_queue.mutex );
0145     pthread_mutex_destroy( &aio_request_queue.mutex );
0146     pthread_attr_destroy( &aio_request_queue.attr );
0147     return -1;
0148   }
0149 
0150   rtems_chain_initialize_empty( &aio_request_queue.work_req );
0151   rtems_chain_initialize_empty( &aio_request_queue.idle_req );
0152 
0153   aio_request_queue.active_threads = 0;
0154   aio_request_queue.idle_threads = 0;
0155   atomic_init( &aio_request_queue.queued_requests, 0 );
0156   aio_request_queue.initialized = AIO_QUEUE_INITIALIZED;
0157 
0158   pthread_mutex_unlock( &aio_request_queue.mutex );
0159 
0160   return 0;
0161 }
0162 
0163 rtems_aio_request *init_write_req( struct aiocb* aiocbp )
0164 {
0165   rtems_aio_request *req;
0166   int mode;
0167 
0168   if ( aiocbp == NULL ) {
0169     errno = EINVAL;
0170     return NULL;
0171   }
0172   
0173   mode = fcntl( aiocbp->aio_fildes, F_GETFL );
0174   if (
0175     ( mode&O_ACCMODE ) != O_WRONLY &&
0176     ( mode&O_ACCMODE ) != O_RDWR 
0177   ) {
0178     errno = EBADF;
0179     return NULL;
0180   }
0181 
0182   if ( aiocbp->aio_reqprio < 0 || aiocbp->aio_reqprio > AIO_PRIO_DELTA_MAX ) {
0183     errno = EINVAL;
0184     return NULL;
0185   }
0186 
0187   if ( aiocbp->aio_offset < 0 ) {
0188     errno = EINVAL;
0189     return NULL;
0190   }
0191 
0192   if ( rtems_aio_check_sigevent( &aiocbp->aio_sigevent ) == 0 ) {
0193     errno = EINVAL;
0194     return NULL;
0195   }
0196 
0197   req = malloc( sizeof( rtems_aio_request ) );
0198   if ( req == NULL ) {
0199     errno = EAGAIN;
0200     return NULL;
0201   }
0202 
0203   req->aiocbp = aiocbp;
0204   req->op_type = AIO_OP_WRITE;
0205   req->listcbp = NULL;
0206 
0207   return req;
0208 }
0209 
0210 rtems_aio_request *init_read_req( struct aiocb* aiocbp )
0211 {
0212   rtems_aio_request *req;
0213   int mode;
0214 
0215   if ( aiocbp == NULL ) {
0216     errno = EINVAL;
0217     return NULL;
0218   }
0219 
0220   mode = fcntl( aiocbp->aio_fildes, F_GETFL );
0221   if (
0222       ( mode&O_ACCMODE ) != O_RDONLY &&
0223       ( mode&O_ACCMODE ) != O_RDWR
0224   ) {
0225     errno = EBADF;
0226     return NULL;
0227   }
0228 
0229   if ( aiocbp->aio_reqprio < 0 || aiocbp->aio_reqprio > AIO_PRIO_DELTA_MAX ) {
0230     errno = EINVAL;
0231     return NULL;
0232   }
0233 
0234   if ( aiocbp->aio_offset < 0 ) {
0235     errno = EINVAL;
0236     return NULL;
0237   }
0238 
0239   if ( rtems_aio_check_sigevent( &aiocbp->aio_sigevent ) == 0 ) {
0240     errno = EINVAL;
0241     return NULL;
0242   }
0243 
0244   req = malloc( sizeof( rtems_aio_request ) );
0245   if ( req == NULL ) {
0246     errno = EAGAIN;
0247     return NULL;
0248   }
0249 
0250   req->aiocbp = aiocbp;
0251   req->op_type = AIO_OP_READ;
0252   req->listcbp = NULL;
0253 
0254   return req;
0255 }
0256 
0257 void rtems_aio_completed_list_op( listcb *listcbp )
0258 {
0259   if (listcbp == NULL)
0260     return;
0261 
0262   pthread_mutex_lock( &listcbp->mutex );
0263 
0264   if( --listcbp->requests_left == 0 ){
0265     switch ( listcbp->notification_type ) {
0266       case AIO_LIO_NO_NOTIFY:
0267         break;
0268       case AIO_LIO_SIGEV:
0269         rtems_aio_notify( listcbp->lio_notification.sigp );
0270         break;
0271       case AIO_LIO_EVENT:
0272         rtems_event_system_send(
0273           listcbp->lio_notification.task_id,
0274           RTEMS_EVENT_SYSTEM_LIO_LIST_COMPLETED
0275         );
0276         break;
0277     }
0278     pthread_mutex_unlock( &listcbp->mutex );
0279     free( listcbp );
0280   } else {
0281     pthread_mutex_unlock( &listcbp->mutex );
0282   }
0283 }
0284 
0285 rtems_aio_request_chain *rtems_aio_search_fd(
0286   rtems_chain_control *chain,
0287   int fildes,
0288   int create
0289 )
0290 {
0291   rtems_aio_request_chain *r_chain;
0292   rtems_chain_node *node;
0293 
0294   node = rtems_chain_first( chain );
0295   r_chain = (rtems_aio_request_chain *) node;
0296 
0297   while ( r_chain->fildes < fildes && !rtems_chain_is_tail( chain, node ) ) {
0298     node = rtems_chain_next( node );
0299     r_chain = (rtems_aio_request_chain *) node;
0300   }
0301 
0302   if ( r_chain->fildes == fildes ) {
0303     r_chain->new_fd = 0;
0304   } else {
0305     if ( create == 0 ) {
0306       r_chain = NULL;
0307     } else {
0308       r_chain = calloc( 1, sizeof( rtems_aio_request_chain ) );
0309       rtems_chain_initialize_empty( &r_chain->perfd );
0310       rtems_chain_initialize_node( &r_chain->next_fd );
0311 
0312       if ( rtems_chain_is_empty( chain ) )
0313         rtems_chain_prepend( chain, &r_chain->next_fd );
0314       else
0315         rtems_chain_insert( rtems_chain_previous( node ), &r_chain->next_fd );
0316 
0317       r_chain->new_fd = 1;
0318       r_chain->fildes = fildes;
0319     }
0320   }
0321   return r_chain;
0322 }
0323 
0324 static void rtems_aio_move_to_work( rtems_aio_request_chain *r_chain )
0325 {
0326   rtems_chain_control *work_req_chain = &aio_request_queue.work_req;
0327   rtems_aio_request_chain *temp;
0328   rtems_chain_node *node;
0329 
0330   node = rtems_chain_first( work_req_chain );
0331   temp = (rtems_aio_request_chain *) node;
0332 
0333   while (
0334     temp->fildes < r_chain->fildes &&
0335     !rtems_chain_is_tail( work_req_chain, node )
0336   ) {
0337     node = rtems_chain_next( node );
0338     temp = (rtems_aio_request_chain *) node;
0339   }
0340 
0341   rtems_chain_insert( rtems_chain_previous( node ), &r_chain->next_fd );
0342 }
0343 
0344 static void rtems_aio_insert_prio(
0345   rtems_chain_control *chain,
0346   rtems_aio_request *req
0347 )
0348 {
0349   rtems_chain_node *node;
0350 
0351   AIO_printf( "FD exists \n" );
0352   node = rtems_chain_first( chain );
0353 
0354   if ( rtems_chain_is_empty( chain ) ) {
0355     AIO_printf( "First in chain \n" );
0356     rtems_chain_prepend( chain, &req->next_prio );
0357   } else {
0358     AIO_printf( "Add by priority \n" );
0359     int prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
0360 
0361     while (
0362       req->aiocbp->aio_reqprio > prio &&
0363       !rtems_chain_is_tail( chain, node )
0364     ) {
0365       node = rtems_chain_next( node );
0366       prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
0367     }
0368 
0369     rtems_chain_insert( node->previous, &req->next_prio );
0370   }
0371 }
0372 
0373 void rtems_aio_remove_fd( rtems_aio_request_chain *r_chain )
0374 {
0375   rtems_chain_control *chain;
0376   rtems_chain_node *node;
0377   chain = &r_chain->perfd;
0378   node = rtems_chain_first( chain );
0379   
0380   while ( !rtems_chain_is_tail( chain, node ) ) {
0381     rtems_aio_request *req = (rtems_aio_request *) node;
0382     node = rtems_chain_next( node );
0383     rtems_chain_extract( &req->next_prio );
0384     req->aiocbp->error_code = ECANCELED;
0385     req->aiocbp->return_value = -1;
0386     atomic_fetch_sub( &aio_request_queue.queued_requests, 1 );
0387     rtems_aio_completed_list_op( req->listcbp );
0388     free( req );
0389   }
0390 }
0391 
0392 int rtems_aio_remove_req( rtems_chain_control *chain, struct aiocb *aiocbp )
0393 {
0394   if ( rtems_chain_is_empty( chain ) )
0395     return AIO_ALLDONE;
0396 
0397   rtems_chain_node *node = rtems_chain_first( chain );
0398   rtems_aio_request *current;
0399 
0400   current = (rtems_aio_request *) node;
0401 
0402   while ( !rtems_chain_is_tail( chain, node ) && current->aiocbp != aiocbp ) {
0403     node = rtems_chain_next( node );
0404     current = (rtems_aio_request *) node;
0405   }
0406 
0407   if ( rtems_chain_is_tail( chain, node ) )
0408     return AIO_NOTCANCELED;
0409   else {
0410     rtems_chain_extract( node );
0411     current->aiocbp->error_code = ECANCELED;
0412     current->aiocbp->return_value = -1;
0413     rtems_aio_completed_list_op( current->listcbp );
0414     atomic_fetch_sub( &aio_request_queue.queued_requests, 1 );
0415     free( current );
0416   }
0417     
0418   return AIO_CANCELED;
0419 }
0420 
0421 int rtems_aio_enqueue( rtems_aio_request *req )
0422 {
0423   rtems_aio_request_chain *r_chain;
0424   rtems_chain_control *chain;
0425   pthread_t thid;
0426   int result, policy;
0427   struct sched_param param;
0428 
0429   /* The queue should be initialized */
0430   AIO_assert( aio_request_queue.initialized == AIO_QUEUE_INITIALIZED );
0431 
0432   result = pthread_mutex_lock( &aio_request_queue.mutex );
0433   if ( result != 0 ) {
0434     free( req );
0435     return result;
0436   }
0437 
0438   /* _POSIX_PRIORITIZED_IO and _POSIX_PRIORITY_SCHEDULING are defined,
0439      we can use aio_reqprio to lower the priority of the request */
0440   pthread_getschedparam( pthread_self(), &policy, &param );
0441 
0442   rtems_chain_initialize_node( &req->next_prio );
0443   req->caller_thread = pthread_self();
0444   req->priority = param.sched_priority - req->aiocbp->aio_reqprio;
0445   req->policy = policy;
0446   req->aiocbp->error_code = EINPROGRESS;
0447   req->aiocbp->return_value = 0;
0448   req->aiocbp->return_status = AIO_NOTRETURNED;
0449   atomic_fetch_add( &aio_request_queue.queued_requests, 1 );
0450 
0451   if (
0452     aio_request_queue.idle_threads == 0 &&
0453     aio_request_queue.active_threads < AIO_MAX_THREADS
0454   ) {
0455     /* we still have empty places on the active_threads chain */
0456     chain = &aio_request_queue.work_req;
0457     r_chain = rtems_aio_search_fd( chain, req->aiocbp->aio_fildes, 1 );
0458 
0459     if ( r_chain->new_fd == 1 ) {
0460       rtems_chain_prepend( &r_chain->perfd, &req->next_prio );
0461       r_chain->new_fd = 0;
0462       pthread_mutex_init( &r_chain->mutex, NULL );
0463       pthread_cond_init( &r_chain->cond, NULL );
0464 
0465       AIO_printf( "New thread \n" );
0466       result = pthread_create(
0467         &thid,
0468         &aio_request_queue.attr,
0469         rtems_aio_handle,
0470         (void *) r_chain
0471       );
0472       if ( result != 0 ) {
0473         pthread_mutex_unlock( &aio_request_queue.mutex );
0474         return result;
0475       }
0476       ++aio_request_queue.active_threads;
0477     } else {
0478       /* put request in the fd chain it belongs to */
0479       pthread_mutex_lock( &r_chain->mutex );
0480       rtems_aio_insert_prio( &r_chain->perfd, req );
0481       pthread_cond_signal( &r_chain->cond );
0482       pthread_mutex_unlock( &r_chain->mutex );
0483     }
0484   } else {
0485     /* The maximum number of threads has been already created
0486        even though some of them might be idle. The request
0487        belongs to one of the active fd chain */
0488     r_chain = rtems_aio_search_fd(
0489       &aio_request_queue.work_req,
0490       req->aiocbp->aio_fildes,
0491       0
0492     );
0493     if (r_chain != NULL) {
0494       pthread_mutex_lock( &r_chain->mutex );
0495       rtems_aio_insert_prio( &r_chain->perfd, req );
0496       pthread_cond_signal( &r_chain->cond );
0497       pthread_mutex_unlock( &r_chain->mutex );
0498     } else {
0499 
0500       /* or to the idle chain */
0501       chain = &aio_request_queue.idle_req;
0502       r_chain = rtems_aio_search_fd( chain, req->aiocbp->aio_fildes, 1 );
0503 
0504       if ( r_chain->new_fd == 1 ) {
0505         /* If this is a new fd chain we signal the idle threads that
0506           might be waiting for requests */
0507         AIO_printf( " New chain on waiting queue \n " );
0508         rtems_chain_prepend( &r_chain->perfd, &req->next_prio );
0509         r_chain->new_fd = 0;
0510         pthread_mutex_init( &r_chain->mutex, NULL );
0511         pthread_cond_init( &r_chain->cond, NULL );
0512       } else
0513         /* just insert the request in the existing fd chain */
0514         rtems_aio_insert_prio( &r_chain->perfd, req );
0515 
0516       if ( aio_request_queue.idle_threads > 0 )
0517         pthread_cond_signal( &aio_request_queue.new_req );
0518     }
0519   }
0520 
0521   pthread_mutex_unlock( &aio_request_queue.mutex );
0522   return 0;
0523 }
0524 
0525 int rtems_aio_check_sigevent( struct sigevent *sigp )
0526 {
0527   _Assert( sigp != NULL );
0528 
0529   switch ( sigp->sigev_notify ) {
0530     case SIGEV_NONE:
0531       break;
0532 
0533     case SIGEV_SIGNAL:
0534       if ( sigp->sigev_signo < 1 || sigp->sigev_signo > 32 ) {
0535         return 0;
0536       }
0537       break;
0538 
0539     case SIGEV_THREAD:
0540       if ( sigp->sigev_notify_function == NULL ) {
0541         return 0;
0542       }
0543       break;
0544 
0545     default:
0546       return 0;
0547   }
0548 
0549   return 1;
0550 }
0551 
0552 static void *rtems_aio_notify_function_wrapper( void *args )
0553 {
0554   struct sigevent *sig = ( struct sigevent * ) args;
0555   void (*notify_function)( union sigval ) = sig->sigev_notify_function;  
0556   union sigval param = ( union sigval ) sig->sigev_value;
0557 
0558   notify_function( param );
0559 
0560   pthread_exit( NULL );
0561 }
0562 
0563 static void rtems_aio_notify( struct sigevent *sigp ) 
0564 {
0565   int result;
0566 
0567   _Assert( sigp != NULL );
0568 
0569   switch ( sigp->sigev_notify ) {
0570 #ifdef RTEMS_POSIX_API
0571     case SIGEV_SIGNAL:
0572       result = sigqueue(
0573         getpid(),
0574         sigp->sigev_signo,
0575         sigp->sigev_value
0576       );
0577       _Assert_Unused_variable_equals( result, 0 );
0578       break;
0579 #endif
0580     case SIGEV_THREAD:
0581       pthread_t thread;
0582       pthread_attr_t attr;
0583       pthread_attr_t *attrp = sigp->sigev_notify_attributes;
0584       
0585       if ( attrp == NULL ) {
0586         attrp = &attr;
0587 
0588         result = pthread_attr_init( attrp );
0589         _Assert_Unused_variable_equals( result, 0 );
0590 
0591         result = pthread_attr_setdetachstate(
0592           attrp, 
0593           PTHREAD_CREATE_DETACHED
0594         );
0595         _Assert_Unused_variable_equals( result, 0 );
0596       }
0597 
0598       result = pthread_create( 
0599         &thread,
0600         attrp,
0601         rtems_aio_notify_function_wrapper,
0602         sigp
0603       );
0604       _Assert_Unused_variable_equals( result, 0 );
0605       break;
0606   }
0607 }
0608 
0609 static void *rtems_aio_handle( void *arg )
0610 {
0611   rtems_aio_request_chain *r_chain = arg;
0612   rtems_aio_request *req;
0613   rtems_chain_control *chain;
0614   rtems_chain_node *node;
0615   int result, policy;
0616   struct sched_param param;
0617 
0618   AIO_printf( "Thread started\n" );
0619 
0620   while (1) {
0621 
0622     /* acquire the mutex of the current fd chain.
0623        we don't need to lock the queue mutex since we can
0624        add requests to idle fd chains or even active ones
0625        if the working request has been extracted from the
0626        chain */
0627     result = pthread_mutex_lock( &r_chain->mutex );
0628     if ( result != 0 )
0629       return NULL;
0630 
0631     chain = &r_chain->perfd;
0632 
0633     /* If the locked chain is not empty, take the first
0634        request extract it, unlock the chain and process
0635        the request, in this way the user can supply more
0636        requests to this fd chain */
0637     if ( !rtems_chain_is_empty( chain ) ) {
0638 
0639       AIO_printf( "Get new request from not empty chain\n" );   
0640       node = rtems_chain_first( chain );
0641       req = (rtems_aio_request *) node;
0642 
0643       /* See _POSIX_PRIORITIZE_IO and _POSIX_PRIORITY_SCHEDULING
0644          discussion in rtems_aio_enqueue() */
0645       pthread_getschedparam(
0646         pthread_self(),
0647         &policy,
0648         &param
0649       );
0650       param.sched_priority = req->priority;
0651       pthread_setschedparam( pthread_self(), req->policy, &param );
0652 
0653       rtems_chain_extract( node );
0654 
0655       pthread_mutex_unlock( &r_chain->mutex );
0656 
0657       /* perform the requested operation*/
0658       rtems_aio_handle_helper( req );
0659 
0660       /* update queued_requests */
0661       atomic_fetch_sub( &aio_request_queue.queued_requests, 1 );
0662 
0663       /* notification for request completion */
0664       rtems_aio_notify( &req->aiocbp->aio_sigevent );
0665 
0666       /* notification for list completion */
0667       rtems_aio_completed_list_op( req->listcbp );
0668       req->listcbp = NULL;
0669 
0670     } else {
0671       /* If the fd chain is empty we unlock the fd chain and we lock
0672          the queue chain, this will ensure that we have at most
0673          one request coming to our fd chain when we check. 
0674 
0675          If there was no request added sleep for 3 seconds and wait
0676          for a signal on chain, this will unlock the queue.
0677          The fd chain is already unlocked */
0678 
0679       struct timespec timeout;
0680 
0681       AIO_printf( "Chain is empty [WQ], wait for work\n" );
0682 
0683       pthread_mutex_unlock( &r_chain->mutex );
0684       pthread_mutex_lock( &aio_request_queue.mutex );
0685 
0686       if ( rtems_chain_is_empty( chain ) ) {
0687         clock_gettime( CLOCK_REALTIME, &timeout );
0688         timeout.tv_sec += 3;
0689         timeout.tv_nsec = 0;
0690         result = pthread_cond_timedwait(
0691           &r_chain->cond,
0692           &aio_request_queue.mutex,
0693           &timeout
0694         );
0695 
0696         /* If no requests were added to the chain we delete the fd chain from
0697           the queue and start working with idle fd chains */
0698         if ( result == ETIMEDOUT ) {
0699           rtems_chain_extract( &r_chain->next_fd );
0700           pthread_mutex_destroy( &r_chain->mutex );
0701           pthread_cond_destroy( &r_chain->cond );
0702           free( r_chain );
0703 
0704           /* If the idle chain is empty sleep for 3 seconds and wait for a
0705             signal. The thread now becomes idle. */
0706           if ( rtems_chain_is_empty( &aio_request_queue.idle_req ) ) {
0707             AIO_printf( "Chain is empty [IQ], wait for work\n" );
0708 
0709             ++aio_request_queue.idle_threads;
0710             --aio_request_queue.active_threads;
0711             clock_gettime( CLOCK_REALTIME, &timeout );
0712             timeout.tv_sec += 3;
0713             timeout.tv_nsec = 0;
0714 
0715             /* Coverity detected an error, but after review,
0716               it was deemed a false positive.
0717 
0718               The concern was whether there would be an issue
0719               if the thread was interrupted while waiting.
0720               According to the POSIX specification of
0721               pthread_cond_timedwait:
0722 
0723               "If a signal is delivered to a thread waiting
0724               on a condition variable, the thread either resumes
0725               waiting as if not interrupted or returns zero due
0726               to spurious wakeup."
0727 
0728               Both scenarios are safe:
0729 
0730               - If pthread_cond_timedwait resumes waiting,
0731                 there are no issues.
0732               - If it returns zero, the thread continues
0733                 as if an element was added. Since no element
0734                 is added, in the next iteration of the loop
0735                 handling requests, the queue stays empty, and
0736                 the thread waits again. */
0737                 
0738             result = pthread_cond_timedwait(
0739               &aio_request_queue.new_req,
0740               &aio_request_queue.mutex,
0741               &timeout
0742             );
0743 
0744             /* If no new fd chain was added in the idle requests
0745                then this thread is finished */
0746             if ( result == ETIMEDOUT ) {
0747               AIO_printf( "Etimeout\n" );
0748               --aio_request_queue.idle_threads;
0749               pthread_mutex_unlock( &aio_request_queue.mutex );
0750               return NULL;
0751             }
0752           }
0753           /* Otherwise move this chain to the working chain and
0754              start the loop all over again */
0755           AIO_printf( "Work on idle\n" );
0756           --aio_request_queue.idle_threads;
0757           ++aio_request_queue.active_threads;
0758 
0759           node = rtems_chain_first( &aio_request_queue.idle_req );
0760           rtems_chain_extract( node );
0761 
0762           r_chain = (rtems_aio_request_chain *) node;
0763           rtems_aio_move_to_work( r_chain );
0764 
0765         }
0766       }
0767       /* If there was a request added in the initial fd chain then release
0768          the mutex and process it */
0769       pthread_mutex_unlock( &aio_request_queue.mutex );
0770 
0771     }
0772   }
0773 
0774   AIO_printf( "Thread finished\n" );
0775   return NULL;
0776 }
0777 
0778 static void rtems_aio_handle_helper( rtems_aio_request *req )
0779 {
0780   int result;
0781 
0782   switch ( req->op_type ) {
0783     case AIO_OP_READ:
0784       AIO_printf( "read\n" );
0785       result = pread(
0786         req->aiocbp->aio_fildes,
0787         (void *) req->aiocbp->aio_buf,
0788         req->aiocbp->aio_nbytes, req->aiocbp->aio_offset
0789       );
0790       break;
0791 
0792     case AIO_OP_WRITE:
0793       AIO_printf( "write\n" );
0794       result = pwrite(
0795         req->aiocbp->aio_fildes,
0796         (void *) req->aiocbp->aio_buf,
0797         req->aiocbp->aio_nbytes, req->aiocbp->aio_offset
0798       );
0799       break;
0800 
0801     case AIO_OP_SYNC:
0802       AIO_printf( "sync\n" );
0803       result = fsync( req->aiocbp->aio_fildes );
0804       break;
0805     
0806     case AIO_OP_DSYNC:
0807       AIO_printf( "data sync\n" );
0808       result = fdatasync( req->aiocbp->aio_fildes );
0809       break;
0810 
0811     default:
0812       result = -1;
0813   }
0814 
0815   if ( result < 0 ) {
0816     req->aiocbp->return_value = -1;
0817     req->aiocbp->error_code = errno;
0818   } else {
0819     req->aiocbp->return_value = result;
0820     req->aiocbp->error_code = 0;
0821   }
0822 }
0823