blob: 09668ec2e27988a246af20d11fd05374f2969988 [file] [log] [blame]
David Teiglande7fd4172006-01-18 09:30:29 +00001/******************************************************************************
2*******************************************************************************
3**
David Teiglandef0c2bb2007-03-28 09:56:46 -05004** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
David Teiglande7fd4172006-01-18 09:30:29 +00005**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
David Teigland597d0ca2006-07-12 16:44:04 -050058#include <linux/types.h>
David Teiglande7fd4172006-01-18 09:30:29 +000059#include "dlm_internal.h"
David Teigland597d0ca2006-07-12 16:44:04 -050060#include <linux/dlm_device.h>
David Teiglande7fd4172006-01-18 09:30:29 +000061#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
David Teigland597d0ca2006-07-12 16:44:04 -050073#include "user.h"
David Teiglande7fd4172006-01-18 09:30:29 +000074#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms);
David Teigland84991372007-03-30 15:02:40 -050088static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
David Teiglande7fd4172006-01-18 09:30:29 +000089
90/*
91 * Lock compatibilty matrix - thanks Steve
92 * UN = Unlocked state. Not really a state, used as a flag
93 * PD = Padding. Used to make the matrix a nice power of two in size
94 * Other states are the same as the VMS DLM.
95 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
96 */
97
98static const int __dlm_compat_matrix[8][8] = {
99 /* UN NL CR CW PR PW EX PD */
100 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
102 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
103 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
104 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
105 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
106 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
107 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
108};
109
110/*
111 * This defines the direction of transfer of LVB data.
112 * Granted mode is the row; requested mode is the column.
113 * Usage: matrix[grmode+1][rqmode+1]
114 * 1 = LVB is returned to the caller
115 * 0 = LVB is written to the resource
116 * -1 = nothing happens to the LVB
117 */
118
119const int dlm_lvb_operations[8][8] = {
120 /* UN NL CR CW PR PW EX PD*/
121 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
122 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
123 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
124 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
125 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
126 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
127 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
128 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
129};
David Teiglande7fd4172006-01-18 09:30:29 +0000130
131#define modes_compat(gr, rq) \
132 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133
134int dlm_modes_compat(int mode1, int mode2)
135{
136 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
137}
138
139/*
140 * Compatibility matrix for conversions with QUECVT set.
141 * Granted mode is the row; requested mode is the column.
142 * Usage: matrix[grmode+1][rqmode+1]
143 */
144
145static const int __quecvt_compat_matrix[8][8] = {
146 /* UN NL CR CW PR PW EX PD */
147 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
148 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
149 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
150 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
151 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
152 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
153 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
154 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
155};
156
David Teigland597d0ca2006-07-12 16:44:04 -0500157void dlm_print_lkb(struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000158{
159 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
164}
165
166void dlm_print_rsb(struct dlm_rsb *r)
167{
168 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169 r->res_nodeid, r->res_flags, r->res_first_lkid,
170 r->res_recover_locks_count, r->res_name);
171}
172
David Teiglanda345da32006-08-18 11:54:25 -0500173void dlm_dump_rsb(struct dlm_rsb *r)
174{
175 struct dlm_lkb *lkb;
176
177 dlm_print_rsb(r);
178
179 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181 printk(KERN_ERR "rsb lookup list\n");
182 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183 dlm_print_lkb(lkb);
184 printk(KERN_ERR "rsb grant queue:\n");
185 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb convert queue:\n");
188 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb wait queue:\n");
191 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
193}
194
David Teiglande7fd4172006-01-18 09:30:29 +0000195/* Threads cannot use the lockspace while it's being recovered */
196
David Teigland85e86ed2007-05-18 08:58:15 -0500197static inline void dlm_lock_recovery(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000198{
199 down_read(&ls->ls_in_recovery);
200}
201
David Teigland85e86ed2007-05-18 08:58:15 -0500202void dlm_unlock_recovery(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000203{
204 up_read(&ls->ls_in_recovery);
205}
206
David Teigland85e86ed2007-05-18 08:58:15 -0500207int dlm_lock_recovery_try(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000208{
209 return down_read_trylock(&ls->ls_in_recovery);
210}
211
212static inline int can_be_queued(struct dlm_lkb *lkb)
213{
214 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
215}
216
217static inline int force_blocking_asts(struct dlm_lkb *lkb)
218{
219 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
220}
221
222static inline int is_demoted(struct dlm_lkb *lkb)
223{
224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
225}
226
David Teigland7d3c1fe2007-04-19 10:30:41 -0500227static inline int is_altmode(struct dlm_lkb *lkb)
228{
229 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
230}
231
232static inline int is_granted(struct dlm_lkb *lkb)
233{
234 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
235}
236
David Teiglande7fd4172006-01-18 09:30:29 +0000237static inline int is_remote(struct dlm_rsb *r)
238{
239 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
240 return !!r->res_nodeid;
241}
242
243static inline int is_process_copy(struct dlm_lkb *lkb)
244{
245 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
246}
247
248static inline int is_master_copy(struct dlm_lkb *lkb)
249{
250 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
251 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
David Teigland90135922006-01-20 08:47:07 +0000252 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000253}
254
255static inline int middle_conversion(struct dlm_lkb *lkb)
256{
257 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
258 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
David Teigland90135922006-01-20 08:47:07 +0000259 return 1;
260 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000261}
262
263static inline int down_conversion(struct dlm_lkb *lkb)
264{
265 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
266}
267
David Teiglandef0c2bb2007-03-28 09:56:46 -0500268static inline int is_overlap_unlock(struct dlm_lkb *lkb)
269{
270 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
271}
272
273static inline int is_overlap_cancel(struct dlm_lkb *lkb)
274{
275 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
276}
277
278static inline int is_overlap(struct dlm_lkb *lkb)
279{
280 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
281 DLM_IFL_OVERLAP_CANCEL));
282}
283
David Teiglande7fd4172006-01-18 09:30:29 +0000284static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
285{
286 if (is_master_copy(lkb))
287 return;
288
289 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
290
291 lkb->lkb_lksb->sb_status = rv;
292 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
293
294 dlm_add_ast(lkb, AST_COMP);
295}
296
David Teiglandef0c2bb2007-03-28 09:56:46 -0500297static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
298{
299 queue_cast(r, lkb,
300 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
301}
302
David Teiglande7fd4172006-01-18 09:30:29 +0000303static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
304{
305 if (is_master_copy(lkb))
306 send_bast(r, lkb, rqmode);
307 else {
308 lkb->lkb_bastmode = rqmode;
309 dlm_add_ast(lkb, AST_BAST);
310 }
311}
312
313/*
314 * Basic operations on rsb's and lkb's
315 */
316
317static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
318{
319 struct dlm_rsb *r;
320
321 r = allocate_rsb(ls, len);
322 if (!r)
323 return NULL;
324
325 r->res_ls = ls;
326 r->res_length = len;
327 memcpy(r->res_name, name, len);
David Teigland90135922006-01-20 08:47:07 +0000328 mutex_init(&r->res_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000329
330 INIT_LIST_HEAD(&r->res_lookup);
331 INIT_LIST_HEAD(&r->res_grantqueue);
332 INIT_LIST_HEAD(&r->res_convertqueue);
333 INIT_LIST_HEAD(&r->res_waitqueue);
334 INIT_LIST_HEAD(&r->res_root_list);
335 INIT_LIST_HEAD(&r->res_recover_list);
336
337 return r;
338}
339
340static int search_rsb_list(struct list_head *head, char *name, int len,
341 unsigned int flags, struct dlm_rsb **r_ret)
342{
343 struct dlm_rsb *r;
344 int error = 0;
345
346 list_for_each_entry(r, head, res_hashchain) {
347 if (len == r->res_length && !memcmp(name, r->res_name, len))
348 goto found;
349 }
David Teigland597d0ca2006-07-12 16:44:04 -0500350 return -EBADR;
David Teiglande7fd4172006-01-18 09:30:29 +0000351
352 found:
353 if (r->res_nodeid && (flags & R_MASTER))
354 error = -ENOTBLK;
355 *r_ret = r;
356 return error;
357}
358
359static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
360 unsigned int flags, struct dlm_rsb **r_ret)
361{
362 struct dlm_rsb *r;
363 int error;
364
365 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
366 if (!error) {
367 kref_get(&r->res_ref);
368 goto out;
369 }
370 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
371 if (error)
372 goto out;
373
374 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
375
376 if (dlm_no_directory(ls))
377 goto out;
378
379 if (r->res_nodeid == -1) {
380 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
381 r->res_first_lkid = 0;
382 } else if (r->res_nodeid > 0) {
383 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
384 r->res_first_lkid = 0;
385 } else {
386 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
387 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
388 }
389 out:
390 *r_ret = r;
391 return error;
392}
393
394static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
395 unsigned int flags, struct dlm_rsb **r_ret)
396{
397 int error;
398 write_lock(&ls->ls_rsbtbl[b].lock);
399 error = _search_rsb(ls, name, len, b, flags, r_ret);
400 write_unlock(&ls->ls_rsbtbl[b].lock);
401 return error;
402}
403
404/*
405 * Find rsb in rsbtbl and potentially create/add one
406 *
407 * Delaying the release of rsb's has a similar benefit to applications keeping
408 * NL locks on an rsb, but without the guarantee that the cached master value
409 * will still be valid when the rsb is reused. Apps aren't always smart enough
410 * to keep NL locks on an rsb that they may lock again shortly; this can lead
411 * to excessive master lookups and removals if we don't delay the release.
412 *
413 * Searching for an rsb means looking through both the normal list and toss
414 * list. When found on the toss list the rsb is moved to the normal list with
415 * ref count of 1; when found on normal list the ref count is incremented.
416 */
417
418static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
419 unsigned int flags, struct dlm_rsb **r_ret)
420{
421 struct dlm_rsb *r, *tmp;
422 uint32_t hash, bucket;
423 int error = 0;
424
425 if (dlm_no_directory(ls))
426 flags |= R_CREATE;
427
428 hash = jhash(name, namelen, 0);
429 bucket = hash & (ls->ls_rsbtbl_size - 1);
430
431 error = search_rsb(ls, name, namelen, bucket, flags, &r);
432 if (!error)
433 goto out;
434
David Teigland597d0ca2006-07-12 16:44:04 -0500435 if (error == -EBADR && !(flags & R_CREATE))
David Teiglande7fd4172006-01-18 09:30:29 +0000436 goto out;
437
438 /* the rsb was found but wasn't a master copy */
439 if (error == -ENOTBLK)
440 goto out;
441
442 error = -ENOMEM;
443 r = create_rsb(ls, name, namelen);
444 if (!r)
445 goto out;
446
447 r->res_hash = hash;
448 r->res_bucket = bucket;
449 r->res_nodeid = -1;
450 kref_init(&r->res_ref);
451
452 /* With no directory, the master can be set immediately */
453 if (dlm_no_directory(ls)) {
454 int nodeid = dlm_dir_nodeid(r);
455 if (nodeid == dlm_our_nodeid())
456 nodeid = 0;
457 r->res_nodeid = nodeid;
458 }
459
460 write_lock(&ls->ls_rsbtbl[bucket].lock);
461 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
462 if (!error) {
463 write_unlock(&ls->ls_rsbtbl[bucket].lock);
464 free_rsb(r);
465 r = tmp;
466 goto out;
467 }
468 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
469 write_unlock(&ls->ls_rsbtbl[bucket].lock);
470 error = 0;
471 out:
472 *r_ret = r;
473 return error;
474}
475
476int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
477 unsigned int flags, struct dlm_rsb **r_ret)
478{
479 return find_rsb(ls, name, namelen, flags, r_ret);
480}
481
482/* This is only called to add a reference when the code already holds
483 a valid reference to the rsb, so there's no need for locking. */
484
485static inline void hold_rsb(struct dlm_rsb *r)
486{
487 kref_get(&r->res_ref);
488}
489
490void dlm_hold_rsb(struct dlm_rsb *r)
491{
492 hold_rsb(r);
493}
494
495static void toss_rsb(struct kref *kref)
496{
497 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
498 struct dlm_ls *ls = r->res_ls;
499
500 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
501 kref_init(&r->res_ref);
502 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
503 r->res_toss_time = jiffies;
504 if (r->res_lvbptr) {
505 free_lvb(r->res_lvbptr);
506 r->res_lvbptr = NULL;
507 }
508}
509
510/* When all references to the rsb are gone it's transfered to
511 the tossed list for later disposal. */
512
513static void put_rsb(struct dlm_rsb *r)
514{
515 struct dlm_ls *ls = r->res_ls;
516 uint32_t bucket = r->res_bucket;
517
518 write_lock(&ls->ls_rsbtbl[bucket].lock);
519 kref_put(&r->res_ref, toss_rsb);
520 write_unlock(&ls->ls_rsbtbl[bucket].lock);
521}
522
523void dlm_put_rsb(struct dlm_rsb *r)
524{
525 put_rsb(r);
526}
527
528/* See comment for unhold_lkb */
529
530static void unhold_rsb(struct dlm_rsb *r)
531{
532 int rv;
533 rv = kref_put(&r->res_ref, toss_rsb);
David Teiglanda345da32006-08-18 11:54:25 -0500534 DLM_ASSERT(!rv, dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +0000535}
536
537static void kill_rsb(struct kref *kref)
538{
539 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
540
541 /* All work is done after the return from kref_put() so we
542 can release the write_lock before the remove and free. */
543
David Teiglanda345da32006-08-18 11:54:25 -0500544 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
545 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
546 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
547 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
548 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
549 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +0000550}
551
552/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
553 The rsb must exist as long as any lkb's for it do. */
554
555static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
556{
557 hold_rsb(r);
558 lkb->lkb_resource = r;
559}
560
561static void detach_lkb(struct dlm_lkb *lkb)
562{
563 if (lkb->lkb_resource) {
564 put_rsb(lkb->lkb_resource);
565 lkb->lkb_resource = NULL;
566 }
567}
568
569static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
570{
571 struct dlm_lkb *lkb, *tmp;
572 uint32_t lkid = 0;
573 uint16_t bucket;
574
575 lkb = allocate_lkb(ls);
576 if (!lkb)
577 return -ENOMEM;
578
579 lkb->lkb_nodeid = -1;
580 lkb->lkb_grmode = DLM_LOCK_IV;
581 kref_init(&lkb->lkb_ref);
David Teigland34e22be2006-07-18 11:24:04 -0500582 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +0000584
585 get_random_bytes(&bucket, sizeof(bucket));
586 bucket &= (ls->ls_lkbtbl_size - 1);
587
588 write_lock(&ls->ls_lkbtbl[bucket].lock);
589
590 /* counter can roll over so we must verify lkid is not in use */
591
592 while (lkid == 0) {
David Teiglandce03f122007-04-02 12:12:55 -0500593 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
David Teiglande7fd4172006-01-18 09:30:29 +0000594
595 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
596 lkb_idtbl_list) {
597 if (tmp->lkb_id != lkid)
598 continue;
599 lkid = 0;
600 break;
601 }
602 }
603
604 lkb->lkb_id = lkid;
605 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
606 write_unlock(&ls->ls_lkbtbl[bucket].lock);
607
608 *lkb_ret = lkb;
609 return 0;
610}
611
612static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
613{
David Teiglande7fd4172006-01-18 09:30:29 +0000614 struct dlm_lkb *lkb;
David Teiglandce03f122007-04-02 12:12:55 -0500615 uint16_t bucket = (lkid >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000616
617 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
618 if (lkb->lkb_id == lkid)
619 return lkb;
620 }
621 return NULL;
622}
623
624static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
625{
626 struct dlm_lkb *lkb;
David Teiglandce03f122007-04-02 12:12:55 -0500627 uint16_t bucket = (lkid >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000628
629 if (bucket >= ls->ls_lkbtbl_size)
630 return -EBADSLT;
631
632 read_lock(&ls->ls_lkbtbl[bucket].lock);
633 lkb = __find_lkb(ls, lkid);
634 if (lkb)
635 kref_get(&lkb->lkb_ref);
636 read_unlock(&ls->ls_lkbtbl[bucket].lock);
637
638 *lkb_ret = lkb;
639 return lkb ? 0 : -ENOENT;
640}
641
642static void kill_lkb(struct kref *kref)
643{
644 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
645
646 /* All work is done after the return from kref_put() so we
647 can release the write_lock before the detach_lkb */
648
649 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
650}
651
David Teiglandb3f58d82006-02-28 11:16:37 -0500652/* __put_lkb() is used when an lkb may not have an rsb attached to
653 it so we need to provide the lockspace explicitly */
654
655static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000656{
David Teiglandce03f122007-04-02 12:12:55 -0500657 uint16_t bucket = (lkb->lkb_id >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000658
659 write_lock(&ls->ls_lkbtbl[bucket].lock);
660 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
661 list_del(&lkb->lkb_idtbl_list);
662 write_unlock(&ls->ls_lkbtbl[bucket].lock);
663
664 detach_lkb(lkb);
665
666 /* for local/process lkbs, lvbptr points to caller's lksb */
667 if (lkb->lkb_lvbptr && is_master_copy(lkb))
668 free_lvb(lkb->lkb_lvbptr);
David Teiglande7fd4172006-01-18 09:30:29 +0000669 free_lkb(lkb);
670 return 1;
671 } else {
672 write_unlock(&ls->ls_lkbtbl[bucket].lock);
673 return 0;
674 }
675}
676
677int dlm_put_lkb(struct dlm_lkb *lkb)
678{
David Teiglandb3f58d82006-02-28 11:16:37 -0500679 struct dlm_ls *ls;
680
681 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
682 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
683
684 ls = lkb->lkb_resource->res_ls;
685 return __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +0000686}
687
688/* This is only called to add a reference when the code already holds
689 a valid reference to the lkb, so there's no need for locking. */
690
691static inline void hold_lkb(struct dlm_lkb *lkb)
692{
693 kref_get(&lkb->lkb_ref);
694}
695
696/* This is called when we need to remove a reference and are certain
697 it's not the last ref. e.g. del_lkb is always called between a
698 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
699 put_lkb would work fine, but would involve unnecessary locking */
700
701static inline void unhold_lkb(struct dlm_lkb *lkb)
702{
703 int rv;
704 rv = kref_put(&lkb->lkb_ref, kill_lkb);
705 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
706}
707
708static void lkb_add_ordered(struct list_head *new, struct list_head *head,
709 int mode)
710{
711 struct dlm_lkb *lkb = NULL;
712
713 list_for_each_entry(lkb, head, lkb_statequeue)
714 if (lkb->lkb_rqmode < mode)
715 break;
716
717 if (!lkb)
718 list_add_tail(new, head);
719 else
720 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
721}
722
723/* add/remove lkb to rsb's grant/convert/wait queue */
724
725static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
726{
727 kref_get(&lkb->lkb_ref);
728
729 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
730
731 lkb->lkb_status = status;
732
733 switch (status) {
734 case DLM_LKSTS_WAITING:
735 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
736 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
737 else
738 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
739 break;
740 case DLM_LKSTS_GRANTED:
741 /* convention says granted locks kept in order of grmode */
742 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
743 lkb->lkb_grmode);
744 break;
745 case DLM_LKSTS_CONVERT:
746 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
748 else
749 list_add_tail(&lkb->lkb_statequeue,
750 &r->res_convertqueue);
751 break;
752 default:
753 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
754 }
755}
756
757static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
758{
759 lkb->lkb_status = 0;
760 list_del(&lkb->lkb_statequeue);
761 unhold_lkb(lkb);
762}
763
764static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
765{
766 hold_lkb(lkb);
767 del_lkb(r, lkb);
768 add_lkb(r, lkb, sts);
769 unhold_lkb(lkb);
770}
771
David Teiglandef0c2bb2007-03-28 09:56:46 -0500772static int msg_reply_type(int mstype)
773{
774 switch (mstype) {
775 case DLM_MSG_REQUEST:
776 return DLM_MSG_REQUEST_REPLY;
777 case DLM_MSG_CONVERT:
778 return DLM_MSG_CONVERT_REPLY;
779 case DLM_MSG_UNLOCK:
780 return DLM_MSG_UNLOCK_REPLY;
781 case DLM_MSG_CANCEL:
782 return DLM_MSG_CANCEL_REPLY;
783 case DLM_MSG_LOOKUP:
784 return DLM_MSG_LOOKUP_REPLY;
785 }
786 return -1;
787}
788
David Teiglande7fd4172006-01-18 09:30:29 +0000789/* add/remove lkb from global waiters list of lkb's waiting for
790 a reply from a remote node */
791
David Teiglandef0c2bb2007-03-28 09:56:46 -0500792static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000793{
794 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500795 int error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000796
David Teigland90135922006-01-20 08:47:07 +0000797 mutex_lock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500798
799 if (is_overlap_unlock(lkb) ||
800 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
801 error = -EINVAL;
David Teiglande7fd4172006-01-18 09:30:29 +0000802 goto out;
803 }
David Teiglandef0c2bb2007-03-28 09:56:46 -0500804
805 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
806 switch (mstype) {
807 case DLM_MSG_UNLOCK:
808 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
809 break;
810 case DLM_MSG_CANCEL:
811 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
812 break;
813 default:
814 error = -EBUSY;
815 goto out;
816 }
817 lkb->lkb_wait_count++;
818 hold_lkb(lkb);
819
820 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
821 lkb->lkb_id, lkb->lkb_wait_type, mstype,
822 lkb->lkb_wait_count, lkb->lkb_flags);
823 goto out;
824 }
825
826 DLM_ASSERT(!lkb->lkb_wait_count,
827 dlm_print_lkb(lkb);
828 printk("wait_count %d\n", lkb->lkb_wait_count););
829
830 lkb->lkb_wait_count++;
David Teiglande7fd4172006-01-18 09:30:29 +0000831 lkb->lkb_wait_type = mstype;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500832 hold_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +0000833 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
834 out:
David Teiglandef0c2bb2007-03-28 09:56:46 -0500835 if (error)
836 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
837 lkb->lkb_id, error, lkb->lkb_flags, mstype,
838 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
David Teigland90135922006-01-20 08:47:07 +0000839 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500840 return error;
David Teiglande7fd4172006-01-18 09:30:29 +0000841}
842
David Teiglandb790c3b2007-01-24 10:21:33 -0600843/* We clear the RESEND flag because we might be taking an lkb off the waiters
844 list as part of process_requestqueue (e.g. a lookup that has an optimized
845 request reply on the requestqueue) between dlm_recover_waiters_pre() which
846 set RESEND and dlm_recover_waiters_post() */
847
David Teiglandef0c2bb2007-03-28 09:56:46 -0500848static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000849{
David Teiglandef0c2bb2007-03-28 09:56:46 -0500850 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
851 int overlap_done = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000852
David Teiglandef0c2bb2007-03-28 09:56:46 -0500853 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
854 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
855 overlap_done = 1;
856 goto out_del;
David Teiglande7fd4172006-01-18 09:30:29 +0000857 }
David Teiglandef0c2bb2007-03-28 09:56:46 -0500858
859 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
861 overlap_done = 1;
862 goto out_del;
863 }
864
865 /* N.B. type of reply may not always correspond to type of original
866 msg due to lookup->request optimization, verify others? */
867
868 if (lkb->lkb_wait_type) {
869 lkb->lkb_wait_type = 0;
870 goto out_del;
871 }
872
873 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
874 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
875 return -1;
876
877 out_del:
878 /* the force-unlock/cancel has completed and we haven't recvd a reply
879 to the op that was in progress prior to the unlock/cancel; we
880 give up on any reply to the earlier op. FIXME: not sure when/how
881 this would happen */
882
883 if (overlap_done && lkb->lkb_wait_type) {
884 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
885 lkb->lkb_id, mstype, lkb->lkb_wait_type);
886 lkb->lkb_wait_count--;
887 lkb->lkb_wait_type = 0;
888 }
889
890 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
891
David Teiglandb790c3b2007-01-24 10:21:33 -0600892 lkb->lkb_flags &= ~DLM_IFL_RESEND;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500893 lkb->lkb_wait_count--;
894 if (!lkb->lkb_wait_count)
895 list_del_init(&lkb->lkb_wait_reply);
David Teiglande7fd4172006-01-18 09:30:29 +0000896 unhold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500897 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000898}
899
David Teiglandef0c2bb2007-03-28 09:56:46 -0500900static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000901{
902 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
903 int error;
904
David Teigland90135922006-01-20 08:47:07 +0000905 mutex_lock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500906 error = _remove_from_waiters(lkb, mstype);
David Teigland90135922006-01-20 08:47:07 +0000907 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000908 return error;
909}
910
David Teiglandef0c2bb2007-03-28 09:56:46 -0500911/* Handles situations where we might be processing a "fake" or "stub" reply in
912 which we can't try to take waiters_mutex again. */
913
914static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
915{
916 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
917 int error;
918
919 if (ms != &ls->ls_stub_ms)
920 mutex_lock(&ls->ls_waiters_mutex);
921 error = _remove_from_waiters(lkb, ms->m_type);
922 if (ms != &ls->ls_stub_ms)
923 mutex_unlock(&ls->ls_waiters_mutex);
924 return error;
925}
926
David Teiglande7fd4172006-01-18 09:30:29 +0000927static void dir_remove(struct dlm_rsb *r)
928{
929 int to_nodeid;
930
931 if (dlm_no_directory(r->res_ls))
932 return;
933
934 to_nodeid = dlm_dir_nodeid(r);
935 if (to_nodeid != dlm_our_nodeid())
936 send_remove(r);
937 else
938 dlm_dir_remove_entry(r->res_ls, to_nodeid,
939 r->res_name, r->res_length);
940}
941
942/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
943 found since they are in order of newest to oldest? */
944
945static int shrink_bucket(struct dlm_ls *ls, int b)
946{
947 struct dlm_rsb *r;
948 int count = 0, found;
949
950 for (;;) {
David Teigland90135922006-01-20 08:47:07 +0000951 found = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000952 write_lock(&ls->ls_rsbtbl[b].lock);
953 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
954 res_hashchain) {
955 if (!time_after_eq(jiffies, r->res_toss_time +
David Teigland68c817a2007-01-09 09:41:48 -0600956 dlm_config.ci_toss_secs * HZ))
David Teiglande7fd4172006-01-18 09:30:29 +0000957 continue;
David Teigland90135922006-01-20 08:47:07 +0000958 found = 1;
David Teiglande7fd4172006-01-18 09:30:29 +0000959 break;
960 }
961
962 if (!found) {
963 write_unlock(&ls->ls_rsbtbl[b].lock);
964 break;
965 }
966
967 if (kref_put(&r->res_ref, kill_rsb)) {
968 list_del(&r->res_hashchain);
969 write_unlock(&ls->ls_rsbtbl[b].lock);
970
971 if (is_master(r))
972 dir_remove(r);
973 free_rsb(r);
974 count++;
975 } else {
976 write_unlock(&ls->ls_rsbtbl[b].lock);
977 log_error(ls, "tossed rsb in use %s", r->res_name);
978 }
979 }
980
981 return count;
982}
983
984void dlm_scan_rsbs(struct dlm_ls *ls)
985{
986 int i;
987
David Teiglande7fd4172006-01-18 09:30:29 +0000988 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
989 shrink_bucket(ls, i);
David Teigland85e86ed2007-05-18 08:58:15 -0500990 if (dlm_locking_stopped(ls))
991 break;
David Teiglande7fd4172006-01-18 09:30:29 +0000992 cond_resched();
993 }
994}
995
996/* lkb is master or local copy */
997
998static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
999{
1000 int b, len = r->res_ls->ls_lvblen;
1001
1002 /* b=1 lvb returned to caller
1003 b=0 lvb written to rsb or invalidated
1004 b=-1 do nothing */
1005
1006 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1007
1008 if (b == 1) {
1009 if (!lkb->lkb_lvbptr)
1010 return;
1011
1012 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1013 return;
1014
1015 if (!r->res_lvbptr)
1016 return;
1017
1018 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1019 lkb->lkb_lvbseq = r->res_lvbseq;
1020
1021 } else if (b == 0) {
1022 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1023 rsb_set_flag(r, RSB_VALNOTVALID);
1024 return;
1025 }
1026
1027 if (!lkb->lkb_lvbptr)
1028 return;
1029
1030 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1031 return;
1032
1033 if (!r->res_lvbptr)
1034 r->res_lvbptr = allocate_lvb(r->res_ls);
1035
1036 if (!r->res_lvbptr)
1037 return;
1038
1039 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1040 r->res_lvbseq++;
1041 lkb->lkb_lvbseq = r->res_lvbseq;
1042 rsb_clear_flag(r, RSB_VALNOTVALID);
1043 }
1044
1045 if (rsb_flag(r, RSB_VALNOTVALID))
1046 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1047}
1048
1049static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1050{
1051 if (lkb->lkb_grmode < DLM_LOCK_PW)
1052 return;
1053
1054 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1055 rsb_set_flag(r, RSB_VALNOTVALID);
1056 return;
1057 }
1058
1059 if (!lkb->lkb_lvbptr)
1060 return;
1061
1062 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1063 return;
1064
1065 if (!r->res_lvbptr)
1066 r->res_lvbptr = allocate_lvb(r->res_ls);
1067
1068 if (!r->res_lvbptr)
1069 return;
1070
1071 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1072 r->res_lvbseq++;
1073 rsb_clear_flag(r, RSB_VALNOTVALID);
1074}
1075
1076/* lkb is process copy (pc) */
1077
1078static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1079 struct dlm_message *ms)
1080{
1081 int b;
1082
1083 if (!lkb->lkb_lvbptr)
1084 return;
1085
1086 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1087 return;
1088
David Teigland597d0ca2006-07-12 16:44:04 -05001089 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
David Teiglande7fd4172006-01-18 09:30:29 +00001090 if (b == 1) {
1091 int len = receive_extralen(ms);
1092 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1093 lkb->lkb_lvbseq = ms->m_lvbseq;
1094 }
1095}
1096
1097/* Manipulate lkb's on rsb's convert/granted/waiting queues
1098 remove_lock -- used for unlock, removes lkb from granted
1099 revert_lock -- used for cancel, moves lkb from convert to granted
1100 grant_lock -- used for request and convert, adds lkb to granted or
1101 moves lkb from convert or waiting to granted
1102
1103 Each of these is used for master or local copy lkb's. There is
1104 also a _pc() variation used to make the corresponding change on
1105 a process copy (pc) lkb. */
1106
1107static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1108{
1109 del_lkb(r, lkb);
1110 lkb->lkb_grmode = DLM_LOCK_IV;
1111 /* this unhold undoes the original ref from create_lkb()
1112 so this leads to the lkb being freed */
1113 unhold_lkb(lkb);
1114}
1115
1116static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1117{
1118 set_lvb_unlock(r, lkb);
1119 _remove_lock(r, lkb);
1120}
1121
1122static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1123{
1124 _remove_lock(r, lkb);
1125}
1126
David Teiglandef0c2bb2007-03-28 09:56:46 -05001127/* returns: 0 did nothing
1128 1 moved lock to granted
1129 -1 removed lock */
1130
1131static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00001132{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001133 int rv = 0;
1134
David Teiglande7fd4172006-01-18 09:30:29 +00001135 lkb->lkb_rqmode = DLM_LOCK_IV;
1136
1137 switch (lkb->lkb_status) {
David Teigland597d0ca2006-07-12 16:44:04 -05001138 case DLM_LKSTS_GRANTED:
1139 break;
David Teiglande7fd4172006-01-18 09:30:29 +00001140 case DLM_LKSTS_CONVERT:
1141 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001142 rv = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001143 break;
1144 case DLM_LKSTS_WAITING:
1145 del_lkb(r, lkb);
1146 lkb->lkb_grmode = DLM_LOCK_IV;
1147 /* this unhold undoes the original ref from create_lkb()
1148 so this leads to the lkb being freed */
1149 unhold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001150 rv = -1;
David Teiglande7fd4172006-01-18 09:30:29 +00001151 break;
1152 default:
1153 log_print("invalid status for revert %d", lkb->lkb_status);
1154 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05001155 return rv;
David Teiglande7fd4172006-01-18 09:30:29 +00001156}
1157
David Teiglandef0c2bb2007-03-28 09:56:46 -05001158static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00001159{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001160 return revert_lock(r, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00001161}
1162
1163static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1164{
1165 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1166 lkb->lkb_grmode = lkb->lkb_rqmode;
1167 if (lkb->lkb_status)
1168 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1169 else
1170 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1171 }
1172
1173 lkb->lkb_rqmode = DLM_LOCK_IV;
David Teiglande7fd4172006-01-18 09:30:29 +00001174}
1175
1176static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1177{
1178 set_lvb_lock(r, lkb);
1179 _grant_lock(r, lkb);
1180 lkb->lkb_highbast = 0;
1181}
1182
1183static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1184 struct dlm_message *ms)
1185{
1186 set_lvb_lock_pc(r, lkb, ms);
1187 _grant_lock(r, lkb);
1188}
1189
1190/* called by grant_pending_locks() which means an async grant message must
1191 be sent to the requesting node in addition to granting the lock if the
1192 lkb belongs to a remote node. */
1193
1194static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1195{
1196 grant_lock(r, lkb);
1197 if (is_master_copy(lkb))
1198 send_grant(r, lkb);
1199 else
1200 queue_cast(r, lkb, 0);
1201}
1202
David Teigland7d3c1fe2007-04-19 10:30:41 -05001203/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1204 change the granted/requested modes. We're munging things accordingly in
1205 the process copy.
1206 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1207 conversion deadlock
1208 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1209 compatible with other granted locks */
1210
1211static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1212{
1213 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1214 log_print("munge_demoted %x invalid reply type %d",
1215 lkb->lkb_id, ms->m_type);
1216 return;
1217 }
1218
1219 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1220 log_print("munge_demoted %x invalid modes gr %d rq %d",
1221 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1222 return;
1223 }
1224
1225 lkb->lkb_grmode = DLM_LOCK_NL;
1226}
1227
1228static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1229{
1230 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1231 ms->m_type != DLM_MSG_GRANT) {
1232 log_print("munge_altmode %x invalid reply type %d",
1233 lkb->lkb_id, ms->m_type);
1234 return;
1235 }
1236
1237 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1238 lkb->lkb_rqmode = DLM_LOCK_PR;
1239 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1240 lkb->lkb_rqmode = DLM_LOCK_CW;
1241 else {
1242 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1243 dlm_print_lkb(lkb);
1244 }
1245}
1246
David Teiglande7fd4172006-01-18 09:30:29 +00001247static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1248{
1249 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1250 lkb_statequeue);
1251 if (lkb->lkb_id == first->lkb_id)
David Teigland90135922006-01-20 08:47:07 +00001252 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001253
David Teigland90135922006-01-20 08:47:07 +00001254 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001255}
1256
David Teiglande7fd4172006-01-18 09:30:29 +00001257/* Check if the given lkb conflicts with another lkb on the queue. */
1258
1259static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1260{
1261 struct dlm_lkb *this;
1262
1263 list_for_each_entry(this, head, lkb_statequeue) {
1264 if (this == lkb)
1265 continue;
David Teigland3bcd3682006-02-23 09:56:38 +00001266 if (!modes_compat(this, lkb))
David Teigland90135922006-01-20 08:47:07 +00001267 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001268 }
David Teigland90135922006-01-20 08:47:07 +00001269 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001270}
1271
1272/*
1273 * "A conversion deadlock arises with a pair of lock requests in the converting
1274 * queue for one resource. The granted mode of each lock blocks the requested
1275 * mode of the other lock."
1276 *
1277 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1278 * convert queue from being granted, then demote lkb (set grmode to NL).
1279 * This second form requires that we check for conv-deadlk even when
1280 * now == 0 in _can_be_granted().
1281 *
1282 * Example:
1283 * Granted Queue: empty
1284 * Convert Queue: NL->EX (first lock)
1285 * PR->EX (second lock)
1286 *
1287 * The first lock can't be granted because of the granted mode of the second
1288 * lock and the second lock can't be granted because it's not first in the
1289 * list. We demote the granted mode of the second lock (the lkb passed to this
1290 * function).
1291 *
1292 * After the resolution, the "grant pending" function needs to go back and try
1293 * to grant locks on the convert queue again since the first lock can now be
1294 * granted.
1295 */
1296
1297static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1298{
1299 struct dlm_lkb *this, *first = NULL, *self = NULL;
1300
1301 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1302 if (!first)
1303 first = this;
1304 if (this == lkb) {
1305 self = lkb;
1306 continue;
1307 }
1308
David Teiglande7fd4172006-01-18 09:30:29 +00001309 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
David Teigland90135922006-01-20 08:47:07 +00001310 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001311 }
1312
1313 /* if lkb is on the convert queue and is preventing the first
1314 from being granted, then there's deadlock and we demote lkb.
1315 multiple converting locks may need to do this before the first
1316 converting lock can be granted. */
1317
1318 if (self && self != first) {
1319 if (!modes_compat(lkb, first) &&
1320 !queue_conflict(&rsb->res_grantqueue, first))
David Teigland90135922006-01-20 08:47:07 +00001321 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001322 }
1323
David Teigland90135922006-01-20 08:47:07 +00001324 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001325}
1326
1327/*
1328 * Return 1 if the lock can be granted, 0 otherwise.
1329 * Also detect and resolve conversion deadlocks.
1330 *
1331 * lkb is the lock to be granted
1332 *
1333 * now is 1 if the function is being called in the context of the
1334 * immediate request, it is 0 if called later, after the lock has been
1335 * queued.
1336 *
1337 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1338 */
1339
1340static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1341{
1342 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1343
1344 /*
1345 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1346 * a new request for a NL mode lock being blocked.
1347 *
1348 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1349 * request, then it would be granted. In essence, the use of this flag
1350 * tells the Lock Manager to expedite theis request by not considering
1351 * what may be in the CONVERTING or WAITING queues... As of this
1352 * writing, the EXPEDITE flag can be used only with new requests for NL
1353 * mode locks. This flag is not valid for conversion requests.
1354 *
1355 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1356 * conversion or used with a non-NL requested mode. We also know an
1357 * EXPEDITE request is always granted immediately, so now must always
1358 * be 1. The full condition to grant an expedite request: (now &&
1359 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1360 * therefore be shortened to just checking the flag.
1361 */
1362
1363 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
David Teigland90135922006-01-20 08:47:07 +00001364 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001365
1366 /*
1367 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1368 * added to the remaining conditions.
1369 */
1370
1371 if (queue_conflict(&r->res_grantqueue, lkb))
1372 goto out;
1373
1374 /*
1375 * 6-3: By default, a conversion request is immediately granted if the
1376 * requested mode is compatible with the modes of all other granted
1377 * locks
1378 */
1379
1380 if (queue_conflict(&r->res_convertqueue, lkb))
1381 goto out;
1382
1383 /*
1384 * 6-5: But the default algorithm for deciding whether to grant or
1385 * queue conversion requests does not by itself guarantee that such
1386 * requests are serviced on a "first come first serve" basis. This, in
1387 * turn, can lead to a phenomenon known as "indefinate postponement".
1388 *
1389 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1390 * the system service employed to request a lock conversion. This flag
1391 * forces certain conversion requests to be queued, even if they are
1392 * compatible with the granted modes of other locks on the same
1393 * resource. Thus, the use of this flag results in conversion requests
1394 * being ordered on a "first come first servce" basis.
1395 *
1396 * DCT: This condition is all about new conversions being able to occur
1397 * "in place" while the lock remains on the granted queue (assuming
1398 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1399 * doesn't _have_ to go onto the convert queue where it's processed in
1400 * order. The "now" variable is necessary to distinguish converts
1401 * being received and processed for the first time now, because once a
1402 * convert is moved to the conversion queue the condition below applies
1403 * requiring fifo granting.
1404 */
1405
1406 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
David Teigland90135922006-01-20 08:47:07 +00001407 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001408
1409 /*
David Teigland3bcd3682006-02-23 09:56:38 +00001410 * The NOORDER flag is set to avoid the standard vms rules on grant
1411 * order.
David Teiglande7fd4172006-01-18 09:30:29 +00001412 */
1413
1414 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
David Teigland90135922006-01-20 08:47:07 +00001415 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001416
1417 /*
1418 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1419 * granted until all other conversion requests ahead of it are granted
1420 * and/or canceled.
1421 */
1422
1423 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
David Teigland90135922006-01-20 08:47:07 +00001424 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001425
1426 /*
1427 * 6-4: By default, a new request is immediately granted only if all
1428 * three of the following conditions are satisfied when the request is
1429 * issued:
1430 * - The queue of ungranted conversion requests for the resource is
1431 * empty.
1432 * - The queue of ungranted new requests for the resource is empty.
1433 * - The mode of the new request is compatible with the most
1434 * restrictive mode of all granted locks on the resource.
1435 */
1436
1437 if (now && !conv && list_empty(&r->res_convertqueue) &&
1438 list_empty(&r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001439 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001440
1441 /*
1442 * 6-4: Once a lock request is in the queue of ungranted new requests,
1443 * it cannot be granted until the queue of ungranted conversion
1444 * requests is empty, all ungranted new requests ahead of it are
1445 * granted and/or canceled, and it is compatible with the granted mode
1446 * of the most restrictive lock granted on the resource.
1447 */
1448
1449 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1450 first_in_list(lkb, &r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001451 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001452
1453 out:
1454 /*
1455 * The following, enabled by CONVDEADLK, departs from VMS.
1456 */
1457
1458 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1459 conversion_deadlock_detect(r, lkb)) {
1460 lkb->lkb_grmode = DLM_LOCK_NL;
1461 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1462 }
1463
David Teigland90135922006-01-20 08:47:07 +00001464 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001465}
1466
1467/*
1468 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1469 * simple way to provide a big optimization to applications that can use them.
1470 */
1471
1472static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1473{
1474 uint32_t flags = lkb->lkb_exflags;
1475 int rv;
1476 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1477
1478 rv = _can_be_granted(r, lkb, now);
1479 if (rv)
1480 goto out;
1481
1482 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1483 goto out;
1484
1485 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1486 alt = DLM_LOCK_PR;
1487 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1488 alt = DLM_LOCK_CW;
1489
1490 if (alt) {
1491 lkb->lkb_rqmode = alt;
1492 rv = _can_be_granted(r, lkb, now);
1493 if (rv)
1494 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1495 else
1496 lkb->lkb_rqmode = rqmode;
1497 }
1498 out:
1499 return rv;
1500}
1501
1502static int grant_pending_convert(struct dlm_rsb *r, int high)
1503{
1504 struct dlm_lkb *lkb, *s;
1505 int hi, demoted, quit, grant_restart, demote_restart;
1506
1507 quit = 0;
1508 restart:
1509 grant_restart = 0;
1510 demote_restart = 0;
1511 hi = DLM_LOCK_IV;
1512
1513 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1514 demoted = is_demoted(lkb);
David Teigland90135922006-01-20 08:47:07 +00001515 if (can_be_granted(r, lkb, 0)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001516 grant_lock_pending(r, lkb);
1517 grant_restart = 1;
1518 } else {
1519 hi = max_t(int, lkb->lkb_rqmode, hi);
1520 if (!demoted && is_demoted(lkb))
1521 demote_restart = 1;
1522 }
1523 }
1524
1525 if (grant_restart)
1526 goto restart;
1527 if (demote_restart && !quit) {
1528 quit = 1;
1529 goto restart;
1530 }
1531
1532 return max_t(int, high, hi);
1533}
1534
1535static int grant_pending_wait(struct dlm_rsb *r, int high)
1536{
1537 struct dlm_lkb *lkb, *s;
1538
1539 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
David Teigland90135922006-01-20 08:47:07 +00001540 if (can_be_granted(r, lkb, 0))
David Teiglande7fd4172006-01-18 09:30:29 +00001541 grant_lock_pending(r, lkb);
1542 else
1543 high = max_t(int, lkb->lkb_rqmode, high);
1544 }
1545
1546 return high;
1547}
1548
1549static void grant_pending_locks(struct dlm_rsb *r)
1550{
1551 struct dlm_lkb *lkb, *s;
1552 int high = DLM_LOCK_IV;
1553
David Teiglanda345da32006-08-18 11:54:25 -05001554 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +00001555
1556 high = grant_pending_convert(r, high);
1557 high = grant_pending_wait(r, high);
1558
1559 if (high == DLM_LOCK_IV)
1560 return;
1561
1562 /*
1563 * If there are locks left on the wait/convert queue then send blocking
1564 * ASTs to granted locks based on the largest requested mode (high)
David Teigland3bcd3682006-02-23 09:56:38 +00001565 * found above. FIXME: highbast < high comparison not valid for PR/CW.
David Teiglande7fd4172006-01-18 09:30:29 +00001566 */
1567
1568 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1569 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1570 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1571 queue_bast(r, lkb, high);
1572 lkb->lkb_highbast = high;
1573 }
1574 }
1575}
1576
1577static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1578 struct dlm_lkb *lkb)
1579{
1580 struct dlm_lkb *gr;
1581
1582 list_for_each_entry(gr, head, lkb_statequeue) {
1583 if (gr->lkb_bastaddr &&
1584 gr->lkb_highbast < lkb->lkb_rqmode &&
David Teigland3bcd3682006-02-23 09:56:38 +00001585 !modes_compat(gr, lkb)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001586 queue_bast(r, gr, lkb->lkb_rqmode);
1587 gr->lkb_highbast = lkb->lkb_rqmode;
1588 }
1589 }
1590}
1591
1592static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1593{
1594 send_bast_queue(r, &r->res_grantqueue, lkb);
1595}
1596
1597static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1598{
1599 send_bast_queue(r, &r->res_grantqueue, lkb);
1600 send_bast_queue(r, &r->res_convertqueue, lkb);
1601}
1602
1603/* set_master(r, lkb) -- set the master nodeid of a resource
1604
1605 The purpose of this function is to set the nodeid field in the given
1606 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1607 known, it can just be copied to the lkb and the function will return
1608 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1609 before it can be copied to the lkb.
1610
1611 When the rsb nodeid is being looked up remotely, the initial lkb
1612 causing the lookup is kept on the ls_waiters list waiting for the
1613 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1614 on the rsb's res_lookup list until the master is verified.
1615
1616 Return values:
1617 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1618 1: the rsb master is not available and the lkb has been placed on
1619 a wait queue
1620*/
1621
1622static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1623{
1624 struct dlm_ls *ls = r->res_ls;
1625 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1626
1627 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1628 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1629 r->res_first_lkid = lkb->lkb_id;
1630 lkb->lkb_nodeid = r->res_nodeid;
1631 return 0;
1632 }
1633
1634 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1635 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1636 return 1;
1637 }
1638
1639 if (r->res_nodeid == 0) {
1640 lkb->lkb_nodeid = 0;
1641 return 0;
1642 }
1643
1644 if (r->res_nodeid > 0) {
1645 lkb->lkb_nodeid = r->res_nodeid;
1646 return 0;
1647 }
1648
David Teiglanda345da32006-08-18 11:54:25 -05001649 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +00001650
1651 dir_nodeid = dlm_dir_nodeid(r);
1652
1653 if (dir_nodeid != our_nodeid) {
1654 r->res_first_lkid = lkb->lkb_id;
1655 send_lookup(r, lkb);
1656 return 1;
1657 }
1658
1659 for (;;) {
1660 /* It's possible for dlm_scand to remove an old rsb for
1661 this same resource from the toss list, us to create
1662 a new one, look up the master locally, and find it
1663 already exists just before dlm_scand does the
1664 dir_remove() on the previous rsb. */
1665
1666 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1667 r->res_length, &ret_nodeid);
1668 if (!error)
1669 break;
1670 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1671 schedule();
1672 }
1673
1674 if (ret_nodeid == our_nodeid) {
1675 r->res_first_lkid = 0;
1676 r->res_nodeid = 0;
1677 lkb->lkb_nodeid = 0;
1678 } else {
1679 r->res_first_lkid = lkb->lkb_id;
1680 r->res_nodeid = ret_nodeid;
1681 lkb->lkb_nodeid = ret_nodeid;
1682 }
1683 return 0;
1684}
1685
1686static void process_lookup_list(struct dlm_rsb *r)
1687{
1688 struct dlm_lkb *lkb, *safe;
1689
1690 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05001691 list_del_init(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +00001692 _request_lock(r, lkb);
1693 schedule();
1694 }
1695}
1696
1697/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1698
1699static void confirm_master(struct dlm_rsb *r, int error)
1700{
1701 struct dlm_lkb *lkb;
1702
1703 if (!r->res_first_lkid)
1704 return;
1705
1706 switch (error) {
1707 case 0:
1708 case -EINPROGRESS:
1709 r->res_first_lkid = 0;
1710 process_lookup_list(r);
1711 break;
1712
1713 case -EAGAIN:
1714 /* the remote master didn't queue our NOQUEUE request;
1715 make a waiting lkb the first_lkid */
1716
1717 r->res_first_lkid = 0;
1718
1719 if (!list_empty(&r->res_lookup)) {
1720 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1721 lkb_rsb_lookup);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001722 list_del_init(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +00001723 r->res_first_lkid = lkb->lkb_id;
1724 _request_lock(r, lkb);
1725 } else
1726 r->res_nodeid = -1;
1727 break;
1728
1729 default:
1730 log_error(r->res_ls, "confirm_master unknown error %d", error);
1731 }
1732}
1733
1734static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1735 int namelen, uint32_t parent_lkid, void *ast,
David Teigland3bcd3682006-02-23 09:56:38 +00001736 void *astarg, void *bast, struct dlm_args *args)
David Teiglande7fd4172006-01-18 09:30:29 +00001737{
1738 int rv = -EINVAL;
1739
1740 /* check for invalid arg usage */
1741
1742 if (mode < 0 || mode > DLM_LOCK_EX)
1743 goto out;
1744
1745 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1746 goto out;
1747
1748 if (flags & DLM_LKF_CANCEL)
1749 goto out;
1750
1751 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1752 goto out;
1753
1754 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1755 goto out;
1756
1757 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1758 goto out;
1759
1760 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1761 goto out;
1762
1763 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1764 goto out;
1765
1766 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1767 goto out;
1768
1769 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1770 goto out;
1771
1772 if (!ast || !lksb)
1773 goto out;
1774
1775 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1776 goto out;
1777
1778 /* parent/child locks not yet supported */
1779 if (parent_lkid)
1780 goto out;
1781
1782 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1783 goto out;
1784
1785 /* these args will be copied to the lkb in validate_lock_args,
1786 it cannot be done now because when converting locks, fields in
1787 an active lkb cannot be modified before locking the rsb */
1788
1789 args->flags = flags;
1790 args->astaddr = ast;
1791 args->astparam = (long) astarg;
1792 args->bastaddr = bast;
1793 args->mode = mode;
1794 args->lksb = lksb;
David Teiglande7fd4172006-01-18 09:30:29 +00001795 rv = 0;
1796 out:
1797 return rv;
1798}
1799
1800static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1801{
1802 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1803 DLM_LKF_FORCEUNLOCK))
1804 return -EINVAL;
1805
David Teiglandef0c2bb2007-03-28 09:56:46 -05001806 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1807 return -EINVAL;
1808
David Teiglande7fd4172006-01-18 09:30:29 +00001809 args->flags = flags;
1810 args->astparam = (long) astarg;
1811 return 0;
1812}
1813
1814static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1815 struct dlm_args *args)
1816{
1817 int rv = -EINVAL;
1818
1819 if (args->flags & DLM_LKF_CONVERT) {
1820 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1821 goto out;
1822
1823 if (args->flags & DLM_LKF_QUECVT &&
1824 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1825 goto out;
1826
1827 rv = -EBUSY;
1828 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1829 goto out;
1830
1831 if (lkb->lkb_wait_type)
1832 goto out;
David Teiglandef0c2bb2007-03-28 09:56:46 -05001833
1834 if (is_overlap(lkb))
1835 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00001836 }
1837
1838 lkb->lkb_exflags = args->flags;
1839 lkb->lkb_sbflags = 0;
1840 lkb->lkb_astaddr = args->astaddr;
1841 lkb->lkb_astparam = args->astparam;
1842 lkb->lkb_bastaddr = args->bastaddr;
1843 lkb->lkb_rqmode = args->mode;
1844 lkb->lkb_lksb = args->lksb;
1845 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1846 lkb->lkb_ownpid = (int) current->pid;
David Teiglande7fd4172006-01-18 09:30:29 +00001847 rv = 0;
1848 out:
1849 return rv;
1850}
1851
David Teiglandef0c2bb2007-03-28 09:56:46 -05001852/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1853 for success */
1854
1855/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1856 because there may be a lookup in progress and it's valid to do
1857 cancel/unlockf on it */
1858
David Teiglande7fd4172006-01-18 09:30:29 +00001859static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1860{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001861 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
David Teiglande7fd4172006-01-18 09:30:29 +00001862 int rv = -EINVAL;
1863
David Teiglandef0c2bb2007-03-28 09:56:46 -05001864 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1865 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1866 dlm_print_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00001867 goto out;
David Teiglandef0c2bb2007-03-28 09:56:46 -05001868 }
David Teiglande7fd4172006-01-18 09:30:29 +00001869
David Teiglandef0c2bb2007-03-28 09:56:46 -05001870 /* an lkb may still exist even though the lock is EOL'ed due to a
1871 cancel, unlock or failed noqueue request; an app can't use these
1872 locks; return same error as if the lkid had not been found at all */
1873
1874 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1875 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1876 rv = -ENOENT;
1877 goto out;
1878 }
1879
1880 /* an lkb may be waiting for an rsb lookup to complete where the
1881 lookup was initiated by another lock */
1882
1883 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1884 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1885 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1886 list_del_init(&lkb->lkb_rsb_lookup);
1887 queue_cast(lkb->lkb_resource, lkb,
1888 args->flags & DLM_LKF_CANCEL ?
1889 -DLM_ECANCEL : -DLM_EUNLOCK);
1890 unhold_lkb(lkb); /* undoes create_lkb() */
1891 rv = -EBUSY;
1892 goto out;
1893 }
1894 }
1895
1896 /* cancel not allowed with another cancel/unlock in progress */
1897
1898 if (args->flags & DLM_LKF_CANCEL) {
1899 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1900 goto out;
1901
1902 if (is_overlap(lkb))
1903 goto out;
1904
1905 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1906 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1907 rv = -EBUSY;
1908 goto out;
1909 }
1910
1911 switch (lkb->lkb_wait_type) {
1912 case DLM_MSG_LOOKUP:
1913 case DLM_MSG_REQUEST:
1914 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1915 rv = -EBUSY;
1916 goto out;
1917 case DLM_MSG_UNLOCK:
1918 case DLM_MSG_CANCEL:
1919 goto out;
1920 }
1921 /* add_to_waiters() will set OVERLAP_CANCEL */
David Teiglande7fd4172006-01-18 09:30:29 +00001922 goto out_ok;
David Teiglandef0c2bb2007-03-28 09:56:46 -05001923 }
David Teiglande7fd4172006-01-18 09:30:29 +00001924
David Teiglandef0c2bb2007-03-28 09:56:46 -05001925 /* do we need to allow a force-unlock if there's a normal unlock
1926 already in progress? in what conditions could the normal unlock
1927 fail such that we'd want to send a force-unlock to be sure? */
David Teiglande7fd4172006-01-18 09:30:29 +00001928
David Teiglandef0c2bb2007-03-28 09:56:46 -05001929 if (args->flags & DLM_LKF_FORCEUNLOCK) {
1930 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1931 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00001932
David Teiglandef0c2bb2007-03-28 09:56:46 -05001933 if (is_overlap_unlock(lkb))
1934 goto out;
1935
1936 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1937 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1938 rv = -EBUSY;
1939 goto out;
1940 }
1941
1942 switch (lkb->lkb_wait_type) {
1943 case DLM_MSG_LOOKUP:
1944 case DLM_MSG_REQUEST:
1945 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1946 rv = -EBUSY;
1947 goto out;
1948 case DLM_MSG_UNLOCK:
1949 goto out;
1950 }
1951 /* add_to_waiters() will set OVERLAP_UNLOCK */
1952 goto out_ok;
1953 }
1954
1955 /* normal unlock not allowed if there's any op in progress */
David Teiglande7fd4172006-01-18 09:30:29 +00001956 rv = -EBUSY;
David Teiglandef0c2bb2007-03-28 09:56:46 -05001957 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
David Teiglande7fd4172006-01-18 09:30:29 +00001958 goto out;
1959
1960 out_ok:
David Teiglandef0c2bb2007-03-28 09:56:46 -05001961 /* an overlapping op shouldn't blow away exflags from other op */
1962 lkb->lkb_exflags |= args->flags;
David Teiglande7fd4172006-01-18 09:30:29 +00001963 lkb->lkb_sbflags = 0;
1964 lkb->lkb_astparam = args->astparam;
David Teiglande7fd4172006-01-18 09:30:29 +00001965 rv = 0;
1966 out:
David Teiglandef0c2bb2007-03-28 09:56:46 -05001967 if (rv)
1968 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1969 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1970 args->flags, lkb->lkb_wait_type,
1971 lkb->lkb_resource->res_name);
David Teiglande7fd4172006-01-18 09:30:29 +00001972 return rv;
1973}
1974
1975/*
1976 * Four stage 4 varieties:
1977 * do_request(), do_convert(), do_unlock(), do_cancel()
1978 * These are called on the master node for the given lock and
1979 * from the central locking logic.
1980 */
1981
1982static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1983{
1984 int error = 0;
1985
David Teigland90135922006-01-20 08:47:07 +00001986 if (can_be_granted(r, lkb, 1)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001987 grant_lock(r, lkb);
1988 queue_cast(r, lkb, 0);
1989 goto out;
1990 }
1991
1992 if (can_be_queued(lkb)) {
1993 error = -EINPROGRESS;
1994 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1995 send_blocking_asts(r, lkb);
1996 goto out;
1997 }
1998
1999 error = -EAGAIN;
2000 if (force_blocking_asts(lkb))
2001 send_blocking_asts_all(r, lkb);
2002 queue_cast(r, lkb, -EAGAIN);
2003
2004 out:
2005 return error;
2006}
2007
2008static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2009{
2010 int error = 0;
2011
2012 /* changing an existing lock may allow others to be granted */
2013
David Teigland90135922006-01-20 08:47:07 +00002014 if (can_be_granted(r, lkb, 1)) {
David Teiglande7fd4172006-01-18 09:30:29 +00002015 grant_lock(r, lkb);
2016 queue_cast(r, lkb, 0);
2017 grant_pending_locks(r);
2018 goto out;
2019 }
2020
David Teigland7d3c1fe2007-04-19 10:30:41 -05002021 /* is_demoted() means the can_be_granted() above set the grmode
2022 to NL, and left us on the granted queue. This auto-demotion
2023 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2024 now grantable. We have to try to grant other converting locks
2025 before we try again to grant this one. */
2026
2027 if (is_demoted(lkb)) {
2028 grant_pending_convert(r, DLM_LOCK_IV);
2029 if (_can_be_granted(r, lkb, 1)) {
2030 grant_lock(r, lkb);
2031 queue_cast(r, lkb, 0);
David Teiglande7fd4172006-01-18 09:30:29 +00002032 grant_pending_locks(r);
David Teigland7d3c1fe2007-04-19 10:30:41 -05002033 goto out;
2034 }
2035 /* else fall through and move to convert queue */
2036 }
2037
2038 if (can_be_queued(lkb)) {
David Teiglande7fd4172006-01-18 09:30:29 +00002039 error = -EINPROGRESS;
2040 del_lkb(r, lkb);
2041 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2042 send_blocking_asts(r, lkb);
2043 goto out;
2044 }
2045
2046 error = -EAGAIN;
2047 if (force_blocking_asts(lkb))
2048 send_blocking_asts_all(r, lkb);
2049 queue_cast(r, lkb, -EAGAIN);
2050
2051 out:
2052 return error;
2053}
2054
2055static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2056{
2057 remove_lock(r, lkb);
2058 queue_cast(r, lkb, -DLM_EUNLOCK);
2059 grant_pending_locks(r);
2060 return -DLM_EUNLOCK;
2061}
2062
David Teiglandef0c2bb2007-03-28 09:56:46 -05002063/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
Steven Whitehouse907b9bc2006-09-25 09:26:04 -04002064
David Teiglande7fd4172006-01-18 09:30:29 +00002065static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2066{
David Teiglandef0c2bb2007-03-28 09:56:46 -05002067 int error;
2068
2069 error = revert_lock(r, lkb);
2070 if (error) {
2071 queue_cast(r, lkb, -DLM_ECANCEL);
2072 grant_pending_locks(r);
2073 return -DLM_ECANCEL;
2074 }
2075 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002076}
2077
2078/*
2079 * Four stage 3 varieties:
2080 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2081 */
2082
2083/* add a new lkb to a possibly new rsb, called by requesting process */
2084
2085static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2086{
2087 int error;
2088
2089 /* set_master: sets lkb nodeid from r */
2090
2091 error = set_master(r, lkb);
2092 if (error < 0)
2093 goto out;
2094 if (error) {
2095 error = 0;
2096 goto out;
2097 }
2098
2099 if (is_remote(r))
2100 /* receive_request() calls do_request() on remote node */
2101 error = send_request(r, lkb);
2102 else
2103 error = do_request(r, lkb);
2104 out:
2105 return error;
2106}
2107
David Teigland3bcd3682006-02-23 09:56:38 +00002108/* change some property of an existing lkb, e.g. mode */
David Teiglande7fd4172006-01-18 09:30:29 +00002109
2110static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2111{
2112 int error;
2113
2114 if (is_remote(r))
2115 /* receive_convert() calls do_convert() on remote node */
2116 error = send_convert(r, lkb);
2117 else
2118 error = do_convert(r, lkb);
2119
2120 return error;
2121}
2122
2123/* remove an existing lkb from the granted queue */
2124
2125static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2126{
2127 int error;
2128
2129 if (is_remote(r))
2130 /* receive_unlock() calls do_unlock() on remote node */
2131 error = send_unlock(r, lkb);
2132 else
2133 error = do_unlock(r, lkb);
2134
2135 return error;
2136}
2137
2138/* remove an existing lkb from the convert or wait queue */
2139
2140static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2141{
2142 int error;
2143
2144 if (is_remote(r))
2145 /* receive_cancel() calls do_cancel() on remote node */
2146 error = send_cancel(r, lkb);
2147 else
2148 error = do_cancel(r, lkb);
2149
2150 return error;
2151}
2152
2153/*
2154 * Four stage 2 varieties:
2155 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2156 */
2157
2158static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2159 int len, struct dlm_args *args)
2160{
2161 struct dlm_rsb *r;
2162 int error;
2163
2164 error = validate_lock_args(ls, lkb, args);
2165 if (error)
2166 goto out;
2167
2168 error = find_rsb(ls, name, len, R_CREATE, &r);
2169 if (error)
2170 goto out;
2171
2172 lock_rsb(r);
2173
2174 attach_lkb(r, lkb);
2175 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2176
2177 error = _request_lock(r, lkb);
2178
2179 unlock_rsb(r);
2180 put_rsb(r);
2181
2182 out:
2183 return error;
2184}
2185
2186static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2187 struct dlm_args *args)
2188{
2189 struct dlm_rsb *r;
2190 int error;
2191
2192 r = lkb->lkb_resource;
2193
2194 hold_rsb(r);
2195 lock_rsb(r);
2196
2197 error = validate_lock_args(ls, lkb, args);
2198 if (error)
2199 goto out;
2200
2201 error = _convert_lock(r, lkb);
2202 out:
2203 unlock_rsb(r);
2204 put_rsb(r);
2205 return error;
2206}
2207
2208static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2209 struct dlm_args *args)
2210{
2211 struct dlm_rsb *r;
2212 int error;
2213
2214 r = lkb->lkb_resource;
2215
2216 hold_rsb(r);
2217 lock_rsb(r);
2218
2219 error = validate_unlock_args(lkb, args);
2220 if (error)
2221 goto out;
2222
2223 error = _unlock_lock(r, lkb);
2224 out:
2225 unlock_rsb(r);
2226 put_rsb(r);
2227 return error;
2228}
2229
2230static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2231 struct dlm_args *args)
2232{
2233 struct dlm_rsb *r;
2234 int error;
2235
2236 r = lkb->lkb_resource;
2237
2238 hold_rsb(r);
2239 lock_rsb(r);
2240
2241 error = validate_unlock_args(lkb, args);
2242 if (error)
2243 goto out;
2244
2245 error = _cancel_lock(r, lkb);
2246 out:
2247 unlock_rsb(r);
2248 put_rsb(r);
2249 return error;
2250}
2251
2252/*
2253 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2254 */
2255
2256int dlm_lock(dlm_lockspace_t *lockspace,
2257 int mode,
2258 struct dlm_lksb *lksb,
2259 uint32_t flags,
2260 void *name,
2261 unsigned int namelen,
2262 uint32_t parent_lkid,
2263 void (*ast) (void *astarg),
2264 void *astarg,
David Teigland3bcd3682006-02-23 09:56:38 +00002265 void (*bast) (void *astarg, int mode))
David Teiglande7fd4172006-01-18 09:30:29 +00002266{
2267 struct dlm_ls *ls;
2268 struct dlm_lkb *lkb;
2269 struct dlm_args args;
2270 int error, convert = flags & DLM_LKF_CONVERT;
2271
2272 ls = dlm_find_lockspace_local(lockspace);
2273 if (!ls)
2274 return -EINVAL;
2275
David Teigland85e86ed2007-05-18 08:58:15 -05002276 dlm_lock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002277
2278 if (convert)
2279 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2280 else
2281 error = create_lkb(ls, &lkb);
2282
2283 if (error)
2284 goto out;
2285
2286 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
David Teigland3bcd3682006-02-23 09:56:38 +00002287 astarg, bast, &args);
David Teiglande7fd4172006-01-18 09:30:29 +00002288 if (error)
2289 goto out_put;
2290
2291 if (convert)
2292 error = convert_lock(ls, lkb, &args);
2293 else
2294 error = request_lock(ls, lkb, name, namelen, &args);
2295
2296 if (error == -EINPROGRESS)
2297 error = 0;
2298 out_put:
2299 if (convert || error)
David Teiglandb3f58d82006-02-28 11:16:37 -05002300 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002301 if (error == -EAGAIN)
2302 error = 0;
2303 out:
David Teigland85e86ed2007-05-18 08:58:15 -05002304 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002305 dlm_put_lockspace(ls);
2306 return error;
2307}
2308
2309int dlm_unlock(dlm_lockspace_t *lockspace,
2310 uint32_t lkid,
2311 uint32_t flags,
2312 struct dlm_lksb *lksb,
2313 void *astarg)
2314{
2315 struct dlm_ls *ls;
2316 struct dlm_lkb *lkb;
2317 struct dlm_args args;
2318 int error;
2319
2320 ls = dlm_find_lockspace_local(lockspace);
2321 if (!ls)
2322 return -EINVAL;
2323
David Teigland85e86ed2007-05-18 08:58:15 -05002324 dlm_lock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002325
2326 error = find_lkb(ls, lkid, &lkb);
2327 if (error)
2328 goto out;
2329
2330 error = set_unlock_args(flags, astarg, &args);
2331 if (error)
2332 goto out_put;
2333
2334 if (flags & DLM_LKF_CANCEL)
2335 error = cancel_lock(ls, lkb, &args);
2336 else
2337 error = unlock_lock(ls, lkb, &args);
2338
2339 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2340 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002341 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2342 error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002343 out_put:
David Teiglandb3f58d82006-02-28 11:16:37 -05002344 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002345 out:
David Teigland85e86ed2007-05-18 08:58:15 -05002346 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002347 dlm_put_lockspace(ls);
2348 return error;
2349}
2350
2351/*
2352 * send/receive routines for remote operations and replies
2353 *
2354 * send_args
2355 * send_common
2356 * send_request receive_request
2357 * send_convert receive_convert
2358 * send_unlock receive_unlock
2359 * send_cancel receive_cancel
2360 * send_grant receive_grant
2361 * send_bast receive_bast
2362 * send_lookup receive_lookup
2363 * send_remove receive_remove
2364 *
2365 * send_common_reply
2366 * receive_request_reply send_request_reply
2367 * receive_convert_reply send_convert_reply
2368 * receive_unlock_reply send_unlock_reply
2369 * receive_cancel_reply send_cancel_reply
2370 * receive_lookup_reply send_lookup_reply
2371 */
2372
David Teigland7e4dac32007-04-02 09:06:41 -05002373static int _create_message(struct dlm_ls *ls, int mb_len,
2374 int to_nodeid, int mstype,
2375 struct dlm_message **ms_ret,
2376 struct dlm_mhandle **mh_ret)
2377{
2378 struct dlm_message *ms;
2379 struct dlm_mhandle *mh;
2380 char *mb;
2381
2382 /* get_buffer gives us a message handle (mh) that we need to
2383 pass into lowcomms_commit and a message buffer (mb) that we
2384 write our data into */
2385
2386 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2387 if (!mh)
2388 return -ENOBUFS;
2389
2390 memset(mb, 0, mb_len);
2391
2392 ms = (struct dlm_message *) mb;
2393
2394 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2395 ms->m_header.h_lockspace = ls->ls_global_id;
2396 ms->m_header.h_nodeid = dlm_our_nodeid();
2397 ms->m_header.h_length = mb_len;
2398 ms->m_header.h_cmd = DLM_MSG;
2399
2400 ms->m_type = mstype;
2401
2402 *mh_ret = mh;
2403 *ms_ret = ms;
2404 return 0;
2405}
2406
David Teiglande7fd4172006-01-18 09:30:29 +00002407static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2408 int to_nodeid, int mstype,
2409 struct dlm_message **ms_ret,
2410 struct dlm_mhandle **mh_ret)
2411{
David Teiglande7fd4172006-01-18 09:30:29 +00002412 int mb_len = sizeof(struct dlm_message);
2413
2414 switch (mstype) {
2415 case DLM_MSG_REQUEST:
2416 case DLM_MSG_LOOKUP:
2417 case DLM_MSG_REMOVE:
2418 mb_len += r->res_length;
2419 break;
2420 case DLM_MSG_CONVERT:
2421 case DLM_MSG_UNLOCK:
2422 case DLM_MSG_REQUEST_REPLY:
2423 case DLM_MSG_CONVERT_REPLY:
2424 case DLM_MSG_GRANT:
2425 if (lkb && lkb->lkb_lvbptr)
2426 mb_len += r->res_ls->ls_lvblen;
2427 break;
2428 }
2429
David Teigland7e4dac32007-04-02 09:06:41 -05002430 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2431 ms_ret, mh_ret);
David Teiglande7fd4172006-01-18 09:30:29 +00002432}
2433
2434/* further lowcomms enhancements or alternate implementations may make
2435 the return value from this function useful at some point */
2436
2437static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2438{
2439 dlm_message_out(ms);
2440 dlm_lowcomms_commit_buffer(mh);
2441 return 0;
2442}
2443
2444static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2445 struct dlm_message *ms)
2446{
2447 ms->m_nodeid = lkb->lkb_nodeid;
2448 ms->m_pid = lkb->lkb_ownpid;
2449 ms->m_lkid = lkb->lkb_id;
2450 ms->m_remid = lkb->lkb_remid;
2451 ms->m_exflags = lkb->lkb_exflags;
2452 ms->m_sbflags = lkb->lkb_sbflags;
2453 ms->m_flags = lkb->lkb_flags;
2454 ms->m_lvbseq = lkb->lkb_lvbseq;
2455 ms->m_status = lkb->lkb_status;
2456 ms->m_grmode = lkb->lkb_grmode;
2457 ms->m_rqmode = lkb->lkb_rqmode;
2458 ms->m_hash = r->res_hash;
2459
2460 /* m_result and m_bastmode are set from function args,
2461 not from lkb fields */
2462
2463 if (lkb->lkb_bastaddr)
2464 ms->m_asts |= AST_BAST;
2465 if (lkb->lkb_astaddr)
2466 ms->m_asts |= AST_COMP;
2467
David Teiglandda49f362006-12-13 10:38:45 -06002468 /* compare with switch in create_message; send_remove() doesn't
2469 use send_args() */
2470
2471 switch (ms->m_type) {
2472 case DLM_MSG_REQUEST:
2473 case DLM_MSG_LOOKUP:
David Teiglande7fd4172006-01-18 09:30:29 +00002474 memcpy(ms->m_extra, r->res_name, r->res_length);
David Teiglandda49f362006-12-13 10:38:45 -06002475 break;
2476 case DLM_MSG_CONVERT:
2477 case DLM_MSG_UNLOCK:
2478 case DLM_MSG_REQUEST_REPLY:
2479 case DLM_MSG_CONVERT_REPLY:
2480 case DLM_MSG_GRANT:
2481 if (!lkb->lkb_lvbptr)
2482 break;
David Teiglande7fd4172006-01-18 09:30:29 +00002483 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
David Teiglandda49f362006-12-13 10:38:45 -06002484 break;
2485 }
David Teiglande7fd4172006-01-18 09:30:29 +00002486}
2487
2488static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2489{
2490 struct dlm_message *ms;
2491 struct dlm_mhandle *mh;
2492 int to_nodeid, error;
2493
David Teiglandef0c2bb2007-03-28 09:56:46 -05002494 error = add_to_waiters(lkb, mstype);
2495 if (error)
2496 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00002497
2498 to_nodeid = r->res_nodeid;
2499
2500 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2501 if (error)
2502 goto fail;
2503
2504 send_args(r, lkb, ms);
2505
2506 error = send_message(mh, ms);
2507 if (error)
2508 goto fail;
2509 return 0;
2510
2511 fail:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002512 remove_from_waiters(lkb, msg_reply_type(mstype));
David Teiglande7fd4172006-01-18 09:30:29 +00002513 return error;
2514}
2515
2516static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2517{
2518 return send_common(r, lkb, DLM_MSG_REQUEST);
2519}
2520
2521static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2522{
2523 int error;
2524
2525 error = send_common(r, lkb, DLM_MSG_CONVERT);
2526
2527 /* down conversions go without a reply from the master */
2528 if (!error && down_conversion(lkb)) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05002529 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2530 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00002531 r->res_ls->ls_stub_ms.m_result = 0;
David Teigland32f105a2006-08-23 16:07:31 -04002532 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00002533 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2534 }
2535
2536 return error;
2537}
2538
2539/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2540 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2541 that the master is still correct. */
2542
2543static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2544{
2545 return send_common(r, lkb, DLM_MSG_UNLOCK);
2546}
2547
2548static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2549{
2550 return send_common(r, lkb, DLM_MSG_CANCEL);
2551}
2552
2553static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2554{
2555 struct dlm_message *ms;
2556 struct dlm_mhandle *mh;
2557 int to_nodeid, error;
2558
2559 to_nodeid = lkb->lkb_nodeid;
2560
2561 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2562 if (error)
2563 goto out;
2564
2565 send_args(r, lkb, ms);
2566
2567 ms->m_result = 0;
2568
2569 error = send_message(mh, ms);
2570 out:
2571 return error;
2572}
2573
2574static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2575{
2576 struct dlm_message *ms;
2577 struct dlm_mhandle *mh;
2578 int to_nodeid, error;
2579
2580 to_nodeid = lkb->lkb_nodeid;
2581
2582 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2583 if (error)
2584 goto out;
2585
2586 send_args(r, lkb, ms);
2587
2588 ms->m_bastmode = mode;
2589
2590 error = send_message(mh, ms);
2591 out:
2592 return error;
2593}
2594
2595static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2596{
2597 struct dlm_message *ms;
2598 struct dlm_mhandle *mh;
2599 int to_nodeid, error;
2600
David Teiglandef0c2bb2007-03-28 09:56:46 -05002601 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2602 if (error)
2603 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00002604
2605 to_nodeid = dlm_dir_nodeid(r);
2606
2607 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2608 if (error)
2609 goto fail;
2610
2611 send_args(r, lkb, ms);
2612
2613 error = send_message(mh, ms);
2614 if (error)
2615 goto fail;
2616 return 0;
2617
2618 fail:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002619 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
David Teiglande7fd4172006-01-18 09:30:29 +00002620 return error;
2621}
2622
2623static int send_remove(struct dlm_rsb *r)
2624{
2625 struct dlm_message *ms;
2626 struct dlm_mhandle *mh;
2627 int to_nodeid, error;
2628
2629 to_nodeid = dlm_dir_nodeid(r);
2630
2631 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2632 if (error)
2633 goto out;
2634
2635 memcpy(ms->m_extra, r->res_name, r->res_length);
2636 ms->m_hash = r->res_hash;
2637
2638 error = send_message(mh, ms);
2639 out:
2640 return error;
2641}
2642
2643static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2644 int mstype, int rv)
2645{
2646 struct dlm_message *ms;
2647 struct dlm_mhandle *mh;
2648 int to_nodeid, error;
2649
2650 to_nodeid = lkb->lkb_nodeid;
2651
2652 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2653 if (error)
2654 goto out;
2655
2656 send_args(r, lkb, ms);
2657
2658 ms->m_result = rv;
2659
2660 error = send_message(mh, ms);
2661 out:
2662 return error;
2663}
2664
2665static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2666{
2667 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2668}
2669
2670static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2671{
2672 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2673}
2674
2675static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2676{
2677 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2678}
2679
2680static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2681{
2682 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2683}
2684
2685static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2686 int ret_nodeid, int rv)
2687{
2688 struct dlm_rsb *r = &ls->ls_stub_rsb;
2689 struct dlm_message *ms;
2690 struct dlm_mhandle *mh;
2691 int error, nodeid = ms_in->m_header.h_nodeid;
2692
2693 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2694 if (error)
2695 goto out;
2696
2697 ms->m_lkid = ms_in->m_lkid;
2698 ms->m_result = rv;
2699 ms->m_nodeid = ret_nodeid;
2700
2701 error = send_message(mh, ms);
2702 out:
2703 return error;
2704}
2705
2706/* which args we save from a received message depends heavily on the type
2707 of message, unlike the send side where we can safely send everything about
2708 the lkb for any type of message */
2709
2710static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2711{
2712 lkb->lkb_exflags = ms->m_exflags;
David Teigland6f90a8b12006-11-10 14:16:27 -06002713 lkb->lkb_sbflags = ms->m_sbflags;
David Teiglande7fd4172006-01-18 09:30:29 +00002714 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2715 (ms->m_flags & 0x0000FFFF);
2716}
2717
2718static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2719{
2720 lkb->lkb_sbflags = ms->m_sbflags;
2721 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2722 (ms->m_flags & 0x0000FFFF);
2723}
2724
2725static int receive_extralen(struct dlm_message *ms)
2726{
2727 return (ms->m_header.h_length - sizeof(struct dlm_message));
2728}
2729
David Teiglande7fd4172006-01-18 09:30:29 +00002730static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2731 struct dlm_message *ms)
2732{
2733 int len;
2734
2735 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2736 if (!lkb->lkb_lvbptr)
2737 lkb->lkb_lvbptr = allocate_lvb(ls);
2738 if (!lkb->lkb_lvbptr)
2739 return -ENOMEM;
2740 len = receive_extralen(ms);
2741 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2742 }
2743 return 0;
2744}
2745
2746static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2747 struct dlm_message *ms)
2748{
2749 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2750 lkb->lkb_ownpid = ms->m_pid;
2751 lkb->lkb_remid = ms->m_lkid;
2752 lkb->lkb_grmode = DLM_LOCK_IV;
2753 lkb->lkb_rqmode = ms->m_rqmode;
2754 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2755 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2756
2757 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2758
David Teigland8d07fd52006-12-13 10:39:20 -06002759 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2760 /* lkb was just created so there won't be an lvb yet */
2761 lkb->lkb_lvbptr = allocate_lvb(ls);
2762 if (!lkb->lkb_lvbptr)
2763 return -ENOMEM;
2764 }
David Teiglande7fd4172006-01-18 09:30:29 +00002765
2766 return 0;
2767}
2768
2769static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2770 struct dlm_message *ms)
2771{
2772 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2773 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2774 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2775 lkb->lkb_id, lkb->lkb_remid);
2776 return -EINVAL;
2777 }
2778
2779 if (!is_master_copy(lkb))
2780 return -EINVAL;
2781
2782 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2783 return -EBUSY;
2784
David Teiglande7fd4172006-01-18 09:30:29 +00002785 if (receive_lvb(ls, lkb, ms))
2786 return -ENOMEM;
2787
2788 lkb->lkb_rqmode = ms->m_rqmode;
2789 lkb->lkb_lvbseq = ms->m_lvbseq;
2790
2791 return 0;
2792}
2793
2794static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2795 struct dlm_message *ms)
2796{
2797 if (!is_master_copy(lkb))
2798 return -EINVAL;
2799 if (receive_lvb(ls, lkb, ms))
2800 return -ENOMEM;
2801 return 0;
2802}
2803
2804/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2805 uses to send a reply and that the remote end uses to process the reply. */
2806
2807static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2808{
2809 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2810 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2811 lkb->lkb_remid = ms->m_lkid;
2812}
2813
2814static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2815{
2816 struct dlm_lkb *lkb;
2817 struct dlm_rsb *r;
2818 int error, namelen;
2819
2820 error = create_lkb(ls, &lkb);
2821 if (error)
2822 goto fail;
2823
2824 receive_flags(lkb, ms);
2825 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2826 error = receive_request_args(ls, lkb, ms);
2827 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05002828 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002829 goto fail;
2830 }
2831
2832 namelen = receive_extralen(ms);
2833
2834 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2835 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05002836 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002837 goto fail;
2838 }
2839
2840 lock_rsb(r);
2841
2842 attach_lkb(r, lkb);
2843 error = do_request(r, lkb);
2844 send_request_reply(r, lkb, error);
2845
2846 unlock_rsb(r);
2847 put_rsb(r);
2848
2849 if (error == -EINPROGRESS)
2850 error = 0;
2851 if (error)
David Teiglandb3f58d82006-02-28 11:16:37 -05002852 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002853 return;
2854
2855 fail:
2856 setup_stub_lkb(ls, ms);
2857 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2858}
2859
2860static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2861{
2862 struct dlm_lkb *lkb;
2863 struct dlm_rsb *r;
David Teigland90135922006-01-20 08:47:07 +00002864 int error, reply = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00002865
2866 error = find_lkb(ls, ms->m_remid, &lkb);
2867 if (error)
2868 goto fail;
2869
2870 r = lkb->lkb_resource;
2871
2872 hold_rsb(r);
2873 lock_rsb(r);
2874
2875 receive_flags(lkb, ms);
2876 error = receive_convert_args(ls, lkb, ms);
2877 if (error)
2878 goto out;
2879 reply = !down_conversion(lkb);
2880
2881 error = do_convert(r, lkb);
2882 out:
2883 if (reply)
2884 send_convert_reply(r, lkb, error);
2885
2886 unlock_rsb(r);
2887 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002888 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002889 return;
2890
2891 fail:
2892 setup_stub_lkb(ls, ms);
2893 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2894}
2895
2896static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2897{
2898 struct dlm_lkb *lkb;
2899 struct dlm_rsb *r;
2900 int error;
2901
2902 error = find_lkb(ls, ms->m_remid, &lkb);
2903 if (error)
2904 goto fail;
2905
2906 r = lkb->lkb_resource;
2907
2908 hold_rsb(r);
2909 lock_rsb(r);
2910
2911 receive_flags(lkb, ms);
2912 error = receive_unlock_args(ls, lkb, ms);
2913 if (error)
2914 goto out;
2915
2916 error = do_unlock(r, lkb);
2917 out:
2918 send_unlock_reply(r, lkb, error);
2919
2920 unlock_rsb(r);
2921 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002922 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002923 return;
2924
2925 fail:
2926 setup_stub_lkb(ls, ms);
2927 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2928}
2929
2930static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2931{
2932 struct dlm_lkb *lkb;
2933 struct dlm_rsb *r;
2934 int error;
2935
2936 error = find_lkb(ls, ms->m_remid, &lkb);
2937 if (error)
2938 goto fail;
2939
2940 receive_flags(lkb, ms);
2941
2942 r = lkb->lkb_resource;
2943
2944 hold_rsb(r);
2945 lock_rsb(r);
2946
2947 error = do_cancel(r, lkb);
2948 send_cancel_reply(r, lkb, error);
2949
2950 unlock_rsb(r);
2951 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002952 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002953 return;
2954
2955 fail:
2956 setup_stub_lkb(ls, ms);
2957 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2958}
2959
2960static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2961{
2962 struct dlm_lkb *lkb;
2963 struct dlm_rsb *r;
2964 int error;
2965
2966 error = find_lkb(ls, ms->m_remid, &lkb);
2967 if (error) {
2968 log_error(ls, "receive_grant no lkb");
2969 return;
2970 }
2971 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2972
2973 r = lkb->lkb_resource;
2974
2975 hold_rsb(r);
2976 lock_rsb(r);
2977
2978 receive_flags_reply(lkb, ms);
David Teigland7d3c1fe2007-04-19 10:30:41 -05002979 if (is_altmode(lkb))
2980 munge_altmode(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00002981 grant_lock_pc(r, lkb, ms);
2982 queue_cast(r, lkb, 0);
2983
2984 unlock_rsb(r);
2985 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002986 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002987}
2988
2989static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2990{
2991 struct dlm_lkb *lkb;
2992 struct dlm_rsb *r;
2993 int error;
2994
2995 error = find_lkb(ls, ms->m_remid, &lkb);
2996 if (error) {
2997 log_error(ls, "receive_bast no lkb");
2998 return;
2999 }
3000 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3001
3002 r = lkb->lkb_resource;
3003
3004 hold_rsb(r);
3005 lock_rsb(r);
3006
3007 queue_bast(r, lkb, ms->m_bastmode);
3008
3009 unlock_rsb(r);
3010 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003011 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003012}
3013
3014static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3015{
3016 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3017
3018 from_nodeid = ms->m_header.h_nodeid;
3019 our_nodeid = dlm_our_nodeid();
3020
3021 len = receive_extralen(ms);
3022
3023 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3024 if (dir_nodeid != our_nodeid) {
3025 log_error(ls, "lookup dir_nodeid %d from %d",
3026 dir_nodeid, from_nodeid);
3027 error = -EINVAL;
3028 ret_nodeid = -1;
3029 goto out;
3030 }
3031
3032 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3033
3034 /* Optimization: we're master so treat lookup as a request */
3035 if (!error && ret_nodeid == our_nodeid) {
3036 receive_request(ls, ms);
3037 return;
3038 }
3039 out:
3040 send_lookup_reply(ls, ms, ret_nodeid, error);
3041}
3042
3043static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3044{
3045 int len, dir_nodeid, from_nodeid;
3046
3047 from_nodeid = ms->m_header.h_nodeid;
3048
3049 len = receive_extralen(ms);
3050
3051 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3052 if (dir_nodeid != dlm_our_nodeid()) {
3053 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3054 dir_nodeid, from_nodeid);
3055 return;
3056 }
3057
3058 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3059}
3060
David Teigland84991372007-03-30 15:02:40 -05003061static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3062{
3063 do_purge(ls, ms->m_nodeid, ms->m_pid);
3064}
3065
David Teiglande7fd4172006-01-18 09:30:29 +00003066static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3067{
3068 struct dlm_lkb *lkb;
3069 struct dlm_rsb *r;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003070 int error, mstype, result;
David Teiglande7fd4172006-01-18 09:30:29 +00003071
3072 error = find_lkb(ls, ms->m_remid, &lkb);
3073 if (error) {
3074 log_error(ls, "receive_request_reply no lkb");
3075 return;
3076 }
3077 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3078
David Teiglande7fd4172006-01-18 09:30:29 +00003079 r = lkb->lkb_resource;
3080 hold_rsb(r);
3081 lock_rsb(r);
3082
David Teiglandef0c2bb2007-03-28 09:56:46 -05003083 mstype = lkb->lkb_wait_type;
3084 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3085 if (error)
3086 goto out;
3087
David Teiglande7fd4172006-01-18 09:30:29 +00003088 /* Optimization: the dir node was also the master, so it took our
3089 lookup as a request and sent request reply instead of lookup reply */
3090 if (mstype == DLM_MSG_LOOKUP) {
3091 r->res_nodeid = ms->m_header.h_nodeid;
3092 lkb->lkb_nodeid = r->res_nodeid;
3093 }
3094
David Teiglandef0c2bb2007-03-28 09:56:46 -05003095 /* this is the value returned from do_request() on the master */
3096 result = ms->m_result;
3097
3098 switch (result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003099 case -EAGAIN:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003100 /* request would block (be queued) on remote master */
David Teiglande7fd4172006-01-18 09:30:29 +00003101 queue_cast(r, lkb, -EAGAIN);
3102 confirm_master(r, -EAGAIN);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003103 unhold_lkb(lkb); /* undoes create_lkb() */
David Teiglande7fd4172006-01-18 09:30:29 +00003104 break;
3105
3106 case -EINPROGRESS:
3107 case 0:
3108 /* request was queued or granted on remote master */
3109 receive_flags_reply(lkb, ms);
3110 lkb->lkb_remid = ms->m_lkid;
David Teigland7d3c1fe2007-04-19 10:30:41 -05003111 if (is_altmode(lkb))
3112 munge_altmode(lkb, ms);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003113 if (result)
David Teiglande7fd4172006-01-18 09:30:29 +00003114 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3115 else {
3116 grant_lock_pc(r, lkb, ms);
3117 queue_cast(r, lkb, 0);
3118 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003119 confirm_master(r, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003120 break;
3121
David Teigland597d0ca2006-07-12 16:44:04 -05003122 case -EBADR:
David Teiglande7fd4172006-01-18 09:30:29 +00003123 case -ENOTBLK:
3124 /* find_rsb failed to find rsb or rsb wasn't master */
David Teiglandef0c2bb2007-03-28 09:56:46 -05003125 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3126 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003127 r->res_nodeid = -1;
3128 lkb->lkb_nodeid = -1;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003129
3130 if (is_overlap(lkb)) {
3131 /* we'll ignore error in cancel/unlock reply */
3132 queue_cast_overlap(r, lkb);
3133 unhold_lkb(lkb); /* undoes create_lkb() */
3134 } else
3135 _request_lock(r, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003136 break;
3137
3138 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003139 log_error(ls, "receive_request_reply %x error %d",
3140 lkb->lkb_id, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003141 }
3142
David Teiglandef0c2bb2007-03-28 09:56:46 -05003143 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3144 log_debug(ls, "receive_request_reply %x result %d unlock",
3145 lkb->lkb_id, result);
3146 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3147 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3148 send_unlock(r, lkb);
3149 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3150 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3151 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3152 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3153 send_cancel(r, lkb);
3154 } else {
3155 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3156 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3157 }
3158 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003159 unlock_rsb(r);
3160 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003161 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003162}
3163
3164static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3165 struct dlm_message *ms)
3166{
David Teiglande7fd4172006-01-18 09:30:29 +00003167 /* this is the value returned from do_convert() on the master */
David Teiglandef0c2bb2007-03-28 09:56:46 -05003168 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003169 case -EAGAIN:
3170 /* convert would block (be queued) on remote master */
3171 queue_cast(r, lkb, -EAGAIN);
3172 break;
3173
3174 case -EINPROGRESS:
3175 /* convert was queued on remote master */
David Teigland7d3c1fe2007-04-19 10:30:41 -05003176 receive_flags_reply(lkb, ms);
3177 if (is_demoted(lkb))
3178 munge_demoted(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00003179 del_lkb(r, lkb);
3180 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3181 break;
3182
3183 case 0:
3184 /* convert was granted on remote master */
3185 receive_flags_reply(lkb, ms);
David Teigland7d3c1fe2007-04-19 10:30:41 -05003186 if (is_demoted(lkb))
3187 munge_demoted(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00003188 grant_lock_pc(r, lkb, ms);
3189 queue_cast(r, lkb, 0);
3190 break;
3191
3192 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003193 log_error(r->res_ls, "receive_convert_reply %x error %d",
3194 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003195 }
3196}
3197
3198static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3199{
3200 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003201 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003202
3203 hold_rsb(r);
3204 lock_rsb(r);
3205
David Teiglandef0c2bb2007-03-28 09:56:46 -05003206 /* stub reply can happen with waiters_mutex held */
3207 error = remove_from_waiters_ms(lkb, ms);
3208 if (error)
3209 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00003210
David Teiglandef0c2bb2007-03-28 09:56:46 -05003211 __receive_convert_reply(r, lkb, ms);
3212 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003213 unlock_rsb(r);
3214 put_rsb(r);
3215}
3216
3217static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3218{
3219 struct dlm_lkb *lkb;
3220 int error;
3221
3222 error = find_lkb(ls, ms->m_remid, &lkb);
3223 if (error) {
3224 log_error(ls, "receive_convert_reply no lkb");
3225 return;
3226 }
3227 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3228
David Teiglande7fd4172006-01-18 09:30:29 +00003229 _receive_convert_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003230 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003231}
3232
3233static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3234{
3235 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003236 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003237
3238 hold_rsb(r);
3239 lock_rsb(r);
3240
David Teiglandef0c2bb2007-03-28 09:56:46 -05003241 /* stub reply can happen with waiters_mutex held */
3242 error = remove_from_waiters_ms(lkb, ms);
3243 if (error)
3244 goto out;
3245
David Teiglande7fd4172006-01-18 09:30:29 +00003246 /* this is the value returned from do_unlock() on the master */
3247
David Teiglandef0c2bb2007-03-28 09:56:46 -05003248 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003249 case -DLM_EUNLOCK:
3250 receive_flags_reply(lkb, ms);
3251 remove_lock_pc(r, lkb);
3252 queue_cast(r, lkb, -DLM_EUNLOCK);
3253 break;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003254 case -ENOENT:
3255 break;
David Teiglande7fd4172006-01-18 09:30:29 +00003256 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003257 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3258 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003259 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003260 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003261 unlock_rsb(r);
3262 put_rsb(r);
3263}
3264
3265static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3266{
3267 struct dlm_lkb *lkb;
3268 int error;
3269
3270 error = find_lkb(ls, ms->m_remid, &lkb);
3271 if (error) {
3272 log_error(ls, "receive_unlock_reply no lkb");
3273 return;
3274 }
3275 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3276
David Teiglande7fd4172006-01-18 09:30:29 +00003277 _receive_unlock_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003278 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003279}
3280
3281static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3282{
3283 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003284 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003285
3286 hold_rsb(r);
3287 lock_rsb(r);
3288
David Teiglandef0c2bb2007-03-28 09:56:46 -05003289 /* stub reply can happen with waiters_mutex held */
3290 error = remove_from_waiters_ms(lkb, ms);
3291 if (error)
3292 goto out;
3293
David Teiglande7fd4172006-01-18 09:30:29 +00003294 /* this is the value returned from do_cancel() on the master */
3295
David Teiglandef0c2bb2007-03-28 09:56:46 -05003296 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003297 case -DLM_ECANCEL:
3298 receive_flags_reply(lkb, ms);
3299 revert_lock_pc(r, lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003300 if (ms->m_result)
3301 queue_cast(r, lkb, -DLM_ECANCEL);
3302 break;
3303 case 0:
David Teiglande7fd4172006-01-18 09:30:29 +00003304 break;
3305 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003306 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3307 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003308 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003309 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003310 unlock_rsb(r);
3311 put_rsb(r);
3312}
3313
3314static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3315{
3316 struct dlm_lkb *lkb;
3317 int error;
3318
3319 error = find_lkb(ls, ms->m_remid, &lkb);
3320 if (error) {
3321 log_error(ls, "receive_cancel_reply no lkb");
3322 return;
3323 }
3324 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3325
David Teiglande7fd4172006-01-18 09:30:29 +00003326 _receive_cancel_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003327 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003328}
3329
3330static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3331{
3332 struct dlm_lkb *lkb;
3333 struct dlm_rsb *r;
3334 int error, ret_nodeid;
3335
3336 error = find_lkb(ls, ms->m_lkid, &lkb);
3337 if (error) {
3338 log_error(ls, "receive_lookup_reply no lkb");
3339 return;
3340 }
3341
David Teiglandef0c2bb2007-03-28 09:56:46 -05003342 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
David Teiglande7fd4172006-01-18 09:30:29 +00003343 FIXME: will a non-zero error ever be returned? */
David Teiglande7fd4172006-01-18 09:30:29 +00003344
3345 r = lkb->lkb_resource;
3346 hold_rsb(r);
3347 lock_rsb(r);
3348
David Teiglandef0c2bb2007-03-28 09:56:46 -05003349 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3350 if (error)
3351 goto out;
3352
David Teiglande7fd4172006-01-18 09:30:29 +00003353 ret_nodeid = ms->m_nodeid;
3354 if (ret_nodeid == dlm_our_nodeid()) {
3355 r->res_nodeid = 0;
3356 ret_nodeid = 0;
3357 r->res_first_lkid = 0;
3358 } else {
3359 /* set_master() will copy res_nodeid to lkb_nodeid */
3360 r->res_nodeid = ret_nodeid;
3361 }
3362
David Teiglandef0c2bb2007-03-28 09:56:46 -05003363 if (is_overlap(lkb)) {
3364 log_debug(ls, "receive_lookup_reply %x unlock %x",
3365 lkb->lkb_id, lkb->lkb_flags);
3366 queue_cast_overlap(r, lkb);
3367 unhold_lkb(lkb); /* undoes create_lkb() */
3368 goto out_list;
3369 }
3370
David Teiglande7fd4172006-01-18 09:30:29 +00003371 _request_lock(r, lkb);
3372
David Teiglandef0c2bb2007-03-28 09:56:46 -05003373 out_list:
David Teiglande7fd4172006-01-18 09:30:29 +00003374 if (!ret_nodeid)
3375 process_lookup_list(r);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003376 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003377 unlock_rsb(r);
3378 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003379 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003380}
3381
3382int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3383{
3384 struct dlm_message *ms = (struct dlm_message *) hd;
3385 struct dlm_ls *ls;
David Teigland8fd3a982007-01-24 10:11:45 -06003386 int error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003387
3388 if (!recovery)
3389 dlm_message_in(ms);
3390
3391 ls = dlm_find_lockspace_global(hd->h_lockspace);
3392 if (!ls) {
3393 log_print("drop message %d from %d for unknown lockspace %d",
3394 ms->m_type, nodeid, hd->h_lockspace);
3395 return -EINVAL;
3396 }
3397
3398 /* recovery may have just ended leaving a bunch of backed-up requests
3399 in the requestqueue; wait while dlm_recoverd clears them */
3400
3401 if (!recovery)
3402 dlm_wait_requestqueue(ls);
3403
3404 /* recovery may have just started while there were a bunch of
3405 in-flight requests -- save them in requestqueue to be processed
3406 after recovery. we can't let dlm_recvd block on the recovery
3407 lock. if dlm_recoverd is calling this function to clear the
3408 requestqueue, it needs to be interrupted (-EINTR) if another
3409 recovery operation is starting. */
3410
3411 while (1) {
3412 if (dlm_locking_stopped(ls)) {
David Teiglandd4400152006-10-31 11:55:56 -06003413 if (recovery) {
3414 error = -EINTR;
3415 goto out;
3416 }
3417 error = dlm_add_requestqueue(ls, nodeid, hd);
3418 if (error == -EAGAIN)
3419 continue;
3420 else {
3421 error = -EINTR;
3422 goto out;
3423 }
David Teiglande7fd4172006-01-18 09:30:29 +00003424 }
3425
David Teigland85e86ed2007-05-18 08:58:15 -05003426 if (dlm_lock_recovery_try(ls))
David Teiglande7fd4172006-01-18 09:30:29 +00003427 break;
3428 schedule();
3429 }
3430
3431 switch (ms->m_type) {
3432
3433 /* messages sent to a master node */
3434
3435 case DLM_MSG_REQUEST:
3436 receive_request(ls, ms);
3437 break;
3438
3439 case DLM_MSG_CONVERT:
3440 receive_convert(ls, ms);
3441 break;
3442
3443 case DLM_MSG_UNLOCK:
3444 receive_unlock(ls, ms);
3445 break;
3446
3447 case DLM_MSG_CANCEL:
3448 receive_cancel(ls, ms);
3449 break;
3450
3451 /* messages sent from a master node (replies to above) */
3452
3453 case DLM_MSG_REQUEST_REPLY:
3454 receive_request_reply(ls, ms);
3455 break;
3456
3457 case DLM_MSG_CONVERT_REPLY:
3458 receive_convert_reply(ls, ms);
3459 break;
3460
3461 case DLM_MSG_UNLOCK_REPLY:
3462 receive_unlock_reply(ls, ms);
3463 break;
3464
3465 case DLM_MSG_CANCEL_REPLY:
3466 receive_cancel_reply(ls, ms);
3467 break;
3468
3469 /* messages sent from a master node (only two types of async msg) */
3470
3471 case DLM_MSG_GRANT:
3472 receive_grant(ls, ms);
3473 break;
3474
3475 case DLM_MSG_BAST:
3476 receive_bast(ls, ms);
3477 break;
3478
3479 /* messages sent to a dir node */
3480
3481 case DLM_MSG_LOOKUP:
3482 receive_lookup(ls, ms);
3483 break;
3484
3485 case DLM_MSG_REMOVE:
3486 receive_remove(ls, ms);
3487 break;
3488
3489 /* messages sent from a dir node (remove has no reply) */
3490
3491 case DLM_MSG_LOOKUP_REPLY:
3492 receive_lookup_reply(ls, ms);
3493 break;
3494
David Teigland84991372007-03-30 15:02:40 -05003495 /* other messages */
3496
3497 case DLM_MSG_PURGE:
3498 receive_purge(ls, ms);
3499 break;
3500
David Teiglande7fd4172006-01-18 09:30:29 +00003501 default:
3502 log_error(ls, "unknown message type %d", ms->m_type);
3503 }
3504
David Teigland85e86ed2007-05-18 08:58:15 -05003505 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00003506 out:
3507 dlm_put_lockspace(ls);
3508 dlm_astd_wake();
David Teigland8fd3a982007-01-24 10:11:45 -06003509 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00003510}
3511
3512
3513/*
3514 * Recovery related
3515 */
3516
3517static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3518{
3519 if (middle_conversion(lkb)) {
3520 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003521 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003522 ls->ls_stub_ms.m_result = -EINPROGRESS;
David Teigland075529b2006-12-13 10:40:26 -06003523 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003524 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3525
3526 /* Same special case as in receive_rcom_lock_args() */
3527 lkb->lkb_grmode = DLM_LOCK_IV;
3528 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3529 unhold_lkb(lkb);
3530
3531 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3532 lkb->lkb_flags |= DLM_IFL_RESEND;
3533 }
3534
3535 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3536 conversions are async; there's no reply from the remote master */
3537}
3538
3539/* A waiting lkb needs recovery if the master node has failed, or
3540 the master node is changing (only when no directory is used) */
3541
3542static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3543{
3544 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3545 return 1;
3546
3547 if (!dlm_no_directory(ls))
3548 return 0;
3549
3550 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3551 return 1;
3552
3553 return 0;
3554}
3555
3556/* Recovery for locks that are waiting for replies from nodes that are now
3557 gone. We can just complete unlocks and cancels by faking a reply from the
3558 dead node. Requests and up-conversions we flag to be resent after
3559 recovery. Down-conversions can just be completed with a fake reply like
3560 unlocks. Conversions between PR and CW need special attention. */
3561
3562void dlm_recover_waiters_pre(struct dlm_ls *ls)
3563{
3564 struct dlm_lkb *lkb, *safe;
3565
David Teigland90135922006-01-20 08:47:07 +00003566 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003567
3568 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3569 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3570 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3571
3572 /* all outstanding lookups, regardless of destination will be
3573 resent after recovery is done */
3574
3575 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3576 lkb->lkb_flags |= DLM_IFL_RESEND;
3577 continue;
3578 }
3579
3580 if (!waiter_needs_recovery(ls, lkb))
3581 continue;
3582
3583 switch (lkb->lkb_wait_type) {
3584
3585 case DLM_MSG_REQUEST:
3586 lkb->lkb_flags |= DLM_IFL_RESEND;
3587 break;
3588
3589 case DLM_MSG_CONVERT:
3590 recover_convert_waiter(ls, lkb);
3591 break;
3592
3593 case DLM_MSG_UNLOCK:
3594 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003595 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003596 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
David Teigland075529b2006-12-13 10:40:26 -06003597 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003598 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003599 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003600 break;
3601
3602 case DLM_MSG_CANCEL:
3603 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003604 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003605 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
David Teigland075529b2006-12-13 10:40:26 -06003606 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003607 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003608 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003609 break;
3610
3611 default:
3612 log_error(ls, "invalid lkb wait_type %d",
3613 lkb->lkb_wait_type);
3614 }
David Teigland81456802006-07-25 14:05:09 -05003615 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00003616 }
David Teigland90135922006-01-20 08:47:07 +00003617 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003618}
3619
David Teiglandef0c2bb2007-03-28 09:56:46 -05003620static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +00003621{
3622 struct dlm_lkb *lkb;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003623 int found = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003624
David Teigland90135922006-01-20 08:47:07 +00003625 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003626 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3627 if (lkb->lkb_flags & DLM_IFL_RESEND) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05003628 hold_lkb(lkb);
3629 found = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00003630 break;
3631 }
3632 }
David Teigland90135922006-01-20 08:47:07 +00003633 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003634
David Teiglandef0c2bb2007-03-28 09:56:46 -05003635 if (!found)
David Teiglande7fd4172006-01-18 09:30:29 +00003636 lkb = NULL;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003637 return lkb;
David Teiglande7fd4172006-01-18 09:30:29 +00003638}
3639
3640/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3641 master or dir-node for r. Processing the lkb may result in it being placed
3642 back on waiters. */
3643
David Teiglandef0c2bb2007-03-28 09:56:46 -05003644/* We do this after normal locking has been enabled and any saved messages
3645 (in requestqueue) have been processed. We should be confident that at
3646 this point we won't get or process a reply to any of these waiting
3647 operations. But, new ops may be coming in on the rsbs/locks here from
3648 userspace or remotely. */
3649
3650/* there may have been an overlap unlock/cancel prior to recovery or after
3651 recovery. if before, the lkb may still have a pos wait_count; if after, the
3652 overlap flag would just have been set and nothing new sent. we can be
3653 confident here than any replies to either the initial op or overlap ops
3654 prior to recovery have been received. */
3655
David Teiglande7fd4172006-01-18 09:30:29 +00003656int dlm_recover_waiters_post(struct dlm_ls *ls)
3657{
3658 struct dlm_lkb *lkb;
3659 struct dlm_rsb *r;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003660 int error = 0, mstype, err, oc, ou;
David Teiglande7fd4172006-01-18 09:30:29 +00003661
3662 while (1) {
3663 if (dlm_locking_stopped(ls)) {
3664 log_debug(ls, "recover_waiters_post aborted");
3665 error = -EINTR;
3666 break;
3667 }
3668
David Teiglandef0c2bb2007-03-28 09:56:46 -05003669 lkb = find_resend_waiter(ls);
3670 if (!lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00003671 break;
3672
3673 r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003674 hold_rsb(r);
3675 lock_rsb(r);
3676
3677 mstype = lkb->lkb_wait_type;
3678 oc = is_overlap_cancel(lkb);
3679 ou = is_overlap_unlock(lkb);
3680 err = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003681
3682 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3683 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3684
David Teiglandef0c2bb2007-03-28 09:56:46 -05003685 /* At this point we assume that we won't get a reply to any
3686 previous op or overlap op on this lock. First, do a big
3687 remove_from_waiters() for all previous ops. */
David Teiglande7fd4172006-01-18 09:30:29 +00003688
David Teiglandef0c2bb2007-03-28 09:56:46 -05003689 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3690 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3691 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3692 lkb->lkb_wait_type = 0;
3693 lkb->lkb_wait_count = 0;
3694 mutex_lock(&ls->ls_waiters_mutex);
3695 list_del_init(&lkb->lkb_wait_reply);
3696 mutex_unlock(&ls->ls_waiters_mutex);
3697 unhold_lkb(lkb); /* for waiters list */
David Teiglande7fd4172006-01-18 09:30:29 +00003698
David Teiglandef0c2bb2007-03-28 09:56:46 -05003699 if (oc || ou) {
3700 /* do an unlock or cancel instead of resending */
3701 switch (mstype) {
3702 case DLM_MSG_LOOKUP:
3703 case DLM_MSG_REQUEST:
3704 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3705 -DLM_ECANCEL);
3706 unhold_lkb(lkb); /* undoes create_lkb() */
3707 break;
3708 case DLM_MSG_CONVERT:
3709 if (oc) {
3710 queue_cast(r, lkb, -DLM_ECANCEL);
3711 } else {
3712 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3713 _unlock_lock(r, lkb);
3714 }
3715 break;
3716 default:
3717 err = 1;
3718 }
3719 } else {
3720 switch (mstype) {
3721 case DLM_MSG_LOOKUP:
3722 case DLM_MSG_REQUEST:
3723 _request_lock(r, lkb);
3724 if (is_master(r))
3725 confirm_master(r, 0);
3726 break;
3727 case DLM_MSG_CONVERT:
3728 _convert_lock(r, lkb);
3729 break;
3730 default:
3731 err = 1;
3732 }
David Teiglande7fd4172006-01-18 09:30:29 +00003733 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003734
3735 if (err)
3736 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3737 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3738 unlock_rsb(r);
3739 put_rsb(r);
3740 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003741 }
3742
3743 return error;
3744}
3745
3746static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3747 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3748{
3749 struct dlm_ls *ls = r->res_ls;
3750 struct dlm_lkb *lkb, *safe;
3751
3752 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3753 if (test(ls, lkb)) {
David Teigland97a35d12006-05-02 13:34:03 -04003754 rsb_set_flag(r, RSB_LOCKS_PURGED);
David Teiglande7fd4172006-01-18 09:30:29 +00003755 del_lkb(r, lkb);
3756 /* this put should free the lkb */
David Teiglandb3f58d82006-02-28 11:16:37 -05003757 if (!dlm_put_lkb(lkb))
David Teiglande7fd4172006-01-18 09:30:29 +00003758 log_error(ls, "purged lkb not released");
3759 }
3760 }
3761}
3762
3763static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3764{
3765 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3766}
3767
3768static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3769{
3770 return is_master_copy(lkb);
3771}
3772
3773static void purge_dead_locks(struct dlm_rsb *r)
3774{
3775 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3776 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3777 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3778}
3779
3780void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3781{
3782 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3783 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3784 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3785}
3786
3787/* Get rid of locks held by nodes that are gone. */
3788
3789int dlm_purge_locks(struct dlm_ls *ls)
3790{
3791 struct dlm_rsb *r;
3792
3793 log_debug(ls, "dlm_purge_locks");
3794
3795 down_write(&ls->ls_root_sem);
3796 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3797 hold_rsb(r);
3798 lock_rsb(r);
3799 if (is_master(r))
3800 purge_dead_locks(r);
3801 unlock_rsb(r);
3802 unhold_rsb(r);
3803
3804 schedule();
3805 }
3806 up_write(&ls->ls_root_sem);
3807
3808 return 0;
3809}
3810
David Teigland97a35d12006-05-02 13:34:03 -04003811static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3812{
3813 struct dlm_rsb *r, *r_ret = NULL;
3814
3815 read_lock(&ls->ls_rsbtbl[bucket].lock);
3816 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3817 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3818 continue;
3819 hold_rsb(r);
3820 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3821 r_ret = r;
3822 break;
3823 }
3824 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3825 return r_ret;
3826}
3827
3828void dlm_grant_after_purge(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +00003829{
3830 struct dlm_rsb *r;
David Teigland2b4e9262006-07-25 13:59:48 -05003831 int bucket = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003832
David Teigland2b4e9262006-07-25 13:59:48 -05003833 while (1) {
3834 r = find_purged_rsb(ls, bucket);
3835 if (!r) {
3836 if (bucket == ls->ls_rsbtbl_size - 1)
3837 break;
3838 bucket++;
David Teigland97a35d12006-05-02 13:34:03 -04003839 continue;
David Teigland2b4e9262006-07-25 13:59:48 -05003840 }
David Teigland97a35d12006-05-02 13:34:03 -04003841 lock_rsb(r);
3842 if (is_master(r)) {
3843 grant_pending_locks(r);
3844 confirm_master(r, 0);
David Teiglande7fd4172006-01-18 09:30:29 +00003845 }
David Teigland97a35d12006-05-02 13:34:03 -04003846 unlock_rsb(r);
3847 put_rsb(r);
David Teigland2b4e9262006-07-25 13:59:48 -05003848 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00003849 }
David Teiglande7fd4172006-01-18 09:30:29 +00003850}
3851
3852static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3853 uint32_t remid)
3854{
3855 struct dlm_lkb *lkb;
3856
3857 list_for_each_entry(lkb, head, lkb_statequeue) {
3858 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3859 return lkb;
3860 }
3861 return NULL;
3862}
3863
3864static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3865 uint32_t remid)
3866{
3867 struct dlm_lkb *lkb;
3868
3869 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3870 if (lkb)
3871 return lkb;
3872 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3873 if (lkb)
3874 return lkb;
3875 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3876 if (lkb)
3877 return lkb;
3878 return NULL;
3879}
3880
3881static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3882 struct dlm_rsb *r, struct dlm_rcom *rc)
3883{
3884 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3885 int lvblen;
3886
3887 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3888 lkb->lkb_ownpid = rl->rl_ownpid;
3889 lkb->lkb_remid = rl->rl_lkid;
3890 lkb->lkb_exflags = rl->rl_exflags;
3891 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3892 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3893 lkb->lkb_lvbseq = rl->rl_lvbseq;
3894 lkb->lkb_rqmode = rl->rl_rqmode;
3895 lkb->lkb_grmode = rl->rl_grmode;
3896 /* don't set lkb_status because add_lkb wants to itself */
3897
3898 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3899 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3900
David Teiglande7fd4172006-01-18 09:30:29 +00003901 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3902 lkb->lkb_lvbptr = allocate_lvb(ls);
3903 if (!lkb->lkb_lvbptr)
3904 return -ENOMEM;
3905 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3906 sizeof(struct rcom_lock);
3907 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3908 }
3909
3910 /* Conversions between PR and CW (middle modes) need special handling.
3911 The real granted mode of these converting locks cannot be determined
3912 until all locks have been rebuilt on the rsb (recover_conversion) */
3913
3914 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3915 rl->rl_status = DLM_LKSTS_CONVERT;
3916 lkb->lkb_grmode = DLM_LOCK_IV;
3917 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3918 }
3919
3920 return 0;
3921}
3922
3923/* This lkb may have been recovered in a previous aborted recovery so we need
3924 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3925 If so we just send back a standard reply. If not, we create a new lkb with
3926 the given values and send back our lkid. We send back our lkid by sending
3927 back the rcom_lock struct we got but with the remid field filled in. */
3928
3929int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3930{
3931 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3932 struct dlm_rsb *r;
3933 struct dlm_lkb *lkb;
3934 int error;
3935
3936 if (rl->rl_parent_lkid) {
3937 error = -EOPNOTSUPP;
3938 goto out;
3939 }
3940
3941 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3942 if (error)
3943 goto out;
3944
3945 lock_rsb(r);
3946
3947 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3948 if (lkb) {
3949 error = -EEXIST;
3950 goto out_remid;
3951 }
3952
3953 error = create_lkb(ls, &lkb);
3954 if (error)
3955 goto out_unlock;
3956
3957 error = receive_rcom_lock_args(ls, lkb, r, rc);
3958 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05003959 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003960 goto out_unlock;
3961 }
3962
3963 attach_lkb(r, lkb);
3964 add_lkb(r, lkb, rl->rl_status);
3965 error = 0;
3966
3967 out_remid:
3968 /* this is the new value returned to the lock holder for
3969 saving in its process-copy lkb */
3970 rl->rl_remid = lkb->lkb_id;
3971
3972 out_unlock:
3973 unlock_rsb(r);
3974 put_rsb(r);
3975 out:
3976 if (error)
3977 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3978 rl->rl_result = error;
3979 return error;
3980}
3981
3982int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3983{
3984 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3985 struct dlm_rsb *r;
3986 struct dlm_lkb *lkb;
3987 int error;
3988
3989 error = find_lkb(ls, rl->rl_lkid, &lkb);
3990 if (error) {
3991 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3992 return error;
3993 }
3994
3995 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3996
3997 error = rl->rl_result;
3998
3999 r = lkb->lkb_resource;
4000 hold_rsb(r);
4001 lock_rsb(r);
4002
4003 switch (error) {
David Teiglanddc200a82006-12-13 10:36:37 -06004004 case -EBADR:
4005 /* There's a chance the new master received our lock before
4006 dlm_recover_master_reply(), this wouldn't happen if we did
4007 a barrier between recover_masters and recover_locks. */
4008 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4009 (unsigned long)r, r->res_name);
4010 dlm_send_rcom_lock(r, lkb);
4011 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00004012 case -EEXIST:
4013 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4014 /* fall through */
4015 case 0:
4016 lkb->lkb_remid = rl->rl_remid;
4017 break;
4018 default:
4019 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4020 error, lkb->lkb_id);
4021 }
4022
4023 /* an ack for dlm_recover_locks() which waits for replies from
4024 all the locks it sends to new masters */
4025 dlm_recovered_lock(r);
David Teiglanddc200a82006-12-13 10:36:37 -06004026 out:
David Teiglande7fd4172006-01-18 09:30:29 +00004027 unlock_rsb(r);
4028 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05004029 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00004030
4031 return 0;
4032}
4033
David Teigland597d0ca2006-07-12 16:44:04 -05004034int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4035 int mode, uint32_t flags, void *name, unsigned int namelen,
4036 uint32_t parent_lkid)
4037{
4038 struct dlm_lkb *lkb;
4039 struct dlm_args args;
4040 int error;
4041
David Teigland85e86ed2007-05-18 08:58:15 -05004042 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004043
4044 error = create_lkb(ls, &lkb);
4045 if (error) {
4046 kfree(ua);
4047 goto out;
4048 }
4049
4050 if (flags & DLM_LKF_VALBLK) {
David Teigland62a0f622007-01-31 13:25:00 -06004051 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
David Teigland597d0ca2006-07-12 16:44:04 -05004052 if (!ua->lksb.sb_lvbptr) {
4053 kfree(ua);
4054 __put_lkb(ls, lkb);
4055 error = -ENOMEM;
4056 goto out;
4057 }
4058 }
4059
4060 /* After ua is attached to lkb it will be freed by free_lkb().
4061 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4062 lock and that lkb_astparam is the dlm_user_args structure. */
4063
4064 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
David Teigland32f105a2006-08-23 16:07:31 -04004065 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
David Teigland597d0ca2006-07-12 16:44:04 -05004066 lkb->lkb_flags |= DLM_IFL_USER;
4067 ua->old_mode = DLM_LOCK_IV;
4068
4069 if (error) {
4070 __put_lkb(ls, lkb);
4071 goto out;
4072 }
4073
4074 error = request_lock(ls, lkb, name, namelen, &args);
4075
4076 switch (error) {
4077 case 0:
4078 break;
4079 case -EINPROGRESS:
4080 error = 0;
4081 break;
4082 case -EAGAIN:
4083 error = 0;
4084 /* fall through */
4085 default:
4086 __put_lkb(ls, lkb);
4087 goto out;
4088 }
4089
4090 /* add this new lkb to the per-process list of locks */
4091 spin_lock(&ua->proc->locks_spin);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004092 hold_lkb(lkb);
David Teigland597d0ca2006-07-12 16:44:04 -05004093 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4094 spin_unlock(&ua->proc->locks_spin);
4095 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004096 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004097 return error;
4098}
4099
4100int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4101 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4102{
4103 struct dlm_lkb *lkb;
4104 struct dlm_args args;
4105 struct dlm_user_args *ua;
4106 int error;
4107
David Teigland85e86ed2007-05-18 08:58:15 -05004108 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004109
4110 error = find_lkb(ls, lkid, &lkb);
4111 if (error)
4112 goto out;
4113
4114 /* user can change the params on its lock when it converts it, or
4115 add an lvb that didn't exist before */
4116
4117 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4118
4119 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
David Teigland62a0f622007-01-31 13:25:00 -06004120 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
David Teigland597d0ca2006-07-12 16:44:04 -05004121 if (!ua->lksb.sb_lvbptr) {
4122 error = -ENOMEM;
4123 goto out_put;
4124 }
4125 }
4126 if (lvb_in && ua->lksb.sb_lvbptr)
4127 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4128
4129 ua->castparam = ua_tmp->castparam;
4130 ua->castaddr = ua_tmp->castaddr;
4131 ua->bastparam = ua_tmp->bastparam;
4132 ua->bastaddr = ua_tmp->bastaddr;
Patrick Caulfield10948eb2006-08-23 09:49:31 +01004133 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004134 ua->old_mode = lkb->lkb_grmode;
4135
David Teigland32f105a2006-08-23 16:07:31 -04004136 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4137 ua, DLM_FAKE_USER_AST, &args);
David Teigland597d0ca2006-07-12 16:44:04 -05004138 if (error)
4139 goto out_put;
4140
4141 error = convert_lock(ls, lkb, &args);
4142
4143 if (error == -EINPROGRESS || error == -EAGAIN)
4144 error = 0;
4145 out_put:
4146 dlm_put_lkb(lkb);
4147 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004148 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004149 kfree(ua_tmp);
4150 return error;
4151}
4152
4153int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4154 uint32_t flags, uint32_t lkid, char *lvb_in)
4155{
4156 struct dlm_lkb *lkb;
4157 struct dlm_args args;
4158 struct dlm_user_args *ua;
4159 int error;
4160
David Teigland85e86ed2007-05-18 08:58:15 -05004161 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004162
4163 error = find_lkb(ls, lkid, &lkb);
4164 if (error)
4165 goto out;
4166
4167 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4168
4169 if (lvb_in && ua->lksb.sb_lvbptr)
4170 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4171 ua->castparam = ua_tmp->castparam;
Patrick Caulfieldcc346d52006-08-08 10:34:40 -04004172 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004173
4174 error = set_unlock_args(flags, ua, &args);
4175 if (error)
4176 goto out_put;
4177
4178 error = unlock_lock(ls, lkb, &args);
4179
4180 if (error == -DLM_EUNLOCK)
4181 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004182 /* from validate_unlock_args() */
4183 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4184 error = 0;
David Teigland597d0ca2006-07-12 16:44:04 -05004185 if (error)
4186 goto out_put;
4187
4188 spin_lock(&ua->proc->locks_spin);
David Teiglanda1bc86e2007-01-15 10:34:52 -06004189 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4190 if (!list_empty(&lkb->lkb_ownqueue))
4191 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
David Teigland597d0ca2006-07-12 16:44:04 -05004192 spin_unlock(&ua->proc->locks_spin);
David Teigland597d0ca2006-07-12 16:44:04 -05004193 out_put:
4194 dlm_put_lkb(lkb);
4195 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004196 dlm_unlock_recovery(ls);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004197 kfree(ua_tmp);
David Teigland597d0ca2006-07-12 16:44:04 -05004198 return error;
4199}
4200
4201int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4202 uint32_t flags, uint32_t lkid)
4203{
4204 struct dlm_lkb *lkb;
4205 struct dlm_args args;
4206 struct dlm_user_args *ua;
4207 int error;
4208
David Teigland85e86ed2007-05-18 08:58:15 -05004209 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004210
4211 error = find_lkb(ls, lkid, &lkb);
4212 if (error)
4213 goto out;
4214
4215 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4216 ua->castparam = ua_tmp->castparam;
Patrick Caulfieldc059f702006-08-23 10:24:03 +01004217 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004218
4219 error = set_unlock_args(flags, ua, &args);
4220 if (error)
4221 goto out_put;
4222
4223 error = cancel_lock(ls, lkb, &args);
4224
4225 if (error == -DLM_ECANCEL)
4226 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004227 /* from validate_unlock_args() */
4228 if (error == -EBUSY)
4229 error = 0;
David Teigland597d0ca2006-07-12 16:44:04 -05004230 out_put:
4231 dlm_put_lkb(lkb);
4232 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004233 dlm_unlock_recovery(ls);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004234 kfree(ua_tmp);
David Teigland597d0ca2006-07-12 16:44:04 -05004235 return error;
4236}
4237
David Teiglandef0c2bb2007-03-28 09:56:46 -05004238/* lkb's that are removed from the waiters list by revert are just left on the
4239 orphans list with the granted orphan locks, to be freed by purge */
4240
David Teigland597d0ca2006-07-12 16:44:04 -05004241static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4242{
4243 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004244 struct dlm_args args;
4245 int error;
David Teigland597d0ca2006-07-12 16:44:04 -05004246
David Teiglandef0c2bb2007-03-28 09:56:46 -05004247 hold_lkb(lkb);
4248 mutex_lock(&ls->ls_orphans_mutex);
4249 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4250 mutex_unlock(&ls->ls_orphans_mutex);
David Teigland597d0ca2006-07-12 16:44:04 -05004251
David Teiglandef0c2bb2007-03-28 09:56:46 -05004252 set_unlock_args(0, ua, &args);
4253
4254 error = cancel_lock(ls, lkb, &args);
4255 if (error == -DLM_ECANCEL)
4256 error = 0;
4257 return error;
David Teigland597d0ca2006-07-12 16:44:04 -05004258}
4259
4260/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4261 Regardless of what rsb queue the lock is on, it's removed and freed. */
4262
4263static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4264{
4265 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4266 struct dlm_args args;
4267 int error;
4268
David Teigland597d0ca2006-07-12 16:44:04 -05004269 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4270
4271 error = unlock_lock(ls, lkb, &args);
4272 if (error == -DLM_EUNLOCK)
4273 error = 0;
4274 return error;
4275}
4276
David Teiglandef0c2bb2007-03-28 09:56:46 -05004277/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4278 (which does lock_rsb) due to deadlock with receiving a message that does
4279 lock_rsb followed by dlm_user_add_ast() */
4280
4281static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4282 struct dlm_user_proc *proc)
4283{
4284 struct dlm_lkb *lkb = NULL;
4285
4286 mutex_lock(&ls->ls_clear_proc_locks);
4287 if (list_empty(&proc->locks))
4288 goto out;
4289
4290 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4291 list_del_init(&lkb->lkb_ownqueue);
4292
4293 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4294 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4295 else
4296 lkb->lkb_flags |= DLM_IFL_DEAD;
4297 out:
4298 mutex_unlock(&ls->ls_clear_proc_locks);
4299 return lkb;
4300}
4301
David Teigland597d0ca2006-07-12 16:44:04 -05004302/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4303 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4304 which we clear here. */
4305
4306/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4307 list, and no more device_writes should add lkb's to proc->locks list; so we
4308 shouldn't need to take asts_spin or locks_spin here. this assumes that
4309 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4310 them ourself. */
4311
4312void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4313{
4314 struct dlm_lkb *lkb, *safe;
4315
David Teigland85e86ed2007-05-18 08:58:15 -05004316 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004317
David Teiglandef0c2bb2007-03-28 09:56:46 -05004318 while (1) {
4319 lkb = del_proc_lock(ls, proc);
4320 if (!lkb)
4321 break;
4322 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
David Teigland597d0ca2006-07-12 16:44:04 -05004323 orphan_proc_lock(ls, lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004324 else
David Teigland597d0ca2006-07-12 16:44:04 -05004325 unlock_proc_lock(ls, lkb);
David Teigland597d0ca2006-07-12 16:44:04 -05004326
4327 /* this removes the reference for the proc->locks list
4328 added by dlm_user_request, it may result in the lkb
4329 being freed */
4330
4331 dlm_put_lkb(lkb);
4332 }
David Teiglanda1bc86e2007-01-15 10:34:52 -06004333
David Teiglandef0c2bb2007-03-28 09:56:46 -05004334 mutex_lock(&ls->ls_clear_proc_locks);
4335
David Teiglanda1bc86e2007-01-15 10:34:52 -06004336 /* in-progress unlocks */
4337 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4338 list_del_init(&lkb->lkb_ownqueue);
4339 lkb->lkb_flags |= DLM_IFL_DEAD;
4340 dlm_put_lkb(lkb);
4341 }
4342
4343 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4344 list_del(&lkb->lkb_astqueue);
4345 dlm_put_lkb(lkb);
4346 }
4347
David Teigland597d0ca2006-07-12 16:44:04 -05004348 mutex_unlock(&ls->ls_clear_proc_locks);
David Teigland85e86ed2007-05-18 08:58:15 -05004349 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004350}
David Teiglanda1bc86e2007-01-15 10:34:52 -06004351
David Teigland84991372007-03-30 15:02:40 -05004352static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4353{
4354 struct dlm_lkb *lkb, *safe;
4355
4356 while (1) {
4357 lkb = NULL;
4358 spin_lock(&proc->locks_spin);
4359 if (!list_empty(&proc->locks)) {
4360 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4361 lkb_ownqueue);
4362 list_del_init(&lkb->lkb_ownqueue);
4363 }
4364 spin_unlock(&proc->locks_spin);
4365
4366 if (!lkb)
4367 break;
4368
4369 lkb->lkb_flags |= DLM_IFL_DEAD;
4370 unlock_proc_lock(ls, lkb);
4371 dlm_put_lkb(lkb); /* ref from proc->locks list */
4372 }
4373
4374 spin_lock(&proc->locks_spin);
4375 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4376 list_del_init(&lkb->lkb_ownqueue);
4377 lkb->lkb_flags |= DLM_IFL_DEAD;
4378 dlm_put_lkb(lkb);
4379 }
4380 spin_unlock(&proc->locks_spin);
4381
4382 spin_lock(&proc->asts_spin);
4383 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4384 list_del(&lkb->lkb_astqueue);
4385 dlm_put_lkb(lkb);
4386 }
4387 spin_unlock(&proc->asts_spin);
4388}
4389
4390/* pid of 0 means purge all orphans */
4391
4392static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4393{
4394 struct dlm_lkb *lkb, *safe;
4395
4396 mutex_lock(&ls->ls_orphans_mutex);
4397 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4398 if (pid && lkb->lkb_ownpid != pid)
4399 continue;
4400 unlock_proc_lock(ls, lkb);
4401 list_del_init(&lkb->lkb_ownqueue);
4402 dlm_put_lkb(lkb);
4403 }
4404 mutex_unlock(&ls->ls_orphans_mutex);
4405}
4406
4407static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4408{
4409 struct dlm_message *ms;
4410 struct dlm_mhandle *mh;
4411 int error;
4412
4413 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4414 DLM_MSG_PURGE, &ms, &mh);
4415 if (error)
4416 return error;
4417 ms->m_nodeid = nodeid;
4418 ms->m_pid = pid;
4419
4420 return send_message(mh, ms);
4421}
4422
4423int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4424 int nodeid, int pid)
4425{
4426 int error = 0;
4427
4428 if (nodeid != dlm_our_nodeid()) {
4429 error = send_purge(ls, nodeid, pid);
4430 } else {
David Teigland85e86ed2007-05-18 08:58:15 -05004431 dlm_lock_recovery(ls);
David Teigland84991372007-03-30 15:02:40 -05004432 if (pid == current->pid)
4433 purge_proc_locks(ls, proc);
4434 else
4435 do_purge(ls, nodeid, pid);
David Teigland85e86ed2007-05-18 08:58:15 -05004436 dlm_unlock_recovery(ls);
David Teigland84991372007-03-30 15:02:40 -05004437 }
4438 return error;
4439}
4440