blob: bb2e3515121077698634e791c5b07868150c884c [file] [log] [blame]
David Teiglande7fd4172006-01-18 09:30:29 +00001/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
David Teigland597d0ca2006-07-12 16:44:04 -050058#include <linux/types.h>
David Teiglande7fd4172006-01-18 09:30:29 +000059#include "dlm_internal.h"
David Teigland597d0ca2006-07-12 16:44:04 -050060#include <linux/dlm_device.h>
David Teiglande7fd4172006-01-18 09:30:29 +000061#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
David Teigland597d0ca2006-07-12 16:44:04 -050073#include "user.h"
David Teiglande7fd4172006-01-18 09:30:29 +000074#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms);
88
David Teigland597d0ca2006-07-12 16:44:04 -050089#define FAKE_USER_AST (void*)0xff00ff00
90
David Teiglande7fd4172006-01-18 09:30:29 +000091/*
92 * Lock compatibilty matrix - thanks Steve
93 * UN = Unlocked state. Not really a state, used as a flag
94 * PD = Padding. Used to make the matrix a nice power of two in size
95 * Other states are the same as the VMS DLM.
96 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
97 */
98
99static const int __dlm_compat_matrix[8][8] = {
100 /* UN NL CR CW PR PW EX PD */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
102 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
103 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
104 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
105 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
106 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
107 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
108 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
109};
110
111/*
112 * This defines the direction of transfer of LVB data.
113 * Granted mode is the row; requested mode is the column.
114 * Usage: matrix[grmode+1][rqmode+1]
115 * 1 = LVB is returned to the caller
116 * 0 = LVB is written to the resource
117 * -1 = nothing happens to the LVB
118 */
119
120const int dlm_lvb_operations[8][8] = {
121 /* UN NL CR CW PR PW EX PD*/
122 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
123 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
124 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
125 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
126 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
127 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
128 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
129 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
130};
David Teiglande7fd4172006-01-18 09:30:29 +0000131
132#define modes_compat(gr, rq) \
133 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
134
135int dlm_modes_compat(int mode1, int mode2)
136{
137 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
138}
139
140/*
141 * Compatibility matrix for conversions with QUECVT set.
142 * Granted mode is the row; requested mode is the column.
143 * Usage: matrix[grmode+1][rqmode+1]
144 */
145
146static const int __quecvt_compat_matrix[8][8] = {
147 /* UN NL CR CW PR PW EX PD */
148 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
149 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
150 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
151 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
152 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
153 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
154 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
155 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
156};
157
David Teigland597d0ca2006-07-12 16:44:04 -0500158void dlm_print_lkb(struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000159{
160 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
161 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
162 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
163 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
164 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
165}
166
167void dlm_print_rsb(struct dlm_rsb *r)
168{
169 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
170 r->res_nodeid, r->res_flags, r->res_first_lkid,
171 r->res_recover_locks_count, r->res_name);
172}
173
174/* Threads cannot use the lockspace while it's being recovered */
175
176static inline void lock_recovery(struct dlm_ls *ls)
177{
178 down_read(&ls->ls_in_recovery);
179}
180
181static inline void unlock_recovery(struct dlm_ls *ls)
182{
183 up_read(&ls->ls_in_recovery);
184}
185
186static inline int lock_recovery_try(struct dlm_ls *ls)
187{
188 return down_read_trylock(&ls->ls_in_recovery);
189}
190
191static inline int can_be_queued(struct dlm_lkb *lkb)
192{
193 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
194}
195
196static inline int force_blocking_asts(struct dlm_lkb *lkb)
197{
198 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
199}
200
201static inline int is_demoted(struct dlm_lkb *lkb)
202{
203 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
204}
205
206static inline int is_remote(struct dlm_rsb *r)
207{
208 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
209 return !!r->res_nodeid;
210}
211
212static inline int is_process_copy(struct dlm_lkb *lkb)
213{
214 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
215}
216
217static inline int is_master_copy(struct dlm_lkb *lkb)
218{
219 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
220 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
David Teigland90135922006-01-20 08:47:07 +0000221 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000222}
223
224static inline int middle_conversion(struct dlm_lkb *lkb)
225{
226 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
227 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
David Teigland90135922006-01-20 08:47:07 +0000228 return 1;
229 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000230}
231
232static inline int down_conversion(struct dlm_lkb *lkb)
233{
234 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
235}
236
237static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
238{
239 if (is_master_copy(lkb))
240 return;
241
242 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
243
244 lkb->lkb_lksb->sb_status = rv;
245 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
246
247 dlm_add_ast(lkb, AST_COMP);
248}
249
250static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
251{
252 if (is_master_copy(lkb))
253 send_bast(r, lkb, rqmode);
254 else {
255 lkb->lkb_bastmode = rqmode;
256 dlm_add_ast(lkb, AST_BAST);
257 }
258}
259
260/*
261 * Basic operations on rsb's and lkb's
262 */
263
264static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
265{
266 struct dlm_rsb *r;
267
268 r = allocate_rsb(ls, len);
269 if (!r)
270 return NULL;
271
272 r->res_ls = ls;
273 r->res_length = len;
274 memcpy(r->res_name, name, len);
David Teigland90135922006-01-20 08:47:07 +0000275 mutex_init(&r->res_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000276
277 INIT_LIST_HEAD(&r->res_lookup);
278 INIT_LIST_HEAD(&r->res_grantqueue);
279 INIT_LIST_HEAD(&r->res_convertqueue);
280 INIT_LIST_HEAD(&r->res_waitqueue);
281 INIT_LIST_HEAD(&r->res_root_list);
282 INIT_LIST_HEAD(&r->res_recover_list);
283
284 return r;
285}
286
287static int search_rsb_list(struct list_head *head, char *name, int len,
288 unsigned int flags, struct dlm_rsb **r_ret)
289{
290 struct dlm_rsb *r;
291 int error = 0;
292
293 list_for_each_entry(r, head, res_hashchain) {
294 if (len == r->res_length && !memcmp(name, r->res_name, len))
295 goto found;
296 }
David Teigland597d0ca2006-07-12 16:44:04 -0500297 return -EBADR;
David Teiglande7fd4172006-01-18 09:30:29 +0000298
299 found:
300 if (r->res_nodeid && (flags & R_MASTER))
301 error = -ENOTBLK;
302 *r_ret = r;
303 return error;
304}
305
306static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
307 unsigned int flags, struct dlm_rsb **r_ret)
308{
309 struct dlm_rsb *r;
310 int error;
311
312 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
313 if (!error) {
314 kref_get(&r->res_ref);
315 goto out;
316 }
317 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
318 if (error)
319 goto out;
320
321 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
322
323 if (dlm_no_directory(ls))
324 goto out;
325
326 if (r->res_nodeid == -1) {
327 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
328 r->res_first_lkid = 0;
329 } else if (r->res_nodeid > 0) {
330 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
331 r->res_first_lkid = 0;
332 } else {
333 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
334 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
335 }
336 out:
337 *r_ret = r;
338 return error;
339}
340
341static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
342 unsigned int flags, struct dlm_rsb **r_ret)
343{
344 int error;
345 write_lock(&ls->ls_rsbtbl[b].lock);
346 error = _search_rsb(ls, name, len, b, flags, r_ret);
347 write_unlock(&ls->ls_rsbtbl[b].lock);
348 return error;
349}
350
351/*
352 * Find rsb in rsbtbl and potentially create/add one
353 *
354 * Delaying the release of rsb's has a similar benefit to applications keeping
355 * NL locks on an rsb, but without the guarantee that the cached master value
356 * will still be valid when the rsb is reused. Apps aren't always smart enough
357 * to keep NL locks on an rsb that they may lock again shortly; this can lead
358 * to excessive master lookups and removals if we don't delay the release.
359 *
360 * Searching for an rsb means looking through both the normal list and toss
361 * list. When found on the toss list the rsb is moved to the normal list with
362 * ref count of 1; when found on normal list the ref count is incremented.
363 */
364
365static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
366 unsigned int flags, struct dlm_rsb **r_ret)
367{
368 struct dlm_rsb *r, *tmp;
369 uint32_t hash, bucket;
370 int error = 0;
371
372 if (dlm_no_directory(ls))
373 flags |= R_CREATE;
374
375 hash = jhash(name, namelen, 0);
376 bucket = hash & (ls->ls_rsbtbl_size - 1);
377
378 error = search_rsb(ls, name, namelen, bucket, flags, &r);
379 if (!error)
380 goto out;
381
David Teigland597d0ca2006-07-12 16:44:04 -0500382 if (error == -EBADR && !(flags & R_CREATE))
David Teiglande7fd4172006-01-18 09:30:29 +0000383 goto out;
384
385 /* the rsb was found but wasn't a master copy */
386 if (error == -ENOTBLK)
387 goto out;
388
389 error = -ENOMEM;
390 r = create_rsb(ls, name, namelen);
391 if (!r)
392 goto out;
393
394 r->res_hash = hash;
395 r->res_bucket = bucket;
396 r->res_nodeid = -1;
397 kref_init(&r->res_ref);
398
399 /* With no directory, the master can be set immediately */
400 if (dlm_no_directory(ls)) {
401 int nodeid = dlm_dir_nodeid(r);
402 if (nodeid == dlm_our_nodeid())
403 nodeid = 0;
404 r->res_nodeid = nodeid;
405 }
406
407 write_lock(&ls->ls_rsbtbl[bucket].lock);
408 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
409 if (!error) {
410 write_unlock(&ls->ls_rsbtbl[bucket].lock);
411 free_rsb(r);
412 r = tmp;
413 goto out;
414 }
415 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
416 write_unlock(&ls->ls_rsbtbl[bucket].lock);
417 error = 0;
418 out:
419 *r_ret = r;
420 return error;
421}
422
423int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
424 unsigned int flags, struct dlm_rsb **r_ret)
425{
426 return find_rsb(ls, name, namelen, flags, r_ret);
427}
428
429/* This is only called to add a reference when the code already holds
430 a valid reference to the rsb, so there's no need for locking. */
431
432static inline void hold_rsb(struct dlm_rsb *r)
433{
434 kref_get(&r->res_ref);
435}
436
437void dlm_hold_rsb(struct dlm_rsb *r)
438{
439 hold_rsb(r);
440}
441
442static void toss_rsb(struct kref *kref)
443{
444 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
445 struct dlm_ls *ls = r->res_ls;
446
447 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
448 kref_init(&r->res_ref);
449 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
450 r->res_toss_time = jiffies;
451 if (r->res_lvbptr) {
452 free_lvb(r->res_lvbptr);
453 r->res_lvbptr = NULL;
454 }
455}
456
457/* When all references to the rsb are gone it's transfered to
458 the tossed list for later disposal. */
459
460static void put_rsb(struct dlm_rsb *r)
461{
462 struct dlm_ls *ls = r->res_ls;
463 uint32_t bucket = r->res_bucket;
464
465 write_lock(&ls->ls_rsbtbl[bucket].lock);
466 kref_put(&r->res_ref, toss_rsb);
467 write_unlock(&ls->ls_rsbtbl[bucket].lock);
468}
469
470void dlm_put_rsb(struct dlm_rsb *r)
471{
472 put_rsb(r);
473}
474
475/* See comment for unhold_lkb */
476
477static void unhold_rsb(struct dlm_rsb *r)
478{
479 int rv;
480 rv = kref_put(&r->res_ref, toss_rsb);
481 DLM_ASSERT(!rv, dlm_print_rsb(r););
482}
483
484static void kill_rsb(struct kref *kref)
485{
486 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
487
488 /* All work is done after the return from kref_put() so we
489 can release the write_lock before the remove and free. */
490
491 DLM_ASSERT(list_empty(&r->res_lookup),);
492 DLM_ASSERT(list_empty(&r->res_grantqueue),);
493 DLM_ASSERT(list_empty(&r->res_convertqueue),);
494 DLM_ASSERT(list_empty(&r->res_waitqueue),);
495 DLM_ASSERT(list_empty(&r->res_root_list),);
496 DLM_ASSERT(list_empty(&r->res_recover_list),);
497}
498
499/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
500 The rsb must exist as long as any lkb's for it do. */
501
502static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
503{
504 hold_rsb(r);
505 lkb->lkb_resource = r;
506}
507
508static void detach_lkb(struct dlm_lkb *lkb)
509{
510 if (lkb->lkb_resource) {
511 put_rsb(lkb->lkb_resource);
512 lkb->lkb_resource = NULL;
513 }
514}
515
516static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
517{
518 struct dlm_lkb *lkb, *tmp;
519 uint32_t lkid = 0;
520 uint16_t bucket;
521
522 lkb = allocate_lkb(ls);
523 if (!lkb)
524 return -ENOMEM;
525
526 lkb->lkb_nodeid = -1;
527 lkb->lkb_grmode = DLM_LOCK_IV;
528 kref_init(&lkb->lkb_ref);
David Teigland34e22be2006-07-18 11:24:04 -0500529 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
David Teiglande7fd4172006-01-18 09:30:29 +0000530
531 get_random_bytes(&bucket, sizeof(bucket));
532 bucket &= (ls->ls_lkbtbl_size - 1);
533
534 write_lock(&ls->ls_lkbtbl[bucket].lock);
535
536 /* counter can roll over so we must verify lkid is not in use */
537
538 while (lkid == 0) {
539 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
540
541 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
542 lkb_idtbl_list) {
543 if (tmp->lkb_id != lkid)
544 continue;
545 lkid = 0;
546 break;
547 }
548 }
549
550 lkb->lkb_id = lkid;
551 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
552 write_unlock(&ls->ls_lkbtbl[bucket].lock);
553
554 *lkb_ret = lkb;
555 return 0;
556}
557
558static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
559{
560 uint16_t bucket = lkid & 0xFFFF;
561 struct dlm_lkb *lkb;
562
563 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
564 if (lkb->lkb_id == lkid)
565 return lkb;
566 }
567 return NULL;
568}
569
570static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
571{
572 struct dlm_lkb *lkb;
573 uint16_t bucket = lkid & 0xFFFF;
574
575 if (bucket >= ls->ls_lkbtbl_size)
576 return -EBADSLT;
577
578 read_lock(&ls->ls_lkbtbl[bucket].lock);
579 lkb = __find_lkb(ls, lkid);
580 if (lkb)
581 kref_get(&lkb->lkb_ref);
582 read_unlock(&ls->ls_lkbtbl[bucket].lock);
583
584 *lkb_ret = lkb;
585 return lkb ? 0 : -ENOENT;
586}
587
588static void kill_lkb(struct kref *kref)
589{
590 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
591
592 /* All work is done after the return from kref_put() so we
593 can release the write_lock before the detach_lkb */
594
595 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
596}
597
David Teiglandb3f58d82006-02-28 11:16:37 -0500598/* __put_lkb() is used when an lkb may not have an rsb attached to
599 it so we need to provide the lockspace explicitly */
600
601static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000602{
David Teiglande7fd4172006-01-18 09:30:29 +0000603 uint16_t bucket = lkb->lkb_id & 0xFFFF;
604
605 write_lock(&ls->ls_lkbtbl[bucket].lock);
606 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
607 list_del(&lkb->lkb_idtbl_list);
608 write_unlock(&ls->ls_lkbtbl[bucket].lock);
609
610 detach_lkb(lkb);
611
612 /* for local/process lkbs, lvbptr points to caller's lksb */
613 if (lkb->lkb_lvbptr && is_master_copy(lkb))
614 free_lvb(lkb->lkb_lvbptr);
David Teiglande7fd4172006-01-18 09:30:29 +0000615 free_lkb(lkb);
616 return 1;
617 } else {
618 write_unlock(&ls->ls_lkbtbl[bucket].lock);
619 return 0;
620 }
621}
622
623int dlm_put_lkb(struct dlm_lkb *lkb)
624{
David Teiglandb3f58d82006-02-28 11:16:37 -0500625 struct dlm_ls *ls;
626
627 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
628 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
629
630 ls = lkb->lkb_resource->res_ls;
631 return __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +0000632}
633
634/* This is only called to add a reference when the code already holds
635 a valid reference to the lkb, so there's no need for locking. */
636
637static inline void hold_lkb(struct dlm_lkb *lkb)
638{
639 kref_get(&lkb->lkb_ref);
640}
641
642/* This is called when we need to remove a reference and are certain
643 it's not the last ref. e.g. del_lkb is always called between a
644 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
645 put_lkb would work fine, but would involve unnecessary locking */
646
647static inline void unhold_lkb(struct dlm_lkb *lkb)
648{
649 int rv;
650 rv = kref_put(&lkb->lkb_ref, kill_lkb);
651 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
652}
653
654static void lkb_add_ordered(struct list_head *new, struct list_head *head,
655 int mode)
656{
657 struct dlm_lkb *lkb = NULL;
658
659 list_for_each_entry(lkb, head, lkb_statequeue)
660 if (lkb->lkb_rqmode < mode)
661 break;
662
663 if (!lkb)
664 list_add_tail(new, head);
665 else
666 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
667}
668
669/* add/remove lkb to rsb's grant/convert/wait queue */
670
671static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
672{
673 kref_get(&lkb->lkb_ref);
674
675 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
676
677 lkb->lkb_status = status;
678
679 switch (status) {
680 case DLM_LKSTS_WAITING:
681 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
682 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
683 else
684 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
685 break;
686 case DLM_LKSTS_GRANTED:
687 /* convention says granted locks kept in order of grmode */
688 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
689 lkb->lkb_grmode);
690 break;
691 case DLM_LKSTS_CONVERT:
692 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
693 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
694 else
695 list_add_tail(&lkb->lkb_statequeue,
696 &r->res_convertqueue);
697 break;
698 default:
699 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
700 }
701}
702
703static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
704{
705 lkb->lkb_status = 0;
706 list_del(&lkb->lkb_statequeue);
707 unhold_lkb(lkb);
708}
709
710static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
711{
712 hold_lkb(lkb);
713 del_lkb(r, lkb);
714 add_lkb(r, lkb, sts);
715 unhold_lkb(lkb);
716}
717
718/* add/remove lkb from global waiters list of lkb's waiting for
719 a reply from a remote node */
720
721static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
722{
723 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
724
David Teigland90135922006-01-20 08:47:07 +0000725 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000726 if (lkb->lkb_wait_type) {
727 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
728 goto out;
729 }
730 lkb->lkb_wait_type = mstype;
731 kref_get(&lkb->lkb_ref);
732 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
733 out:
David Teigland90135922006-01-20 08:47:07 +0000734 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000735}
736
737static int _remove_from_waiters(struct dlm_lkb *lkb)
738{
739 int error = 0;
740
741 if (!lkb->lkb_wait_type) {
742 log_print("remove_from_waiters error");
743 error = -EINVAL;
744 goto out;
745 }
746 lkb->lkb_wait_type = 0;
747 list_del(&lkb->lkb_wait_reply);
748 unhold_lkb(lkb);
749 out:
750 return error;
751}
752
753static int remove_from_waiters(struct dlm_lkb *lkb)
754{
755 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
756 int error;
757
David Teigland90135922006-01-20 08:47:07 +0000758 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000759 error = _remove_from_waiters(lkb);
David Teigland90135922006-01-20 08:47:07 +0000760 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000761 return error;
762}
763
764static void dir_remove(struct dlm_rsb *r)
765{
766 int to_nodeid;
767
768 if (dlm_no_directory(r->res_ls))
769 return;
770
771 to_nodeid = dlm_dir_nodeid(r);
772 if (to_nodeid != dlm_our_nodeid())
773 send_remove(r);
774 else
775 dlm_dir_remove_entry(r->res_ls, to_nodeid,
776 r->res_name, r->res_length);
777}
778
779/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
780 found since they are in order of newest to oldest? */
781
782static int shrink_bucket(struct dlm_ls *ls, int b)
783{
784 struct dlm_rsb *r;
785 int count = 0, found;
786
787 for (;;) {
David Teigland90135922006-01-20 08:47:07 +0000788 found = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000789 write_lock(&ls->ls_rsbtbl[b].lock);
790 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
791 res_hashchain) {
792 if (!time_after_eq(jiffies, r->res_toss_time +
793 dlm_config.toss_secs * HZ))
794 continue;
David Teigland90135922006-01-20 08:47:07 +0000795 found = 1;
David Teiglande7fd4172006-01-18 09:30:29 +0000796 break;
797 }
798
799 if (!found) {
800 write_unlock(&ls->ls_rsbtbl[b].lock);
801 break;
802 }
803
804 if (kref_put(&r->res_ref, kill_rsb)) {
805 list_del(&r->res_hashchain);
806 write_unlock(&ls->ls_rsbtbl[b].lock);
807
808 if (is_master(r))
809 dir_remove(r);
810 free_rsb(r);
811 count++;
812 } else {
813 write_unlock(&ls->ls_rsbtbl[b].lock);
814 log_error(ls, "tossed rsb in use %s", r->res_name);
815 }
816 }
817
818 return count;
819}
820
821void dlm_scan_rsbs(struct dlm_ls *ls)
822{
823 int i;
824
825 if (dlm_locking_stopped(ls))
826 return;
827
828 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
829 shrink_bucket(ls, i);
830 cond_resched();
831 }
832}
833
834/* lkb is master or local copy */
835
836static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
837{
838 int b, len = r->res_ls->ls_lvblen;
839
840 /* b=1 lvb returned to caller
841 b=0 lvb written to rsb or invalidated
842 b=-1 do nothing */
843
844 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
845
846 if (b == 1) {
847 if (!lkb->lkb_lvbptr)
848 return;
849
850 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
851 return;
852
853 if (!r->res_lvbptr)
854 return;
855
856 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
857 lkb->lkb_lvbseq = r->res_lvbseq;
858
859 } else if (b == 0) {
860 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
861 rsb_set_flag(r, RSB_VALNOTVALID);
862 return;
863 }
864
865 if (!lkb->lkb_lvbptr)
866 return;
867
868 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
869 return;
870
871 if (!r->res_lvbptr)
872 r->res_lvbptr = allocate_lvb(r->res_ls);
873
874 if (!r->res_lvbptr)
875 return;
876
877 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
878 r->res_lvbseq++;
879 lkb->lkb_lvbseq = r->res_lvbseq;
880 rsb_clear_flag(r, RSB_VALNOTVALID);
881 }
882
883 if (rsb_flag(r, RSB_VALNOTVALID))
884 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
885}
886
887static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
888{
889 if (lkb->lkb_grmode < DLM_LOCK_PW)
890 return;
891
892 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
893 rsb_set_flag(r, RSB_VALNOTVALID);
894 return;
895 }
896
897 if (!lkb->lkb_lvbptr)
898 return;
899
900 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
901 return;
902
903 if (!r->res_lvbptr)
904 r->res_lvbptr = allocate_lvb(r->res_ls);
905
906 if (!r->res_lvbptr)
907 return;
908
909 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
910 r->res_lvbseq++;
911 rsb_clear_flag(r, RSB_VALNOTVALID);
912}
913
914/* lkb is process copy (pc) */
915
916static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
917 struct dlm_message *ms)
918{
919 int b;
920
921 if (!lkb->lkb_lvbptr)
922 return;
923
924 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
925 return;
926
David Teigland597d0ca2006-07-12 16:44:04 -0500927 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
David Teiglande7fd4172006-01-18 09:30:29 +0000928 if (b == 1) {
929 int len = receive_extralen(ms);
930 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
931 lkb->lkb_lvbseq = ms->m_lvbseq;
932 }
933}
934
935/* Manipulate lkb's on rsb's convert/granted/waiting queues
936 remove_lock -- used for unlock, removes lkb from granted
937 revert_lock -- used for cancel, moves lkb from convert to granted
938 grant_lock -- used for request and convert, adds lkb to granted or
939 moves lkb from convert or waiting to granted
940
941 Each of these is used for master or local copy lkb's. There is
942 also a _pc() variation used to make the corresponding change on
943 a process copy (pc) lkb. */
944
945static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
946{
947 del_lkb(r, lkb);
948 lkb->lkb_grmode = DLM_LOCK_IV;
949 /* this unhold undoes the original ref from create_lkb()
950 so this leads to the lkb being freed */
951 unhold_lkb(lkb);
952}
953
954static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
955{
956 set_lvb_unlock(r, lkb);
957 _remove_lock(r, lkb);
958}
959
960static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
961{
962 _remove_lock(r, lkb);
963}
964
965static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
966{
967 lkb->lkb_rqmode = DLM_LOCK_IV;
968
969 switch (lkb->lkb_status) {
David Teigland597d0ca2006-07-12 16:44:04 -0500970 case DLM_LKSTS_GRANTED:
971 break;
David Teiglande7fd4172006-01-18 09:30:29 +0000972 case DLM_LKSTS_CONVERT:
973 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
974 break;
975 case DLM_LKSTS_WAITING:
976 del_lkb(r, lkb);
977 lkb->lkb_grmode = DLM_LOCK_IV;
978 /* this unhold undoes the original ref from create_lkb()
979 so this leads to the lkb being freed */
980 unhold_lkb(lkb);
981 break;
982 default:
983 log_print("invalid status for revert %d", lkb->lkb_status);
984 }
985}
986
987static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
988{
989 revert_lock(r, lkb);
990}
991
992static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
993{
994 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
995 lkb->lkb_grmode = lkb->lkb_rqmode;
996 if (lkb->lkb_status)
997 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
998 else
999 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1000 }
1001
1002 lkb->lkb_rqmode = DLM_LOCK_IV;
David Teiglande7fd4172006-01-18 09:30:29 +00001003}
1004
1005static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1006{
1007 set_lvb_lock(r, lkb);
1008 _grant_lock(r, lkb);
1009 lkb->lkb_highbast = 0;
1010}
1011
1012static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1013 struct dlm_message *ms)
1014{
1015 set_lvb_lock_pc(r, lkb, ms);
1016 _grant_lock(r, lkb);
1017}
1018
1019/* called by grant_pending_locks() which means an async grant message must
1020 be sent to the requesting node in addition to granting the lock if the
1021 lkb belongs to a remote node. */
1022
1023static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1024{
1025 grant_lock(r, lkb);
1026 if (is_master_copy(lkb))
1027 send_grant(r, lkb);
1028 else
1029 queue_cast(r, lkb, 0);
1030}
1031
1032static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1033{
1034 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1035 lkb_statequeue);
1036 if (lkb->lkb_id == first->lkb_id)
David Teigland90135922006-01-20 08:47:07 +00001037 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001038
David Teigland90135922006-01-20 08:47:07 +00001039 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001040}
1041
David Teiglande7fd4172006-01-18 09:30:29 +00001042/* Check if the given lkb conflicts with another lkb on the queue. */
1043
1044static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1045{
1046 struct dlm_lkb *this;
1047
1048 list_for_each_entry(this, head, lkb_statequeue) {
1049 if (this == lkb)
1050 continue;
David Teigland3bcd3682006-02-23 09:56:38 +00001051 if (!modes_compat(this, lkb))
David Teigland90135922006-01-20 08:47:07 +00001052 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001053 }
David Teigland90135922006-01-20 08:47:07 +00001054 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001055}
1056
1057/*
1058 * "A conversion deadlock arises with a pair of lock requests in the converting
1059 * queue for one resource. The granted mode of each lock blocks the requested
1060 * mode of the other lock."
1061 *
1062 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1063 * convert queue from being granted, then demote lkb (set grmode to NL).
1064 * This second form requires that we check for conv-deadlk even when
1065 * now == 0 in _can_be_granted().
1066 *
1067 * Example:
1068 * Granted Queue: empty
1069 * Convert Queue: NL->EX (first lock)
1070 * PR->EX (second lock)
1071 *
1072 * The first lock can't be granted because of the granted mode of the second
1073 * lock and the second lock can't be granted because it's not first in the
1074 * list. We demote the granted mode of the second lock (the lkb passed to this
1075 * function).
1076 *
1077 * After the resolution, the "grant pending" function needs to go back and try
1078 * to grant locks on the convert queue again since the first lock can now be
1079 * granted.
1080 */
1081
1082static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1083{
1084 struct dlm_lkb *this, *first = NULL, *self = NULL;
1085
1086 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1087 if (!first)
1088 first = this;
1089 if (this == lkb) {
1090 self = lkb;
1091 continue;
1092 }
1093
David Teiglande7fd4172006-01-18 09:30:29 +00001094 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
David Teigland90135922006-01-20 08:47:07 +00001095 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001096 }
1097
1098 /* if lkb is on the convert queue and is preventing the first
1099 from being granted, then there's deadlock and we demote lkb.
1100 multiple converting locks may need to do this before the first
1101 converting lock can be granted. */
1102
1103 if (self && self != first) {
1104 if (!modes_compat(lkb, first) &&
1105 !queue_conflict(&rsb->res_grantqueue, first))
David Teigland90135922006-01-20 08:47:07 +00001106 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001107 }
1108
David Teigland90135922006-01-20 08:47:07 +00001109 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001110}
1111
1112/*
1113 * Return 1 if the lock can be granted, 0 otherwise.
1114 * Also detect and resolve conversion deadlocks.
1115 *
1116 * lkb is the lock to be granted
1117 *
1118 * now is 1 if the function is being called in the context of the
1119 * immediate request, it is 0 if called later, after the lock has been
1120 * queued.
1121 *
1122 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1123 */
1124
1125static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1126{
1127 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1128
1129 /*
1130 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1131 * a new request for a NL mode lock being blocked.
1132 *
1133 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1134 * request, then it would be granted. In essence, the use of this flag
1135 * tells the Lock Manager to expedite theis request by not considering
1136 * what may be in the CONVERTING or WAITING queues... As of this
1137 * writing, the EXPEDITE flag can be used only with new requests for NL
1138 * mode locks. This flag is not valid for conversion requests.
1139 *
1140 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1141 * conversion or used with a non-NL requested mode. We also know an
1142 * EXPEDITE request is always granted immediately, so now must always
1143 * be 1. The full condition to grant an expedite request: (now &&
1144 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1145 * therefore be shortened to just checking the flag.
1146 */
1147
1148 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
David Teigland90135922006-01-20 08:47:07 +00001149 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001150
1151 /*
1152 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1153 * added to the remaining conditions.
1154 */
1155
1156 if (queue_conflict(&r->res_grantqueue, lkb))
1157 goto out;
1158
1159 /*
1160 * 6-3: By default, a conversion request is immediately granted if the
1161 * requested mode is compatible with the modes of all other granted
1162 * locks
1163 */
1164
1165 if (queue_conflict(&r->res_convertqueue, lkb))
1166 goto out;
1167
1168 /*
1169 * 6-5: But the default algorithm for deciding whether to grant or
1170 * queue conversion requests does not by itself guarantee that such
1171 * requests are serviced on a "first come first serve" basis. This, in
1172 * turn, can lead to a phenomenon known as "indefinate postponement".
1173 *
1174 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1175 * the system service employed to request a lock conversion. This flag
1176 * forces certain conversion requests to be queued, even if they are
1177 * compatible with the granted modes of other locks on the same
1178 * resource. Thus, the use of this flag results in conversion requests
1179 * being ordered on a "first come first servce" basis.
1180 *
1181 * DCT: This condition is all about new conversions being able to occur
1182 * "in place" while the lock remains on the granted queue (assuming
1183 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1184 * doesn't _have_ to go onto the convert queue where it's processed in
1185 * order. The "now" variable is necessary to distinguish converts
1186 * being received and processed for the first time now, because once a
1187 * convert is moved to the conversion queue the condition below applies
1188 * requiring fifo granting.
1189 */
1190
1191 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
David Teigland90135922006-01-20 08:47:07 +00001192 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001193
1194 /*
David Teigland3bcd3682006-02-23 09:56:38 +00001195 * The NOORDER flag is set to avoid the standard vms rules on grant
1196 * order.
David Teiglande7fd4172006-01-18 09:30:29 +00001197 */
1198
1199 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
David Teigland90135922006-01-20 08:47:07 +00001200 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001201
1202 /*
1203 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1204 * granted until all other conversion requests ahead of it are granted
1205 * and/or canceled.
1206 */
1207
1208 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
David Teigland90135922006-01-20 08:47:07 +00001209 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001210
1211 /*
1212 * 6-4: By default, a new request is immediately granted only if all
1213 * three of the following conditions are satisfied when the request is
1214 * issued:
1215 * - The queue of ungranted conversion requests for the resource is
1216 * empty.
1217 * - The queue of ungranted new requests for the resource is empty.
1218 * - The mode of the new request is compatible with the most
1219 * restrictive mode of all granted locks on the resource.
1220 */
1221
1222 if (now && !conv && list_empty(&r->res_convertqueue) &&
1223 list_empty(&r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001224 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001225
1226 /*
1227 * 6-4: Once a lock request is in the queue of ungranted new requests,
1228 * it cannot be granted until the queue of ungranted conversion
1229 * requests is empty, all ungranted new requests ahead of it are
1230 * granted and/or canceled, and it is compatible with the granted mode
1231 * of the most restrictive lock granted on the resource.
1232 */
1233
1234 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1235 first_in_list(lkb, &r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001236 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001237
1238 out:
1239 /*
1240 * The following, enabled by CONVDEADLK, departs from VMS.
1241 */
1242
1243 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1244 conversion_deadlock_detect(r, lkb)) {
1245 lkb->lkb_grmode = DLM_LOCK_NL;
1246 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1247 }
1248
David Teigland90135922006-01-20 08:47:07 +00001249 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001250}
1251
1252/*
1253 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1254 * simple way to provide a big optimization to applications that can use them.
1255 */
1256
1257static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1258{
1259 uint32_t flags = lkb->lkb_exflags;
1260 int rv;
1261 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1262
1263 rv = _can_be_granted(r, lkb, now);
1264 if (rv)
1265 goto out;
1266
1267 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1268 goto out;
1269
1270 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1271 alt = DLM_LOCK_PR;
1272 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1273 alt = DLM_LOCK_CW;
1274
1275 if (alt) {
1276 lkb->lkb_rqmode = alt;
1277 rv = _can_be_granted(r, lkb, now);
1278 if (rv)
1279 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1280 else
1281 lkb->lkb_rqmode = rqmode;
1282 }
1283 out:
1284 return rv;
1285}
1286
1287static int grant_pending_convert(struct dlm_rsb *r, int high)
1288{
1289 struct dlm_lkb *lkb, *s;
1290 int hi, demoted, quit, grant_restart, demote_restart;
1291
1292 quit = 0;
1293 restart:
1294 grant_restart = 0;
1295 demote_restart = 0;
1296 hi = DLM_LOCK_IV;
1297
1298 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1299 demoted = is_demoted(lkb);
David Teigland90135922006-01-20 08:47:07 +00001300 if (can_be_granted(r, lkb, 0)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001301 grant_lock_pending(r, lkb);
1302 grant_restart = 1;
1303 } else {
1304 hi = max_t(int, lkb->lkb_rqmode, hi);
1305 if (!demoted && is_demoted(lkb))
1306 demote_restart = 1;
1307 }
1308 }
1309
1310 if (grant_restart)
1311 goto restart;
1312 if (demote_restart && !quit) {
1313 quit = 1;
1314 goto restart;
1315 }
1316
1317 return max_t(int, high, hi);
1318}
1319
1320static int grant_pending_wait(struct dlm_rsb *r, int high)
1321{
1322 struct dlm_lkb *lkb, *s;
1323
1324 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
David Teigland90135922006-01-20 08:47:07 +00001325 if (can_be_granted(r, lkb, 0))
David Teiglande7fd4172006-01-18 09:30:29 +00001326 grant_lock_pending(r, lkb);
1327 else
1328 high = max_t(int, lkb->lkb_rqmode, high);
1329 }
1330
1331 return high;
1332}
1333
1334static void grant_pending_locks(struct dlm_rsb *r)
1335{
1336 struct dlm_lkb *lkb, *s;
1337 int high = DLM_LOCK_IV;
1338
1339 DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1340
1341 high = grant_pending_convert(r, high);
1342 high = grant_pending_wait(r, high);
1343
1344 if (high == DLM_LOCK_IV)
1345 return;
1346
1347 /*
1348 * If there are locks left on the wait/convert queue then send blocking
1349 * ASTs to granted locks based on the largest requested mode (high)
David Teigland3bcd3682006-02-23 09:56:38 +00001350 * found above. FIXME: highbast < high comparison not valid for PR/CW.
David Teiglande7fd4172006-01-18 09:30:29 +00001351 */
1352
1353 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1354 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1355 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1356 queue_bast(r, lkb, high);
1357 lkb->lkb_highbast = high;
1358 }
1359 }
1360}
1361
1362static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1363 struct dlm_lkb *lkb)
1364{
1365 struct dlm_lkb *gr;
1366
1367 list_for_each_entry(gr, head, lkb_statequeue) {
1368 if (gr->lkb_bastaddr &&
1369 gr->lkb_highbast < lkb->lkb_rqmode &&
David Teigland3bcd3682006-02-23 09:56:38 +00001370 !modes_compat(gr, lkb)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001371 queue_bast(r, gr, lkb->lkb_rqmode);
1372 gr->lkb_highbast = lkb->lkb_rqmode;
1373 }
1374 }
1375}
1376
1377static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1378{
1379 send_bast_queue(r, &r->res_grantqueue, lkb);
1380}
1381
1382static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1383{
1384 send_bast_queue(r, &r->res_grantqueue, lkb);
1385 send_bast_queue(r, &r->res_convertqueue, lkb);
1386}
1387
1388/* set_master(r, lkb) -- set the master nodeid of a resource
1389
1390 The purpose of this function is to set the nodeid field in the given
1391 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1392 known, it can just be copied to the lkb and the function will return
1393 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1394 before it can be copied to the lkb.
1395
1396 When the rsb nodeid is being looked up remotely, the initial lkb
1397 causing the lookup is kept on the ls_waiters list waiting for the
1398 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1399 on the rsb's res_lookup list until the master is verified.
1400
1401 Return values:
1402 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1403 1: the rsb master is not available and the lkb has been placed on
1404 a wait queue
1405*/
1406
1407static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1408{
1409 struct dlm_ls *ls = r->res_ls;
1410 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1411
1412 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1413 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1414 r->res_first_lkid = lkb->lkb_id;
1415 lkb->lkb_nodeid = r->res_nodeid;
1416 return 0;
1417 }
1418
1419 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1420 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1421 return 1;
1422 }
1423
1424 if (r->res_nodeid == 0) {
1425 lkb->lkb_nodeid = 0;
1426 return 0;
1427 }
1428
1429 if (r->res_nodeid > 0) {
1430 lkb->lkb_nodeid = r->res_nodeid;
1431 return 0;
1432 }
1433
1434 DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1435
1436 dir_nodeid = dlm_dir_nodeid(r);
1437
1438 if (dir_nodeid != our_nodeid) {
1439 r->res_first_lkid = lkb->lkb_id;
1440 send_lookup(r, lkb);
1441 return 1;
1442 }
1443
1444 for (;;) {
1445 /* It's possible for dlm_scand to remove an old rsb for
1446 this same resource from the toss list, us to create
1447 a new one, look up the master locally, and find it
1448 already exists just before dlm_scand does the
1449 dir_remove() on the previous rsb. */
1450
1451 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1452 r->res_length, &ret_nodeid);
1453 if (!error)
1454 break;
1455 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1456 schedule();
1457 }
1458
1459 if (ret_nodeid == our_nodeid) {
1460 r->res_first_lkid = 0;
1461 r->res_nodeid = 0;
1462 lkb->lkb_nodeid = 0;
1463 } else {
1464 r->res_first_lkid = lkb->lkb_id;
1465 r->res_nodeid = ret_nodeid;
1466 lkb->lkb_nodeid = ret_nodeid;
1467 }
1468 return 0;
1469}
1470
1471static void process_lookup_list(struct dlm_rsb *r)
1472{
1473 struct dlm_lkb *lkb, *safe;
1474
1475 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1476 list_del(&lkb->lkb_rsb_lookup);
1477 _request_lock(r, lkb);
1478 schedule();
1479 }
1480}
1481
1482/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1483
1484static void confirm_master(struct dlm_rsb *r, int error)
1485{
1486 struct dlm_lkb *lkb;
1487
1488 if (!r->res_first_lkid)
1489 return;
1490
1491 switch (error) {
1492 case 0:
1493 case -EINPROGRESS:
1494 r->res_first_lkid = 0;
1495 process_lookup_list(r);
1496 break;
1497
1498 case -EAGAIN:
1499 /* the remote master didn't queue our NOQUEUE request;
1500 make a waiting lkb the first_lkid */
1501
1502 r->res_first_lkid = 0;
1503
1504 if (!list_empty(&r->res_lookup)) {
1505 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1506 lkb_rsb_lookup);
1507 list_del(&lkb->lkb_rsb_lookup);
1508 r->res_first_lkid = lkb->lkb_id;
1509 _request_lock(r, lkb);
1510 } else
1511 r->res_nodeid = -1;
1512 break;
1513
1514 default:
1515 log_error(r->res_ls, "confirm_master unknown error %d", error);
1516 }
1517}
1518
1519static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1520 int namelen, uint32_t parent_lkid, void *ast,
David Teigland3bcd3682006-02-23 09:56:38 +00001521 void *astarg, void *bast, struct dlm_args *args)
David Teiglande7fd4172006-01-18 09:30:29 +00001522{
1523 int rv = -EINVAL;
1524
1525 /* check for invalid arg usage */
1526
1527 if (mode < 0 || mode > DLM_LOCK_EX)
1528 goto out;
1529
1530 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1531 goto out;
1532
1533 if (flags & DLM_LKF_CANCEL)
1534 goto out;
1535
1536 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1537 goto out;
1538
1539 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1540 goto out;
1541
1542 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1543 goto out;
1544
1545 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1546 goto out;
1547
1548 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1549 goto out;
1550
1551 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1552 goto out;
1553
1554 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1555 goto out;
1556
1557 if (!ast || !lksb)
1558 goto out;
1559
1560 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1561 goto out;
1562
1563 /* parent/child locks not yet supported */
1564 if (parent_lkid)
1565 goto out;
1566
1567 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1568 goto out;
1569
1570 /* these args will be copied to the lkb in validate_lock_args,
1571 it cannot be done now because when converting locks, fields in
1572 an active lkb cannot be modified before locking the rsb */
1573
1574 args->flags = flags;
1575 args->astaddr = ast;
1576 args->astparam = (long) astarg;
1577 args->bastaddr = bast;
1578 args->mode = mode;
1579 args->lksb = lksb;
David Teiglande7fd4172006-01-18 09:30:29 +00001580 rv = 0;
1581 out:
1582 return rv;
1583}
1584
1585static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1586{
1587 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1588 DLM_LKF_FORCEUNLOCK))
1589 return -EINVAL;
1590
1591 args->flags = flags;
1592 args->astparam = (long) astarg;
1593 return 0;
1594}
1595
1596static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1597 struct dlm_args *args)
1598{
1599 int rv = -EINVAL;
1600
1601 if (args->flags & DLM_LKF_CONVERT) {
1602 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1603 goto out;
1604
1605 if (args->flags & DLM_LKF_QUECVT &&
1606 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1607 goto out;
1608
1609 rv = -EBUSY;
1610 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1611 goto out;
1612
1613 if (lkb->lkb_wait_type)
1614 goto out;
1615 }
1616
1617 lkb->lkb_exflags = args->flags;
1618 lkb->lkb_sbflags = 0;
1619 lkb->lkb_astaddr = args->astaddr;
1620 lkb->lkb_astparam = args->astparam;
1621 lkb->lkb_bastaddr = args->bastaddr;
1622 lkb->lkb_rqmode = args->mode;
1623 lkb->lkb_lksb = args->lksb;
1624 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1625 lkb->lkb_ownpid = (int) current->pid;
David Teiglande7fd4172006-01-18 09:30:29 +00001626 rv = 0;
1627 out:
1628 return rv;
1629}
1630
1631static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1632{
1633 int rv = -EINVAL;
1634
1635 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1636 goto out;
1637
1638 if (args->flags & DLM_LKF_FORCEUNLOCK)
1639 goto out_ok;
1640
1641 if (args->flags & DLM_LKF_CANCEL &&
1642 lkb->lkb_status == DLM_LKSTS_GRANTED)
1643 goto out;
1644
1645 if (!(args->flags & DLM_LKF_CANCEL) &&
1646 lkb->lkb_status != DLM_LKSTS_GRANTED)
1647 goto out;
1648
1649 rv = -EBUSY;
1650 if (lkb->lkb_wait_type)
1651 goto out;
1652
1653 out_ok:
1654 lkb->lkb_exflags = args->flags;
1655 lkb->lkb_sbflags = 0;
1656 lkb->lkb_astparam = args->astparam;
1657
1658 rv = 0;
1659 out:
1660 return rv;
1661}
1662
1663/*
1664 * Four stage 4 varieties:
1665 * do_request(), do_convert(), do_unlock(), do_cancel()
1666 * These are called on the master node for the given lock and
1667 * from the central locking logic.
1668 */
1669
1670static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1671{
1672 int error = 0;
1673
David Teigland90135922006-01-20 08:47:07 +00001674 if (can_be_granted(r, lkb, 1)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001675 grant_lock(r, lkb);
1676 queue_cast(r, lkb, 0);
1677 goto out;
1678 }
1679
1680 if (can_be_queued(lkb)) {
1681 error = -EINPROGRESS;
1682 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1683 send_blocking_asts(r, lkb);
1684 goto out;
1685 }
1686
1687 error = -EAGAIN;
1688 if (force_blocking_asts(lkb))
1689 send_blocking_asts_all(r, lkb);
1690 queue_cast(r, lkb, -EAGAIN);
1691
1692 out:
1693 return error;
1694}
1695
1696static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1697{
1698 int error = 0;
1699
1700 /* changing an existing lock may allow others to be granted */
1701
David Teigland90135922006-01-20 08:47:07 +00001702 if (can_be_granted(r, lkb, 1)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001703 grant_lock(r, lkb);
1704 queue_cast(r, lkb, 0);
1705 grant_pending_locks(r);
1706 goto out;
1707 }
1708
1709 if (can_be_queued(lkb)) {
1710 if (is_demoted(lkb))
1711 grant_pending_locks(r);
1712 error = -EINPROGRESS;
1713 del_lkb(r, lkb);
1714 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1715 send_blocking_asts(r, lkb);
1716 goto out;
1717 }
1718
1719 error = -EAGAIN;
1720 if (force_blocking_asts(lkb))
1721 send_blocking_asts_all(r, lkb);
1722 queue_cast(r, lkb, -EAGAIN);
1723
1724 out:
1725 return error;
1726}
1727
1728static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1729{
1730 remove_lock(r, lkb);
1731 queue_cast(r, lkb, -DLM_EUNLOCK);
1732 grant_pending_locks(r);
1733 return -DLM_EUNLOCK;
1734}
1735
David Teigland597d0ca2006-07-12 16:44:04 -05001736/* FIXME: if revert_lock() finds that the lkb is granted, we should
1737 skip the queue_cast(ECANCEL). It indicates that the request/convert
1738 completed (and queued a normal ast) just before the cancel; we don't
1739 want to clobber the sb_result for the normal ast with ECANCEL. */
1740
David Teiglande7fd4172006-01-18 09:30:29 +00001741static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1742{
1743 revert_lock(r, lkb);
1744 queue_cast(r, lkb, -DLM_ECANCEL);
1745 grant_pending_locks(r);
1746 return -DLM_ECANCEL;
1747}
1748
1749/*
1750 * Four stage 3 varieties:
1751 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1752 */
1753
1754/* add a new lkb to a possibly new rsb, called by requesting process */
1755
1756static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1757{
1758 int error;
1759
1760 /* set_master: sets lkb nodeid from r */
1761
1762 error = set_master(r, lkb);
1763 if (error < 0)
1764 goto out;
1765 if (error) {
1766 error = 0;
1767 goto out;
1768 }
1769
1770 if (is_remote(r))
1771 /* receive_request() calls do_request() on remote node */
1772 error = send_request(r, lkb);
1773 else
1774 error = do_request(r, lkb);
1775 out:
1776 return error;
1777}
1778
David Teigland3bcd3682006-02-23 09:56:38 +00001779/* change some property of an existing lkb, e.g. mode */
David Teiglande7fd4172006-01-18 09:30:29 +00001780
1781static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1782{
1783 int error;
1784
1785 if (is_remote(r))
1786 /* receive_convert() calls do_convert() on remote node */
1787 error = send_convert(r, lkb);
1788 else
1789 error = do_convert(r, lkb);
1790
1791 return error;
1792}
1793
1794/* remove an existing lkb from the granted queue */
1795
1796static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1797{
1798 int error;
1799
1800 if (is_remote(r))
1801 /* receive_unlock() calls do_unlock() on remote node */
1802 error = send_unlock(r, lkb);
1803 else
1804 error = do_unlock(r, lkb);
1805
1806 return error;
1807}
1808
1809/* remove an existing lkb from the convert or wait queue */
1810
1811static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1812{
1813 int error;
1814
1815 if (is_remote(r))
1816 /* receive_cancel() calls do_cancel() on remote node */
1817 error = send_cancel(r, lkb);
1818 else
1819 error = do_cancel(r, lkb);
1820
1821 return error;
1822}
1823
1824/*
1825 * Four stage 2 varieties:
1826 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1827 */
1828
1829static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1830 int len, struct dlm_args *args)
1831{
1832 struct dlm_rsb *r;
1833 int error;
1834
1835 error = validate_lock_args(ls, lkb, args);
1836 if (error)
1837 goto out;
1838
1839 error = find_rsb(ls, name, len, R_CREATE, &r);
1840 if (error)
1841 goto out;
1842
1843 lock_rsb(r);
1844
1845 attach_lkb(r, lkb);
1846 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1847
1848 error = _request_lock(r, lkb);
1849
1850 unlock_rsb(r);
1851 put_rsb(r);
1852
1853 out:
1854 return error;
1855}
1856
1857static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1858 struct dlm_args *args)
1859{
1860 struct dlm_rsb *r;
1861 int error;
1862
1863 r = lkb->lkb_resource;
1864
1865 hold_rsb(r);
1866 lock_rsb(r);
1867
1868 error = validate_lock_args(ls, lkb, args);
1869 if (error)
1870 goto out;
1871
1872 error = _convert_lock(r, lkb);
1873 out:
1874 unlock_rsb(r);
1875 put_rsb(r);
1876 return error;
1877}
1878
1879static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1880 struct dlm_args *args)
1881{
1882 struct dlm_rsb *r;
1883 int error;
1884
1885 r = lkb->lkb_resource;
1886
1887 hold_rsb(r);
1888 lock_rsb(r);
1889
1890 error = validate_unlock_args(lkb, args);
1891 if (error)
1892 goto out;
1893
1894 error = _unlock_lock(r, lkb);
1895 out:
1896 unlock_rsb(r);
1897 put_rsb(r);
1898 return error;
1899}
1900
1901static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1902 struct dlm_args *args)
1903{
1904 struct dlm_rsb *r;
1905 int error;
1906
1907 r = lkb->lkb_resource;
1908
1909 hold_rsb(r);
1910 lock_rsb(r);
1911
1912 error = validate_unlock_args(lkb, args);
1913 if (error)
1914 goto out;
1915
1916 error = _cancel_lock(r, lkb);
1917 out:
1918 unlock_rsb(r);
1919 put_rsb(r);
1920 return error;
1921}
1922
1923/*
1924 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
1925 */
1926
1927int dlm_lock(dlm_lockspace_t *lockspace,
1928 int mode,
1929 struct dlm_lksb *lksb,
1930 uint32_t flags,
1931 void *name,
1932 unsigned int namelen,
1933 uint32_t parent_lkid,
1934 void (*ast) (void *astarg),
1935 void *astarg,
David Teigland3bcd3682006-02-23 09:56:38 +00001936 void (*bast) (void *astarg, int mode))
David Teiglande7fd4172006-01-18 09:30:29 +00001937{
1938 struct dlm_ls *ls;
1939 struct dlm_lkb *lkb;
1940 struct dlm_args args;
1941 int error, convert = flags & DLM_LKF_CONVERT;
1942
1943 ls = dlm_find_lockspace_local(lockspace);
1944 if (!ls)
1945 return -EINVAL;
1946
1947 lock_recovery(ls);
1948
1949 if (convert)
1950 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1951 else
1952 error = create_lkb(ls, &lkb);
1953
1954 if (error)
1955 goto out;
1956
1957 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
David Teigland3bcd3682006-02-23 09:56:38 +00001958 astarg, bast, &args);
David Teiglande7fd4172006-01-18 09:30:29 +00001959 if (error)
1960 goto out_put;
1961
1962 if (convert)
1963 error = convert_lock(ls, lkb, &args);
1964 else
1965 error = request_lock(ls, lkb, name, namelen, &args);
1966
1967 if (error == -EINPROGRESS)
1968 error = 0;
1969 out_put:
1970 if (convert || error)
David Teiglandb3f58d82006-02-28 11:16:37 -05001971 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00001972 if (error == -EAGAIN)
1973 error = 0;
1974 out:
1975 unlock_recovery(ls);
1976 dlm_put_lockspace(ls);
1977 return error;
1978}
1979
1980int dlm_unlock(dlm_lockspace_t *lockspace,
1981 uint32_t lkid,
1982 uint32_t flags,
1983 struct dlm_lksb *lksb,
1984 void *astarg)
1985{
1986 struct dlm_ls *ls;
1987 struct dlm_lkb *lkb;
1988 struct dlm_args args;
1989 int error;
1990
1991 ls = dlm_find_lockspace_local(lockspace);
1992 if (!ls)
1993 return -EINVAL;
1994
1995 lock_recovery(ls);
1996
1997 error = find_lkb(ls, lkid, &lkb);
1998 if (error)
1999 goto out;
2000
2001 error = set_unlock_args(flags, astarg, &args);
2002 if (error)
2003 goto out_put;
2004
2005 if (flags & DLM_LKF_CANCEL)
2006 error = cancel_lock(ls, lkb, &args);
2007 else
2008 error = unlock_lock(ls, lkb, &args);
2009
2010 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2011 error = 0;
2012 out_put:
David Teiglandb3f58d82006-02-28 11:16:37 -05002013 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002014 out:
2015 unlock_recovery(ls);
2016 dlm_put_lockspace(ls);
2017 return error;
2018}
2019
2020/*
2021 * send/receive routines for remote operations and replies
2022 *
2023 * send_args
2024 * send_common
2025 * send_request receive_request
2026 * send_convert receive_convert
2027 * send_unlock receive_unlock
2028 * send_cancel receive_cancel
2029 * send_grant receive_grant
2030 * send_bast receive_bast
2031 * send_lookup receive_lookup
2032 * send_remove receive_remove
2033 *
2034 * send_common_reply
2035 * receive_request_reply send_request_reply
2036 * receive_convert_reply send_convert_reply
2037 * receive_unlock_reply send_unlock_reply
2038 * receive_cancel_reply send_cancel_reply
2039 * receive_lookup_reply send_lookup_reply
2040 */
2041
2042static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2043 int to_nodeid, int mstype,
2044 struct dlm_message **ms_ret,
2045 struct dlm_mhandle **mh_ret)
2046{
2047 struct dlm_message *ms;
2048 struct dlm_mhandle *mh;
2049 char *mb;
2050 int mb_len = sizeof(struct dlm_message);
2051
2052 switch (mstype) {
2053 case DLM_MSG_REQUEST:
2054 case DLM_MSG_LOOKUP:
2055 case DLM_MSG_REMOVE:
2056 mb_len += r->res_length;
2057 break;
2058 case DLM_MSG_CONVERT:
2059 case DLM_MSG_UNLOCK:
2060 case DLM_MSG_REQUEST_REPLY:
2061 case DLM_MSG_CONVERT_REPLY:
2062 case DLM_MSG_GRANT:
2063 if (lkb && lkb->lkb_lvbptr)
2064 mb_len += r->res_ls->ls_lvblen;
2065 break;
2066 }
2067
2068 /* get_buffer gives us a message handle (mh) that we need to
2069 pass into lowcomms_commit and a message buffer (mb) that we
2070 write our data into */
2071
2072 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2073 if (!mh)
2074 return -ENOBUFS;
2075
2076 memset(mb, 0, mb_len);
2077
2078 ms = (struct dlm_message *) mb;
2079
2080 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2081 ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2082 ms->m_header.h_nodeid = dlm_our_nodeid();
2083 ms->m_header.h_length = mb_len;
2084 ms->m_header.h_cmd = DLM_MSG;
2085
2086 ms->m_type = mstype;
2087
2088 *mh_ret = mh;
2089 *ms_ret = ms;
2090 return 0;
2091}
2092
2093/* further lowcomms enhancements or alternate implementations may make
2094 the return value from this function useful at some point */
2095
2096static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2097{
2098 dlm_message_out(ms);
2099 dlm_lowcomms_commit_buffer(mh);
2100 return 0;
2101}
2102
2103static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2104 struct dlm_message *ms)
2105{
2106 ms->m_nodeid = lkb->lkb_nodeid;
2107 ms->m_pid = lkb->lkb_ownpid;
2108 ms->m_lkid = lkb->lkb_id;
2109 ms->m_remid = lkb->lkb_remid;
2110 ms->m_exflags = lkb->lkb_exflags;
2111 ms->m_sbflags = lkb->lkb_sbflags;
2112 ms->m_flags = lkb->lkb_flags;
2113 ms->m_lvbseq = lkb->lkb_lvbseq;
2114 ms->m_status = lkb->lkb_status;
2115 ms->m_grmode = lkb->lkb_grmode;
2116 ms->m_rqmode = lkb->lkb_rqmode;
2117 ms->m_hash = r->res_hash;
2118
2119 /* m_result and m_bastmode are set from function args,
2120 not from lkb fields */
2121
2122 if (lkb->lkb_bastaddr)
2123 ms->m_asts |= AST_BAST;
2124 if (lkb->lkb_astaddr)
2125 ms->m_asts |= AST_COMP;
2126
David Teiglande7fd4172006-01-18 09:30:29 +00002127 if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2128 memcpy(ms->m_extra, r->res_name, r->res_length);
2129
2130 else if (lkb->lkb_lvbptr)
2131 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2132
2133}
2134
2135static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2136{
2137 struct dlm_message *ms;
2138 struct dlm_mhandle *mh;
2139 int to_nodeid, error;
2140
2141 add_to_waiters(lkb, mstype);
2142
2143 to_nodeid = r->res_nodeid;
2144
2145 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2146 if (error)
2147 goto fail;
2148
2149 send_args(r, lkb, ms);
2150
2151 error = send_message(mh, ms);
2152 if (error)
2153 goto fail;
2154 return 0;
2155
2156 fail:
2157 remove_from_waiters(lkb);
2158 return error;
2159}
2160
2161static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2162{
2163 return send_common(r, lkb, DLM_MSG_REQUEST);
2164}
2165
2166static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2167{
2168 int error;
2169
2170 error = send_common(r, lkb, DLM_MSG_CONVERT);
2171
2172 /* down conversions go without a reply from the master */
2173 if (!error && down_conversion(lkb)) {
2174 remove_from_waiters(lkb);
2175 r->res_ls->ls_stub_ms.m_result = 0;
2176 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2177 }
2178
2179 return error;
2180}
2181
2182/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2183 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2184 that the master is still correct. */
2185
2186static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2187{
2188 return send_common(r, lkb, DLM_MSG_UNLOCK);
2189}
2190
2191static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2192{
2193 return send_common(r, lkb, DLM_MSG_CANCEL);
2194}
2195
2196static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2197{
2198 struct dlm_message *ms;
2199 struct dlm_mhandle *mh;
2200 int to_nodeid, error;
2201
2202 to_nodeid = lkb->lkb_nodeid;
2203
2204 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2205 if (error)
2206 goto out;
2207
2208 send_args(r, lkb, ms);
2209
2210 ms->m_result = 0;
2211
2212 error = send_message(mh, ms);
2213 out:
2214 return error;
2215}
2216
2217static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2218{
2219 struct dlm_message *ms;
2220 struct dlm_mhandle *mh;
2221 int to_nodeid, error;
2222
2223 to_nodeid = lkb->lkb_nodeid;
2224
2225 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2226 if (error)
2227 goto out;
2228
2229 send_args(r, lkb, ms);
2230
2231 ms->m_bastmode = mode;
2232
2233 error = send_message(mh, ms);
2234 out:
2235 return error;
2236}
2237
2238static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2239{
2240 struct dlm_message *ms;
2241 struct dlm_mhandle *mh;
2242 int to_nodeid, error;
2243
2244 add_to_waiters(lkb, DLM_MSG_LOOKUP);
2245
2246 to_nodeid = dlm_dir_nodeid(r);
2247
2248 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2249 if (error)
2250 goto fail;
2251
2252 send_args(r, lkb, ms);
2253
2254 error = send_message(mh, ms);
2255 if (error)
2256 goto fail;
2257 return 0;
2258
2259 fail:
2260 remove_from_waiters(lkb);
2261 return error;
2262}
2263
2264static int send_remove(struct dlm_rsb *r)
2265{
2266 struct dlm_message *ms;
2267 struct dlm_mhandle *mh;
2268 int to_nodeid, error;
2269
2270 to_nodeid = dlm_dir_nodeid(r);
2271
2272 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2273 if (error)
2274 goto out;
2275
2276 memcpy(ms->m_extra, r->res_name, r->res_length);
2277 ms->m_hash = r->res_hash;
2278
2279 error = send_message(mh, ms);
2280 out:
2281 return error;
2282}
2283
2284static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2285 int mstype, int rv)
2286{
2287 struct dlm_message *ms;
2288 struct dlm_mhandle *mh;
2289 int to_nodeid, error;
2290
2291 to_nodeid = lkb->lkb_nodeid;
2292
2293 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2294 if (error)
2295 goto out;
2296
2297 send_args(r, lkb, ms);
2298
2299 ms->m_result = rv;
2300
2301 error = send_message(mh, ms);
2302 out:
2303 return error;
2304}
2305
2306static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2307{
2308 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2309}
2310
2311static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2312{
2313 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2314}
2315
2316static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2317{
2318 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2319}
2320
2321static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2322{
2323 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2324}
2325
2326static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2327 int ret_nodeid, int rv)
2328{
2329 struct dlm_rsb *r = &ls->ls_stub_rsb;
2330 struct dlm_message *ms;
2331 struct dlm_mhandle *mh;
2332 int error, nodeid = ms_in->m_header.h_nodeid;
2333
2334 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2335 if (error)
2336 goto out;
2337
2338 ms->m_lkid = ms_in->m_lkid;
2339 ms->m_result = rv;
2340 ms->m_nodeid = ret_nodeid;
2341
2342 error = send_message(mh, ms);
2343 out:
2344 return error;
2345}
2346
2347/* which args we save from a received message depends heavily on the type
2348 of message, unlike the send side where we can safely send everything about
2349 the lkb for any type of message */
2350
2351static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2352{
2353 lkb->lkb_exflags = ms->m_exflags;
2354 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2355 (ms->m_flags & 0x0000FFFF);
2356}
2357
2358static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2359{
2360 lkb->lkb_sbflags = ms->m_sbflags;
2361 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2362 (ms->m_flags & 0x0000FFFF);
2363}
2364
2365static int receive_extralen(struct dlm_message *ms)
2366{
2367 return (ms->m_header.h_length - sizeof(struct dlm_message));
2368}
2369
David Teiglande7fd4172006-01-18 09:30:29 +00002370static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2371 struct dlm_message *ms)
2372{
2373 int len;
2374
2375 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2376 if (!lkb->lkb_lvbptr)
2377 lkb->lkb_lvbptr = allocate_lvb(ls);
2378 if (!lkb->lkb_lvbptr)
2379 return -ENOMEM;
2380 len = receive_extralen(ms);
2381 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2382 }
2383 return 0;
2384}
2385
2386static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2387 struct dlm_message *ms)
2388{
2389 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2390 lkb->lkb_ownpid = ms->m_pid;
2391 lkb->lkb_remid = ms->m_lkid;
2392 lkb->lkb_grmode = DLM_LOCK_IV;
2393 lkb->lkb_rqmode = ms->m_rqmode;
2394 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2395 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2396
2397 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2398
David Teiglande7fd4172006-01-18 09:30:29 +00002399 if (receive_lvb(ls, lkb, ms))
2400 return -ENOMEM;
2401
2402 return 0;
2403}
2404
2405static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2406 struct dlm_message *ms)
2407{
2408 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2409 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2410 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2411 lkb->lkb_id, lkb->lkb_remid);
2412 return -EINVAL;
2413 }
2414
2415 if (!is_master_copy(lkb))
2416 return -EINVAL;
2417
2418 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2419 return -EBUSY;
2420
David Teiglande7fd4172006-01-18 09:30:29 +00002421 if (receive_lvb(ls, lkb, ms))
2422 return -ENOMEM;
2423
2424 lkb->lkb_rqmode = ms->m_rqmode;
2425 lkb->lkb_lvbseq = ms->m_lvbseq;
2426
2427 return 0;
2428}
2429
2430static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2431 struct dlm_message *ms)
2432{
2433 if (!is_master_copy(lkb))
2434 return -EINVAL;
2435 if (receive_lvb(ls, lkb, ms))
2436 return -ENOMEM;
2437 return 0;
2438}
2439
2440/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2441 uses to send a reply and that the remote end uses to process the reply. */
2442
2443static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2444{
2445 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2446 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2447 lkb->lkb_remid = ms->m_lkid;
2448}
2449
2450static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2451{
2452 struct dlm_lkb *lkb;
2453 struct dlm_rsb *r;
2454 int error, namelen;
2455
2456 error = create_lkb(ls, &lkb);
2457 if (error)
2458 goto fail;
2459
2460 receive_flags(lkb, ms);
2461 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2462 error = receive_request_args(ls, lkb, ms);
2463 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05002464 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002465 goto fail;
2466 }
2467
2468 namelen = receive_extralen(ms);
2469
2470 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2471 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05002472 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002473 goto fail;
2474 }
2475
2476 lock_rsb(r);
2477
2478 attach_lkb(r, lkb);
2479 error = do_request(r, lkb);
2480 send_request_reply(r, lkb, error);
2481
2482 unlock_rsb(r);
2483 put_rsb(r);
2484
2485 if (error == -EINPROGRESS)
2486 error = 0;
2487 if (error)
David Teiglandb3f58d82006-02-28 11:16:37 -05002488 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002489 return;
2490
2491 fail:
2492 setup_stub_lkb(ls, ms);
2493 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2494}
2495
2496static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2497{
2498 struct dlm_lkb *lkb;
2499 struct dlm_rsb *r;
David Teigland90135922006-01-20 08:47:07 +00002500 int error, reply = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00002501
2502 error = find_lkb(ls, ms->m_remid, &lkb);
2503 if (error)
2504 goto fail;
2505
2506 r = lkb->lkb_resource;
2507
2508 hold_rsb(r);
2509 lock_rsb(r);
2510
2511 receive_flags(lkb, ms);
2512 error = receive_convert_args(ls, lkb, ms);
2513 if (error)
2514 goto out;
2515 reply = !down_conversion(lkb);
2516
2517 error = do_convert(r, lkb);
2518 out:
2519 if (reply)
2520 send_convert_reply(r, lkb, error);
2521
2522 unlock_rsb(r);
2523 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002524 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002525 return;
2526
2527 fail:
2528 setup_stub_lkb(ls, ms);
2529 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2530}
2531
2532static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2533{
2534 struct dlm_lkb *lkb;
2535 struct dlm_rsb *r;
2536 int error;
2537
2538 error = find_lkb(ls, ms->m_remid, &lkb);
2539 if (error)
2540 goto fail;
2541
2542 r = lkb->lkb_resource;
2543
2544 hold_rsb(r);
2545 lock_rsb(r);
2546
2547 receive_flags(lkb, ms);
2548 error = receive_unlock_args(ls, lkb, ms);
2549 if (error)
2550 goto out;
2551
2552 error = do_unlock(r, lkb);
2553 out:
2554 send_unlock_reply(r, lkb, error);
2555
2556 unlock_rsb(r);
2557 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002558 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002559 return;
2560
2561 fail:
2562 setup_stub_lkb(ls, ms);
2563 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2564}
2565
2566static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2567{
2568 struct dlm_lkb *lkb;
2569 struct dlm_rsb *r;
2570 int error;
2571
2572 error = find_lkb(ls, ms->m_remid, &lkb);
2573 if (error)
2574 goto fail;
2575
2576 receive_flags(lkb, ms);
2577
2578 r = lkb->lkb_resource;
2579
2580 hold_rsb(r);
2581 lock_rsb(r);
2582
2583 error = do_cancel(r, lkb);
2584 send_cancel_reply(r, lkb, error);
2585
2586 unlock_rsb(r);
2587 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002588 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002589 return;
2590
2591 fail:
2592 setup_stub_lkb(ls, ms);
2593 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2594}
2595
2596static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2597{
2598 struct dlm_lkb *lkb;
2599 struct dlm_rsb *r;
2600 int error;
2601
2602 error = find_lkb(ls, ms->m_remid, &lkb);
2603 if (error) {
2604 log_error(ls, "receive_grant no lkb");
2605 return;
2606 }
2607 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2608
2609 r = lkb->lkb_resource;
2610
2611 hold_rsb(r);
2612 lock_rsb(r);
2613
2614 receive_flags_reply(lkb, ms);
2615 grant_lock_pc(r, lkb, ms);
2616 queue_cast(r, lkb, 0);
2617
2618 unlock_rsb(r);
2619 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002620 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002621}
2622
2623static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2624{
2625 struct dlm_lkb *lkb;
2626 struct dlm_rsb *r;
2627 int error;
2628
2629 error = find_lkb(ls, ms->m_remid, &lkb);
2630 if (error) {
2631 log_error(ls, "receive_bast no lkb");
2632 return;
2633 }
2634 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2635
2636 r = lkb->lkb_resource;
2637
2638 hold_rsb(r);
2639 lock_rsb(r);
2640
2641 queue_bast(r, lkb, ms->m_bastmode);
2642
2643 unlock_rsb(r);
2644 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002645 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002646}
2647
2648static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2649{
2650 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2651
2652 from_nodeid = ms->m_header.h_nodeid;
2653 our_nodeid = dlm_our_nodeid();
2654
2655 len = receive_extralen(ms);
2656
2657 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2658 if (dir_nodeid != our_nodeid) {
2659 log_error(ls, "lookup dir_nodeid %d from %d",
2660 dir_nodeid, from_nodeid);
2661 error = -EINVAL;
2662 ret_nodeid = -1;
2663 goto out;
2664 }
2665
2666 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2667
2668 /* Optimization: we're master so treat lookup as a request */
2669 if (!error && ret_nodeid == our_nodeid) {
2670 receive_request(ls, ms);
2671 return;
2672 }
2673 out:
2674 send_lookup_reply(ls, ms, ret_nodeid, error);
2675}
2676
2677static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2678{
2679 int len, dir_nodeid, from_nodeid;
2680
2681 from_nodeid = ms->m_header.h_nodeid;
2682
2683 len = receive_extralen(ms);
2684
2685 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2686 if (dir_nodeid != dlm_our_nodeid()) {
2687 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2688 dir_nodeid, from_nodeid);
2689 return;
2690 }
2691
2692 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2693}
2694
2695static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2696{
2697 struct dlm_lkb *lkb;
2698 struct dlm_rsb *r;
2699 int error, mstype;
2700
2701 error = find_lkb(ls, ms->m_remid, &lkb);
2702 if (error) {
2703 log_error(ls, "receive_request_reply no lkb");
2704 return;
2705 }
2706 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2707
2708 mstype = lkb->lkb_wait_type;
2709 error = remove_from_waiters(lkb);
2710 if (error) {
2711 log_error(ls, "receive_request_reply not on waiters");
2712 goto out;
2713 }
2714
2715 /* this is the value returned from do_request() on the master */
2716 error = ms->m_result;
2717
2718 r = lkb->lkb_resource;
2719 hold_rsb(r);
2720 lock_rsb(r);
2721
2722 /* Optimization: the dir node was also the master, so it took our
2723 lookup as a request and sent request reply instead of lookup reply */
2724 if (mstype == DLM_MSG_LOOKUP) {
2725 r->res_nodeid = ms->m_header.h_nodeid;
2726 lkb->lkb_nodeid = r->res_nodeid;
2727 }
2728
2729 switch (error) {
2730 case -EAGAIN:
2731 /* request would block (be queued) on remote master;
2732 the unhold undoes the original ref from create_lkb()
2733 so it leads to the lkb being freed */
2734 queue_cast(r, lkb, -EAGAIN);
2735 confirm_master(r, -EAGAIN);
2736 unhold_lkb(lkb);
2737 break;
2738
2739 case -EINPROGRESS:
2740 case 0:
2741 /* request was queued or granted on remote master */
2742 receive_flags_reply(lkb, ms);
2743 lkb->lkb_remid = ms->m_lkid;
2744 if (error)
2745 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2746 else {
2747 grant_lock_pc(r, lkb, ms);
2748 queue_cast(r, lkb, 0);
2749 }
2750 confirm_master(r, error);
2751 break;
2752
David Teigland597d0ca2006-07-12 16:44:04 -05002753 case -EBADR:
David Teiglande7fd4172006-01-18 09:30:29 +00002754 case -ENOTBLK:
2755 /* find_rsb failed to find rsb or rsb wasn't master */
2756 r->res_nodeid = -1;
2757 lkb->lkb_nodeid = -1;
2758 _request_lock(r, lkb);
2759 break;
2760
2761 default:
2762 log_error(ls, "receive_request_reply error %d", error);
2763 }
2764
2765 unlock_rsb(r);
2766 put_rsb(r);
2767 out:
David Teiglandb3f58d82006-02-28 11:16:37 -05002768 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002769}
2770
2771static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2772 struct dlm_message *ms)
2773{
2774 int error = ms->m_result;
2775
2776 /* this is the value returned from do_convert() on the master */
2777
2778 switch (error) {
2779 case -EAGAIN:
2780 /* convert would block (be queued) on remote master */
2781 queue_cast(r, lkb, -EAGAIN);
2782 break;
2783
2784 case -EINPROGRESS:
2785 /* convert was queued on remote master */
2786 del_lkb(r, lkb);
2787 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2788 break;
2789
2790 case 0:
2791 /* convert was granted on remote master */
2792 receive_flags_reply(lkb, ms);
2793 grant_lock_pc(r, lkb, ms);
2794 queue_cast(r, lkb, 0);
2795 break;
2796
2797 default:
2798 log_error(r->res_ls, "receive_convert_reply error %d", error);
2799 }
2800}
2801
2802static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2803{
2804 struct dlm_rsb *r = lkb->lkb_resource;
2805
2806 hold_rsb(r);
2807 lock_rsb(r);
2808
2809 __receive_convert_reply(r, lkb, ms);
2810
2811 unlock_rsb(r);
2812 put_rsb(r);
2813}
2814
2815static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2816{
2817 struct dlm_lkb *lkb;
2818 int error;
2819
2820 error = find_lkb(ls, ms->m_remid, &lkb);
2821 if (error) {
2822 log_error(ls, "receive_convert_reply no lkb");
2823 return;
2824 }
2825 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2826
2827 error = remove_from_waiters(lkb);
2828 if (error) {
2829 log_error(ls, "receive_convert_reply not on waiters");
2830 goto out;
2831 }
2832
2833 _receive_convert_reply(lkb, ms);
2834 out:
David Teiglandb3f58d82006-02-28 11:16:37 -05002835 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002836}
2837
2838static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2839{
2840 struct dlm_rsb *r = lkb->lkb_resource;
2841 int error = ms->m_result;
2842
2843 hold_rsb(r);
2844 lock_rsb(r);
2845
2846 /* this is the value returned from do_unlock() on the master */
2847
2848 switch (error) {
2849 case -DLM_EUNLOCK:
2850 receive_flags_reply(lkb, ms);
2851 remove_lock_pc(r, lkb);
2852 queue_cast(r, lkb, -DLM_EUNLOCK);
2853 break;
2854 default:
2855 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2856 }
2857
2858 unlock_rsb(r);
2859 put_rsb(r);
2860}
2861
2862static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2863{
2864 struct dlm_lkb *lkb;
2865 int error;
2866
2867 error = find_lkb(ls, ms->m_remid, &lkb);
2868 if (error) {
2869 log_error(ls, "receive_unlock_reply no lkb");
2870 return;
2871 }
2872 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2873
2874 error = remove_from_waiters(lkb);
2875 if (error) {
2876 log_error(ls, "receive_unlock_reply not on waiters");
2877 goto out;
2878 }
2879
2880 _receive_unlock_reply(lkb, ms);
2881 out:
David Teiglandb3f58d82006-02-28 11:16:37 -05002882 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002883}
2884
2885static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2886{
2887 struct dlm_rsb *r = lkb->lkb_resource;
2888 int error = ms->m_result;
2889
2890 hold_rsb(r);
2891 lock_rsb(r);
2892
2893 /* this is the value returned from do_cancel() on the master */
2894
2895 switch (error) {
2896 case -DLM_ECANCEL:
2897 receive_flags_reply(lkb, ms);
2898 revert_lock_pc(r, lkb);
2899 queue_cast(r, lkb, -DLM_ECANCEL);
2900 break;
2901 default:
2902 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2903 }
2904
2905 unlock_rsb(r);
2906 put_rsb(r);
2907}
2908
2909static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2910{
2911 struct dlm_lkb *lkb;
2912 int error;
2913
2914 error = find_lkb(ls, ms->m_remid, &lkb);
2915 if (error) {
2916 log_error(ls, "receive_cancel_reply no lkb");
2917 return;
2918 }
2919 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2920
2921 error = remove_from_waiters(lkb);
2922 if (error) {
2923 log_error(ls, "receive_cancel_reply not on waiters");
2924 goto out;
2925 }
2926
2927 _receive_cancel_reply(lkb, ms);
2928 out:
David Teiglandb3f58d82006-02-28 11:16:37 -05002929 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002930}
2931
2932static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2933{
2934 struct dlm_lkb *lkb;
2935 struct dlm_rsb *r;
2936 int error, ret_nodeid;
2937
2938 error = find_lkb(ls, ms->m_lkid, &lkb);
2939 if (error) {
2940 log_error(ls, "receive_lookup_reply no lkb");
2941 return;
2942 }
2943
2944 error = remove_from_waiters(lkb);
2945 if (error) {
2946 log_error(ls, "receive_lookup_reply not on waiters");
2947 goto out;
2948 }
2949
2950 /* this is the value returned by dlm_dir_lookup on dir node
2951 FIXME: will a non-zero error ever be returned? */
2952 error = ms->m_result;
2953
2954 r = lkb->lkb_resource;
2955 hold_rsb(r);
2956 lock_rsb(r);
2957
2958 ret_nodeid = ms->m_nodeid;
2959 if (ret_nodeid == dlm_our_nodeid()) {
2960 r->res_nodeid = 0;
2961 ret_nodeid = 0;
2962 r->res_first_lkid = 0;
2963 } else {
2964 /* set_master() will copy res_nodeid to lkb_nodeid */
2965 r->res_nodeid = ret_nodeid;
2966 }
2967
2968 _request_lock(r, lkb);
2969
2970 if (!ret_nodeid)
2971 process_lookup_list(r);
2972
2973 unlock_rsb(r);
2974 put_rsb(r);
2975 out:
David Teiglandb3f58d82006-02-28 11:16:37 -05002976 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002977}
2978
2979int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
2980{
2981 struct dlm_message *ms = (struct dlm_message *) hd;
2982 struct dlm_ls *ls;
2983 int error;
2984
2985 if (!recovery)
2986 dlm_message_in(ms);
2987
2988 ls = dlm_find_lockspace_global(hd->h_lockspace);
2989 if (!ls) {
2990 log_print("drop message %d from %d for unknown lockspace %d",
2991 ms->m_type, nodeid, hd->h_lockspace);
2992 return -EINVAL;
2993 }
2994
2995 /* recovery may have just ended leaving a bunch of backed-up requests
2996 in the requestqueue; wait while dlm_recoverd clears them */
2997
2998 if (!recovery)
2999 dlm_wait_requestqueue(ls);
3000
3001 /* recovery may have just started while there were a bunch of
3002 in-flight requests -- save them in requestqueue to be processed
3003 after recovery. we can't let dlm_recvd block on the recovery
3004 lock. if dlm_recoverd is calling this function to clear the
3005 requestqueue, it needs to be interrupted (-EINTR) if another
3006 recovery operation is starting. */
3007
3008 while (1) {
3009 if (dlm_locking_stopped(ls)) {
3010 if (!recovery)
3011 dlm_add_requestqueue(ls, nodeid, hd);
3012 error = -EINTR;
3013 goto out;
3014 }
3015
3016 if (lock_recovery_try(ls))
3017 break;
3018 schedule();
3019 }
3020
3021 switch (ms->m_type) {
3022
3023 /* messages sent to a master node */
3024
3025 case DLM_MSG_REQUEST:
3026 receive_request(ls, ms);
3027 break;
3028
3029 case DLM_MSG_CONVERT:
3030 receive_convert(ls, ms);
3031 break;
3032
3033 case DLM_MSG_UNLOCK:
3034 receive_unlock(ls, ms);
3035 break;
3036
3037 case DLM_MSG_CANCEL:
3038 receive_cancel(ls, ms);
3039 break;
3040
3041 /* messages sent from a master node (replies to above) */
3042
3043 case DLM_MSG_REQUEST_REPLY:
3044 receive_request_reply(ls, ms);
3045 break;
3046
3047 case DLM_MSG_CONVERT_REPLY:
3048 receive_convert_reply(ls, ms);
3049 break;
3050
3051 case DLM_MSG_UNLOCK_REPLY:
3052 receive_unlock_reply(ls, ms);
3053 break;
3054
3055 case DLM_MSG_CANCEL_REPLY:
3056 receive_cancel_reply(ls, ms);
3057 break;
3058
3059 /* messages sent from a master node (only two types of async msg) */
3060
3061 case DLM_MSG_GRANT:
3062 receive_grant(ls, ms);
3063 break;
3064
3065 case DLM_MSG_BAST:
3066 receive_bast(ls, ms);
3067 break;
3068
3069 /* messages sent to a dir node */
3070
3071 case DLM_MSG_LOOKUP:
3072 receive_lookup(ls, ms);
3073 break;
3074
3075 case DLM_MSG_REMOVE:
3076 receive_remove(ls, ms);
3077 break;
3078
3079 /* messages sent from a dir node (remove has no reply) */
3080
3081 case DLM_MSG_LOOKUP_REPLY:
3082 receive_lookup_reply(ls, ms);
3083 break;
3084
3085 default:
3086 log_error(ls, "unknown message type %d", ms->m_type);
3087 }
3088
3089 unlock_recovery(ls);
3090 out:
3091 dlm_put_lockspace(ls);
3092 dlm_astd_wake();
3093 return 0;
3094}
3095
3096
3097/*
3098 * Recovery related
3099 */
3100
3101static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3102{
3103 if (middle_conversion(lkb)) {
3104 hold_lkb(lkb);
3105 ls->ls_stub_ms.m_result = -EINPROGRESS;
3106 _remove_from_waiters(lkb);
3107 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3108
3109 /* Same special case as in receive_rcom_lock_args() */
3110 lkb->lkb_grmode = DLM_LOCK_IV;
3111 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3112 unhold_lkb(lkb);
3113
3114 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3115 lkb->lkb_flags |= DLM_IFL_RESEND;
3116 }
3117
3118 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3119 conversions are async; there's no reply from the remote master */
3120}
3121
3122/* A waiting lkb needs recovery if the master node has failed, or
3123 the master node is changing (only when no directory is used) */
3124
3125static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3126{
3127 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3128 return 1;
3129
3130 if (!dlm_no_directory(ls))
3131 return 0;
3132
3133 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3134 return 1;
3135
3136 return 0;
3137}
3138
3139/* Recovery for locks that are waiting for replies from nodes that are now
3140 gone. We can just complete unlocks and cancels by faking a reply from the
3141 dead node. Requests and up-conversions we flag to be resent after
3142 recovery. Down-conversions can just be completed with a fake reply like
3143 unlocks. Conversions between PR and CW need special attention. */
3144
3145void dlm_recover_waiters_pre(struct dlm_ls *ls)
3146{
3147 struct dlm_lkb *lkb, *safe;
3148
David Teigland90135922006-01-20 08:47:07 +00003149 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003150
3151 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3152 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3153 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3154
3155 /* all outstanding lookups, regardless of destination will be
3156 resent after recovery is done */
3157
3158 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3159 lkb->lkb_flags |= DLM_IFL_RESEND;
3160 continue;
3161 }
3162
3163 if (!waiter_needs_recovery(ls, lkb))
3164 continue;
3165
3166 switch (lkb->lkb_wait_type) {
3167
3168 case DLM_MSG_REQUEST:
3169 lkb->lkb_flags |= DLM_IFL_RESEND;
3170 break;
3171
3172 case DLM_MSG_CONVERT:
3173 recover_convert_waiter(ls, lkb);
3174 break;
3175
3176 case DLM_MSG_UNLOCK:
3177 hold_lkb(lkb);
3178 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3179 _remove_from_waiters(lkb);
3180 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003181 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003182 break;
3183
3184 case DLM_MSG_CANCEL:
3185 hold_lkb(lkb);
3186 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3187 _remove_from_waiters(lkb);
3188 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003189 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003190 break;
3191
3192 default:
3193 log_error(ls, "invalid lkb wait_type %d",
3194 lkb->lkb_wait_type);
3195 }
David Teigland81456802006-07-25 14:05:09 -05003196 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00003197 }
David Teigland90135922006-01-20 08:47:07 +00003198 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003199}
3200
3201static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3202{
3203 struct dlm_lkb *lkb;
3204 int rv = 0;
3205
David Teigland90135922006-01-20 08:47:07 +00003206 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003207 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3208 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3209 rv = lkb->lkb_wait_type;
3210 _remove_from_waiters(lkb);
3211 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3212 break;
3213 }
3214 }
David Teigland90135922006-01-20 08:47:07 +00003215 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003216
3217 if (!rv)
3218 lkb = NULL;
3219 *lkb_ret = lkb;
3220 return rv;
3221}
3222
3223/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3224 master or dir-node for r. Processing the lkb may result in it being placed
3225 back on waiters. */
3226
3227int dlm_recover_waiters_post(struct dlm_ls *ls)
3228{
3229 struct dlm_lkb *lkb;
3230 struct dlm_rsb *r;
3231 int error = 0, mstype;
3232
3233 while (1) {
3234 if (dlm_locking_stopped(ls)) {
3235 log_debug(ls, "recover_waiters_post aborted");
3236 error = -EINTR;
3237 break;
3238 }
3239
3240 mstype = remove_resend_waiter(ls, &lkb);
3241 if (!mstype)
3242 break;
3243
3244 r = lkb->lkb_resource;
3245
3246 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3247 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3248
3249 switch (mstype) {
3250
3251 case DLM_MSG_LOOKUP:
3252 hold_rsb(r);
3253 lock_rsb(r);
3254 _request_lock(r, lkb);
3255 if (is_master(r))
3256 confirm_master(r, 0);
3257 unlock_rsb(r);
3258 put_rsb(r);
3259 break;
3260
3261 case DLM_MSG_REQUEST:
3262 hold_rsb(r);
3263 lock_rsb(r);
3264 _request_lock(r, lkb);
3265 unlock_rsb(r);
3266 put_rsb(r);
3267 break;
3268
3269 case DLM_MSG_CONVERT:
3270 hold_rsb(r);
3271 lock_rsb(r);
3272 _convert_lock(r, lkb);
3273 unlock_rsb(r);
3274 put_rsb(r);
3275 break;
3276
3277 default:
3278 log_error(ls, "recover_waiters_post type %d", mstype);
3279 }
3280 }
3281
3282 return error;
3283}
3284
3285static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3286 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3287{
3288 struct dlm_ls *ls = r->res_ls;
3289 struct dlm_lkb *lkb, *safe;
3290
3291 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3292 if (test(ls, lkb)) {
David Teigland97a35d12006-05-02 13:34:03 -04003293 rsb_set_flag(r, RSB_LOCKS_PURGED);
David Teiglande7fd4172006-01-18 09:30:29 +00003294 del_lkb(r, lkb);
3295 /* this put should free the lkb */
David Teiglandb3f58d82006-02-28 11:16:37 -05003296 if (!dlm_put_lkb(lkb))
David Teiglande7fd4172006-01-18 09:30:29 +00003297 log_error(ls, "purged lkb not released");
3298 }
3299 }
3300}
3301
3302static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3303{
3304 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3305}
3306
3307static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3308{
3309 return is_master_copy(lkb);
3310}
3311
3312static void purge_dead_locks(struct dlm_rsb *r)
3313{
3314 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3315 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3316 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3317}
3318
3319void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3320{
3321 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3322 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3323 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3324}
3325
3326/* Get rid of locks held by nodes that are gone. */
3327
3328int dlm_purge_locks(struct dlm_ls *ls)
3329{
3330 struct dlm_rsb *r;
3331
3332 log_debug(ls, "dlm_purge_locks");
3333
3334 down_write(&ls->ls_root_sem);
3335 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3336 hold_rsb(r);
3337 lock_rsb(r);
3338 if (is_master(r))
3339 purge_dead_locks(r);
3340 unlock_rsb(r);
3341 unhold_rsb(r);
3342
3343 schedule();
3344 }
3345 up_write(&ls->ls_root_sem);
3346
3347 return 0;
3348}
3349
David Teigland97a35d12006-05-02 13:34:03 -04003350static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3351{
3352 struct dlm_rsb *r, *r_ret = NULL;
3353
3354 read_lock(&ls->ls_rsbtbl[bucket].lock);
3355 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3356 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3357 continue;
3358 hold_rsb(r);
3359 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3360 r_ret = r;
3361 break;
3362 }
3363 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3364 return r_ret;
3365}
3366
3367void dlm_grant_after_purge(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +00003368{
3369 struct dlm_rsb *r;
David Teigland2b4e9262006-07-25 13:59:48 -05003370 int bucket = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003371
David Teigland2b4e9262006-07-25 13:59:48 -05003372 while (1) {
3373 r = find_purged_rsb(ls, bucket);
3374 if (!r) {
3375 if (bucket == ls->ls_rsbtbl_size - 1)
3376 break;
3377 bucket++;
David Teigland97a35d12006-05-02 13:34:03 -04003378 continue;
David Teigland2b4e9262006-07-25 13:59:48 -05003379 }
David Teigland97a35d12006-05-02 13:34:03 -04003380 lock_rsb(r);
3381 if (is_master(r)) {
3382 grant_pending_locks(r);
3383 confirm_master(r, 0);
David Teiglande7fd4172006-01-18 09:30:29 +00003384 }
David Teigland97a35d12006-05-02 13:34:03 -04003385 unlock_rsb(r);
3386 put_rsb(r);
David Teigland2b4e9262006-07-25 13:59:48 -05003387 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00003388 }
David Teiglande7fd4172006-01-18 09:30:29 +00003389}
3390
3391static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3392 uint32_t remid)
3393{
3394 struct dlm_lkb *lkb;
3395
3396 list_for_each_entry(lkb, head, lkb_statequeue) {
3397 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3398 return lkb;
3399 }
3400 return NULL;
3401}
3402
3403static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3404 uint32_t remid)
3405{
3406 struct dlm_lkb *lkb;
3407
3408 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3409 if (lkb)
3410 return lkb;
3411 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3412 if (lkb)
3413 return lkb;
3414 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3415 if (lkb)
3416 return lkb;
3417 return NULL;
3418}
3419
3420static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3421 struct dlm_rsb *r, struct dlm_rcom *rc)
3422{
3423 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3424 int lvblen;
3425
3426 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3427 lkb->lkb_ownpid = rl->rl_ownpid;
3428 lkb->lkb_remid = rl->rl_lkid;
3429 lkb->lkb_exflags = rl->rl_exflags;
3430 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3431 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3432 lkb->lkb_lvbseq = rl->rl_lvbseq;
3433 lkb->lkb_rqmode = rl->rl_rqmode;
3434 lkb->lkb_grmode = rl->rl_grmode;
3435 /* don't set lkb_status because add_lkb wants to itself */
3436
3437 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3438 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3439
David Teiglande7fd4172006-01-18 09:30:29 +00003440 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3441 lkb->lkb_lvbptr = allocate_lvb(ls);
3442 if (!lkb->lkb_lvbptr)
3443 return -ENOMEM;
3444 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3445 sizeof(struct rcom_lock);
3446 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3447 }
3448
3449 /* Conversions between PR and CW (middle modes) need special handling.
3450 The real granted mode of these converting locks cannot be determined
3451 until all locks have been rebuilt on the rsb (recover_conversion) */
3452
3453 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3454 rl->rl_status = DLM_LKSTS_CONVERT;
3455 lkb->lkb_grmode = DLM_LOCK_IV;
3456 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3457 }
3458
3459 return 0;
3460}
3461
3462/* This lkb may have been recovered in a previous aborted recovery so we need
3463 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3464 If so we just send back a standard reply. If not, we create a new lkb with
3465 the given values and send back our lkid. We send back our lkid by sending
3466 back the rcom_lock struct we got but with the remid field filled in. */
3467
3468int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3469{
3470 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3471 struct dlm_rsb *r;
3472 struct dlm_lkb *lkb;
3473 int error;
3474
3475 if (rl->rl_parent_lkid) {
3476 error = -EOPNOTSUPP;
3477 goto out;
3478 }
3479
3480 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3481 if (error)
3482 goto out;
3483
3484 lock_rsb(r);
3485
3486 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3487 if (lkb) {
3488 error = -EEXIST;
3489 goto out_remid;
3490 }
3491
3492 error = create_lkb(ls, &lkb);
3493 if (error)
3494 goto out_unlock;
3495
3496 error = receive_rcom_lock_args(ls, lkb, r, rc);
3497 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05003498 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003499 goto out_unlock;
3500 }
3501
3502 attach_lkb(r, lkb);
3503 add_lkb(r, lkb, rl->rl_status);
3504 error = 0;
3505
3506 out_remid:
3507 /* this is the new value returned to the lock holder for
3508 saving in its process-copy lkb */
3509 rl->rl_remid = lkb->lkb_id;
3510
3511 out_unlock:
3512 unlock_rsb(r);
3513 put_rsb(r);
3514 out:
3515 if (error)
3516 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3517 rl->rl_result = error;
3518 return error;
3519}
3520
3521int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3522{
3523 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3524 struct dlm_rsb *r;
3525 struct dlm_lkb *lkb;
3526 int error;
3527
3528 error = find_lkb(ls, rl->rl_lkid, &lkb);
3529 if (error) {
3530 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3531 return error;
3532 }
3533
3534 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3535
3536 error = rl->rl_result;
3537
3538 r = lkb->lkb_resource;
3539 hold_rsb(r);
3540 lock_rsb(r);
3541
3542 switch (error) {
3543 case -EEXIST:
3544 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3545 /* fall through */
3546 case 0:
3547 lkb->lkb_remid = rl->rl_remid;
3548 break;
3549 default:
3550 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3551 error, lkb->lkb_id);
3552 }
3553
3554 /* an ack for dlm_recover_locks() which waits for replies from
3555 all the locks it sends to new masters */
3556 dlm_recovered_lock(r);
3557
3558 unlock_rsb(r);
3559 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003560 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003561
3562 return 0;
3563}
3564
David Teigland597d0ca2006-07-12 16:44:04 -05003565int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3566 int mode, uint32_t flags, void *name, unsigned int namelen,
3567 uint32_t parent_lkid)
3568{
3569 struct dlm_lkb *lkb;
3570 struct dlm_args args;
3571 int error;
3572
3573 lock_recovery(ls);
3574
3575 error = create_lkb(ls, &lkb);
3576 if (error) {
3577 kfree(ua);
3578 goto out;
3579 }
3580
3581 if (flags & DLM_LKF_VALBLK) {
3582 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3583 if (!ua->lksb.sb_lvbptr) {
3584 kfree(ua);
3585 __put_lkb(ls, lkb);
3586 error = -ENOMEM;
3587 goto out;
3588 }
3589 }
3590
3591 /* After ua is attached to lkb it will be freed by free_lkb().
3592 When DLM_IFL_USER is set, the dlm knows that this is a userspace
3593 lock and that lkb_astparam is the dlm_user_args structure. */
3594
3595 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3596 FAKE_USER_AST, ua, FAKE_USER_AST, &args);
3597 lkb->lkb_flags |= DLM_IFL_USER;
3598 ua->old_mode = DLM_LOCK_IV;
3599
3600 if (error) {
3601 __put_lkb(ls, lkb);
3602 goto out;
3603 }
3604
3605 error = request_lock(ls, lkb, name, namelen, &args);
3606
3607 switch (error) {
3608 case 0:
3609 break;
3610 case -EINPROGRESS:
3611 error = 0;
3612 break;
3613 case -EAGAIN:
3614 error = 0;
3615 /* fall through */
3616 default:
3617 __put_lkb(ls, lkb);
3618 goto out;
3619 }
3620
3621 /* add this new lkb to the per-process list of locks */
3622 spin_lock(&ua->proc->locks_spin);
3623 kref_get(&lkb->lkb_ref);
3624 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3625 spin_unlock(&ua->proc->locks_spin);
3626 out:
3627 unlock_recovery(ls);
3628 return error;
3629}
3630
3631int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3632 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3633{
3634 struct dlm_lkb *lkb;
3635 struct dlm_args args;
3636 struct dlm_user_args *ua;
3637 int error;
3638
3639 lock_recovery(ls);
3640
3641 error = find_lkb(ls, lkid, &lkb);
3642 if (error)
3643 goto out;
3644
3645 /* user can change the params on its lock when it converts it, or
3646 add an lvb that didn't exist before */
3647
3648 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3649
3650 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3651 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3652 if (!ua->lksb.sb_lvbptr) {
3653 error = -ENOMEM;
3654 goto out_put;
3655 }
3656 }
3657 if (lvb_in && ua->lksb.sb_lvbptr)
3658 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3659
3660 ua->castparam = ua_tmp->castparam;
3661 ua->castaddr = ua_tmp->castaddr;
3662 ua->bastparam = ua_tmp->bastparam;
3663 ua->bastaddr = ua_tmp->bastaddr;
3664 ua->old_mode = lkb->lkb_grmode;
3665
3666 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, FAKE_USER_AST, ua,
3667 FAKE_USER_AST, &args);
3668 if (error)
3669 goto out_put;
3670
3671 error = convert_lock(ls, lkb, &args);
3672
3673 if (error == -EINPROGRESS || error == -EAGAIN)
3674 error = 0;
3675 out_put:
3676 dlm_put_lkb(lkb);
3677 out:
3678 unlock_recovery(ls);
3679 kfree(ua_tmp);
3680 return error;
3681}
3682
3683int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3684 uint32_t flags, uint32_t lkid, char *lvb_in)
3685{
3686 struct dlm_lkb *lkb;
3687 struct dlm_args args;
3688 struct dlm_user_args *ua;
3689 int error;
3690
3691 lock_recovery(ls);
3692
3693 error = find_lkb(ls, lkid, &lkb);
3694 if (error)
3695 goto out;
3696
3697 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3698
3699 if (lvb_in && ua->lksb.sb_lvbptr)
3700 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3701 ua->castparam = ua_tmp->castparam;
Patrick Caulfieldcc346d52006-08-08 10:34:40 -04003702 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05003703
3704 error = set_unlock_args(flags, ua, &args);
3705 if (error)
3706 goto out_put;
3707
3708 error = unlock_lock(ls, lkb, &args);
3709
3710 if (error == -DLM_EUNLOCK)
3711 error = 0;
3712 if (error)
3713 goto out_put;
3714
3715 spin_lock(&ua->proc->locks_spin);
David Teigland34e22be2006-07-18 11:24:04 -05003716 list_del_init(&lkb->lkb_ownqueue);
David Teigland597d0ca2006-07-12 16:44:04 -05003717 spin_unlock(&ua->proc->locks_spin);
3718
3719 /* this removes the reference for the proc->locks list added by
3720 dlm_user_request */
3721 unhold_lkb(lkb);
3722 out_put:
3723 dlm_put_lkb(lkb);
3724 out:
3725 unlock_recovery(ls);
3726 return error;
3727}
3728
3729int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3730 uint32_t flags, uint32_t lkid)
3731{
3732 struct dlm_lkb *lkb;
3733 struct dlm_args args;
3734 struct dlm_user_args *ua;
3735 int error;
3736
3737 lock_recovery(ls);
3738
3739 error = find_lkb(ls, lkid, &lkb);
3740 if (error)
3741 goto out;
3742
3743 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3744 ua->castparam = ua_tmp->castparam;
3745
3746 error = set_unlock_args(flags, ua, &args);
3747 if (error)
3748 goto out_put;
3749
3750 error = cancel_lock(ls, lkb, &args);
3751
3752 if (error == -DLM_ECANCEL)
3753 error = 0;
3754 if (error)
3755 goto out_put;
3756
3757 /* this lkb was removed from the WAITING queue */
3758 if (lkb->lkb_grmode == DLM_LOCK_IV) {
3759 spin_lock(&ua->proc->locks_spin);
David Teigland34e22be2006-07-18 11:24:04 -05003760 list_del_init(&lkb->lkb_ownqueue);
David Teigland597d0ca2006-07-12 16:44:04 -05003761 spin_unlock(&ua->proc->locks_spin);
3762 unhold_lkb(lkb);
3763 }
3764 out_put:
3765 dlm_put_lkb(lkb);
3766 out:
3767 unlock_recovery(ls);
3768 return error;
3769}
3770
3771static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3772{
3773 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3774
3775 if (ua->lksb.sb_lvbptr)
3776 kfree(ua->lksb.sb_lvbptr);
3777 kfree(ua);
3778 lkb->lkb_astparam = (long)NULL;
3779
3780 /* TODO: propogate to master if needed */
3781 return 0;
3782}
3783
3784/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3785 Regardless of what rsb queue the lock is on, it's removed and freed. */
3786
3787static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3788{
3789 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3790 struct dlm_args args;
3791 int error;
3792
3793 /* FIXME: we need to handle the case where the lkb is in limbo
3794 while the rsb is being looked up, currently we assert in
3795 _unlock_lock/is_remote because rsb nodeid is -1. */
3796
3797 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3798
3799 error = unlock_lock(ls, lkb, &args);
3800 if (error == -DLM_EUNLOCK)
3801 error = 0;
3802 return error;
3803}
3804
3805/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3806 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3807 which we clear here. */
3808
3809/* proc CLOSING flag is set so no more device_reads should look at proc->asts
3810 list, and no more device_writes should add lkb's to proc->locks list; so we
3811 shouldn't need to take asts_spin or locks_spin here. this assumes that
3812 device reads/writes/closes are serialized -- FIXME: we may need to serialize
3813 them ourself. */
3814
3815void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3816{
3817 struct dlm_lkb *lkb, *safe;
3818
3819 lock_recovery(ls);
3820 mutex_lock(&ls->ls_clear_proc_locks);
3821
3822 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3823 if (lkb->lkb_ast_type) {
3824 list_del(&lkb->lkb_astqueue);
3825 unhold_lkb(lkb);
3826 }
3827
David Teigland34e22be2006-07-18 11:24:04 -05003828 list_del_init(&lkb->lkb_ownqueue);
David Teigland597d0ca2006-07-12 16:44:04 -05003829
3830 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3831 lkb->lkb_flags |= DLM_IFL_ORPHAN;
3832 orphan_proc_lock(ls, lkb);
3833 } else {
3834 lkb->lkb_flags |= DLM_IFL_DEAD;
3835 unlock_proc_lock(ls, lkb);
3836 }
3837
3838 /* this removes the reference for the proc->locks list
3839 added by dlm_user_request, it may result in the lkb
3840 being freed */
3841
3842 dlm_put_lkb(lkb);
3843 }
3844 mutex_unlock(&ls->ls_clear_proc_locks);
3845 unlock_recovery(ls);
3846}