1/*
2 * Copyright (C) 2002 Sistina Software (UK) Limited.
3 * Copyright (C) 2006 Red Hat GmbH
4 *
5 * This file is released under the GPL.
6 *
7 * Kcopyd provides a simple interface for copying an area of one
8 * block-device to one or more other block-devices, with an asynchronous
9 * completion notification.
10 */
11
12#include <asm/types.h>
13#include <asm/atomic.h>
14
15#include <linux/blkdev.h>
16#include <linux/fs.h>
17#include <linux/init.h>
18#include <linux/list.h>
19#include <linux/mempool.h>
20#include <linux/module.h>
21#include <linux/pagemap.h>
22#include <linux/slab.h>
23#include <linux/vmalloc.h>
24#include <linux/workqueue.h>
25#include <linux/mutex.h>
26
27#include "kcopyd.h"
28
29static struct workqueue_struct *_kcopyd_wq;
30static struct work_struct _kcopyd_work;
31
32static inline void wake(void)
33{
34	queue_work(_kcopyd_wq, &_kcopyd_work);
35}
36
37/*-----------------------------------------------------------------
38 * Each kcopyd client has its own little pool of preallocated
39 * pages for kcopyd io.
40 *---------------------------------------------------------------*/
41struct kcopyd_client {
42	struct list_head list;
43
44	spinlock_t lock;
45	struct page_list *pages;
46	unsigned int nr_pages;
47	unsigned int nr_free_pages;
48
49	struct dm_io_client *io_client;
50
51	wait_queue_head_t destroyq;
52	atomic_t nr_jobs;
53};
54
55static struct page_list *alloc_pl(void)
56{
57	struct page_list *pl;
58
59	pl = kmalloc(sizeof(*pl), GFP_KERNEL);
60	if (!pl)
61		return NULL;
62
63	pl->page = alloc_page(GFP_KERNEL);
64	if (!pl->page) {
65		kfree(pl);
66		return NULL;
67	}
68
69	return pl;
70}
71
72static void free_pl(struct page_list *pl)
73{
74	__free_page(pl->page);
75	kfree(pl);
76}
77
78static int kcopyd_get_pages(struct kcopyd_client *kc,
79			    unsigned int nr, struct page_list **pages)
80{
81	struct page_list *pl;
82
83	spin_lock(&kc->lock);
84	if (kc->nr_free_pages < nr) {
85		spin_unlock(&kc->lock);
86		return -ENOMEM;
87	}
88
89	kc->nr_free_pages -= nr;
90	for (*pages = pl = kc->pages; --nr; pl = pl->next)
91		;
92
93	kc->pages = pl->next;
94	pl->next = NULL;
95
96	spin_unlock(&kc->lock);
97
98	return 0;
99}
100
101static void kcopyd_put_pages(struct kcopyd_client *kc, struct page_list *pl)
102{
103	struct page_list *cursor;
104
105	spin_lock(&kc->lock);
106	for (cursor = pl; cursor->next; cursor = cursor->next)
107		kc->nr_free_pages++;
108
109	kc->nr_free_pages++;
110	cursor->next = kc->pages;
111	kc->pages = pl;
112	spin_unlock(&kc->lock);
113}
114
115/*
116 * These three functions resize the page pool.
117 */
118static void drop_pages(struct page_list *pl)
119{
120	struct page_list *next;
121
122	while (pl) {
123		next = pl->next;
124		free_pl(pl);
125		pl = next;
126	}
127}
128
129static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr)
130{
131	unsigned int i;
132	struct page_list *pl = NULL, *next;
133
134	for (i = 0; i < nr; i++) {
135		next = alloc_pl();
136		if (!next) {
137			if (pl)
138				drop_pages(pl);
139			return -ENOMEM;
140		}
141		next->next = pl;
142		pl = next;
143	}
144
145	kcopyd_put_pages(kc, pl);
146	kc->nr_pages += nr;
147	return 0;
148}
149
150static void client_free_pages(struct kcopyd_client *kc)
151{
152	BUG_ON(kc->nr_free_pages != kc->nr_pages);
153	drop_pages(kc->pages);
154	kc->pages = NULL;
155	kc->nr_free_pages = kc->nr_pages = 0;
156}
157
158/*-----------------------------------------------------------------
159 * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
160 * for this reason we use a mempool to prevent the client from
161 * ever having to do io (which could cause a deadlock).
162 *---------------------------------------------------------------*/
163struct kcopyd_job {
164	struct kcopyd_client *kc;
165	struct list_head list;
166	unsigned long flags;
167
168	/*
169	 * Error state of the job.
170	 */
171	int read_err;
172	unsigned int write_err;
173
174	/*
175	 * Either READ or WRITE
176	 */
177	int rw;
178	struct io_region source;
179
180	/*
181	 * The destinations for the transfer.
182	 */
183	unsigned int num_dests;
184	struct io_region dests[KCOPYD_MAX_REGIONS];
185
186	sector_t offset;
187	unsigned int nr_pages;
188	struct page_list *pages;
189
190	/*
191	 * Set this to ensure you are notified when the job has
192	 * completed.  'context' is for callback to use.
193	 */
194	kcopyd_notify_fn fn;
195	void *context;
196
197	/*
198	 * These fields are only used if the job has been split
199	 * into more manageable parts.
200	 */
201	struct semaphore lock;
202	atomic_t sub_jobs;
203	sector_t progress;
204};
205
206#define MIN_JOBS 512
207
208static struct kmem_cache *_job_cache;
209static mempool_t *_job_pool;
210
211/*
212 * We maintain three lists of jobs:
213 *
214 * i)   jobs waiting for pages
215 * ii)  jobs that have pages, and are waiting for the io to be issued.
216 * iii) jobs that have completed.
217 *
218 * All three of these are protected by job_lock.
219 */
220static DEFINE_SPINLOCK(_job_lock);
221
222static LIST_HEAD(_complete_jobs);
223static LIST_HEAD(_io_jobs);
224static LIST_HEAD(_pages_jobs);
225
226static int jobs_init(void)
227{
228	_job_cache = kmem_cache_create("kcopyd-jobs",
229				       sizeof(struct kcopyd_job),
230				       __alignof__(struct kcopyd_job),
231				       0, NULL, NULL);
232	if (!_job_cache)
233		return -ENOMEM;
234
235	_job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
236	if (!_job_pool) {
237		kmem_cache_destroy(_job_cache);
238		return -ENOMEM;
239	}
240
241	return 0;
242}
243
244static void jobs_exit(void)
245{
246	BUG_ON(!list_empty(&_complete_jobs));
247	BUG_ON(!list_empty(&_io_jobs));
248	BUG_ON(!list_empty(&_pages_jobs));
249
250	mempool_destroy(_job_pool);
251	kmem_cache_destroy(_job_cache);
252	_job_pool = NULL;
253	_job_cache = NULL;
254}
255
256/*
257 * Functions to push and pop a job onto the head of a given job
258 * list.
259 */
260static inline struct kcopyd_job *pop(struct list_head *jobs)
261{
262	struct kcopyd_job *job = NULL;
263	unsigned long flags;
264
265	spin_lock_irqsave(&_job_lock, flags);
266
267	if (!list_empty(jobs)) {
268		job = list_entry(jobs->next, struct kcopyd_job, list);
269		list_del(&job->list);
270	}
271	spin_unlock_irqrestore(&_job_lock, flags);
272
273	return job;
274}
275
276static inline void push(struct list_head *jobs, struct kcopyd_job *job)
277{
278	unsigned long flags;
279
280	spin_lock_irqsave(&_job_lock, flags);
281	list_add_tail(&job->list, jobs);
282	spin_unlock_irqrestore(&_job_lock, flags);
283}
284
285/*
286 * These three functions process 1 item from the corresponding
287 * job list.
288 *
289 * They return:
290 * < 0: error
291 *   0: success
292 * > 0: can't process yet.
293 */
294static int run_complete_job(struct kcopyd_job *job)
295{
296	void *context = job->context;
297	int read_err = job->read_err;
298	unsigned int write_err = job->write_err;
299	kcopyd_notify_fn fn = job->fn;
300	struct kcopyd_client *kc = job->kc;
301
302	kcopyd_put_pages(kc, job->pages);
303	mempool_free(job, _job_pool);
304	fn(read_err, write_err, context);
305
306	if (atomic_dec_and_test(&kc->nr_jobs))
307		wake_up(&kc->destroyq);
308
309	return 0;
310}
311
312static void complete_io(unsigned long error, void *context)
313{
314	struct kcopyd_job *job = (struct kcopyd_job *) context;
315
316	if (error) {
317		if (job->rw == WRITE)
318			job->write_err |= error;
319		else
320			job->read_err = 1;
321
322		if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
323			push(&_complete_jobs, job);
324			wake();
325			return;
326		}
327	}
328
329	if (job->rw == WRITE)
330		push(&_complete_jobs, job);
331
332	else {
333		job->rw = WRITE;
334		push(&_io_jobs, job);
335	}
336
337	wake();
338}
339
340/*
341 * Request io on as many buffer heads as we can currently get for
342 * a particular job.
343 */
344static int run_io_job(struct kcopyd_job *job)
345{
346	int r;
347	struct dm_io_request io_req = {
348		.bi_rw = job->rw,
349		.mem.type = DM_IO_PAGE_LIST,
350		.mem.ptr.pl = job->pages,
351		.mem.offset = job->offset,
352		.notify.fn = complete_io,
353		.notify.context = job,
354		.client = job->kc->io_client,
355	};
356
357	if (job->rw == READ)
358		r = dm_io(&io_req, 1, &job->source, NULL);
359	else
360		r = dm_io(&io_req, job->num_dests, job->dests, NULL);
361
362	return r;
363}
364
365static int run_pages_job(struct kcopyd_job *job)
366{
367	int r;
368
369	job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
370				  PAGE_SIZE >> 9);
371	r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
372	if (!r) {
373		/* this job is ready for io */
374		push(&_io_jobs, job);
375		return 0;
376	}
377
378	if (r == -ENOMEM)
379		/* can't complete now */
380		return 1;
381
382	return r;
383}
384
385/*
386 * Run through a list for as long as possible.  Returns the count
387 * of successful jobs.
388 */
389static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
390{
391	struct kcopyd_job *job;
392	int r, count = 0;
393
394	while ((job = pop(jobs))) {
395
396		r = fn(job);
397
398		if (r < 0) {
399			/* error this rogue job */
400			if (job->rw == WRITE)
401				job->write_err = (unsigned int) -1;
402			else
403				job->read_err = 1;
404			push(&_complete_jobs, job);
405			break;
406		}
407
408		if (r > 0) {
409			/*
410			 * We couldn't service this job ATM, so
411			 * push this job back onto the list.
412			 */
413			push(jobs, job);
414			break;
415		}
416
417		count++;
418	}
419
420	return count;
421}
422
423/*
424 * kcopyd does this every time it's woken up.
425 */
426static void do_work(struct work_struct *ignored)
427{
428	/*
429	 * The order that these are called is *very* important.
430	 * complete jobs can free some pages for pages jobs.
431	 * Pages jobs when successful will jump onto the io jobs
432	 * list.  io jobs call wake when they complete and it all
433	 * starts again.
434	 */
435	process_jobs(&_complete_jobs, run_complete_job);
436	process_jobs(&_pages_jobs, run_pages_job);
437	process_jobs(&_io_jobs, run_io_job);
438}
439
440/*
441 * If we are copying a small region we just dispatch a single job
442 * to do the copy, otherwise the io has to be split up into many
443 * jobs.
444 */
445static void dispatch_job(struct kcopyd_job *job)
446{
447	atomic_inc(&job->kc->nr_jobs);
448	push(&_pages_jobs, job);
449	wake();
450}
451
452#define SUB_JOB_SIZE 128
453static void segment_complete(int read_err,
454			     unsigned int write_err, void *context)
455{
456	sector_t progress = 0;
457	sector_t count = 0;
458	struct kcopyd_job *job = (struct kcopyd_job *) context;
459
460	down(&job->lock);
461
462	/* update the error */
463	if (read_err)
464		job->read_err = 1;
465
466	if (write_err)
467		job->write_err |= write_err;
468
469	/*
470	 * Only dispatch more work if there hasn't been an error.
471	 */
472	if ((!job->read_err && !job->write_err) ||
473	    test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
474		/* get the next chunk of work */
475		progress = job->progress;
476		count = job->source.count - progress;
477		if (count) {
478			if (count > SUB_JOB_SIZE)
479				count = SUB_JOB_SIZE;
480
481			job->progress += count;
482		}
483	}
484	up(&job->lock);
485
486	if (count) {
487		int i;
488		struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO);
489
490		*sub_job = *job;
491		sub_job->source.sector += progress;
492		sub_job->source.count = count;
493
494		for (i = 0; i < job->num_dests; i++) {
495			sub_job->dests[i].sector += progress;
496			sub_job->dests[i].count = count;
497		}
498
499		sub_job->fn = segment_complete;
500		sub_job->context = job;
501		dispatch_job(sub_job);
502
503	} else if (atomic_dec_and_test(&job->sub_jobs)) {
504
505		/*
506		 * To avoid a race we must keep the job around
507		 * until after the notify function has completed.
508		 * Otherwise the client may try and stop the job
509		 * after we've completed.
510		 */
511		job->fn(read_err, write_err, job->context);
512		mempool_free(job, _job_pool);
513	}
514}
515
516/*
517 * Create some little jobs that will do the move between
518 * them.
519 */
520#define SPLIT_COUNT 8
521static void split_job(struct kcopyd_job *job)
522{
523	int i;
524
525	atomic_set(&job->sub_jobs, SPLIT_COUNT);
526	for (i = 0; i < SPLIT_COUNT; i++)
527		segment_complete(0, 0u, job);
528}
529
530int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from,
531		unsigned int num_dests, struct io_region *dests,
532		unsigned int flags, kcopyd_notify_fn fn, void *context)
533{
534	struct kcopyd_job *job;
535
536	/*
537	 * Allocate a new job.
538	 */
539	job = mempool_alloc(_job_pool, GFP_NOIO);
540
541	/*
542	 * set up for the read.
543	 */
544	job->kc = kc;
545	job->flags = flags;
546	job->read_err = 0;
547	job->write_err = 0;
548	job->rw = READ;
549
550	job->source = *from;
551
552	job->num_dests = num_dests;
553	memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
554
555	job->offset = 0;
556	job->nr_pages = 0;
557	job->pages = NULL;
558
559	job->fn = fn;
560	job->context = context;
561
562	if (job->source.count < SUB_JOB_SIZE)
563		dispatch_job(job);
564
565	else {
566		init_MUTEX(&job->lock);
567		job->progress = 0;
568		split_job(job);
569	}
570
571	return 0;
572}
573
574/*
575 * Cancels a kcopyd job, eg. someone might be deactivating a
576 * mirror.
577 */
578
579/*-----------------------------------------------------------------
580 * Unit setup
581 *---------------------------------------------------------------*/
582static DEFINE_MUTEX(_client_lock);
583static LIST_HEAD(_clients);
584
585static void client_add(struct kcopyd_client *kc)
586{
587	mutex_lock(&_client_lock);
588	list_add(&kc->list, &_clients);
589	mutex_unlock(&_client_lock);
590}
591
592static void client_del(struct kcopyd_client *kc)
593{
594	mutex_lock(&_client_lock);
595	list_del(&kc->list);
596	mutex_unlock(&_client_lock);
597}
598
599static DEFINE_MUTEX(kcopyd_init_lock);
600static int kcopyd_clients = 0;
601
602static int kcopyd_init(void)
603{
604	int r;
605
606	mutex_lock(&kcopyd_init_lock);
607
608	if (kcopyd_clients) {
609		/* Already initialized. */
610		kcopyd_clients++;
611		mutex_unlock(&kcopyd_init_lock);
612		return 0;
613	}
614
615	r = jobs_init();
616	if (r) {
617		mutex_unlock(&kcopyd_init_lock);
618		return r;
619	}
620
621	_kcopyd_wq = create_singlethread_workqueue("kcopyd");
622	if (!_kcopyd_wq) {
623		jobs_exit();
624		mutex_unlock(&kcopyd_init_lock);
625		return -ENOMEM;
626	}
627
628	kcopyd_clients++;
629	INIT_WORK(&_kcopyd_work, do_work);
630	mutex_unlock(&kcopyd_init_lock);
631	return 0;
632}
633
634static void kcopyd_exit(void)
635{
636	mutex_lock(&kcopyd_init_lock);
637	kcopyd_clients--;
638	if (!kcopyd_clients) {
639		jobs_exit();
640		destroy_workqueue(_kcopyd_wq);
641		_kcopyd_wq = NULL;
642	}
643	mutex_unlock(&kcopyd_init_lock);
644}
645
646int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result)
647{
648	int r = 0;
649	struct kcopyd_client *kc;
650
651	r = kcopyd_init();
652	if (r)
653		return r;
654
655	kc = kmalloc(sizeof(*kc), GFP_KERNEL);
656	if (!kc) {
657		kcopyd_exit();
658		return -ENOMEM;
659	}
660
661	spin_lock_init(&kc->lock);
662	kc->pages = NULL;
663	kc->nr_pages = kc->nr_free_pages = 0;
664	r = client_alloc_pages(kc, nr_pages);
665	if (r) {
666		kfree(kc);
667		kcopyd_exit();
668		return r;
669	}
670
671	kc->io_client = dm_io_client_create(nr_pages);
672	if (IS_ERR(kc->io_client)) {
673		r = PTR_ERR(kc->io_client);
674		client_free_pages(kc);
675		kfree(kc);
676		kcopyd_exit();
677		return r;
678	}
679
680	init_waitqueue_head(&kc->destroyq);
681	atomic_set(&kc->nr_jobs, 0);
682
683	client_add(kc);
684	*result = kc;
685	return 0;
686}
687
688void kcopyd_client_destroy(struct kcopyd_client *kc)
689{
690	/* Wait for completion of all jobs submitted by this client. */
691	wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
692
693	dm_io_client_destroy(kc->io_client);
694	client_free_pages(kc);
695	client_del(kc);
696	kfree(kc);
697	kcopyd_exit();
698}
699
700EXPORT_SYMBOL(kcopyd_client_create);
701EXPORT_SYMBOL(kcopyd_client_destroy);
702EXPORT_SYMBOL(kcopyd_copy);
703