File indexing completed on 2025-05-11 08:24:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042 #include <pthread.h>
0043 #include <stdlib.h>
0044 #include <unistd.h>
0045 #include <time.h>
0046 #include <signal.h>
0047 #include <rtems/posix/aio_misc.h>
0048 #include <rtems/score/assert.h>
0049 #include <errno.h>
0050 #include <limits.h>
0051
0052
0053
0054
0055
0056
0057
0058
0059 static void *rtems_aio_handle( void *arg );
0060
0061
0062
0063
0064
0065
0066
0067 static void rtems_aio_handle_helper( rtems_aio_request *req );
0068
0069
0070
0071
0072
0073
0074 static void rtems_aio_move_to_work( rtems_aio_request_chain *r_chain );
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084 static void rtems_aio_insert_prio(
0085 rtems_chain_control *chain,
0086 rtems_aio_request *req
0087 );
0088
0089
0090
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101 static void *rtems_aio_notify_function_wrapper( void *args );
0102
0103
0104
0105
0106
0107
0108
0109
0110
0111
0112 static void rtems_aio_notify( struct sigevent *sigp );
0113
0114 rtems_aio_queue aio_request_queue;
0115
0116 int rtems_aio_init( void )
0117 {
0118 int result = 0;
0119
0120 result = pthread_attr_init( &aio_request_queue.attr );
0121 if ( result != 0 ){
0122 return -1;
0123 }
0124
0125 result = pthread_attr_setdetachstate(
0126 &aio_request_queue.attr,
0127 PTHREAD_CREATE_DETACHED
0128 );
0129 if ( result != 0 ) {
0130 pthread_attr_destroy( &aio_request_queue.attr );
0131 return -1;
0132 }
0133
0134 result = pthread_mutex_init( &aio_request_queue.mutex, NULL );
0135 if ( result != 0 ) {
0136 pthread_attr_destroy( &aio_request_queue.attr );
0137 return -1;
0138 }
0139
0140 pthread_mutex_lock( &aio_request_queue.mutex );
0141
0142 result = pthread_cond_init( &aio_request_queue.new_req, NULL );
0143 if ( result != 0 ) {
0144 pthread_mutex_unlock( &aio_request_queue.mutex );
0145 pthread_mutex_destroy( &aio_request_queue.mutex );
0146 pthread_attr_destroy( &aio_request_queue.attr );
0147 return -1;
0148 }
0149
0150 rtems_chain_initialize_empty( &aio_request_queue.work_req );
0151 rtems_chain_initialize_empty( &aio_request_queue.idle_req );
0152
0153 aio_request_queue.active_threads = 0;
0154 aio_request_queue.idle_threads = 0;
0155 atomic_init( &aio_request_queue.queued_requests, 0 );
0156 aio_request_queue.initialized = AIO_QUEUE_INITIALIZED;
0157
0158 pthread_mutex_unlock( &aio_request_queue.mutex );
0159
0160 return 0;
0161 }
0162
0163 rtems_aio_request *init_write_req( struct aiocb* aiocbp )
0164 {
0165 rtems_aio_request *req;
0166 int mode;
0167
0168 if ( aiocbp == NULL ) {
0169 errno = EINVAL;
0170 return NULL;
0171 }
0172
0173 mode = fcntl( aiocbp->aio_fildes, F_GETFL );
0174 if (
0175 ( mode&O_ACCMODE ) != O_WRONLY &&
0176 ( mode&O_ACCMODE ) != O_RDWR
0177 ) {
0178 errno = EBADF;
0179 return NULL;
0180 }
0181
0182 if ( aiocbp->aio_reqprio < 0 || aiocbp->aio_reqprio > AIO_PRIO_DELTA_MAX ) {
0183 errno = EINVAL;
0184 return NULL;
0185 }
0186
0187 if ( aiocbp->aio_offset < 0 ) {
0188 errno = EINVAL;
0189 return NULL;
0190 }
0191
0192 if ( rtems_aio_check_sigevent( &aiocbp->aio_sigevent ) == 0 ) {
0193 errno = EINVAL;
0194 return NULL;
0195 }
0196
0197 req = malloc( sizeof( rtems_aio_request ) );
0198 if ( req == NULL ) {
0199 errno = EAGAIN;
0200 return NULL;
0201 }
0202
0203 req->aiocbp = aiocbp;
0204 req->op_type = AIO_OP_WRITE;
0205 req->listcbp = NULL;
0206
0207 return req;
0208 }
0209
0210 rtems_aio_request *init_read_req( struct aiocb* aiocbp )
0211 {
0212 rtems_aio_request *req;
0213 int mode;
0214
0215 if ( aiocbp == NULL ) {
0216 errno = EINVAL;
0217 return NULL;
0218 }
0219
0220 mode = fcntl( aiocbp->aio_fildes, F_GETFL );
0221 if (
0222 ( mode&O_ACCMODE ) != O_RDONLY &&
0223 ( mode&O_ACCMODE ) != O_RDWR
0224 ) {
0225 errno = EBADF;
0226 return NULL;
0227 }
0228
0229 if ( aiocbp->aio_reqprio < 0 || aiocbp->aio_reqprio > AIO_PRIO_DELTA_MAX ) {
0230 errno = EINVAL;
0231 return NULL;
0232 }
0233
0234 if ( aiocbp->aio_offset < 0 ) {
0235 errno = EINVAL;
0236 return NULL;
0237 }
0238
0239 if ( rtems_aio_check_sigevent( &aiocbp->aio_sigevent ) == 0 ) {
0240 errno = EINVAL;
0241 return NULL;
0242 }
0243
0244 req = malloc( sizeof( rtems_aio_request ) );
0245 if ( req == NULL ) {
0246 errno = EAGAIN;
0247 return NULL;
0248 }
0249
0250 req->aiocbp = aiocbp;
0251 req->op_type = AIO_OP_READ;
0252 req->listcbp = NULL;
0253
0254 return req;
0255 }
0256
0257 void rtems_aio_completed_list_op( listcb *listcbp )
0258 {
0259 if (listcbp == NULL)
0260 return;
0261
0262 pthread_mutex_lock( &listcbp->mutex );
0263
0264 if( --listcbp->requests_left == 0 ){
0265 switch ( listcbp->notification_type ) {
0266 case AIO_LIO_NO_NOTIFY:
0267 break;
0268 case AIO_LIO_SIGEV:
0269 rtems_aio_notify( listcbp->lio_notification.sigp );
0270 break;
0271 case AIO_LIO_EVENT:
0272 rtems_event_system_send(
0273 listcbp->lio_notification.task_id,
0274 RTEMS_EVENT_SYSTEM_LIO_LIST_COMPLETED
0275 );
0276 break;
0277 }
0278 pthread_mutex_unlock( &listcbp->mutex );
0279 free( listcbp );
0280 } else {
0281 pthread_mutex_unlock( &listcbp->mutex );
0282 }
0283 }
0284
0285 rtems_aio_request_chain *rtems_aio_search_fd(
0286 rtems_chain_control *chain,
0287 int fildes,
0288 int create
0289 )
0290 {
0291 rtems_aio_request_chain *r_chain;
0292 rtems_chain_node *node;
0293
0294 node = rtems_chain_first( chain );
0295 r_chain = (rtems_aio_request_chain *) node;
0296
0297 while ( r_chain->fildes < fildes && !rtems_chain_is_tail( chain, node ) ) {
0298 node = rtems_chain_next( node );
0299 r_chain = (rtems_aio_request_chain *) node;
0300 }
0301
0302 if ( r_chain->fildes == fildes ) {
0303 r_chain->new_fd = 0;
0304 } else {
0305 if ( create == 0 ) {
0306 r_chain = NULL;
0307 } else {
0308 r_chain = calloc( 1, sizeof( rtems_aio_request_chain ) );
0309 rtems_chain_initialize_empty( &r_chain->perfd );
0310 rtems_chain_initialize_node( &r_chain->next_fd );
0311
0312 if ( rtems_chain_is_empty( chain ) )
0313 rtems_chain_prepend( chain, &r_chain->next_fd );
0314 else
0315 rtems_chain_insert( rtems_chain_previous( node ), &r_chain->next_fd );
0316
0317 r_chain->new_fd = 1;
0318 r_chain->fildes = fildes;
0319 }
0320 }
0321 return r_chain;
0322 }
0323
0324 static void rtems_aio_move_to_work( rtems_aio_request_chain *r_chain )
0325 {
0326 rtems_chain_control *work_req_chain = &aio_request_queue.work_req;
0327 rtems_aio_request_chain *temp;
0328 rtems_chain_node *node;
0329
0330 node = rtems_chain_first( work_req_chain );
0331 temp = (rtems_aio_request_chain *) node;
0332
0333 while (
0334 temp->fildes < r_chain->fildes &&
0335 !rtems_chain_is_tail( work_req_chain, node )
0336 ) {
0337 node = rtems_chain_next( node );
0338 temp = (rtems_aio_request_chain *) node;
0339 }
0340
0341 rtems_chain_insert( rtems_chain_previous( node ), &r_chain->next_fd );
0342 }
0343
0344 static void rtems_aio_insert_prio(
0345 rtems_chain_control *chain,
0346 rtems_aio_request *req
0347 )
0348 {
0349 rtems_chain_node *node;
0350
0351 AIO_printf( "FD exists \n" );
0352 node = rtems_chain_first( chain );
0353
0354 if ( rtems_chain_is_empty( chain ) ) {
0355 AIO_printf( "First in chain \n" );
0356 rtems_chain_prepend( chain, &req->next_prio );
0357 } else {
0358 AIO_printf( "Add by priority \n" );
0359 int prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
0360
0361 while (
0362 req->aiocbp->aio_reqprio > prio &&
0363 !rtems_chain_is_tail( chain, node )
0364 ) {
0365 node = rtems_chain_next( node );
0366 prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
0367 }
0368
0369 rtems_chain_insert( node->previous, &req->next_prio );
0370 }
0371 }
0372
0373 void rtems_aio_remove_fd( rtems_aio_request_chain *r_chain )
0374 {
0375 rtems_chain_control *chain;
0376 rtems_chain_node *node;
0377 chain = &r_chain->perfd;
0378 node = rtems_chain_first( chain );
0379
0380 while ( !rtems_chain_is_tail( chain, node ) ) {
0381 rtems_aio_request *req = (rtems_aio_request *) node;
0382 node = rtems_chain_next( node );
0383 rtems_chain_extract( &req->next_prio );
0384 req->aiocbp->error_code = ECANCELED;
0385 req->aiocbp->return_value = -1;
0386 atomic_fetch_sub( &aio_request_queue.queued_requests, 1 );
0387 rtems_aio_completed_list_op( req->listcbp );
0388 free( req );
0389 }
0390 }
0391
0392 int rtems_aio_remove_req( rtems_chain_control *chain, struct aiocb *aiocbp )
0393 {
0394 if ( rtems_chain_is_empty( chain ) )
0395 return AIO_ALLDONE;
0396
0397 rtems_chain_node *node = rtems_chain_first( chain );
0398 rtems_aio_request *current;
0399
0400 current = (rtems_aio_request *) node;
0401
0402 while ( !rtems_chain_is_tail( chain, node ) && current->aiocbp != aiocbp ) {
0403 node = rtems_chain_next( node );
0404 current = (rtems_aio_request *) node;
0405 }
0406
0407 if ( rtems_chain_is_tail( chain, node ) )
0408 return AIO_NOTCANCELED;
0409 else {
0410 rtems_chain_extract( node );
0411 current->aiocbp->error_code = ECANCELED;
0412 current->aiocbp->return_value = -1;
0413 rtems_aio_completed_list_op( current->listcbp );
0414 atomic_fetch_sub( &aio_request_queue.queued_requests, 1 );
0415 free( current );
0416 }
0417
0418 return AIO_CANCELED;
0419 }
0420
0421 int rtems_aio_enqueue( rtems_aio_request *req )
0422 {
0423 rtems_aio_request_chain *r_chain;
0424 rtems_chain_control *chain;
0425 pthread_t thid;
0426 int result, policy;
0427 struct sched_param param;
0428
0429
0430 AIO_assert( aio_request_queue.initialized == AIO_QUEUE_INITIALIZED );
0431
0432 result = pthread_mutex_lock( &aio_request_queue.mutex );
0433 if ( result != 0 ) {
0434 free( req );
0435 return result;
0436 }
0437
0438
0439
0440 pthread_getschedparam( pthread_self(), &policy, ¶m );
0441
0442 rtems_chain_initialize_node( &req->next_prio );
0443 req->caller_thread = pthread_self();
0444 req->priority = param.sched_priority - req->aiocbp->aio_reqprio;
0445 req->policy = policy;
0446 req->aiocbp->error_code = EINPROGRESS;
0447 req->aiocbp->return_value = 0;
0448 req->aiocbp->return_status = AIO_NOTRETURNED;
0449 atomic_fetch_add( &aio_request_queue.queued_requests, 1 );
0450
0451 if (
0452 aio_request_queue.idle_threads == 0 &&
0453 aio_request_queue.active_threads < AIO_MAX_THREADS
0454 ) {
0455
0456 chain = &aio_request_queue.work_req;
0457 r_chain = rtems_aio_search_fd( chain, req->aiocbp->aio_fildes, 1 );
0458
0459 if ( r_chain->new_fd == 1 ) {
0460 rtems_chain_prepend( &r_chain->perfd, &req->next_prio );
0461 r_chain->new_fd = 0;
0462 pthread_mutex_init( &r_chain->mutex, NULL );
0463 pthread_cond_init( &r_chain->cond, NULL );
0464
0465 AIO_printf( "New thread \n" );
0466 result = pthread_create(
0467 &thid,
0468 &aio_request_queue.attr,
0469 rtems_aio_handle,
0470 (void *) r_chain
0471 );
0472 if ( result != 0 ) {
0473 pthread_mutex_unlock( &aio_request_queue.mutex );
0474 return result;
0475 }
0476 ++aio_request_queue.active_threads;
0477 } else {
0478
0479 pthread_mutex_lock( &r_chain->mutex );
0480 rtems_aio_insert_prio( &r_chain->perfd, req );
0481 pthread_cond_signal( &r_chain->cond );
0482 pthread_mutex_unlock( &r_chain->mutex );
0483 }
0484 } else {
0485
0486
0487
0488 r_chain = rtems_aio_search_fd(
0489 &aio_request_queue.work_req,
0490 req->aiocbp->aio_fildes,
0491 0
0492 );
0493 if (r_chain != NULL) {
0494 pthread_mutex_lock( &r_chain->mutex );
0495 rtems_aio_insert_prio( &r_chain->perfd, req );
0496 pthread_cond_signal( &r_chain->cond );
0497 pthread_mutex_unlock( &r_chain->mutex );
0498 } else {
0499
0500
0501 chain = &aio_request_queue.idle_req;
0502 r_chain = rtems_aio_search_fd( chain, req->aiocbp->aio_fildes, 1 );
0503
0504 if ( r_chain->new_fd == 1 ) {
0505
0506
0507 AIO_printf( " New chain on waiting queue \n " );
0508 rtems_chain_prepend( &r_chain->perfd, &req->next_prio );
0509 r_chain->new_fd = 0;
0510 pthread_mutex_init( &r_chain->mutex, NULL );
0511 pthread_cond_init( &r_chain->cond, NULL );
0512 } else
0513
0514 rtems_aio_insert_prio( &r_chain->perfd, req );
0515
0516 if ( aio_request_queue.idle_threads > 0 )
0517 pthread_cond_signal( &aio_request_queue.new_req );
0518 }
0519 }
0520
0521 pthread_mutex_unlock( &aio_request_queue.mutex );
0522 return 0;
0523 }
0524
0525 int rtems_aio_check_sigevent( struct sigevent *sigp )
0526 {
0527 _Assert( sigp != NULL );
0528
0529 switch ( sigp->sigev_notify ) {
0530 case SIGEV_NONE:
0531 break;
0532
0533 case SIGEV_SIGNAL:
0534 if ( sigp->sigev_signo < 1 || sigp->sigev_signo > 32 ) {
0535 return 0;
0536 }
0537 break;
0538
0539 case SIGEV_THREAD:
0540 if ( sigp->sigev_notify_function == NULL ) {
0541 return 0;
0542 }
0543 break;
0544
0545 default:
0546 return 0;
0547 }
0548
0549 return 1;
0550 }
0551
0552 static void *rtems_aio_notify_function_wrapper( void *args )
0553 {
0554 struct sigevent *sig = ( struct sigevent * ) args;
0555 void (*notify_function)( union sigval ) = sig->sigev_notify_function;
0556 union sigval param = ( union sigval ) sig->sigev_value;
0557
0558 notify_function( param );
0559
0560 pthread_exit( NULL );
0561 }
0562
0563 static void rtems_aio_notify( struct sigevent *sigp )
0564 {
0565 int result;
0566
0567 _Assert( sigp != NULL );
0568
0569 switch ( sigp->sigev_notify ) {
0570 #ifdef RTEMS_POSIX_API
0571 case SIGEV_SIGNAL:
0572 result = sigqueue(
0573 getpid(),
0574 sigp->sigev_signo,
0575 sigp->sigev_value
0576 );
0577 _Assert_Unused_variable_equals( result, 0 );
0578 break;
0579 #endif
0580 case SIGEV_THREAD:
0581 pthread_t thread;
0582 pthread_attr_t attr;
0583 pthread_attr_t *attrp = sigp->sigev_notify_attributes;
0584
0585 if ( attrp == NULL ) {
0586 attrp = &attr;
0587
0588 result = pthread_attr_init( attrp );
0589 _Assert_Unused_variable_equals( result, 0 );
0590
0591 result = pthread_attr_setdetachstate(
0592 attrp,
0593 PTHREAD_CREATE_DETACHED
0594 );
0595 _Assert_Unused_variable_equals( result, 0 );
0596 }
0597
0598 result = pthread_create(
0599 &thread,
0600 attrp,
0601 rtems_aio_notify_function_wrapper,
0602 sigp
0603 );
0604 _Assert_Unused_variable_equals( result, 0 );
0605 break;
0606 }
0607 }
0608
0609 static void *rtems_aio_handle( void *arg )
0610 {
0611 rtems_aio_request_chain *r_chain = arg;
0612 rtems_aio_request *req;
0613 rtems_chain_control *chain;
0614 rtems_chain_node *node;
0615 int result, policy;
0616 struct sched_param param;
0617
0618 AIO_printf( "Thread started\n" );
0619
0620 while (1) {
0621
0622
0623
0624
0625
0626
0627 result = pthread_mutex_lock( &r_chain->mutex );
0628 if ( result != 0 )
0629 return NULL;
0630
0631 chain = &r_chain->perfd;
0632
0633
0634
0635
0636
0637 if ( !rtems_chain_is_empty( chain ) ) {
0638
0639 AIO_printf( "Get new request from not empty chain\n" );
0640 node = rtems_chain_first( chain );
0641 req = (rtems_aio_request *) node;
0642
0643
0644
0645 pthread_getschedparam(
0646 pthread_self(),
0647 &policy,
0648 ¶m
0649 );
0650 param.sched_priority = req->priority;
0651 pthread_setschedparam( pthread_self(), req->policy, ¶m );
0652
0653 rtems_chain_extract( node );
0654
0655 pthread_mutex_unlock( &r_chain->mutex );
0656
0657
0658 rtems_aio_handle_helper( req );
0659
0660
0661 atomic_fetch_sub( &aio_request_queue.queued_requests, 1 );
0662
0663
0664 rtems_aio_notify( &req->aiocbp->aio_sigevent );
0665
0666
0667 rtems_aio_completed_list_op( req->listcbp );
0668 req->listcbp = NULL;
0669
0670 } else {
0671
0672
0673
0674
0675
0676
0677
0678
0679 struct timespec timeout;
0680
0681 AIO_printf( "Chain is empty [WQ], wait for work\n" );
0682
0683 pthread_mutex_unlock( &r_chain->mutex );
0684 pthread_mutex_lock( &aio_request_queue.mutex );
0685
0686 if ( rtems_chain_is_empty( chain ) ) {
0687 clock_gettime( CLOCK_REALTIME, &timeout );
0688 timeout.tv_sec += 3;
0689 timeout.tv_nsec = 0;
0690 result = pthread_cond_timedwait(
0691 &r_chain->cond,
0692 &aio_request_queue.mutex,
0693 &timeout
0694 );
0695
0696
0697
0698 if ( result == ETIMEDOUT ) {
0699 rtems_chain_extract( &r_chain->next_fd );
0700 pthread_mutex_destroy( &r_chain->mutex );
0701 pthread_cond_destroy( &r_chain->cond );
0702 free( r_chain );
0703
0704
0705
0706 if ( rtems_chain_is_empty( &aio_request_queue.idle_req ) ) {
0707 AIO_printf( "Chain is empty [IQ], wait for work\n" );
0708
0709 ++aio_request_queue.idle_threads;
0710 --aio_request_queue.active_threads;
0711 clock_gettime( CLOCK_REALTIME, &timeout );
0712 timeout.tv_sec += 3;
0713 timeout.tv_nsec = 0;
0714
0715
0716
0717
0718
0719
0720
0721
0722
0723
0724
0725
0726
0727
0728
0729
0730
0731
0732
0733
0734
0735
0736
0737
0738 result = pthread_cond_timedwait(
0739 &aio_request_queue.new_req,
0740 &aio_request_queue.mutex,
0741 &timeout
0742 );
0743
0744
0745
0746 if ( result == ETIMEDOUT ) {
0747 AIO_printf( "Etimeout\n" );
0748 --aio_request_queue.idle_threads;
0749 pthread_mutex_unlock( &aio_request_queue.mutex );
0750 return NULL;
0751 }
0752 }
0753
0754
0755 AIO_printf( "Work on idle\n" );
0756 --aio_request_queue.idle_threads;
0757 ++aio_request_queue.active_threads;
0758
0759 node = rtems_chain_first( &aio_request_queue.idle_req );
0760 rtems_chain_extract( node );
0761
0762 r_chain = (rtems_aio_request_chain *) node;
0763 rtems_aio_move_to_work( r_chain );
0764
0765 }
0766 }
0767
0768
0769 pthread_mutex_unlock( &aio_request_queue.mutex );
0770
0771 }
0772 }
0773
0774 AIO_printf( "Thread finished\n" );
0775 return NULL;
0776 }
0777
0778 static void rtems_aio_handle_helper( rtems_aio_request *req )
0779 {
0780 int result;
0781
0782 switch ( req->op_type ) {
0783 case AIO_OP_READ:
0784 AIO_printf( "read\n" );
0785 result = pread(
0786 req->aiocbp->aio_fildes,
0787 (void *) req->aiocbp->aio_buf,
0788 req->aiocbp->aio_nbytes, req->aiocbp->aio_offset
0789 );
0790 break;
0791
0792 case AIO_OP_WRITE:
0793 AIO_printf( "write\n" );
0794 result = pwrite(
0795 req->aiocbp->aio_fildes,
0796 (void *) req->aiocbp->aio_buf,
0797 req->aiocbp->aio_nbytes, req->aiocbp->aio_offset
0798 );
0799 break;
0800
0801 case AIO_OP_SYNC:
0802 AIO_printf( "sync\n" );
0803 result = fsync( req->aiocbp->aio_fildes );
0804 break;
0805
0806 case AIO_OP_DSYNC:
0807 AIO_printf( "data sync\n" );
0808 result = fdatasync( req->aiocbp->aio_fildes );
0809 break;
0810
0811 default:
0812 result = -1;
0813 }
0814
0815 if ( result < 0 ) {
0816 req->aiocbp->return_value = -1;
0817 req->aiocbp->error_code = errno;
0818 } else {
0819 req->aiocbp->return_value = result;
0820 req->aiocbp->error_code = 0;
0821 }
0822 }
0823