1/* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12#include <asm/types.h> 13#include <asm/atomic.h> 14 15#include <linux/blkdev.h> 16#include <linux/fs.h> 17#include <linux/init.h> 18#include <linux/list.h> 19#include <linux/mempool.h> 20#include <linux/module.h> 21#include <linux/pagemap.h> 22#include <linux/slab.h> 23#include <linux/vmalloc.h> 24#include <linux/workqueue.h> 25#include <linux/mutex.h> 26 27#include "kcopyd.h" 28 29static struct workqueue_struct *_kcopyd_wq; 30static struct work_struct _kcopyd_work; 31 32static inline void wake(void) 33{ 34 queue_work(_kcopyd_wq, &_kcopyd_work); 35} 36 37/*----------------------------------------------------------------- 38 * Each kcopyd client has its own little pool of preallocated 39 * pages for kcopyd io. 40 *---------------------------------------------------------------*/ 41struct kcopyd_client { 42 struct list_head list; 43 44 spinlock_t lock; 45 struct page_list *pages; 46 unsigned int nr_pages; 47 unsigned int nr_free_pages; 48 49 struct dm_io_client *io_client; 50 51 wait_queue_head_t destroyq; 52 atomic_t nr_jobs; 53}; 54 55static struct page_list *alloc_pl(void) 56{ 57 struct page_list *pl; 58 59 pl = kmalloc(sizeof(*pl), GFP_KERNEL); 60 if (!pl) 61 return NULL; 62 63 pl->page = alloc_page(GFP_KERNEL); 64 if (!pl->page) { 65 kfree(pl); 66 return NULL; 67 } 68 69 return pl; 70} 71 72static void free_pl(struct page_list *pl) 73{ 74 __free_page(pl->page); 75 kfree(pl); 76} 77 78static int kcopyd_get_pages(struct kcopyd_client *kc, 79 unsigned int nr, struct page_list **pages) 80{ 81 struct page_list *pl; 82 83 spin_lock(&kc->lock); 84 if (kc->nr_free_pages < nr) { 85 spin_unlock(&kc->lock); 86 return -ENOMEM; 87 } 88 89 kc->nr_free_pages -= nr; 90 for (*pages = pl = kc->pages; --nr; pl = pl->next) 91 ; 92 93 kc->pages = pl->next; 94 pl->next = NULL; 95 96 spin_unlock(&kc->lock); 97 98 return 0; 99} 100 101static void kcopyd_put_pages(struct kcopyd_client *kc, struct page_list *pl) 102{ 103 struct page_list *cursor; 104 105 spin_lock(&kc->lock); 106 for (cursor = pl; cursor->next; cursor = cursor->next) 107 kc->nr_free_pages++; 108 109 kc->nr_free_pages++; 110 cursor->next = kc->pages; 111 kc->pages = pl; 112 spin_unlock(&kc->lock); 113} 114 115/* 116 * These three functions resize the page pool. 117 */ 118static void drop_pages(struct page_list *pl) 119{ 120 struct page_list *next; 121 122 while (pl) { 123 next = pl->next; 124 free_pl(pl); 125 pl = next; 126 } 127} 128 129static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr) 130{ 131 unsigned int i; 132 struct page_list *pl = NULL, *next; 133 134 for (i = 0; i < nr; i++) { 135 next = alloc_pl(); 136 if (!next) { 137 if (pl) 138 drop_pages(pl); 139 return -ENOMEM; 140 } 141 next->next = pl; 142 pl = next; 143 } 144 145 kcopyd_put_pages(kc, pl); 146 kc->nr_pages += nr; 147 return 0; 148} 149 150static void client_free_pages(struct kcopyd_client *kc) 151{ 152 BUG_ON(kc->nr_free_pages != kc->nr_pages); 153 drop_pages(kc->pages); 154 kc->pages = NULL; 155 kc->nr_free_pages = kc->nr_pages = 0; 156} 157 158/*----------------------------------------------------------------- 159 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 160 * for this reason we use a mempool to prevent the client from 161 * ever having to do io (which could cause a deadlock). 162 *---------------------------------------------------------------*/ 163struct kcopyd_job { 164 struct kcopyd_client *kc; 165 struct list_head list; 166 unsigned long flags; 167 168 /* 169 * Error state of the job. 170 */ 171 int read_err; 172 unsigned int write_err; 173 174 /* 175 * Either READ or WRITE 176 */ 177 int rw; 178 struct io_region source; 179 180 /* 181 * The destinations for the transfer. 182 */ 183 unsigned int num_dests; 184 struct io_region dests[KCOPYD_MAX_REGIONS]; 185 186 sector_t offset; 187 unsigned int nr_pages; 188 struct page_list *pages; 189 190 /* 191 * Set this to ensure you are notified when the job has 192 * completed. 'context' is for callback to use. 193 */ 194 kcopyd_notify_fn fn; 195 void *context; 196 197 /* 198 * These fields are only used if the job has been split 199 * into more manageable parts. 200 */ 201 struct semaphore lock; 202 atomic_t sub_jobs; 203 sector_t progress; 204}; 205 206#define MIN_JOBS 512 207 208static struct kmem_cache *_job_cache; 209static mempool_t *_job_pool; 210 211/* 212 * We maintain three lists of jobs: 213 * 214 * i) jobs waiting for pages 215 * ii) jobs that have pages, and are waiting for the io to be issued. 216 * iii) jobs that have completed. 217 * 218 * All three of these are protected by job_lock. 219 */ 220static DEFINE_SPINLOCK(_job_lock); 221 222static LIST_HEAD(_complete_jobs); 223static LIST_HEAD(_io_jobs); 224static LIST_HEAD(_pages_jobs); 225 226static int jobs_init(void) 227{ 228 _job_cache = kmem_cache_create("kcopyd-jobs", 229 sizeof(struct kcopyd_job), 230 __alignof__(struct kcopyd_job), 231 0, NULL, NULL); 232 if (!_job_cache) 233 return -ENOMEM; 234 235 _job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 236 if (!_job_pool) { 237 kmem_cache_destroy(_job_cache); 238 return -ENOMEM; 239 } 240 241 return 0; 242} 243 244static void jobs_exit(void) 245{ 246 BUG_ON(!list_empty(&_complete_jobs)); 247 BUG_ON(!list_empty(&_io_jobs)); 248 BUG_ON(!list_empty(&_pages_jobs)); 249 250 mempool_destroy(_job_pool); 251 kmem_cache_destroy(_job_cache); 252 _job_pool = NULL; 253 _job_cache = NULL; 254} 255 256/* 257 * Functions to push and pop a job onto the head of a given job 258 * list. 259 */ 260static inline struct kcopyd_job *pop(struct list_head *jobs) 261{ 262 struct kcopyd_job *job = NULL; 263 unsigned long flags; 264 265 spin_lock_irqsave(&_job_lock, flags); 266 267 if (!list_empty(jobs)) { 268 job = list_entry(jobs->next, struct kcopyd_job, list); 269 list_del(&job->list); 270 } 271 spin_unlock_irqrestore(&_job_lock, flags); 272 273 return job; 274} 275 276static inline void push(struct list_head *jobs, struct kcopyd_job *job) 277{ 278 unsigned long flags; 279 280 spin_lock_irqsave(&_job_lock, flags); 281 list_add_tail(&job->list, jobs); 282 spin_unlock_irqrestore(&_job_lock, flags); 283} 284 285/* 286 * These three functions process 1 item from the corresponding 287 * job list. 288 * 289 * They return: 290 * < 0: error 291 * 0: success 292 * > 0: can't process yet. 293 */ 294static int run_complete_job(struct kcopyd_job *job) 295{ 296 void *context = job->context; 297 int read_err = job->read_err; 298 unsigned int write_err = job->write_err; 299 kcopyd_notify_fn fn = job->fn; 300 struct kcopyd_client *kc = job->kc; 301 302 kcopyd_put_pages(kc, job->pages); 303 mempool_free(job, _job_pool); 304 fn(read_err, write_err, context); 305 306 if (atomic_dec_and_test(&kc->nr_jobs)) 307 wake_up(&kc->destroyq); 308 309 return 0; 310} 311 312static void complete_io(unsigned long error, void *context) 313{ 314 struct kcopyd_job *job = (struct kcopyd_job *) context; 315 316 if (error) { 317 if (job->rw == WRITE) 318 job->write_err |= error; 319 else 320 job->read_err = 1; 321 322 if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) { 323 push(&_complete_jobs, job); 324 wake(); 325 return; 326 } 327 } 328 329 if (job->rw == WRITE) 330 push(&_complete_jobs, job); 331 332 else { 333 job->rw = WRITE; 334 push(&_io_jobs, job); 335 } 336 337 wake(); 338} 339 340/* 341 * Request io on as many buffer heads as we can currently get for 342 * a particular job. 343 */ 344static int run_io_job(struct kcopyd_job *job) 345{ 346 int r; 347 struct dm_io_request io_req = { 348 .bi_rw = job->rw, 349 .mem.type = DM_IO_PAGE_LIST, 350 .mem.ptr.pl = job->pages, 351 .mem.offset = job->offset, 352 .notify.fn = complete_io, 353 .notify.context = job, 354 .client = job->kc->io_client, 355 }; 356 357 if (job->rw == READ) 358 r = dm_io(&io_req, 1, &job->source, NULL); 359 else 360 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 361 362 return r; 363} 364 365static int run_pages_job(struct kcopyd_job *job) 366{ 367 int r; 368 369 job->nr_pages = dm_div_up(job->dests[0].count + job->offset, 370 PAGE_SIZE >> 9); 371 r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); 372 if (!r) { 373 /* this job is ready for io */ 374 push(&_io_jobs, job); 375 return 0; 376 } 377 378 if (r == -ENOMEM) 379 /* can't complete now */ 380 return 1; 381 382 return r; 383} 384 385/* 386 * Run through a list for as long as possible. Returns the count 387 * of successful jobs. 388 */ 389static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) 390{ 391 struct kcopyd_job *job; 392 int r, count = 0; 393 394 while ((job = pop(jobs))) { 395 396 r = fn(job); 397 398 if (r < 0) { 399 /* error this rogue job */ 400 if (job->rw == WRITE) 401 job->write_err = (unsigned int) -1; 402 else 403 job->read_err = 1; 404 push(&_complete_jobs, job); 405 break; 406 } 407 408 if (r > 0) { 409 /* 410 * We couldn't service this job ATM, so 411 * push this job back onto the list. 412 */ 413 push(jobs, job); 414 break; 415 } 416 417 count++; 418 } 419 420 return count; 421} 422 423/* 424 * kcopyd does this every time it's woken up. 425 */ 426static void do_work(struct work_struct *ignored) 427{ 428 /* 429 * The order that these are called is *very* important. 430 * complete jobs can free some pages for pages jobs. 431 * Pages jobs when successful will jump onto the io jobs 432 * list. io jobs call wake when they complete and it all 433 * starts again. 434 */ 435 process_jobs(&_complete_jobs, run_complete_job); 436 process_jobs(&_pages_jobs, run_pages_job); 437 process_jobs(&_io_jobs, run_io_job); 438} 439 440/* 441 * If we are copying a small region we just dispatch a single job 442 * to do the copy, otherwise the io has to be split up into many 443 * jobs. 444 */ 445static void dispatch_job(struct kcopyd_job *job) 446{ 447 atomic_inc(&job->kc->nr_jobs); 448 push(&_pages_jobs, job); 449 wake(); 450} 451 452#define SUB_JOB_SIZE 128 453static void segment_complete(int read_err, 454 unsigned int write_err, void *context) 455{ 456 sector_t progress = 0; 457 sector_t count = 0; 458 struct kcopyd_job *job = (struct kcopyd_job *) context; 459 460 down(&job->lock); 461 462 /* update the error */ 463 if (read_err) 464 job->read_err = 1; 465 466 if (write_err) 467 job->write_err |= write_err; 468 469 /* 470 * Only dispatch more work if there hasn't been an error. 471 */ 472 if ((!job->read_err && !job->write_err) || 473 test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) { 474 /* get the next chunk of work */ 475 progress = job->progress; 476 count = job->source.count - progress; 477 if (count) { 478 if (count > SUB_JOB_SIZE) 479 count = SUB_JOB_SIZE; 480 481 job->progress += count; 482 } 483 } 484 up(&job->lock); 485 486 if (count) { 487 int i; 488 struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO); 489 490 *sub_job = *job; 491 sub_job->source.sector += progress; 492 sub_job->source.count = count; 493 494 for (i = 0; i < job->num_dests; i++) { 495 sub_job->dests[i].sector += progress; 496 sub_job->dests[i].count = count; 497 } 498 499 sub_job->fn = segment_complete; 500 sub_job->context = job; 501 dispatch_job(sub_job); 502 503 } else if (atomic_dec_and_test(&job->sub_jobs)) { 504 505 /* 506 * To avoid a race we must keep the job around 507 * until after the notify function has completed. 508 * Otherwise the client may try and stop the job 509 * after we've completed. 510 */ 511 job->fn(read_err, write_err, job->context); 512 mempool_free(job, _job_pool); 513 } 514} 515 516/* 517 * Create some little jobs that will do the move between 518 * them. 519 */ 520#define SPLIT_COUNT 8 521static void split_job(struct kcopyd_job *job) 522{ 523 int i; 524 525 atomic_set(&job->sub_jobs, SPLIT_COUNT); 526 for (i = 0; i < SPLIT_COUNT; i++) 527 segment_complete(0, 0u, job); 528} 529 530int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from, 531 unsigned int num_dests, struct io_region *dests, 532 unsigned int flags, kcopyd_notify_fn fn, void *context) 533{ 534 struct kcopyd_job *job; 535 536 /* 537 * Allocate a new job. 538 */ 539 job = mempool_alloc(_job_pool, GFP_NOIO); 540 541 /* 542 * set up for the read. 543 */ 544 job->kc = kc; 545 job->flags = flags; 546 job->read_err = 0; 547 job->write_err = 0; 548 job->rw = READ; 549 550 job->source = *from; 551 552 job->num_dests = num_dests; 553 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 554 555 job->offset = 0; 556 job->nr_pages = 0; 557 job->pages = NULL; 558 559 job->fn = fn; 560 job->context = context; 561 562 if (job->source.count < SUB_JOB_SIZE) 563 dispatch_job(job); 564 565 else { 566 init_MUTEX(&job->lock); 567 job->progress = 0; 568 split_job(job); 569 } 570 571 return 0; 572} 573 574/* 575 * Cancels a kcopyd job, eg. someone might be deactivating a 576 * mirror. 577 */ 578 579/*----------------------------------------------------------------- 580 * Unit setup 581 *---------------------------------------------------------------*/ 582static DEFINE_MUTEX(_client_lock); 583static LIST_HEAD(_clients); 584 585static void client_add(struct kcopyd_client *kc) 586{ 587 mutex_lock(&_client_lock); 588 list_add(&kc->list, &_clients); 589 mutex_unlock(&_client_lock); 590} 591 592static void client_del(struct kcopyd_client *kc) 593{ 594 mutex_lock(&_client_lock); 595 list_del(&kc->list); 596 mutex_unlock(&_client_lock); 597} 598 599static DEFINE_MUTEX(kcopyd_init_lock); 600static int kcopyd_clients = 0; 601 602static int kcopyd_init(void) 603{ 604 int r; 605 606 mutex_lock(&kcopyd_init_lock); 607 608 if (kcopyd_clients) { 609 /* Already initialized. */ 610 kcopyd_clients++; 611 mutex_unlock(&kcopyd_init_lock); 612 return 0; 613 } 614 615 r = jobs_init(); 616 if (r) { 617 mutex_unlock(&kcopyd_init_lock); 618 return r; 619 } 620 621 _kcopyd_wq = create_singlethread_workqueue("kcopyd"); 622 if (!_kcopyd_wq) { 623 jobs_exit(); 624 mutex_unlock(&kcopyd_init_lock); 625 return -ENOMEM; 626 } 627 628 kcopyd_clients++; 629 INIT_WORK(&_kcopyd_work, do_work); 630 mutex_unlock(&kcopyd_init_lock); 631 return 0; 632} 633 634static void kcopyd_exit(void) 635{ 636 mutex_lock(&kcopyd_init_lock); 637 kcopyd_clients--; 638 if (!kcopyd_clients) { 639 jobs_exit(); 640 destroy_workqueue(_kcopyd_wq); 641 _kcopyd_wq = NULL; 642 } 643 mutex_unlock(&kcopyd_init_lock); 644} 645 646int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result) 647{ 648 int r = 0; 649 struct kcopyd_client *kc; 650 651 r = kcopyd_init(); 652 if (r) 653 return r; 654 655 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 656 if (!kc) { 657 kcopyd_exit(); 658 return -ENOMEM; 659 } 660 661 spin_lock_init(&kc->lock); 662 kc->pages = NULL; 663 kc->nr_pages = kc->nr_free_pages = 0; 664 r = client_alloc_pages(kc, nr_pages); 665 if (r) { 666 kfree(kc); 667 kcopyd_exit(); 668 return r; 669 } 670 671 kc->io_client = dm_io_client_create(nr_pages); 672 if (IS_ERR(kc->io_client)) { 673 r = PTR_ERR(kc->io_client); 674 client_free_pages(kc); 675 kfree(kc); 676 kcopyd_exit(); 677 return r; 678 } 679 680 init_waitqueue_head(&kc->destroyq); 681 atomic_set(&kc->nr_jobs, 0); 682 683 client_add(kc); 684 *result = kc; 685 return 0; 686} 687 688void kcopyd_client_destroy(struct kcopyd_client *kc) 689{ 690 /* Wait for completion of all jobs submitted by this client. */ 691 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 692 693 dm_io_client_destroy(kc->io_client); 694 client_free_pages(kc); 695 client_del(kc); 696 kfree(kc); 697 kcopyd_exit(); 698} 699 700EXPORT_SYMBOL(kcopyd_client_create); 701EXPORT_SYMBOL(kcopyd_client_destroy); 702EXPORT_SYMBOL(kcopyd_copy); 703