blob: ad3797a37942fbf01cd6efefaaf4437aad91aa99 [file] [log] [blame]
David Teiglande7fd4172006-01-18 09:30:29 +00001/******************************************************************************
2*******************************************************************************
3**
David Teiglandef0c2bb2007-03-28 09:56:46 -05004** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
David Teiglande7fd4172006-01-18 09:30:29 +00005**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
David Teigland597d0ca2006-07-12 16:44:04 -050058#include <linux/types.h>
David Teiglande7fd4172006-01-18 09:30:29 +000059#include "dlm_internal.h"
David Teigland597d0ca2006-07-12 16:44:04 -050060#include <linux/dlm_device.h>
David Teiglande7fd4172006-01-18 09:30:29 +000061#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
David Teigland597d0ca2006-07-12 16:44:04 -050073#include "user.h"
David Teiglande7fd4172006-01-18 09:30:29 +000074#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
David Teigland3ae1acf2007-05-18 08:59:31 -050085static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
David Teiglande7fd4172006-01-18 09:30:29 +000086static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 struct dlm_message *ms);
88static int receive_extralen(struct dlm_message *ms);
David Teigland84991372007-03-30 15:02:40 -050089static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
David Teigland3ae1acf2007-05-18 08:59:31 -050090static void del_timeout(struct dlm_lkb *lkb);
91void dlm_timeout_warn(struct dlm_lkb *lkb);
David Teiglande7fd4172006-01-18 09:30:29 +000092
93/*
94 * Lock compatibilty matrix - thanks Steve
95 * UN = Unlocked state. Not really a state, used as a flag
96 * PD = Padding. Used to make the matrix a nice power of two in size
97 * Other states are the same as the VMS DLM.
98 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
99 */
100
101static const int __dlm_compat_matrix[8][8] = {
102 /* UN NL CR CW PR PW EX PD */
103 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
105 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
106 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
107 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
108 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
109 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
110 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
111};
112
113/*
114 * This defines the direction of transfer of LVB data.
115 * Granted mode is the row; requested mode is the column.
116 * Usage: matrix[grmode+1][rqmode+1]
117 * 1 = LVB is returned to the caller
118 * 0 = LVB is written to the resource
119 * -1 = nothing happens to the LVB
120 */
121
122const int dlm_lvb_operations[8][8] = {
123 /* UN NL CR CW PR PW EX PD*/
124 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
125 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
126 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
127 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
128 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
129 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
130 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
131 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
132};
David Teiglande7fd4172006-01-18 09:30:29 +0000133
134#define modes_compat(gr, rq) \
135 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137int dlm_modes_compat(int mode1, int mode2)
138{
139 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140}
141
142/*
143 * Compatibility matrix for conversions with QUECVT set.
144 * Granted mode is the row; requested mode is the column.
145 * Usage: matrix[grmode+1][rqmode+1]
146 */
147
148static const int __quecvt_compat_matrix[8][8] = {
149 /* UN NL CR CW PR PW EX PD */
150 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
151 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
152 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
153 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
154 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
155 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
156 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
157 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
158};
159
David Teigland597d0ca2006-07-12 16:44:04 -0500160void dlm_print_lkb(struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000161{
162 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
164 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
167}
168
169void dlm_print_rsb(struct dlm_rsb *r)
170{
171 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 r->res_nodeid, r->res_flags, r->res_first_lkid,
173 r->res_recover_locks_count, r->res_name);
174}
175
David Teiglanda345da32006-08-18 11:54:25 -0500176void dlm_dump_rsb(struct dlm_rsb *r)
177{
178 struct dlm_lkb *lkb;
179
180 dlm_print_rsb(r);
181
182 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184 printk(KERN_ERR "rsb lookup list\n");
185 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb grant queue:\n");
188 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb convert queue:\n");
191 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
193 printk(KERN_ERR "rsb wait queue:\n");
194 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195 dlm_print_lkb(lkb);
196}
197
David Teiglande7fd4172006-01-18 09:30:29 +0000198/* Threads cannot use the lockspace while it's being recovered */
199
David Teigland85e86ed2007-05-18 08:58:15 -0500200static inline void dlm_lock_recovery(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000201{
202 down_read(&ls->ls_in_recovery);
203}
204
David Teigland85e86ed2007-05-18 08:58:15 -0500205void dlm_unlock_recovery(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000206{
207 up_read(&ls->ls_in_recovery);
208}
209
David Teigland85e86ed2007-05-18 08:58:15 -0500210int dlm_lock_recovery_try(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000211{
212 return down_read_trylock(&ls->ls_in_recovery);
213}
214
215static inline int can_be_queued(struct dlm_lkb *lkb)
216{
217 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218}
219
220static inline int force_blocking_asts(struct dlm_lkb *lkb)
221{
222 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223}
224
225static inline int is_demoted(struct dlm_lkb *lkb)
226{
227 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228}
229
David Teigland7d3c1fe2007-04-19 10:30:41 -0500230static inline int is_altmode(struct dlm_lkb *lkb)
231{
232 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233}
234
235static inline int is_granted(struct dlm_lkb *lkb)
236{
237 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238}
239
David Teiglande7fd4172006-01-18 09:30:29 +0000240static inline int is_remote(struct dlm_rsb *r)
241{
242 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243 return !!r->res_nodeid;
244}
245
246static inline int is_process_copy(struct dlm_lkb *lkb)
247{
248 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249}
250
251static inline int is_master_copy(struct dlm_lkb *lkb)
252{
253 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
David Teigland90135922006-01-20 08:47:07 +0000255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000256}
257
258static inline int middle_conversion(struct dlm_lkb *lkb)
259{
260 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
David Teigland90135922006-01-20 08:47:07 +0000262 return 1;
263 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000264}
265
266static inline int down_conversion(struct dlm_lkb *lkb)
267{
268 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269}
270
David Teiglandef0c2bb2007-03-28 09:56:46 -0500271static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272{
273 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274}
275
276static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277{
278 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279}
280
281static inline int is_overlap(struct dlm_lkb *lkb)
282{
283 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 DLM_IFL_OVERLAP_CANCEL));
285}
286
David Teiglande7fd4172006-01-18 09:30:29 +0000287static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288{
289 if (is_master_copy(lkb))
290 return;
291
David Teigland3ae1acf2007-05-18 08:59:31 -0500292 del_timeout(lkb);
293
David Teiglande7fd4172006-01-18 09:30:29 +0000294 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
David Teigland3ae1acf2007-05-18 08:59:31 -0500296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300 rv = -ETIMEDOUT;
301 }
302
David Teiglande7fd4172006-01-18 09:30:29 +0000303 lkb->lkb_lksb->sb_status = rv;
304 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
305
306 dlm_add_ast(lkb, AST_COMP);
307}
308
David Teiglandef0c2bb2007-03-28 09:56:46 -0500309static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
310{
311 queue_cast(r, lkb,
312 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
313}
314
David Teiglande7fd4172006-01-18 09:30:29 +0000315static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
316{
317 if (is_master_copy(lkb))
318 send_bast(r, lkb, rqmode);
319 else {
320 lkb->lkb_bastmode = rqmode;
321 dlm_add_ast(lkb, AST_BAST);
322 }
323}
324
325/*
326 * Basic operations on rsb's and lkb's
327 */
328
329static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
330{
331 struct dlm_rsb *r;
332
333 r = allocate_rsb(ls, len);
334 if (!r)
335 return NULL;
336
337 r->res_ls = ls;
338 r->res_length = len;
339 memcpy(r->res_name, name, len);
David Teigland90135922006-01-20 08:47:07 +0000340 mutex_init(&r->res_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000341
342 INIT_LIST_HEAD(&r->res_lookup);
343 INIT_LIST_HEAD(&r->res_grantqueue);
344 INIT_LIST_HEAD(&r->res_convertqueue);
345 INIT_LIST_HEAD(&r->res_waitqueue);
346 INIT_LIST_HEAD(&r->res_root_list);
347 INIT_LIST_HEAD(&r->res_recover_list);
348
349 return r;
350}
351
352static int search_rsb_list(struct list_head *head, char *name, int len,
353 unsigned int flags, struct dlm_rsb **r_ret)
354{
355 struct dlm_rsb *r;
356 int error = 0;
357
358 list_for_each_entry(r, head, res_hashchain) {
359 if (len == r->res_length && !memcmp(name, r->res_name, len))
360 goto found;
361 }
David Teigland597d0ca2006-07-12 16:44:04 -0500362 return -EBADR;
David Teiglande7fd4172006-01-18 09:30:29 +0000363
364 found:
365 if (r->res_nodeid && (flags & R_MASTER))
366 error = -ENOTBLK;
367 *r_ret = r;
368 return error;
369}
370
371static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
372 unsigned int flags, struct dlm_rsb **r_ret)
373{
374 struct dlm_rsb *r;
375 int error;
376
377 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
378 if (!error) {
379 kref_get(&r->res_ref);
380 goto out;
381 }
382 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
383 if (error)
384 goto out;
385
386 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
387
388 if (dlm_no_directory(ls))
389 goto out;
390
391 if (r->res_nodeid == -1) {
392 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
393 r->res_first_lkid = 0;
394 } else if (r->res_nodeid > 0) {
395 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
396 r->res_first_lkid = 0;
397 } else {
398 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
399 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
400 }
401 out:
402 *r_ret = r;
403 return error;
404}
405
406static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
407 unsigned int flags, struct dlm_rsb **r_ret)
408{
409 int error;
410 write_lock(&ls->ls_rsbtbl[b].lock);
411 error = _search_rsb(ls, name, len, b, flags, r_ret);
412 write_unlock(&ls->ls_rsbtbl[b].lock);
413 return error;
414}
415
416/*
417 * Find rsb in rsbtbl and potentially create/add one
418 *
419 * Delaying the release of rsb's has a similar benefit to applications keeping
420 * NL locks on an rsb, but without the guarantee that the cached master value
421 * will still be valid when the rsb is reused. Apps aren't always smart enough
422 * to keep NL locks on an rsb that they may lock again shortly; this can lead
423 * to excessive master lookups and removals if we don't delay the release.
424 *
425 * Searching for an rsb means looking through both the normal list and toss
426 * list. When found on the toss list the rsb is moved to the normal list with
427 * ref count of 1; when found on normal list the ref count is incremented.
428 */
429
430static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
431 unsigned int flags, struct dlm_rsb **r_ret)
432{
433 struct dlm_rsb *r, *tmp;
434 uint32_t hash, bucket;
435 int error = 0;
436
437 if (dlm_no_directory(ls))
438 flags |= R_CREATE;
439
440 hash = jhash(name, namelen, 0);
441 bucket = hash & (ls->ls_rsbtbl_size - 1);
442
443 error = search_rsb(ls, name, namelen, bucket, flags, &r);
444 if (!error)
445 goto out;
446
David Teigland597d0ca2006-07-12 16:44:04 -0500447 if (error == -EBADR && !(flags & R_CREATE))
David Teiglande7fd4172006-01-18 09:30:29 +0000448 goto out;
449
450 /* the rsb was found but wasn't a master copy */
451 if (error == -ENOTBLK)
452 goto out;
453
454 error = -ENOMEM;
455 r = create_rsb(ls, name, namelen);
456 if (!r)
457 goto out;
458
459 r->res_hash = hash;
460 r->res_bucket = bucket;
461 r->res_nodeid = -1;
462 kref_init(&r->res_ref);
463
464 /* With no directory, the master can be set immediately */
465 if (dlm_no_directory(ls)) {
466 int nodeid = dlm_dir_nodeid(r);
467 if (nodeid == dlm_our_nodeid())
468 nodeid = 0;
469 r->res_nodeid = nodeid;
470 }
471
472 write_lock(&ls->ls_rsbtbl[bucket].lock);
473 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
474 if (!error) {
475 write_unlock(&ls->ls_rsbtbl[bucket].lock);
476 free_rsb(r);
477 r = tmp;
478 goto out;
479 }
480 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
481 write_unlock(&ls->ls_rsbtbl[bucket].lock);
482 error = 0;
483 out:
484 *r_ret = r;
485 return error;
486}
487
488int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
489 unsigned int flags, struct dlm_rsb **r_ret)
490{
491 return find_rsb(ls, name, namelen, flags, r_ret);
492}
493
494/* This is only called to add a reference when the code already holds
495 a valid reference to the rsb, so there's no need for locking. */
496
497static inline void hold_rsb(struct dlm_rsb *r)
498{
499 kref_get(&r->res_ref);
500}
501
502void dlm_hold_rsb(struct dlm_rsb *r)
503{
504 hold_rsb(r);
505}
506
507static void toss_rsb(struct kref *kref)
508{
509 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510 struct dlm_ls *ls = r->res_ls;
511
512 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513 kref_init(&r->res_ref);
514 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515 r->res_toss_time = jiffies;
516 if (r->res_lvbptr) {
517 free_lvb(r->res_lvbptr);
518 r->res_lvbptr = NULL;
519 }
520}
521
522/* When all references to the rsb are gone it's transfered to
523 the tossed list for later disposal. */
524
525static void put_rsb(struct dlm_rsb *r)
526{
527 struct dlm_ls *ls = r->res_ls;
528 uint32_t bucket = r->res_bucket;
529
530 write_lock(&ls->ls_rsbtbl[bucket].lock);
531 kref_put(&r->res_ref, toss_rsb);
532 write_unlock(&ls->ls_rsbtbl[bucket].lock);
533}
534
535void dlm_put_rsb(struct dlm_rsb *r)
536{
537 put_rsb(r);
538}
539
540/* See comment for unhold_lkb */
541
542static void unhold_rsb(struct dlm_rsb *r)
543{
544 int rv;
545 rv = kref_put(&r->res_ref, toss_rsb);
David Teiglanda345da32006-08-18 11:54:25 -0500546 DLM_ASSERT(!rv, dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +0000547}
548
549static void kill_rsb(struct kref *kref)
550{
551 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
552
553 /* All work is done after the return from kref_put() so we
554 can release the write_lock before the remove and free. */
555
David Teiglanda345da32006-08-18 11:54:25 -0500556 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +0000562}
563
564/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565 The rsb must exist as long as any lkb's for it do. */
566
567static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
568{
569 hold_rsb(r);
570 lkb->lkb_resource = r;
571}
572
573static void detach_lkb(struct dlm_lkb *lkb)
574{
575 if (lkb->lkb_resource) {
576 put_rsb(lkb->lkb_resource);
577 lkb->lkb_resource = NULL;
578 }
579}
580
581static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
582{
583 struct dlm_lkb *lkb, *tmp;
584 uint32_t lkid = 0;
585 uint16_t bucket;
586
587 lkb = allocate_lkb(ls);
588 if (!lkb)
589 return -ENOMEM;
590
591 lkb->lkb_nodeid = -1;
592 lkb->lkb_grmode = DLM_LOCK_IV;
593 kref_init(&lkb->lkb_ref);
David Teigland34e22be2006-07-18 11:24:04 -0500594 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500595 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
David Teigland3ae1acf2007-05-18 08:59:31 -0500596 INIT_LIST_HEAD(&lkb->lkb_time_list);
David Teiglande7fd4172006-01-18 09:30:29 +0000597
598 get_random_bytes(&bucket, sizeof(bucket));
599 bucket &= (ls->ls_lkbtbl_size - 1);
600
601 write_lock(&ls->ls_lkbtbl[bucket].lock);
602
603 /* counter can roll over so we must verify lkid is not in use */
604
605 while (lkid == 0) {
David Teiglandce03f122007-04-02 12:12:55 -0500606 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
David Teiglande7fd4172006-01-18 09:30:29 +0000607
608 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
609 lkb_idtbl_list) {
610 if (tmp->lkb_id != lkid)
611 continue;
612 lkid = 0;
613 break;
614 }
615 }
616
617 lkb->lkb_id = lkid;
618 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
619 write_unlock(&ls->ls_lkbtbl[bucket].lock);
620
621 *lkb_ret = lkb;
622 return 0;
623}
624
625static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
626{
David Teiglande7fd4172006-01-18 09:30:29 +0000627 struct dlm_lkb *lkb;
David Teiglandce03f122007-04-02 12:12:55 -0500628 uint16_t bucket = (lkid >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000629
630 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
631 if (lkb->lkb_id == lkid)
632 return lkb;
633 }
634 return NULL;
635}
636
637static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
638{
639 struct dlm_lkb *lkb;
David Teiglandce03f122007-04-02 12:12:55 -0500640 uint16_t bucket = (lkid >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000641
642 if (bucket >= ls->ls_lkbtbl_size)
643 return -EBADSLT;
644
645 read_lock(&ls->ls_lkbtbl[bucket].lock);
646 lkb = __find_lkb(ls, lkid);
647 if (lkb)
648 kref_get(&lkb->lkb_ref);
649 read_unlock(&ls->ls_lkbtbl[bucket].lock);
650
651 *lkb_ret = lkb;
652 return lkb ? 0 : -ENOENT;
653}
654
655static void kill_lkb(struct kref *kref)
656{
657 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
658
659 /* All work is done after the return from kref_put() so we
660 can release the write_lock before the detach_lkb */
661
662 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
663}
664
David Teiglandb3f58d82006-02-28 11:16:37 -0500665/* __put_lkb() is used when an lkb may not have an rsb attached to
666 it so we need to provide the lockspace explicitly */
667
668static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000669{
David Teiglandce03f122007-04-02 12:12:55 -0500670 uint16_t bucket = (lkb->lkb_id >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000671
672 write_lock(&ls->ls_lkbtbl[bucket].lock);
673 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
674 list_del(&lkb->lkb_idtbl_list);
675 write_unlock(&ls->ls_lkbtbl[bucket].lock);
676
677 detach_lkb(lkb);
678
679 /* for local/process lkbs, lvbptr points to caller's lksb */
680 if (lkb->lkb_lvbptr && is_master_copy(lkb))
681 free_lvb(lkb->lkb_lvbptr);
David Teiglande7fd4172006-01-18 09:30:29 +0000682 free_lkb(lkb);
683 return 1;
684 } else {
685 write_unlock(&ls->ls_lkbtbl[bucket].lock);
686 return 0;
687 }
688}
689
690int dlm_put_lkb(struct dlm_lkb *lkb)
691{
David Teiglandb3f58d82006-02-28 11:16:37 -0500692 struct dlm_ls *ls;
693
694 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
695 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
696
697 ls = lkb->lkb_resource->res_ls;
698 return __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +0000699}
700
701/* This is only called to add a reference when the code already holds
702 a valid reference to the lkb, so there's no need for locking. */
703
704static inline void hold_lkb(struct dlm_lkb *lkb)
705{
706 kref_get(&lkb->lkb_ref);
707}
708
709/* This is called when we need to remove a reference and are certain
710 it's not the last ref. e.g. del_lkb is always called between a
711 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
712 put_lkb would work fine, but would involve unnecessary locking */
713
714static inline void unhold_lkb(struct dlm_lkb *lkb)
715{
716 int rv;
717 rv = kref_put(&lkb->lkb_ref, kill_lkb);
718 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
719}
720
721static void lkb_add_ordered(struct list_head *new, struct list_head *head,
722 int mode)
723{
724 struct dlm_lkb *lkb = NULL;
725
726 list_for_each_entry(lkb, head, lkb_statequeue)
727 if (lkb->lkb_rqmode < mode)
728 break;
729
730 if (!lkb)
731 list_add_tail(new, head);
732 else
733 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
734}
735
736/* add/remove lkb to rsb's grant/convert/wait queue */
737
738static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
739{
740 kref_get(&lkb->lkb_ref);
741
742 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
743
744 lkb->lkb_status = status;
745
746 switch (status) {
747 case DLM_LKSTS_WAITING:
748 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
749 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
750 else
751 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
752 break;
753 case DLM_LKSTS_GRANTED:
754 /* convention says granted locks kept in order of grmode */
755 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
756 lkb->lkb_grmode);
757 break;
758 case DLM_LKSTS_CONVERT:
759 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
760 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
761 else
762 list_add_tail(&lkb->lkb_statequeue,
763 &r->res_convertqueue);
764 break;
765 default:
766 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
767 }
768}
769
770static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
771{
772 lkb->lkb_status = 0;
773 list_del(&lkb->lkb_statequeue);
774 unhold_lkb(lkb);
775}
776
777static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
778{
779 hold_lkb(lkb);
780 del_lkb(r, lkb);
781 add_lkb(r, lkb, sts);
782 unhold_lkb(lkb);
783}
784
David Teiglandef0c2bb2007-03-28 09:56:46 -0500785static int msg_reply_type(int mstype)
786{
787 switch (mstype) {
788 case DLM_MSG_REQUEST:
789 return DLM_MSG_REQUEST_REPLY;
790 case DLM_MSG_CONVERT:
791 return DLM_MSG_CONVERT_REPLY;
792 case DLM_MSG_UNLOCK:
793 return DLM_MSG_UNLOCK_REPLY;
794 case DLM_MSG_CANCEL:
795 return DLM_MSG_CANCEL_REPLY;
796 case DLM_MSG_LOOKUP:
797 return DLM_MSG_LOOKUP_REPLY;
798 }
799 return -1;
800}
801
David Teiglande7fd4172006-01-18 09:30:29 +0000802/* add/remove lkb from global waiters list of lkb's waiting for
803 a reply from a remote node */
804
David Teiglandef0c2bb2007-03-28 09:56:46 -0500805static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000806{
807 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500808 int error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000809
David Teigland90135922006-01-20 08:47:07 +0000810 mutex_lock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500811
812 if (is_overlap_unlock(lkb) ||
813 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
814 error = -EINVAL;
David Teiglande7fd4172006-01-18 09:30:29 +0000815 goto out;
816 }
David Teiglandef0c2bb2007-03-28 09:56:46 -0500817
818 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
819 switch (mstype) {
820 case DLM_MSG_UNLOCK:
821 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
822 break;
823 case DLM_MSG_CANCEL:
824 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
825 break;
826 default:
827 error = -EBUSY;
828 goto out;
829 }
830 lkb->lkb_wait_count++;
831 hold_lkb(lkb);
832
833 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
834 lkb->lkb_id, lkb->lkb_wait_type, mstype,
835 lkb->lkb_wait_count, lkb->lkb_flags);
836 goto out;
837 }
838
839 DLM_ASSERT(!lkb->lkb_wait_count,
840 dlm_print_lkb(lkb);
841 printk("wait_count %d\n", lkb->lkb_wait_count););
842
843 lkb->lkb_wait_count++;
David Teiglande7fd4172006-01-18 09:30:29 +0000844 lkb->lkb_wait_type = mstype;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500845 hold_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +0000846 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
847 out:
David Teiglandef0c2bb2007-03-28 09:56:46 -0500848 if (error)
849 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
850 lkb->lkb_id, error, lkb->lkb_flags, mstype,
851 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
David Teigland90135922006-01-20 08:47:07 +0000852 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500853 return error;
David Teiglande7fd4172006-01-18 09:30:29 +0000854}
855
David Teiglandb790c3b2007-01-24 10:21:33 -0600856/* We clear the RESEND flag because we might be taking an lkb off the waiters
857 list as part of process_requestqueue (e.g. a lookup that has an optimized
858 request reply on the requestqueue) between dlm_recover_waiters_pre() which
859 set RESEND and dlm_recover_waiters_post() */
860
David Teiglandef0c2bb2007-03-28 09:56:46 -0500861static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000862{
David Teiglandef0c2bb2007-03-28 09:56:46 -0500863 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
864 int overlap_done = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000865
David Teiglandef0c2bb2007-03-28 09:56:46 -0500866 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
867 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
868 overlap_done = 1;
869 goto out_del;
David Teiglande7fd4172006-01-18 09:30:29 +0000870 }
David Teiglandef0c2bb2007-03-28 09:56:46 -0500871
872 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
873 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
874 overlap_done = 1;
875 goto out_del;
876 }
877
878 /* N.B. type of reply may not always correspond to type of original
879 msg due to lookup->request optimization, verify others? */
880
881 if (lkb->lkb_wait_type) {
882 lkb->lkb_wait_type = 0;
883 goto out_del;
884 }
885
886 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
887 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
888 return -1;
889
890 out_del:
891 /* the force-unlock/cancel has completed and we haven't recvd a reply
892 to the op that was in progress prior to the unlock/cancel; we
893 give up on any reply to the earlier op. FIXME: not sure when/how
894 this would happen */
895
896 if (overlap_done && lkb->lkb_wait_type) {
897 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
898 lkb->lkb_id, mstype, lkb->lkb_wait_type);
899 lkb->lkb_wait_count--;
900 lkb->lkb_wait_type = 0;
901 }
902
903 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
904
David Teiglandb790c3b2007-01-24 10:21:33 -0600905 lkb->lkb_flags &= ~DLM_IFL_RESEND;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500906 lkb->lkb_wait_count--;
907 if (!lkb->lkb_wait_count)
908 list_del_init(&lkb->lkb_wait_reply);
David Teiglande7fd4172006-01-18 09:30:29 +0000909 unhold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500910 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000911}
912
David Teiglandef0c2bb2007-03-28 09:56:46 -0500913static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000914{
915 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
916 int error;
917
David Teigland90135922006-01-20 08:47:07 +0000918 mutex_lock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500919 error = _remove_from_waiters(lkb, mstype);
David Teigland90135922006-01-20 08:47:07 +0000920 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000921 return error;
922}
923
David Teiglandef0c2bb2007-03-28 09:56:46 -0500924/* Handles situations where we might be processing a "fake" or "stub" reply in
925 which we can't try to take waiters_mutex again. */
926
927static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
928{
929 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
930 int error;
931
932 if (ms != &ls->ls_stub_ms)
933 mutex_lock(&ls->ls_waiters_mutex);
934 error = _remove_from_waiters(lkb, ms->m_type);
935 if (ms != &ls->ls_stub_ms)
936 mutex_unlock(&ls->ls_waiters_mutex);
937 return error;
938}
939
David Teiglande7fd4172006-01-18 09:30:29 +0000940static void dir_remove(struct dlm_rsb *r)
941{
942 int to_nodeid;
943
944 if (dlm_no_directory(r->res_ls))
945 return;
946
947 to_nodeid = dlm_dir_nodeid(r);
948 if (to_nodeid != dlm_our_nodeid())
949 send_remove(r);
950 else
951 dlm_dir_remove_entry(r->res_ls, to_nodeid,
952 r->res_name, r->res_length);
953}
954
955/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
956 found since they are in order of newest to oldest? */
957
958static int shrink_bucket(struct dlm_ls *ls, int b)
959{
960 struct dlm_rsb *r;
961 int count = 0, found;
962
963 for (;;) {
David Teigland90135922006-01-20 08:47:07 +0000964 found = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000965 write_lock(&ls->ls_rsbtbl[b].lock);
966 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
967 res_hashchain) {
968 if (!time_after_eq(jiffies, r->res_toss_time +
David Teigland68c817a2007-01-09 09:41:48 -0600969 dlm_config.ci_toss_secs * HZ))
David Teiglande7fd4172006-01-18 09:30:29 +0000970 continue;
David Teigland90135922006-01-20 08:47:07 +0000971 found = 1;
David Teiglande7fd4172006-01-18 09:30:29 +0000972 break;
973 }
974
975 if (!found) {
976 write_unlock(&ls->ls_rsbtbl[b].lock);
977 break;
978 }
979
980 if (kref_put(&r->res_ref, kill_rsb)) {
981 list_del(&r->res_hashchain);
982 write_unlock(&ls->ls_rsbtbl[b].lock);
983
984 if (is_master(r))
985 dir_remove(r);
986 free_rsb(r);
987 count++;
988 } else {
989 write_unlock(&ls->ls_rsbtbl[b].lock);
990 log_error(ls, "tossed rsb in use %s", r->res_name);
991 }
992 }
993
994 return count;
995}
996
997void dlm_scan_rsbs(struct dlm_ls *ls)
998{
999 int i;
1000
David Teiglande7fd4172006-01-18 09:30:29 +00001001 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1002 shrink_bucket(ls, i);
David Teigland85e86ed2007-05-18 08:58:15 -05001003 if (dlm_locking_stopped(ls))
1004 break;
David Teiglande7fd4172006-01-18 09:30:29 +00001005 cond_resched();
1006 }
1007}
1008
David Teigland3ae1acf2007-05-18 08:59:31 -05001009static void add_timeout(struct dlm_lkb *lkb)
1010{
1011 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1012
1013 if (is_master_copy(lkb))
1014 return;
1015
1016 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1017 goto add_it;
1018
1019 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1020 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1021 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1022 goto add_it;
1023 }
1024 return;
1025
1026 add_it:
1027 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1028 mutex_lock(&ls->ls_timeout_mutex);
1029 hold_lkb(lkb);
1030 lkb->lkb_timestamp = jiffies;
1031 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1032 mutex_unlock(&ls->ls_timeout_mutex);
1033}
1034
1035static void del_timeout(struct dlm_lkb *lkb)
1036{
1037 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1038
1039 mutex_lock(&ls->ls_timeout_mutex);
1040 if (!list_empty(&lkb->lkb_time_list)) {
1041 list_del_init(&lkb->lkb_time_list);
1042 unhold_lkb(lkb);
1043 }
1044 mutex_unlock(&ls->ls_timeout_mutex);
1045}
1046
1047/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1048 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1049 and then lock rsb because of lock ordering in add_timeout. We may need
1050 to specify some special timeout-related bits in the lkb that are just to
1051 be accessed under the timeout_mutex. */
1052
1053void dlm_scan_timeout(struct dlm_ls *ls)
1054{
1055 struct dlm_rsb *r;
1056 struct dlm_lkb *lkb;
1057 int do_cancel, do_warn;
1058
1059 for (;;) {
1060 if (dlm_locking_stopped(ls))
1061 break;
1062
1063 do_cancel = 0;
1064 do_warn = 0;
1065 mutex_lock(&ls->ls_timeout_mutex);
1066 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1067
1068 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1069 time_after_eq(jiffies, lkb->lkb_timestamp +
1070 lkb->lkb_timeout_cs * HZ/100))
1071 do_cancel = 1;
1072
1073 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1074 time_after_eq(jiffies, lkb->lkb_timestamp +
1075 dlm_config.ci_timewarn_cs * HZ/100))
1076 do_warn = 1;
1077
1078 if (!do_cancel && !do_warn)
1079 continue;
1080 hold_lkb(lkb);
1081 break;
1082 }
1083 mutex_unlock(&ls->ls_timeout_mutex);
1084
1085 if (!do_cancel && !do_warn)
1086 break;
1087
1088 r = lkb->lkb_resource;
1089 hold_rsb(r);
1090 lock_rsb(r);
1091
1092 if (do_warn) {
1093 /* clear flag so we only warn once */
1094 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1095 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1096 del_timeout(lkb);
1097 dlm_timeout_warn(lkb);
1098 }
1099
1100 if (do_cancel) {
David Teiglandd7db9232007-05-18 09:00:32 -05001101 log_debug("timeout cancel %x node %d %s", lkb->lkb_id,
1102 lkb->lkb_nodeid, r->res_name);
David Teigland3ae1acf2007-05-18 08:59:31 -05001103 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1104 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1105 del_timeout(lkb);
1106 _cancel_lock(r, lkb);
1107 }
1108
1109 unlock_rsb(r);
1110 unhold_rsb(r);
1111 dlm_put_lkb(lkb);
1112 }
1113}
1114
1115/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1116 dlm_recoverd before checking/setting ls_recover_begin. */
1117
1118void dlm_adjust_timeouts(struct dlm_ls *ls)
1119{
1120 struct dlm_lkb *lkb;
1121 long adj = jiffies - ls->ls_recover_begin;
1122
1123 ls->ls_recover_begin = 0;
1124 mutex_lock(&ls->ls_timeout_mutex);
1125 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1126 lkb->lkb_timestamp += adj;
1127 mutex_unlock(&ls->ls_timeout_mutex);
1128}
1129
David Teiglande7fd4172006-01-18 09:30:29 +00001130/* lkb is master or local copy */
1131
1132static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1133{
1134 int b, len = r->res_ls->ls_lvblen;
1135
1136 /* b=1 lvb returned to caller
1137 b=0 lvb written to rsb or invalidated
1138 b=-1 do nothing */
1139
1140 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1141
1142 if (b == 1) {
1143 if (!lkb->lkb_lvbptr)
1144 return;
1145
1146 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1147 return;
1148
1149 if (!r->res_lvbptr)
1150 return;
1151
1152 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1153 lkb->lkb_lvbseq = r->res_lvbseq;
1154
1155 } else if (b == 0) {
1156 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1157 rsb_set_flag(r, RSB_VALNOTVALID);
1158 return;
1159 }
1160
1161 if (!lkb->lkb_lvbptr)
1162 return;
1163
1164 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1165 return;
1166
1167 if (!r->res_lvbptr)
1168 r->res_lvbptr = allocate_lvb(r->res_ls);
1169
1170 if (!r->res_lvbptr)
1171 return;
1172
1173 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1174 r->res_lvbseq++;
1175 lkb->lkb_lvbseq = r->res_lvbseq;
1176 rsb_clear_flag(r, RSB_VALNOTVALID);
1177 }
1178
1179 if (rsb_flag(r, RSB_VALNOTVALID))
1180 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1181}
1182
1183static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1184{
1185 if (lkb->lkb_grmode < DLM_LOCK_PW)
1186 return;
1187
1188 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1189 rsb_set_flag(r, RSB_VALNOTVALID);
1190 return;
1191 }
1192
1193 if (!lkb->lkb_lvbptr)
1194 return;
1195
1196 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1197 return;
1198
1199 if (!r->res_lvbptr)
1200 r->res_lvbptr = allocate_lvb(r->res_ls);
1201
1202 if (!r->res_lvbptr)
1203 return;
1204
1205 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1206 r->res_lvbseq++;
1207 rsb_clear_flag(r, RSB_VALNOTVALID);
1208}
1209
1210/* lkb is process copy (pc) */
1211
1212static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1213 struct dlm_message *ms)
1214{
1215 int b;
1216
1217 if (!lkb->lkb_lvbptr)
1218 return;
1219
1220 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1221 return;
1222
David Teigland597d0ca2006-07-12 16:44:04 -05001223 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
David Teiglande7fd4172006-01-18 09:30:29 +00001224 if (b == 1) {
1225 int len = receive_extralen(ms);
1226 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1227 lkb->lkb_lvbseq = ms->m_lvbseq;
1228 }
1229}
1230
1231/* Manipulate lkb's on rsb's convert/granted/waiting queues
1232 remove_lock -- used for unlock, removes lkb from granted
1233 revert_lock -- used for cancel, moves lkb from convert to granted
1234 grant_lock -- used for request and convert, adds lkb to granted or
1235 moves lkb from convert or waiting to granted
1236
1237 Each of these is used for master or local copy lkb's. There is
1238 also a _pc() variation used to make the corresponding change on
1239 a process copy (pc) lkb. */
1240
1241static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1242{
1243 del_lkb(r, lkb);
1244 lkb->lkb_grmode = DLM_LOCK_IV;
1245 /* this unhold undoes the original ref from create_lkb()
1246 so this leads to the lkb being freed */
1247 unhold_lkb(lkb);
1248}
1249
1250static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1251{
1252 set_lvb_unlock(r, lkb);
1253 _remove_lock(r, lkb);
1254}
1255
1256static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1257{
1258 _remove_lock(r, lkb);
1259}
1260
David Teiglandef0c2bb2007-03-28 09:56:46 -05001261/* returns: 0 did nothing
1262 1 moved lock to granted
1263 -1 removed lock */
1264
1265static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00001266{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001267 int rv = 0;
1268
David Teiglande7fd4172006-01-18 09:30:29 +00001269 lkb->lkb_rqmode = DLM_LOCK_IV;
1270
1271 switch (lkb->lkb_status) {
David Teigland597d0ca2006-07-12 16:44:04 -05001272 case DLM_LKSTS_GRANTED:
1273 break;
David Teiglande7fd4172006-01-18 09:30:29 +00001274 case DLM_LKSTS_CONVERT:
1275 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001276 rv = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001277 break;
1278 case DLM_LKSTS_WAITING:
1279 del_lkb(r, lkb);
1280 lkb->lkb_grmode = DLM_LOCK_IV;
1281 /* this unhold undoes the original ref from create_lkb()
1282 so this leads to the lkb being freed */
1283 unhold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001284 rv = -1;
David Teiglande7fd4172006-01-18 09:30:29 +00001285 break;
1286 default:
1287 log_print("invalid status for revert %d", lkb->lkb_status);
1288 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05001289 return rv;
David Teiglande7fd4172006-01-18 09:30:29 +00001290}
1291
David Teiglandef0c2bb2007-03-28 09:56:46 -05001292static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00001293{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001294 return revert_lock(r, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00001295}
1296
1297static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1298{
1299 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1300 lkb->lkb_grmode = lkb->lkb_rqmode;
1301 if (lkb->lkb_status)
1302 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1303 else
1304 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1305 }
1306
1307 lkb->lkb_rqmode = DLM_LOCK_IV;
David Teiglande7fd4172006-01-18 09:30:29 +00001308}
1309
1310static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1311{
1312 set_lvb_lock(r, lkb);
1313 _grant_lock(r, lkb);
1314 lkb->lkb_highbast = 0;
1315}
1316
1317static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1318 struct dlm_message *ms)
1319{
1320 set_lvb_lock_pc(r, lkb, ms);
1321 _grant_lock(r, lkb);
1322}
1323
1324/* called by grant_pending_locks() which means an async grant message must
1325 be sent to the requesting node in addition to granting the lock if the
1326 lkb belongs to a remote node. */
1327
1328static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1329{
1330 grant_lock(r, lkb);
1331 if (is_master_copy(lkb))
1332 send_grant(r, lkb);
1333 else
1334 queue_cast(r, lkb, 0);
1335}
1336
David Teigland7d3c1fe2007-04-19 10:30:41 -05001337/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1338 change the granted/requested modes. We're munging things accordingly in
1339 the process copy.
1340 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1341 conversion deadlock
1342 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1343 compatible with other granted locks */
1344
1345static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1346{
1347 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1348 log_print("munge_demoted %x invalid reply type %d",
1349 lkb->lkb_id, ms->m_type);
1350 return;
1351 }
1352
1353 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1354 log_print("munge_demoted %x invalid modes gr %d rq %d",
1355 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1356 return;
1357 }
1358
1359 lkb->lkb_grmode = DLM_LOCK_NL;
1360}
1361
1362static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1363{
1364 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1365 ms->m_type != DLM_MSG_GRANT) {
1366 log_print("munge_altmode %x invalid reply type %d",
1367 lkb->lkb_id, ms->m_type);
1368 return;
1369 }
1370
1371 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1372 lkb->lkb_rqmode = DLM_LOCK_PR;
1373 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1374 lkb->lkb_rqmode = DLM_LOCK_CW;
1375 else {
1376 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1377 dlm_print_lkb(lkb);
1378 }
1379}
1380
David Teiglande7fd4172006-01-18 09:30:29 +00001381static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1382{
1383 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1384 lkb_statequeue);
1385 if (lkb->lkb_id == first->lkb_id)
David Teigland90135922006-01-20 08:47:07 +00001386 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001387
David Teigland90135922006-01-20 08:47:07 +00001388 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001389}
1390
David Teiglande7fd4172006-01-18 09:30:29 +00001391/* Check if the given lkb conflicts with another lkb on the queue. */
1392
1393static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1394{
1395 struct dlm_lkb *this;
1396
1397 list_for_each_entry(this, head, lkb_statequeue) {
1398 if (this == lkb)
1399 continue;
David Teigland3bcd3682006-02-23 09:56:38 +00001400 if (!modes_compat(this, lkb))
David Teigland90135922006-01-20 08:47:07 +00001401 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001402 }
David Teigland90135922006-01-20 08:47:07 +00001403 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001404}
1405
1406/*
1407 * "A conversion deadlock arises with a pair of lock requests in the converting
1408 * queue for one resource. The granted mode of each lock blocks the requested
1409 * mode of the other lock."
1410 *
1411 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1412 * convert queue from being granted, then demote lkb (set grmode to NL).
1413 * This second form requires that we check for conv-deadlk even when
1414 * now == 0 in _can_be_granted().
1415 *
1416 * Example:
1417 * Granted Queue: empty
1418 * Convert Queue: NL->EX (first lock)
1419 * PR->EX (second lock)
1420 *
1421 * The first lock can't be granted because of the granted mode of the second
1422 * lock and the second lock can't be granted because it's not first in the
1423 * list. We demote the granted mode of the second lock (the lkb passed to this
1424 * function).
1425 *
1426 * After the resolution, the "grant pending" function needs to go back and try
1427 * to grant locks on the convert queue again since the first lock can now be
1428 * granted.
1429 */
1430
1431static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1432{
1433 struct dlm_lkb *this, *first = NULL, *self = NULL;
1434
1435 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1436 if (!first)
1437 first = this;
1438 if (this == lkb) {
1439 self = lkb;
1440 continue;
1441 }
1442
David Teiglande7fd4172006-01-18 09:30:29 +00001443 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
David Teigland90135922006-01-20 08:47:07 +00001444 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001445 }
1446
1447 /* if lkb is on the convert queue and is preventing the first
1448 from being granted, then there's deadlock and we demote lkb.
1449 multiple converting locks may need to do this before the first
1450 converting lock can be granted. */
1451
1452 if (self && self != first) {
1453 if (!modes_compat(lkb, first) &&
1454 !queue_conflict(&rsb->res_grantqueue, first))
David Teigland90135922006-01-20 08:47:07 +00001455 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001456 }
1457
David Teigland90135922006-01-20 08:47:07 +00001458 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001459}
1460
1461/*
1462 * Return 1 if the lock can be granted, 0 otherwise.
1463 * Also detect and resolve conversion deadlocks.
1464 *
1465 * lkb is the lock to be granted
1466 *
1467 * now is 1 if the function is being called in the context of the
1468 * immediate request, it is 0 if called later, after the lock has been
1469 * queued.
1470 *
1471 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1472 */
1473
1474static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1475{
1476 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1477
1478 /*
1479 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1480 * a new request for a NL mode lock being blocked.
1481 *
1482 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1483 * request, then it would be granted. In essence, the use of this flag
1484 * tells the Lock Manager to expedite theis request by not considering
1485 * what may be in the CONVERTING or WAITING queues... As of this
1486 * writing, the EXPEDITE flag can be used only with new requests for NL
1487 * mode locks. This flag is not valid for conversion requests.
1488 *
1489 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1490 * conversion or used with a non-NL requested mode. We also know an
1491 * EXPEDITE request is always granted immediately, so now must always
1492 * be 1. The full condition to grant an expedite request: (now &&
1493 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1494 * therefore be shortened to just checking the flag.
1495 */
1496
1497 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
David Teigland90135922006-01-20 08:47:07 +00001498 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001499
1500 /*
1501 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1502 * added to the remaining conditions.
1503 */
1504
1505 if (queue_conflict(&r->res_grantqueue, lkb))
1506 goto out;
1507
1508 /*
1509 * 6-3: By default, a conversion request is immediately granted if the
1510 * requested mode is compatible with the modes of all other granted
1511 * locks
1512 */
1513
1514 if (queue_conflict(&r->res_convertqueue, lkb))
1515 goto out;
1516
1517 /*
1518 * 6-5: But the default algorithm for deciding whether to grant or
1519 * queue conversion requests does not by itself guarantee that such
1520 * requests are serviced on a "first come first serve" basis. This, in
1521 * turn, can lead to a phenomenon known as "indefinate postponement".
1522 *
1523 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1524 * the system service employed to request a lock conversion. This flag
1525 * forces certain conversion requests to be queued, even if they are
1526 * compatible with the granted modes of other locks on the same
1527 * resource. Thus, the use of this flag results in conversion requests
1528 * being ordered on a "first come first servce" basis.
1529 *
1530 * DCT: This condition is all about new conversions being able to occur
1531 * "in place" while the lock remains on the granted queue (assuming
1532 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1533 * doesn't _have_ to go onto the convert queue where it's processed in
1534 * order. The "now" variable is necessary to distinguish converts
1535 * being received and processed for the first time now, because once a
1536 * convert is moved to the conversion queue the condition below applies
1537 * requiring fifo granting.
1538 */
1539
1540 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
David Teigland90135922006-01-20 08:47:07 +00001541 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001542
1543 /*
David Teigland3bcd3682006-02-23 09:56:38 +00001544 * The NOORDER flag is set to avoid the standard vms rules on grant
1545 * order.
David Teiglande7fd4172006-01-18 09:30:29 +00001546 */
1547
1548 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
David Teigland90135922006-01-20 08:47:07 +00001549 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001550
1551 /*
1552 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1553 * granted until all other conversion requests ahead of it are granted
1554 * and/or canceled.
1555 */
1556
1557 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
David Teigland90135922006-01-20 08:47:07 +00001558 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001559
1560 /*
1561 * 6-4: By default, a new request is immediately granted only if all
1562 * three of the following conditions are satisfied when the request is
1563 * issued:
1564 * - The queue of ungranted conversion requests for the resource is
1565 * empty.
1566 * - The queue of ungranted new requests for the resource is empty.
1567 * - The mode of the new request is compatible with the most
1568 * restrictive mode of all granted locks on the resource.
1569 */
1570
1571 if (now && !conv && list_empty(&r->res_convertqueue) &&
1572 list_empty(&r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001573 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001574
1575 /*
1576 * 6-4: Once a lock request is in the queue of ungranted new requests,
1577 * it cannot be granted until the queue of ungranted conversion
1578 * requests is empty, all ungranted new requests ahead of it are
1579 * granted and/or canceled, and it is compatible with the granted mode
1580 * of the most restrictive lock granted on the resource.
1581 */
1582
1583 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1584 first_in_list(lkb, &r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001585 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001586
1587 out:
1588 /*
1589 * The following, enabled by CONVDEADLK, departs from VMS.
1590 */
1591
1592 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1593 conversion_deadlock_detect(r, lkb)) {
1594 lkb->lkb_grmode = DLM_LOCK_NL;
1595 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1596 }
1597
David Teigland90135922006-01-20 08:47:07 +00001598 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001599}
1600
1601/*
1602 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1603 * simple way to provide a big optimization to applications that can use them.
1604 */
1605
1606static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1607{
1608 uint32_t flags = lkb->lkb_exflags;
1609 int rv;
1610 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1611
1612 rv = _can_be_granted(r, lkb, now);
1613 if (rv)
1614 goto out;
1615
1616 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1617 goto out;
1618
1619 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1620 alt = DLM_LOCK_PR;
1621 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1622 alt = DLM_LOCK_CW;
1623
1624 if (alt) {
1625 lkb->lkb_rqmode = alt;
1626 rv = _can_be_granted(r, lkb, now);
1627 if (rv)
1628 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1629 else
1630 lkb->lkb_rqmode = rqmode;
1631 }
1632 out:
1633 return rv;
1634}
1635
1636static int grant_pending_convert(struct dlm_rsb *r, int high)
1637{
1638 struct dlm_lkb *lkb, *s;
1639 int hi, demoted, quit, grant_restart, demote_restart;
1640
1641 quit = 0;
1642 restart:
1643 grant_restart = 0;
1644 demote_restart = 0;
1645 hi = DLM_LOCK_IV;
1646
1647 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1648 demoted = is_demoted(lkb);
David Teigland90135922006-01-20 08:47:07 +00001649 if (can_be_granted(r, lkb, 0)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001650 grant_lock_pending(r, lkb);
1651 grant_restart = 1;
1652 } else {
1653 hi = max_t(int, lkb->lkb_rqmode, hi);
1654 if (!demoted && is_demoted(lkb))
1655 demote_restart = 1;
1656 }
1657 }
1658
1659 if (grant_restart)
1660 goto restart;
1661 if (demote_restart && !quit) {
1662 quit = 1;
1663 goto restart;
1664 }
1665
1666 return max_t(int, high, hi);
1667}
1668
1669static int grant_pending_wait(struct dlm_rsb *r, int high)
1670{
1671 struct dlm_lkb *lkb, *s;
1672
1673 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
David Teigland90135922006-01-20 08:47:07 +00001674 if (can_be_granted(r, lkb, 0))
David Teiglande7fd4172006-01-18 09:30:29 +00001675 grant_lock_pending(r, lkb);
1676 else
1677 high = max_t(int, lkb->lkb_rqmode, high);
1678 }
1679
1680 return high;
1681}
1682
1683static void grant_pending_locks(struct dlm_rsb *r)
1684{
1685 struct dlm_lkb *lkb, *s;
1686 int high = DLM_LOCK_IV;
1687
David Teiglanda345da32006-08-18 11:54:25 -05001688 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +00001689
1690 high = grant_pending_convert(r, high);
1691 high = grant_pending_wait(r, high);
1692
1693 if (high == DLM_LOCK_IV)
1694 return;
1695
1696 /*
1697 * If there are locks left on the wait/convert queue then send blocking
1698 * ASTs to granted locks based on the largest requested mode (high)
David Teigland3bcd3682006-02-23 09:56:38 +00001699 * found above. FIXME: highbast < high comparison not valid for PR/CW.
David Teiglande7fd4172006-01-18 09:30:29 +00001700 */
1701
1702 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1703 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1704 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1705 queue_bast(r, lkb, high);
1706 lkb->lkb_highbast = high;
1707 }
1708 }
1709}
1710
1711static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1712 struct dlm_lkb *lkb)
1713{
1714 struct dlm_lkb *gr;
1715
1716 list_for_each_entry(gr, head, lkb_statequeue) {
1717 if (gr->lkb_bastaddr &&
1718 gr->lkb_highbast < lkb->lkb_rqmode &&
David Teigland3bcd3682006-02-23 09:56:38 +00001719 !modes_compat(gr, lkb)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001720 queue_bast(r, gr, lkb->lkb_rqmode);
1721 gr->lkb_highbast = lkb->lkb_rqmode;
1722 }
1723 }
1724}
1725
1726static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1727{
1728 send_bast_queue(r, &r->res_grantqueue, lkb);
1729}
1730
1731static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1732{
1733 send_bast_queue(r, &r->res_grantqueue, lkb);
1734 send_bast_queue(r, &r->res_convertqueue, lkb);
1735}
1736
1737/* set_master(r, lkb) -- set the master nodeid of a resource
1738
1739 The purpose of this function is to set the nodeid field in the given
1740 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1741 known, it can just be copied to the lkb and the function will return
1742 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1743 before it can be copied to the lkb.
1744
1745 When the rsb nodeid is being looked up remotely, the initial lkb
1746 causing the lookup is kept on the ls_waiters list waiting for the
1747 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1748 on the rsb's res_lookup list until the master is verified.
1749
1750 Return values:
1751 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1752 1: the rsb master is not available and the lkb has been placed on
1753 a wait queue
1754*/
1755
1756static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1757{
1758 struct dlm_ls *ls = r->res_ls;
1759 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1760
1761 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1762 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1763 r->res_first_lkid = lkb->lkb_id;
1764 lkb->lkb_nodeid = r->res_nodeid;
1765 return 0;
1766 }
1767
1768 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1769 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1770 return 1;
1771 }
1772
1773 if (r->res_nodeid == 0) {
1774 lkb->lkb_nodeid = 0;
1775 return 0;
1776 }
1777
1778 if (r->res_nodeid > 0) {
1779 lkb->lkb_nodeid = r->res_nodeid;
1780 return 0;
1781 }
1782
David Teiglanda345da32006-08-18 11:54:25 -05001783 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +00001784
1785 dir_nodeid = dlm_dir_nodeid(r);
1786
1787 if (dir_nodeid != our_nodeid) {
1788 r->res_first_lkid = lkb->lkb_id;
1789 send_lookup(r, lkb);
1790 return 1;
1791 }
1792
1793 for (;;) {
1794 /* It's possible for dlm_scand to remove an old rsb for
1795 this same resource from the toss list, us to create
1796 a new one, look up the master locally, and find it
1797 already exists just before dlm_scand does the
1798 dir_remove() on the previous rsb. */
1799
1800 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1801 r->res_length, &ret_nodeid);
1802 if (!error)
1803 break;
1804 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1805 schedule();
1806 }
1807
1808 if (ret_nodeid == our_nodeid) {
1809 r->res_first_lkid = 0;
1810 r->res_nodeid = 0;
1811 lkb->lkb_nodeid = 0;
1812 } else {
1813 r->res_first_lkid = lkb->lkb_id;
1814 r->res_nodeid = ret_nodeid;
1815 lkb->lkb_nodeid = ret_nodeid;
1816 }
1817 return 0;
1818}
1819
1820static void process_lookup_list(struct dlm_rsb *r)
1821{
1822 struct dlm_lkb *lkb, *safe;
1823
1824 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05001825 list_del_init(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +00001826 _request_lock(r, lkb);
1827 schedule();
1828 }
1829}
1830
1831/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1832
1833static void confirm_master(struct dlm_rsb *r, int error)
1834{
1835 struct dlm_lkb *lkb;
1836
1837 if (!r->res_first_lkid)
1838 return;
1839
1840 switch (error) {
1841 case 0:
1842 case -EINPROGRESS:
1843 r->res_first_lkid = 0;
1844 process_lookup_list(r);
1845 break;
1846
1847 case -EAGAIN:
1848 /* the remote master didn't queue our NOQUEUE request;
1849 make a waiting lkb the first_lkid */
1850
1851 r->res_first_lkid = 0;
1852
1853 if (!list_empty(&r->res_lookup)) {
1854 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1855 lkb_rsb_lookup);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001856 list_del_init(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +00001857 r->res_first_lkid = lkb->lkb_id;
1858 _request_lock(r, lkb);
1859 } else
1860 r->res_nodeid = -1;
1861 break;
1862
1863 default:
1864 log_error(r->res_ls, "confirm_master unknown error %d", error);
1865 }
1866}
1867
1868static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
David Teiglandd7db9232007-05-18 09:00:32 -05001869 int namelen, unsigned long timeout_cs, void *ast,
David Teigland3bcd3682006-02-23 09:56:38 +00001870 void *astarg, void *bast, struct dlm_args *args)
David Teiglande7fd4172006-01-18 09:30:29 +00001871{
1872 int rv = -EINVAL;
1873
1874 /* check for invalid arg usage */
1875
1876 if (mode < 0 || mode > DLM_LOCK_EX)
1877 goto out;
1878
1879 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1880 goto out;
1881
1882 if (flags & DLM_LKF_CANCEL)
1883 goto out;
1884
1885 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1886 goto out;
1887
1888 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1889 goto out;
1890
1891 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1892 goto out;
1893
1894 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1895 goto out;
1896
1897 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1898 goto out;
1899
1900 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1901 goto out;
1902
1903 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1904 goto out;
1905
1906 if (!ast || !lksb)
1907 goto out;
1908
1909 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1910 goto out;
1911
David Teiglande7fd4172006-01-18 09:30:29 +00001912 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1913 goto out;
1914
1915 /* these args will be copied to the lkb in validate_lock_args,
1916 it cannot be done now because when converting locks, fields in
1917 an active lkb cannot be modified before locking the rsb */
1918
1919 args->flags = flags;
1920 args->astaddr = ast;
1921 args->astparam = (long) astarg;
1922 args->bastaddr = bast;
David Teiglandd7db9232007-05-18 09:00:32 -05001923 args->timeout = timeout_cs;
David Teiglande7fd4172006-01-18 09:30:29 +00001924 args->mode = mode;
1925 args->lksb = lksb;
David Teiglande7fd4172006-01-18 09:30:29 +00001926 rv = 0;
1927 out:
1928 return rv;
1929}
1930
1931static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1932{
1933 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1934 DLM_LKF_FORCEUNLOCK))
1935 return -EINVAL;
1936
David Teiglandef0c2bb2007-03-28 09:56:46 -05001937 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1938 return -EINVAL;
1939
David Teiglande7fd4172006-01-18 09:30:29 +00001940 args->flags = flags;
1941 args->astparam = (long) astarg;
1942 return 0;
1943}
1944
1945static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1946 struct dlm_args *args)
1947{
1948 int rv = -EINVAL;
1949
1950 if (args->flags & DLM_LKF_CONVERT) {
1951 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1952 goto out;
1953
1954 if (args->flags & DLM_LKF_QUECVT &&
1955 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1956 goto out;
1957
1958 rv = -EBUSY;
1959 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1960 goto out;
1961
1962 if (lkb->lkb_wait_type)
1963 goto out;
David Teiglandef0c2bb2007-03-28 09:56:46 -05001964
1965 if (is_overlap(lkb))
1966 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00001967 }
1968
1969 lkb->lkb_exflags = args->flags;
1970 lkb->lkb_sbflags = 0;
1971 lkb->lkb_astaddr = args->astaddr;
1972 lkb->lkb_astparam = args->astparam;
1973 lkb->lkb_bastaddr = args->bastaddr;
1974 lkb->lkb_rqmode = args->mode;
1975 lkb->lkb_lksb = args->lksb;
1976 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1977 lkb->lkb_ownpid = (int) current->pid;
David Teiglandd7db9232007-05-18 09:00:32 -05001978 lkb->lkb_timeout_cs = args->timeout;
David Teiglande7fd4172006-01-18 09:30:29 +00001979 rv = 0;
1980 out:
1981 return rv;
1982}
1983
David Teiglandef0c2bb2007-03-28 09:56:46 -05001984/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1985 for success */
1986
1987/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1988 because there may be a lookup in progress and it's valid to do
1989 cancel/unlockf on it */
1990
David Teiglande7fd4172006-01-18 09:30:29 +00001991static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1992{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001993 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
David Teiglande7fd4172006-01-18 09:30:29 +00001994 int rv = -EINVAL;
1995
David Teiglandef0c2bb2007-03-28 09:56:46 -05001996 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1997 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1998 dlm_print_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00001999 goto out;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002000 }
David Teiglande7fd4172006-01-18 09:30:29 +00002001
David Teiglandef0c2bb2007-03-28 09:56:46 -05002002 /* an lkb may still exist even though the lock is EOL'ed due to a
2003 cancel, unlock or failed noqueue request; an app can't use these
2004 locks; return same error as if the lkid had not been found at all */
2005
2006 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2007 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2008 rv = -ENOENT;
2009 goto out;
2010 }
2011
2012 /* an lkb may be waiting for an rsb lookup to complete where the
2013 lookup was initiated by another lock */
2014
2015 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2016 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2017 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2018 list_del_init(&lkb->lkb_rsb_lookup);
2019 queue_cast(lkb->lkb_resource, lkb,
2020 args->flags & DLM_LKF_CANCEL ?
2021 -DLM_ECANCEL : -DLM_EUNLOCK);
2022 unhold_lkb(lkb); /* undoes create_lkb() */
2023 rv = -EBUSY;
2024 goto out;
2025 }
2026 }
2027
2028 /* cancel not allowed with another cancel/unlock in progress */
2029
2030 if (args->flags & DLM_LKF_CANCEL) {
2031 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2032 goto out;
2033
2034 if (is_overlap(lkb))
2035 goto out;
2036
David Teigland3ae1acf2007-05-18 08:59:31 -05002037 /* don't let scand try to do a cancel */
2038 del_timeout(lkb);
2039
David Teiglandef0c2bb2007-03-28 09:56:46 -05002040 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2041 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2042 rv = -EBUSY;
2043 goto out;
2044 }
2045
2046 switch (lkb->lkb_wait_type) {
2047 case DLM_MSG_LOOKUP:
2048 case DLM_MSG_REQUEST:
2049 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2050 rv = -EBUSY;
2051 goto out;
2052 case DLM_MSG_UNLOCK:
2053 case DLM_MSG_CANCEL:
2054 goto out;
2055 }
2056 /* add_to_waiters() will set OVERLAP_CANCEL */
David Teiglande7fd4172006-01-18 09:30:29 +00002057 goto out_ok;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002058 }
David Teiglande7fd4172006-01-18 09:30:29 +00002059
David Teiglandef0c2bb2007-03-28 09:56:46 -05002060 /* do we need to allow a force-unlock if there's a normal unlock
2061 already in progress? in what conditions could the normal unlock
2062 fail such that we'd want to send a force-unlock to be sure? */
David Teiglande7fd4172006-01-18 09:30:29 +00002063
David Teiglandef0c2bb2007-03-28 09:56:46 -05002064 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2065 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2066 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00002067
David Teiglandef0c2bb2007-03-28 09:56:46 -05002068 if (is_overlap_unlock(lkb))
2069 goto out;
2070
David Teigland3ae1acf2007-05-18 08:59:31 -05002071 /* don't let scand try to do a cancel */
2072 del_timeout(lkb);
2073
David Teiglandef0c2bb2007-03-28 09:56:46 -05002074 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2075 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2076 rv = -EBUSY;
2077 goto out;
2078 }
2079
2080 switch (lkb->lkb_wait_type) {
2081 case DLM_MSG_LOOKUP:
2082 case DLM_MSG_REQUEST:
2083 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2084 rv = -EBUSY;
2085 goto out;
2086 case DLM_MSG_UNLOCK:
2087 goto out;
2088 }
2089 /* add_to_waiters() will set OVERLAP_UNLOCK */
2090 goto out_ok;
2091 }
2092
2093 /* normal unlock not allowed if there's any op in progress */
David Teiglande7fd4172006-01-18 09:30:29 +00002094 rv = -EBUSY;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002095 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
David Teiglande7fd4172006-01-18 09:30:29 +00002096 goto out;
2097
2098 out_ok:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002099 /* an overlapping op shouldn't blow away exflags from other op */
2100 lkb->lkb_exflags |= args->flags;
David Teiglande7fd4172006-01-18 09:30:29 +00002101 lkb->lkb_sbflags = 0;
2102 lkb->lkb_astparam = args->astparam;
David Teiglande7fd4172006-01-18 09:30:29 +00002103 rv = 0;
2104 out:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002105 if (rv)
2106 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2107 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2108 args->flags, lkb->lkb_wait_type,
2109 lkb->lkb_resource->res_name);
David Teiglande7fd4172006-01-18 09:30:29 +00002110 return rv;
2111}
2112
2113/*
2114 * Four stage 4 varieties:
2115 * do_request(), do_convert(), do_unlock(), do_cancel()
2116 * These are called on the master node for the given lock and
2117 * from the central locking logic.
2118 */
2119
2120static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2121{
2122 int error = 0;
2123
David Teigland90135922006-01-20 08:47:07 +00002124 if (can_be_granted(r, lkb, 1)) {
David Teiglande7fd4172006-01-18 09:30:29 +00002125 grant_lock(r, lkb);
2126 queue_cast(r, lkb, 0);
2127 goto out;
2128 }
2129
2130 if (can_be_queued(lkb)) {
2131 error = -EINPROGRESS;
2132 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2133 send_blocking_asts(r, lkb);
David Teigland3ae1acf2007-05-18 08:59:31 -05002134 add_timeout(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002135 goto out;
2136 }
2137
2138 error = -EAGAIN;
2139 if (force_blocking_asts(lkb))
2140 send_blocking_asts_all(r, lkb);
2141 queue_cast(r, lkb, -EAGAIN);
2142
2143 out:
2144 return error;
2145}
2146
2147static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2148{
2149 int error = 0;
2150
2151 /* changing an existing lock may allow others to be granted */
2152
David Teigland90135922006-01-20 08:47:07 +00002153 if (can_be_granted(r, lkb, 1)) {
David Teiglande7fd4172006-01-18 09:30:29 +00002154 grant_lock(r, lkb);
2155 queue_cast(r, lkb, 0);
2156 grant_pending_locks(r);
2157 goto out;
2158 }
2159
David Teigland7d3c1fe2007-04-19 10:30:41 -05002160 /* is_demoted() means the can_be_granted() above set the grmode
2161 to NL, and left us on the granted queue. This auto-demotion
2162 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2163 now grantable. We have to try to grant other converting locks
2164 before we try again to grant this one. */
2165
2166 if (is_demoted(lkb)) {
2167 grant_pending_convert(r, DLM_LOCK_IV);
2168 if (_can_be_granted(r, lkb, 1)) {
2169 grant_lock(r, lkb);
2170 queue_cast(r, lkb, 0);
David Teiglande7fd4172006-01-18 09:30:29 +00002171 grant_pending_locks(r);
David Teigland7d3c1fe2007-04-19 10:30:41 -05002172 goto out;
2173 }
2174 /* else fall through and move to convert queue */
2175 }
2176
2177 if (can_be_queued(lkb)) {
David Teiglande7fd4172006-01-18 09:30:29 +00002178 error = -EINPROGRESS;
2179 del_lkb(r, lkb);
2180 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2181 send_blocking_asts(r, lkb);
David Teigland3ae1acf2007-05-18 08:59:31 -05002182 add_timeout(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002183 goto out;
2184 }
2185
2186 error = -EAGAIN;
2187 if (force_blocking_asts(lkb))
2188 send_blocking_asts_all(r, lkb);
2189 queue_cast(r, lkb, -EAGAIN);
2190
2191 out:
2192 return error;
2193}
2194
2195static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2196{
2197 remove_lock(r, lkb);
2198 queue_cast(r, lkb, -DLM_EUNLOCK);
2199 grant_pending_locks(r);
2200 return -DLM_EUNLOCK;
2201}
2202
David Teiglandef0c2bb2007-03-28 09:56:46 -05002203/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
Steven Whitehouse907b9bc2006-09-25 09:26:04 -04002204
David Teiglande7fd4172006-01-18 09:30:29 +00002205static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2206{
David Teiglandef0c2bb2007-03-28 09:56:46 -05002207 int error;
2208
2209 error = revert_lock(r, lkb);
2210 if (error) {
2211 queue_cast(r, lkb, -DLM_ECANCEL);
2212 grant_pending_locks(r);
2213 return -DLM_ECANCEL;
2214 }
2215 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002216}
2217
2218/*
2219 * Four stage 3 varieties:
2220 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2221 */
2222
2223/* add a new lkb to a possibly new rsb, called by requesting process */
2224
2225static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2226{
2227 int error;
2228
2229 /* set_master: sets lkb nodeid from r */
2230
2231 error = set_master(r, lkb);
2232 if (error < 0)
2233 goto out;
2234 if (error) {
2235 error = 0;
2236 goto out;
2237 }
2238
2239 if (is_remote(r))
2240 /* receive_request() calls do_request() on remote node */
2241 error = send_request(r, lkb);
2242 else
2243 error = do_request(r, lkb);
2244 out:
2245 return error;
2246}
2247
David Teigland3bcd3682006-02-23 09:56:38 +00002248/* change some property of an existing lkb, e.g. mode */
David Teiglande7fd4172006-01-18 09:30:29 +00002249
2250static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2251{
2252 int error;
2253
2254 if (is_remote(r))
2255 /* receive_convert() calls do_convert() on remote node */
2256 error = send_convert(r, lkb);
2257 else
2258 error = do_convert(r, lkb);
2259
2260 return error;
2261}
2262
2263/* remove an existing lkb from the granted queue */
2264
2265static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2266{
2267 int error;
2268
2269 if (is_remote(r))
2270 /* receive_unlock() calls do_unlock() on remote node */
2271 error = send_unlock(r, lkb);
2272 else
2273 error = do_unlock(r, lkb);
2274
2275 return error;
2276}
2277
2278/* remove an existing lkb from the convert or wait queue */
2279
2280static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2281{
2282 int error;
2283
2284 if (is_remote(r))
2285 /* receive_cancel() calls do_cancel() on remote node */
2286 error = send_cancel(r, lkb);
2287 else
2288 error = do_cancel(r, lkb);
2289
2290 return error;
2291}
2292
2293/*
2294 * Four stage 2 varieties:
2295 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2296 */
2297
2298static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2299 int len, struct dlm_args *args)
2300{
2301 struct dlm_rsb *r;
2302 int error;
2303
2304 error = validate_lock_args(ls, lkb, args);
2305 if (error)
2306 goto out;
2307
2308 error = find_rsb(ls, name, len, R_CREATE, &r);
2309 if (error)
2310 goto out;
2311
2312 lock_rsb(r);
2313
2314 attach_lkb(r, lkb);
2315 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2316
2317 error = _request_lock(r, lkb);
2318
2319 unlock_rsb(r);
2320 put_rsb(r);
2321
2322 out:
2323 return error;
2324}
2325
2326static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2327 struct dlm_args *args)
2328{
2329 struct dlm_rsb *r;
2330 int error;
2331
2332 r = lkb->lkb_resource;
2333
2334 hold_rsb(r);
2335 lock_rsb(r);
2336
2337 error = validate_lock_args(ls, lkb, args);
2338 if (error)
2339 goto out;
2340
2341 error = _convert_lock(r, lkb);
2342 out:
2343 unlock_rsb(r);
2344 put_rsb(r);
2345 return error;
2346}
2347
2348static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2349 struct dlm_args *args)
2350{
2351 struct dlm_rsb *r;
2352 int error;
2353
2354 r = lkb->lkb_resource;
2355
2356 hold_rsb(r);
2357 lock_rsb(r);
2358
2359 error = validate_unlock_args(lkb, args);
2360 if (error)
2361 goto out;
2362
2363 error = _unlock_lock(r, lkb);
2364 out:
2365 unlock_rsb(r);
2366 put_rsb(r);
2367 return error;
2368}
2369
2370static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2371 struct dlm_args *args)
2372{
2373 struct dlm_rsb *r;
2374 int error;
2375
2376 r = lkb->lkb_resource;
2377
2378 hold_rsb(r);
2379 lock_rsb(r);
2380
2381 error = validate_unlock_args(lkb, args);
2382 if (error)
2383 goto out;
2384
2385 error = _cancel_lock(r, lkb);
2386 out:
2387 unlock_rsb(r);
2388 put_rsb(r);
2389 return error;
2390}
2391
2392/*
2393 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2394 */
2395
2396int dlm_lock(dlm_lockspace_t *lockspace,
2397 int mode,
2398 struct dlm_lksb *lksb,
2399 uint32_t flags,
2400 void *name,
2401 unsigned int namelen,
2402 uint32_t parent_lkid,
2403 void (*ast) (void *astarg),
2404 void *astarg,
David Teigland3bcd3682006-02-23 09:56:38 +00002405 void (*bast) (void *astarg, int mode))
David Teiglande7fd4172006-01-18 09:30:29 +00002406{
2407 struct dlm_ls *ls;
2408 struct dlm_lkb *lkb;
2409 struct dlm_args args;
2410 int error, convert = flags & DLM_LKF_CONVERT;
2411
2412 ls = dlm_find_lockspace_local(lockspace);
2413 if (!ls)
2414 return -EINVAL;
2415
David Teigland85e86ed2007-05-18 08:58:15 -05002416 dlm_lock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002417
2418 if (convert)
2419 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2420 else
2421 error = create_lkb(ls, &lkb);
2422
2423 if (error)
2424 goto out;
2425
David Teiglandd7db9232007-05-18 09:00:32 -05002426 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
David Teigland3bcd3682006-02-23 09:56:38 +00002427 astarg, bast, &args);
David Teiglande7fd4172006-01-18 09:30:29 +00002428 if (error)
2429 goto out_put;
2430
2431 if (convert)
2432 error = convert_lock(ls, lkb, &args);
2433 else
2434 error = request_lock(ls, lkb, name, namelen, &args);
2435
2436 if (error == -EINPROGRESS)
2437 error = 0;
2438 out_put:
2439 if (convert || error)
David Teiglandb3f58d82006-02-28 11:16:37 -05002440 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002441 if (error == -EAGAIN)
2442 error = 0;
2443 out:
David Teigland85e86ed2007-05-18 08:58:15 -05002444 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002445 dlm_put_lockspace(ls);
2446 return error;
2447}
2448
2449int dlm_unlock(dlm_lockspace_t *lockspace,
2450 uint32_t lkid,
2451 uint32_t flags,
2452 struct dlm_lksb *lksb,
2453 void *astarg)
2454{
2455 struct dlm_ls *ls;
2456 struct dlm_lkb *lkb;
2457 struct dlm_args args;
2458 int error;
2459
2460 ls = dlm_find_lockspace_local(lockspace);
2461 if (!ls)
2462 return -EINVAL;
2463
David Teigland85e86ed2007-05-18 08:58:15 -05002464 dlm_lock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002465
2466 error = find_lkb(ls, lkid, &lkb);
2467 if (error)
2468 goto out;
2469
2470 error = set_unlock_args(flags, astarg, &args);
2471 if (error)
2472 goto out_put;
2473
2474 if (flags & DLM_LKF_CANCEL)
2475 error = cancel_lock(ls, lkb, &args);
2476 else
2477 error = unlock_lock(ls, lkb, &args);
2478
2479 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2480 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002481 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2482 error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002483 out_put:
David Teiglandb3f58d82006-02-28 11:16:37 -05002484 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002485 out:
David Teigland85e86ed2007-05-18 08:58:15 -05002486 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002487 dlm_put_lockspace(ls);
2488 return error;
2489}
2490
2491/*
2492 * send/receive routines for remote operations and replies
2493 *
2494 * send_args
2495 * send_common
2496 * send_request receive_request
2497 * send_convert receive_convert
2498 * send_unlock receive_unlock
2499 * send_cancel receive_cancel
2500 * send_grant receive_grant
2501 * send_bast receive_bast
2502 * send_lookup receive_lookup
2503 * send_remove receive_remove
2504 *
2505 * send_common_reply
2506 * receive_request_reply send_request_reply
2507 * receive_convert_reply send_convert_reply
2508 * receive_unlock_reply send_unlock_reply
2509 * receive_cancel_reply send_cancel_reply
2510 * receive_lookup_reply send_lookup_reply
2511 */
2512
David Teigland7e4dac32007-04-02 09:06:41 -05002513static int _create_message(struct dlm_ls *ls, int mb_len,
2514 int to_nodeid, int mstype,
2515 struct dlm_message **ms_ret,
2516 struct dlm_mhandle **mh_ret)
2517{
2518 struct dlm_message *ms;
2519 struct dlm_mhandle *mh;
2520 char *mb;
2521
2522 /* get_buffer gives us a message handle (mh) that we need to
2523 pass into lowcomms_commit and a message buffer (mb) that we
2524 write our data into */
2525
2526 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2527 if (!mh)
2528 return -ENOBUFS;
2529
2530 memset(mb, 0, mb_len);
2531
2532 ms = (struct dlm_message *) mb;
2533
2534 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2535 ms->m_header.h_lockspace = ls->ls_global_id;
2536 ms->m_header.h_nodeid = dlm_our_nodeid();
2537 ms->m_header.h_length = mb_len;
2538 ms->m_header.h_cmd = DLM_MSG;
2539
2540 ms->m_type = mstype;
2541
2542 *mh_ret = mh;
2543 *ms_ret = ms;
2544 return 0;
2545}
2546
David Teiglande7fd4172006-01-18 09:30:29 +00002547static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2548 int to_nodeid, int mstype,
2549 struct dlm_message **ms_ret,
2550 struct dlm_mhandle **mh_ret)
2551{
David Teiglande7fd4172006-01-18 09:30:29 +00002552 int mb_len = sizeof(struct dlm_message);
2553
2554 switch (mstype) {
2555 case DLM_MSG_REQUEST:
2556 case DLM_MSG_LOOKUP:
2557 case DLM_MSG_REMOVE:
2558 mb_len += r->res_length;
2559 break;
2560 case DLM_MSG_CONVERT:
2561 case DLM_MSG_UNLOCK:
2562 case DLM_MSG_REQUEST_REPLY:
2563 case DLM_MSG_CONVERT_REPLY:
2564 case DLM_MSG_GRANT:
2565 if (lkb && lkb->lkb_lvbptr)
2566 mb_len += r->res_ls->ls_lvblen;
2567 break;
2568 }
2569
David Teigland7e4dac32007-04-02 09:06:41 -05002570 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2571 ms_ret, mh_ret);
David Teiglande7fd4172006-01-18 09:30:29 +00002572}
2573
2574/* further lowcomms enhancements or alternate implementations may make
2575 the return value from this function useful at some point */
2576
2577static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2578{
2579 dlm_message_out(ms);
2580 dlm_lowcomms_commit_buffer(mh);
2581 return 0;
2582}
2583
2584static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2585 struct dlm_message *ms)
2586{
2587 ms->m_nodeid = lkb->lkb_nodeid;
2588 ms->m_pid = lkb->lkb_ownpid;
2589 ms->m_lkid = lkb->lkb_id;
2590 ms->m_remid = lkb->lkb_remid;
2591 ms->m_exflags = lkb->lkb_exflags;
2592 ms->m_sbflags = lkb->lkb_sbflags;
2593 ms->m_flags = lkb->lkb_flags;
2594 ms->m_lvbseq = lkb->lkb_lvbseq;
2595 ms->m_status = lkb->lkb_status;
2596 ms->m_grmode = lkb->lkb_grmode;
2597 ms->m_rqmode = lkb->lkb_rqmode;
2598 ms->m_hash = r->res_hash;
2599
2600 /* m_result and m_bastmode are set from function args,
2601 not from lkb fields */
2602
2603 if (lkb->lkb_bastaddr)
2604 ms->m_asts |= AST_BAST;
2605 if (lkb->lkb_astaddr)
2606 ms->m_asts |= AST_COMP;
2607
David Teiglandda49f362006-12-13 10:38:45 -06002608 /* compare with switch in create_message; send_remove() doesn't
2609 use send_args() */
2610
2611 switch (ms->m_type) {
2612 case DLM_MSG_REQUEST:
2613 case DLM_MSG_LOOKUP:
David Teiglande7fd4172006-01-18 09:30:29 +00002614 memcpy(ms->m_extra, r->res_name, r->res_length);
David Teiglandda49f362006-12-13 10:38:45 -06002615 break;
2616 case DLM_MSG_CONVERT:
2617 case DLM_MSG_UNLOCK:
2618 case DLM_MSG_REQUEST_REPLY:
2619 case DLM_MSG_CONVERT_REPLY:
2620 case DLM_MSG_GRANT:
2621 if (!lkb->lkb_lvbptr)
2622 break;
David Teiglande7fd4172006-01-18 09:30:29 +00002623 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
David Teiglandda49f362006-12-13 10:38:45 -06002624 break;
2625 }
David Teiglande7fd4172006-01-18 09:30:29 +00002626}
2627
2628static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2629{
2630 struct dlm_message *ms;
2631 struct dlm_mhandle *mh;
2632 int to_nodeid, error;
2633
David Teiglandef0c2bb2007-03-28 09:56:46 -05002634 error = add_to_waiters(lkb, mstype);
2635 if (error)
2636 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00002637
2638 to_nodeid = r->res_nodeid;
2639
2640 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2641 if (error)
2642 goto fail;
2643
2644 send_args(r, lkb, ms);
2645
2646 error = send_message(mh, ms);
2647 if (error)
2648 goto fail;
2649 return 0;
2650
2651 fail:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002652 remove_from_waiters(lkb, msg_reply_type(mstype));
David Teiglande7fd4172006-01-18 09:30:29 +00002653 return error;
2654}
2655
2656static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2657{
2658 return send_common(r, lkb, DLM_MSG_REQUEST);
2659}
2660
2661static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2662{
2663 int error;
2664
2665 error = send_common(r, lkb, DLM_MSG_CONVERT);
2666
2667 /* down conversions go without a reply from the master */
2668 if (!error && down_conversion(lkb)) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05002669 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2670 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00002671 r->res_ls->ls_stub_ms.m_result = 0;
David Teigland32f105a2006-08-23 16:07:31 -04002672 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00002673 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2674 }
2675
2676 return error;
2677}
2678
2679/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2680 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2681 that the master is still correct. */
2682
2683static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2684{
2685 return send_common(r, lkb, DLM_MSG_UNLOCK);
2686}
2687
2688static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2689{
2690 return send_common(r, lkb, DLM_MSG_CANCEL);
2691}
2692
2693static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2694{
2695 struct dlm_message *ms;
2696 struct dlm_mhandle *mh;
2697 int to_nodeid, error;
2698
2699 to_nodeid = lkb->lkb_nodeid;
2700
2701 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2702 if (error)
2703 goto out;
2704
2705 send_args(r, lkb, ms);
2706
2707 ms->m_result = 0;
2708
2709 error = send_message(mh, ms);
2710 out:
2711 return error;
2712}
2713
2714static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2715{
2716 struct dlm_message *ms;
2717 struct dlm_mhandle *mh;
2718 int to_nodeid, error;
2719
2720 to_nodeid = lkb->lkb_nodeid;
2721
2722 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2723 if (error)
2724 goto out;
2725
2726 send_args(r, lkb, ms);
2727
2728 ms->m_bastmode = mode;
2729
2730 error = send_message(mh, ms);
2731 out:
2732 return error;
2733}
2734
2735static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2736{
2737 struct dlm_message *ms;
2738 struct dlm_mhandle *mh;
2739 int to_nodeid, error;
2740
David Teiglandef0c2bb2007-03-28 09:56:46 -05002741 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2742 if (error)
2743 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00002744
2745 to_nodeid = dlm_dir_nodeid(r);
2746
2747 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2748 if (error)
2749 goto fail;
2750
2751 send_args(r, lkb, ms);
2752
2753 error = send_message(mh, ms);
2754 if (error)
2755 goto fail;
2756 return 0;
2757
2758 fail:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002759 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
David Teiglande7fd4172006-01-18 09:30:29 +00002760 return error;
2761}
2762
2763static int send_remove(struct dlm_rsb *r)
2764{
2765 struct dlm_message *ms;
2766 struct dlm_mhandle *mh;
2767 int to_nodeid, error;
2768
2769 to_nodeid = dlm_dir_nodeid(r);
2770
2771 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2772 if (error)
2773 goto out;
2774
2775 memcpy(ms->m_extra, r->res_name, r->res_length);
2776 ms->m_hash = r->res_hash;
2777
2778 error = send_message(mh, ms);
2779 out:
2780 return error;
2781}
2782
2783static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2784 int mstype, int rv)
2785{
2786 struct dlm_message *ms;
2787 struct dlm_mhandle *mh;
2788 int to_nodeid, error;
2789
2790 to_nodeid = lkb->lkb_nodeid;
2791
2792 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2793 if (error)
2794 goto out;
2795
2796 send_args(r, lkb, ms);
2797
2798 ms->m_result = rv;
2799
2800 error = send_message(mh, ms);
2801 out:
2802 return error;
2803}
2804
2805static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2806{
2807 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2808}
2809
2810static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2811{
2812 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2813}
2814
2815static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2816{
2817 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2818}
2819
2820static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2821{
2822 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2823}
2824
2825static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2826 int ret_nodeid, int rv)
2827{
2828 struct dlm_rsb *r = &ls->ls_stub_rsb;
2829 struct dlm_message *ms;
2830 struct dlm_mhandle *mh;
2831 int error, nodeid = ms_in->m_header.h_nodeid;
2832
2833 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2834 if (error)
2835 goto out;
2836
2837 ms->m_lkid = ms_in->m_lkid;
2838 ms->m_result = rv;
2839 ms->m_nodeid = ret_nodeid;
2840
2841 error = send_message(mh, ms);
2842 out:
2843 return error;
2844}
2845
2846/* which args we save from a received message depends heavily on the type
2847 of message, unlike the send side where we can safely send everything about
2848 the lkb for any type of message */
2849
2850static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2851{
2852 lkb->lkb_exflags = ms->m_exflags;
David Teigland6f90a8b12006-11-10 14:16:27 -06002853 lkb->lkb_sbflags = ms->m_sbflags;
David Teiglande7fd4172006-01-18 09:30:29 +00002854 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2855 (ms->m_flags & 0x0000FFFF);
2856}
2857
2858static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2859{
2860 lkb->lkb_sbflags = ms->m_sbflags;
2861 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2862 (ms->m_flags & 0x0000FFFF);
2863}
2864
2865static int receive_extralen(struct dlm_message *ms)
2866{
2867 return (ms->m_header.h_length - sizeof(struct dlm_message));
2868}
2869
David Teiglande7fd4172006-01-18 09:30:29 +00002870static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2871 struct dlm_message *ms)
2872{
2873 int len;
2874
2875 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2876 if (!lkb->lkb_lvbptr)
2877 lkb->lkb_lvbptr = allocate_lvb(ls);
2878 if (!lkb->lkb_lvbptr)
2879 return -ENOMEM;
2880 len = receive_extralen(ms);
2881 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2882 }
2883 return 0;
2884}
2885
2886static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2887 struct dlm_message *ms)
2888{
2889 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2890 lkb->lkb_ownpid = ms->m_pid;
2891 lkb->lkb_remid = ms->m_lkid;
2892 lkb->lkb_grmode = DLM_LOCK_IV;
2893 lkb->lkb_rqmode = ms->m_rqmode;
2894 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2895 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2896
2897 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2898
David Teigland8d07fd52006-12-13 10:39:20 -06002899 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2900 /* lkb was just created so there won't be an lvb yet */
2901 lkb->lkb_lvbptr = allocate_lvb(ls);
2902 if (!lkb->lkb_lvbptr)
2903 return -ENOMEM;
2904 }
David Teiglande7fd4172006-01-18 09:30:29 +00002905
2906 return 0;
2907}
2908
2909static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2910 struct dlm_message *ms)
2911{
2912 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2913 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2914 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2915 lkb->lkb_id, lkb->lkb_remid);
2916 return -EINVAL;
2917 }
2918
2919 if (!is_master_copy(lkb))
2920 return -EINVAL;
2921
2922 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2923 return -EBUSY;
2924
David Teiglande7fd4172006-01-18 09:30:29 +00002925 if (receive_lvb(ls, lkb, ms))
2926 return -ENOMEM;
2927
2928 lkb->lkb_rqmode = ms->m_rqmode;
2929 lkb->lkb_lvbseq = ms->m_lvbseq;
2930
2931 return 0;
2932}
2933
2934static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2935 struct dlm_message *ms)
2936{
2937 if (!is_master_copy(lkb))
2938 return -EINVAL;
2939 if (receive_lvb(ls, lkb, ms))
2940 return -ENOMEM;
2941 return 0;
2942}
2943
2944/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2945 uses to send a reply and that the remote end uses to process the reply. */
2946
2947static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2948{
2949 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2950 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2951 lkb->lkb_remid = ms->m_lkid;
2952}
2953
2954static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2955{
2956 struct dlm_lkb *lkb;
2957 struct dlm_rsb *r;
2958 int error, namelen;
2959
2960 error = create_lkb(ls, &lkb);
2961 if (error)
2962 goto fail;
2963
2964 receive_flags(lkb, ms);
2965 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2966 error = receive_request_args(ls, lkb, ms);
2967 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05002968 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002969 goto fail;
2970 }
2971
2972 namelen = receive_extralen(ms);
2973
2974 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2975 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05002976 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002977 goto fail;
2978 }
2979
2980 lock_rsb(r);
2981
2982 attach_lkb(r, lkb);
2983 error = do_request(r, lkb);
2984 send_request_reply(r, lkb, error);
2985
2986 unlock_rsb(r);
2987 put_rsb(r);
2988
2989 if (error == -EINPROGRESS)
2990 error = 0;
2991 if (error)
David Teiglandb3f58d82006-02-28 11:16:37 -05002992 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002993 return;
2994
2995 fail:
2996 setup_stub_lkb(ls, ms);
2997 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2998}
2999
3000static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3001{
3002 struct dlm_lkb *lkb;
3003 struct dlm_rsb *r;
David Teigland90135922006-01-20 08:47:07 +00003004 int error, reply = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00003005
3006 error = find_lkb(ls, ms->m_remid, &lkb);
3007 if (error)
3008 goto fail;
3009
3010 r = lkb->lkb_resource;
3011
3012 hold_rsb(r);
3013 lock_rsb(r);
3014
3015 receive_flags(lkb, ms);
3016 error = receive_convert_args(ls, lkb, ms);
3017 if (error)
3018 goto out;
3019 reply = !down_conversion(lkb);
3020
3021 error = do_convert(r, lkb);
3022 out:
3023 if (reply)
3024 send_convert_reply(r, lkb, error);
3025
3026 unlock_rsb(r);
3027 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003028 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003029 return;
3030
3031 fail:
3032 setup_stub_lkb(ls, ms);
3033 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3034}
3035
3036static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3037{
3038 struct dlm_lkb *lkb;
3039 struct dlm_rsb *r;
3040 int error;
3041
3042 error = find_lkb(ls, ms->m_remid, &lkb);
3043 if (error)
3044 goto fail;
3045
3046 r = lkb->lkb_resource;
3047
3048 hold_rsb(r);
3049 lock_rsb(r);
3050
3051 receive_flags(lkb, ms);
3052 error = receive_unlock_args(ls, lkb, ms);
3053 if (error)
3054 goto out;
3055
3056 error = do_unlock(r, lkb);
3057 out:
3058 send_unlock_reply(r, lkb, error);
3059
3060 unlock_rsb(r);
3061 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003062 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003063 return;
3064
3065 fail:
3066 setup_stub_lkb(ls, ms);
3067 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3068}
3069
3070static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3071{
3072 struct dlm_lkb *lkb;
3073 struct dlm_rsb *r;
3074 int error;
3075
3076 error = find_lkb(ls, ms->m_remid, &lkb);
3077 if (error)
3078 goto fail;
3079
3080 receive_flags(lkb, ms);
3081
3082 r = lkb->lkb_resource;
3083
3084 hold_rsb(r);
3085 lock_rsb(r);
3086
3087 error = do_cancel(r, lkb);
3088 send_cancel_reply(r, lkb, error);
3089
3090 unlock_rsb(r);
3091 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003092 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003093 return;
3094
3095 fail:
3096 setup_stub_lkb(ls, ms);
3097 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3098}
3099
3100static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3101{
3102 struct dlm_lkb *lkb;
3103 struct dlm_rsb *r;
3104 int error;
3105
3106 error = find_lkb(ls, ms->m_remid, &lkb);
3107 if (error) {
3108 log_error(ls, "receive_grant no lkb");
3109 return;
3110 }
3111 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3112
3113 r = lkb->lkb_resource;
3114
3115 hold_rsb(r);
3116 lock_rsb(r);
3117
3118 receive_flags_reply(lkb, ms);
David Teigland7d3c1fe2007-04-19 10:30:41 -05003119 if (is_altmode(lkb))
3120 munge_altmode(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00003121 grant_lock_pc(r, lkb, ms);
3122 queue_cast(r, lkb, 0);
3123
3124 unlock_rsb(r);
3125 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003126 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003127}
3128
3129static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3130{
3131 struct dlm_lkb *lkb;
3132 struct dlm_rsb *r;
3133 int error;
3134
3135 error = find_lkb(ls, ms->m_remid, &lkb);
3136 if (error) {
3137 log_error(ls, "receive_bast no lkb");
3138 return;
3139 }
3140 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3141
3142 r = lkb->lkb_resource;
3143
3144 hold_rsb(r);
3145 lock_rsb(r);
3146
3147 queue_bast(r, lkb, ms->m_bastmode);
3148
3149 unlock_rsb(r);
3150 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003151 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003152}
3153
3154static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3155{
3156 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3157
3158 from_nodeid = ms->m_header.h_nodeid;
3159 our_nodeid = dlm_our_nodeid();
3160
3161 len = receive_extralen(ms);
3162
3163 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3164 if (dir_nodeid != our_nodeid) {
3165 log_error(ls, "lookup dir_nodeid %d from %d",
3166 dir_nodeid, from_nodeid);
3167 error = -EINVAL;
3168 ret_nodeid = -1;
3169 goto out;
3170 }
3171
3172 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3173
3174 /* Optimization: we're master so treat lookup as a request */
3175 if (!error && ret_nodeid == our_nodeid) {
3176 receive_request(ls, ms);
3177 return;
3178 }
3179 out:
3180 send_lookup_reply(ls, ms, ret_nodeid, error);
3181}
3182
3183static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3184{
3185 int len, dir_nodeid, from_nodeid;
3186
3187 from_nodeid = ms->m_header.h_nodeid;
3188
3189 len = receive_extralen(ms);
3190
3191 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3192 if (dir_nodeid != dlm_our_nodeid()) {
3193 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3194 dir_nodeid, from_nodeid);
3195 return;
3196 }
3197
3198 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3199}
3200
David Teigland84991372007-03-30 15:02:40 -05003201static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3202{
3203 do_purge(ls, ms->m_nodeid, ms->m_pid);
3204}
3205
David Teiglande7fd4172006-01-18 09:30:29 +00003206static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3207{
3208 struct dlm_lkb *lkb;
3209 struct dlm_rsb *r;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003210 int error, mstype, result;
David Teiglande7fd4172006-01-18 09:30:29 +00003211
3212 error = find_lkb(ls, ms->m_remid, &lkb);
3213 if (error) {
3214 log_error(ls, "receive_request_reply no lkb");
3215 return;
3216 }
3217 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3218
David Teiglande7fd4172006-01-18 09:30:29 +00003219 r = lkb->lkb_resource;
3220 hold_rsb(r);
3221 lock_rsb(r);
3222
David Teiglandef0c2bb2007-03-28 09:56:46 -05003223 mstype = lkb->lkb_wait_type;
3224 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3225 if (error)
3226 goto out;
3227
David Teiglande7fd4172006-01-18 09:30:29 +00003228 /* Optimization: the dir node was also the master, so it took our
3229 lookup as a request and sent request reply instead of lookup reply */
3230 if (mstype == DLM_MSG_LOOKUP) {
3231 r->res_nodeid = ms->m_header.h_nodeid;
3232 lkb->lkb_nodeid = r->res_nodeid;
3233 }
3234
David Teiglandef0c2bb2007-03-28 09:56:46 -05003235 /* this is the value returned from do_request() on the master */
3236 result = ms->m_result;
3237
3238 switch (result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003239 case -EAGAIN:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003240 /* request would block (be queued) on remote master */
David Teiglande7fd4172006-01-18 09:30:29 +00003241 queue_cast(r, lkb, -EAGAIN);
3242 confirm_master(r, -EAGAIN);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003243 unhold_lkb(lkb); /* undoes create_lkb() */
David Teiglande7fd4172006-01-18 09:30:29 +00003244 break;
3245
3246 case -EINPROGRESS:
3247 case 0:
3248 /* request was queued or granted on remote master */
3249 receive_flags_reply(lkb, ms);
3250 lkb->lkb_remid = ms->m_lkid;
David Teigland7d3c1fe2007-04-19 10:30:41 -05003251 if (is_altmode(lkb))
3252 munge_altmode(lkb, ms);
David Teigland3ae1acf2007-05-18 08:59:31 -05003253 if (result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003254 add_lkb(r, lkb, DLM_LKSTS_WAITING);
David Teigland3ae1acf2007-05-18 08:59:31 -05003255 add_timeout(lkb);
3256 } else {
David Teiglande7fd4172006-01-18 09:30:29 +00003257 grant_lock_pc(r, lkb, ms);
3258 queue_cast(r, lkb, 0);
3259 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003260 confirm_master(r, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003261 break;
3262
David Teigland597d0ca2006-07-12 16:44:04 -05003263 case -EBADR:
David Teiglande7fd4172006-01-18 09:30:29 +00003264 case -ENOTBLK:
3265 /* find_rsb failed to find rsb or rsb wasn't master */
David Teiglandef0c2bb2007-03-28 09:56:46 -05003266 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3267 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003268 r->res_nodeid = -1;
3269 lkb->lkb_nodeid = -1;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003270
3271 if (is_overlap(lkb)) {
3272 /* we'll ignore error in cancel/unlock reply */
3273 queue_cast_overlap(r, lkb);
3274 unhold_lkb(lkb); /* undoes create_lkb() */
3275 } else
3276 _request_lock(r, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003277 break;
3278
3279 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003280 log_error(ls, "receive_request_reply %x error %d",
3281 lkb->lkb_id, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003282 }
3283
David Teiglandef0c2bb2007-03-28 09:56:46 -05003284 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3285 log_debug(ls, "receive_request_reply %x result %d unlock",
3286 lkb->lkb_id, result);
3287 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3288 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3289 send_unlock(r, lkb);
3290 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3291 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3292 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3293 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3294 send_cancel(r, lkb);
3295 } else {
3296 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3297 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3298 }
3299 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003300 unlock_rsb(r);
3301 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003302 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003303}
3304
3305static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3306 struct dlm_message *ms)
3307{
David Teiglande7fd4172006-01-18 09:30:29 +00003308 /* this is the value returned from do_convert() on the master */
David Teiglandef0c2bb2007-03-28 09:56:46 -05003309 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003310 case -EAGAIN:
3311 /* convert would block (be queued) on remote master */
3312 queue_cast(r, lkb, -EAGAIN);
3313 break;
3314
3315 case -EINPROGRESS:
3316 /* convert was queued on remote master */
David Teigland7d3c1fe2007-04-19 10:30:41 -05003317 receive_flags_reply(lkb, ms);
3318 if (is_demoted(lkb))
3319 munge_demoted(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00003320 del_lkb(r, lkb);
3321 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
David Teigland3ae1acf2007-05-18 08:59:31 -05003322 add_timeout(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003323 break;
3324
3325 case 0:
3326 /* convert was granted on remote master */
3327 receive_flags_reply(lkb, ms);
David Teigland7d3c1fe2007-04-19 10:30:41 -05003328 if (is_demoted(lkb))
3329 munge_demoted(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00003330 grant_lock_pc(r, lkb, ms);
3331 queue_cast(r, lkb, 0);
3332 break;
3333
3334 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003335 log_error(r->res_ls, "receive_convert_reply %x error %d",
3336 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003337 }
3338}
3339
3340static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3341{
3342 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003343 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003344
3345 hold_rsb(r);
3346 lock_rsb(r);
3347
David Teiglandef0c2bb2007-03-28 09:56:46 -05003348 /* stub reply can happen with waiters_mutex held */
3349 error = remove_from_waiters_ms(lkb, ms);
3350 if (error)
3351 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00003352
David Teiglandef0c2bb2007-03-28 09:56:46 -05003353 __receive_convert_reply(r, lkb, ms);
3354 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003355 unlock_rsb(r);
3356 put_rsb(r);
3357}
3358
3359static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3360{
3361 struct dlm_lkb *lkb;
3362 int error;
3363
3364 error = find_lkb(ls, ms->m_remid, &lkb);
3365 if (error) {
3366 log_error(ls, "receive_convert_reply no lkb");
3367 return;
3368 }
3369 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3370
David Teiglande7fd4172006-01-18 09:30:29 +00003371 _receive_convert_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003372 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003373}
3374
3375static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3376{
3377 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003378 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003379
3380 hold_rsb(r);
3381 lock_rsb(r);
3382
David Teiglandef0c2bb2007-03-28 09:56:46 -05003383 /* stub reply can happen with waiters_mutex held */
3384 error = remove_from_waiters_ms(lkb, ms);
3385 if (error)
3386 goto out;
3387
David Teiglande7fd4172006-01-18 09:30:29 +00003388 /* this is the value returned from do_unlock() on the master */
3389
David Teiglandef0c2bb2007-03-28 09:56:46 -05003390 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003391 case -DLM_EUNLOCK:
3392 receive_flags_reply(lkb, ms);
3393 remove_lock_pc(r, lkb);
3394 queue_cast(r, lkb, -DLM_EUNLOCK);
3395 break;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003396 case -ENOENT:
3397 break;
David Teiglande7fd4172006-01-18 09:30:29 +00003398 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003399 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3400 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003401 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003402 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003403 unlock_rsb(r);
3404 put_rsb(r);
3405}
3406
3407static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3408{
3409 struct dlm_lkb *lkb;
3410 int error;
3411
3412 error = find_lkb(ls, ms->m_remid, &lkb);
3413 if (error) {
3414 log_error(ls, "receive_unlock_reply no lkb");
3415 return;
3416 }
3417 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3418
David Teiglande7fd4172006-01-18 09:30:29 +00003419 _receive_unlock_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003420 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003421}
3422
3423static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3424{
3425 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003426 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003427
3428 hold_rsb(r);
3429 lock_rsb(r);
3430
David Teiglandef0c2bb2007-03-28 09:56:46 -05003431 /* stub reply can happen with waiters_mutex held */
3432 error = remove_from_waiters_ms(lkb, ms);
3433 if (error)
3434 goto out;
3435
David Teiglande7fd4172006-01-18 09:30:29 +00003436 /* this is the value returned from do_cancel() on the master */
3437
David Teiglandef0c2bb2007-03-28 09:56:46 -05003438 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003439 case -DLM_ECANCEL:
3440 receive_flags_reply(lkb, ms);
3441 revert_lock_pc(r, lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003442 if (ms->m_result)
3443 queue_cast(r, lkb, -DLM_ECANCEL);
3444 break;
3445 case 0:
David Teiglande7fd4172006-01-18 09:30:29 +00003446 break;
3447 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003448 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3449 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003450 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003451 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003452 unlock_rsb(r);
3453 put_rsb(r);
3454}
3455
3456static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3457{
3458 struct dlm_lkb *lkb;
3459 int error;
3460
3461 error = find_lkb(ls, ms->m_remid, &lkb);
3462 if (error) {
3463 log_error(ls, "receive_cancel_reply no lkb");
3464 return;
3465 }
3466 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3467
David Teiglande7fd4172006-01-18 09:30:29 +00003468 _receive_cancel_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003469 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003470}
3471
3472static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3473{
3474 struct dlm_lkb *lkb;
3475 struct dlm_rsb *r;
3476 int error, ret_nodeid;
3477
3478 error = find_lkb(ls, ms->m_lkid, &lkb);
3479 if (error) {
3480 log_error(ls, "receive_lookup_reply no lkb");
3481 return;
3482 }
3483
David Teiglandef0c2bb2007-03-28 09:56:46 -05003484 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
David Teiglande7fd4172006-01-18 09:30:29 +00003485 FIXME: will a non-zero error ever be returned? */
David Teiglande7fd4172006-01-18 09:30:29 +00003486
3487 r = lkb->lkb_resource;
3488 hold_rsb(r);
3489 lock_rsb(r);
3490
David Teiglandef0c2bb2007-03-28 09:56:46 -05003491 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3492 if (error)
3493 goto out;
3494
David Teiglande7fd4172006-01-18 09:30:29 +00003495 ret_nodeid = ms->m_nodeid;
3496 if (ret_nodeid == dlm_our_nodeid()) {
3497 r->res_nodeid = 0;
3498 ret_nodeid = 0;
3499 r->res_first_lkid = 0;
3500 } else {
3501 /* set_master() will copy res_nodeid to lkb_nodeid */
3502 r->res_nodeid = ret_nodeid;
3503 }
3504
David Teiglandef0c2bb2007-03-28 09:56:46 -05003505 if (is_overlap(lkb)) {
3506 log_debug(ls, "receive_lookup_reply %x unlock %x",
3507 lkb->lkb_id, lkb->lkb_flags);
3508 queue_cast_overlap(r, lkb);
3509 unhold_lkb(lkb); /* undoes create_lkb() */
3510 goto out_list;
3511 }
3512
David Teiglande7fd4172006-01-18 09:30:29 +00003513 _request_lock(r, lkb);
3514
David Teiglandef0c2bb2007-03-28 09:56:46 -05003515 out_list:
David Teiglande7fd4172006-01-18 09:30:29 +00003516 if (!ret_nodeid)
3517 process_lookup_list(r);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003518 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003519 unlock_rsb(r);
3520 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003521 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003522}
3523
3524int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3525{
3526 struct dlm_message *ms = (struct dlm_message *) hd;
3527 struct dlm_ls *ls;
David Teigland8fd3a982007-01-24 10:11:45 -06003528 int error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003529
3530 if (!recovery)
3531 dlm_message_in(ms);
3532
3533 ls = dlm_find_lockspace_global(hd->h_lockspace);
3534 if (!ls) {
3535 log_print("drop message %d from %d for unknown lockspace %d",
3536 ms->m_type, nodeid, hd->h_lockspace);
3537 return -EINVAL;
3538 }
3539
3540 /* recovery may have just ended leaving a bunch of backed-up requests
3541 in the requestqueue; wait while dlm_recoverd clears them */
3542
3543 if (!recovery)
3544 dlm_wait_requestqueue(ls);
3545
3546 /* recovery may have just started while there were a bunch of
3547 in-flight requests -- save them in requestqueue to be processed
3548 after recovery. we can't let dlm_recvd block on the recovery
3549 lock. if dlm_recoverd is calling this function to clear the
3550 requestqueue, it needs to be interrupted (-EINTR) if another
3551 recovery operation is starting. */
3552
3553 while (1) {
3554 if (dlm_locking_stopped(ls)) {
David Teiglandd4400152006-10-31 11:55:56 -06003555 if (recovery) {
3556 error = -EINTR;
3557 goto out;
3558 }
3559 error = dlm_add_requestqueue(ls, nodeid, hd);
3560 if (error == -EAGAIN)
3561 continue;
3562 else {
3563 error = -EINTR;
3564 goto out;
3565 }
David Teiglande7fd4172006-01-18 09:30:29 +00003566 }
3567
David Teigland85e86ed2007-05-18 08:58:15 -05003568 if (dlm_lock_recovery_try(ls))
David Teiglande7fd4172006-01-18 09:30:29 +00003569 break;
3570 schedule();
3571 }
3572
3573 switch (ms->m_type) {
3574
3575 /* messages sent to a master node */
3576
3577 case DLM_MSG_REQUEST:
3578 receive_request(ls, ms);
3579 break;
3580
3581 case DLM_MSG_CONVERT:
3582 receive_convert(ls, ms);
3583 break;
3584
3585 case DLM_MSG_UNLOCK:
3586 receive_unlock(ls, ms);
3587 break;
3588
3589 case DLM_MSG_CANCEL:
3590 receive_cancel(ls, ms);
3591 break;
3592
3593 /* messages sent from a master node (replies to above) */
3594
3595 case DLM_MSG_REQUEST_REPLY:
3596 receive_request_reply(ls, ms);
3597 break;
3598
3599 case DLM_MSG_CONVERT_REPLY:
3600 receive_convert_reply(ls, ms);
3601 break;
3602
3603 case DLM_MSG_UNLOCK_REPLY:
3604 receive_unlock_reply(ls, ms);
3605 break;
3606
3607 case DLM_MSG_CANCEL_REPLY:
3608 receive_cancel_reply(ls, ms);
3609 break;
3610
3611 /* messages sent from a master node (only two types of async msg) */
3612
3613 case DLM_MSG_GRANT:
3614 receive_grant(ls, ms);
3615 break;
3616
3617 case DLM_MSG_BAST:
3618 receive_bast(ls, ms);
3619 break;
3620
3621 /* messages sent to a dir node */
3622
3623 case DLM_MSG_LOOKUP:
3624 receive_lookup(ls, ms);
3625 break;
3626
3627 case DLM_MSG_REMOVE:
3628 receive_remove(ls, ms);
3629 break;
3630
3631 /* messages sent from a dir node (remove has no reply) */
3632
3633 case DLM_MSG_LOOKUP_REPLY:
3634 receive_lookup_reply(ls, ms);
3635 break;
3636
David Teigland84991372007-03-30 15:02:40 -05003637 /* other messages */
3638
3639 case DLM_MSG_PURGE:
3640 receive_purge(ls, ms);
3641 break;
3642
David Teiglande7fd4172006-01-18 09:30:29 +00003643 default:
3644 log_error(ls, "unknown message type %d", ms->m_type);
3645 }
3646
David Teigland85e86ed2007-05-18 08:58:15 -05003647 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00003648 out:
3649 dlm_put_lockspace(ls);
3650 dlm_astd_wake();
David Teigland8fd3a982007-01-24 10:11:45 -06003651 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00003652}
3653
3654
3655/*
3656 * Recovery related
3657 */
3658
3659static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3660{
3661 if (middle_conversion(lkb)) {
3662 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003663 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003664 ls->ls_stub_ms.m_result = -EINPROGRESS;
David Teigland075529b2006-12-13 10:40:26 -06003665 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003666 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3667
3668 /* Same special case as in receive_rcom_lock_args() */
3669 lkb->lkb_grmode = DLM_LOCK_IV;
3670 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3671 unhold_lkb(lkb);
3672
3673 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3674 lkb->lkb_flags |= DLM_IFL_RESEND;
3675 }
3676
3677 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3678 conversions are async; there's no reply from the remote master */
3679}
3680
3681/* A waiting lkb needs recovery if the master node has failed, or
3682 the master node is changing (only when no directory is used) */
3683
3684static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3685{
3686 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3687 return 1;
3688
3689 if (!dlm_no_directory(ls))
3690 return 0;
3691
3692 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3693 return 1;
3694
3695 return 0;
3696}
3697
3698/* Recovery for locks that are waiting for replies from nodes that are now
3699 gone. We can just complete unlocks and cancels by faking a reply from the
3700 dead node. Requests and up-conversions we flag to be resent after
3701 recovery. Down-conversions can just be completed with a fake reply like
3702 unlocks. Conversions between PR and CW need special attention. */
3703
3704void dlm_recover_waiters_pre(struct dlm_ls *ls)
3705{
3706 struct dlm_lkb *lkb, *safe;
3707
David Teigland90135922006-01-20 08:47:07 +00003708 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003709
3710 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3711 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3712 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3713
3714 /* all outstanding lookups, regardless of destination will be
3715 resent after recovery is done */
3716
3717 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3718 lkb->lkb_flags |= DLM_IFL_RESEND;
3719 continue;
3720 }
3721
3722 if (!waiter_needs_recovery(ls, lkb))
3723 continue;
3724
3725 switch (lkb->lkb_wait_type) {
3726
3727 case DLM_MSG_REQUEST:
3728 lkb->lkb_flags |= DLM_IFL_RESEND;
3729 break;
3730
3731 case DLM_MSG_CONVERT:
3732 recover_convert_waiter(ls, lkb);
3733 break;
3734
3735 case DLM_MSG_UNLOCK:
3736 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003737 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003738 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
David Teigland075529b2006-12-13 10:40:26 -06003739 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003740 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003741 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003742 break;
3743
3744 case DLM_MSG_CANCEL:
3745 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003746 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003747 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
David Teigland075529b2006-12-13 10:40:26 -06003748 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003749 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003750 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003751 break;
3752
3753 default:
3754 log_error(ls, "invalid lkb wait_type %d",
3755 lkb->lkb_wait_type);
3756 }
David Teigland81456802006-07-25 14:05:09 -05003757 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00003758 }
David Teigland90135922006-01-20 08:47:07 +00003759 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003760}
3761
David Teiglandef0c2bb2007-03-28 09:56:46 -05003762static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +00003763{
3764 struct dlm_lkb *lkb;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003765 int found = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003766
David Teigland90135922006-01-20 08:47:07 +00003767 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003768 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3769 if (lkb->lkb_flags & DLM_IFL_RESEND) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05003770 hold_lkb(lkb);
3771 found = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00003772 break;
3773 }
3774 }
David Teigland90135922006-01-20 08:47:07 +00003775 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003776
David Teiglandef0c2bb2007-03-28 09:56:46 -05003777 if (!found)
David Teiglande7fd4172006-01-18 09:30:29 +00003778 lkb = NULL;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003779 return lkb;
David Teiglande7fd4172006-01-18 09:30:29 +00003780}
3781
3782/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3783 master or dir-node for r. Processing the lkb may result in it being placed
3784 back on waiters. */
3785
David Teiglandef0c2bb2007-03-28 09:56:46 -05003786/* We do this after normal locking has been enabled and any saved messages
3787 (in requestqueue) have been processed. We should be confident that at
3788 this point we won't get or process a reply to any of these waiting
3789 operations. But, new ops may be coming in on the rsbs/locks here from
3790 userspace or remotely. */
3791
3792/* there may have been an overlap unlock/cancel prior to recovery or after
3793 recovery. if before, the lkb may still have a pos wait_count; if after, the
3794 overlap flag would just have been set and nothing new sent. we can be
3795 confident here than any replies to either the initial op or overlap ops
3796 prior to recovery have been received. */
3797
David Teiglande7fd4172006-01-18 09:30:29 +00003798int dlm_recover_waiters_post(struct dlm_ls *ls)
3799{
3800 struct dlm_lkb *lkb;
3801 struct dlm_rsb *r;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003802 int error = 0, mstype, err, oc, ou;
David Teiglande7fd4172006-01-18 09:30:29 +00003803
3804 while (1) {
3805 if (dlm_locking_stopped(ls)) {
3806 log_debug(ls, "recover_waiters_post aborted");
3807 error = -EINTR;
3808 break;
3809 }
3810
David Teiglandef0c2bb2007-03-28 09:56:46 -05003811 lkb = find_resend_waiter(ls);
3812 if (!lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00003813 break;
3814
3815 r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003816 hold_rsb(r);
3817 lock_rsb(r);
3818
3819 mstype = lkb->lkb_wait_type;
3820 oc = is_overlap_cancel(lkb);
3821 ou = is_overlap_unlock(lkb);
3822 err = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003823
3824 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3825 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3826
David Teiglandef0c2bb2007-03-28 09:56:46 -05003827 /* At this point we assume that we won't get a reply to any
3828 previous op or overlap op on this lock. First, do a big
3829 remove_from_waiters() for all previous ops. */
David Teiglande7fd4172006-01-18 09:30:29 +00003830
David Teiglandef0c2bb2007-03-28 09:56:46 -05003831 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3832 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3833 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3834 lkb->lkb_wait_type = 0;
3835 lkb->lkb_wait_count = 0;
3836 mutex_lock(&ls->ls_waiters_mutex);
3837 list_del_init(&lkb->lkb_wait_reply);
3838 mutex_unlock(&ls->ls_waiters_mutex);
3839 unhold_lkb(lkb); /* for waiters list */
David Teiglande7fd4172006-01-18 09:30:29 +00003840
David Teiglandef0c2bb2007-03-28 09:56:46 -05003841 if (oc || ou) {
3842 /* do an unlock or cancel instead of resending */
3843 switch (mstype) {
3844 case DLM_MSG_LOOKUP:
3845 case DLM_MSG_REQUEST:
3846 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3847 -DLM_ECANCEL);
3848 unhold_lkb(lkb); /* undoes create_lkb() */
3849 break;
3850 case DLM_MSG_CONVERT:
3851 if (oc) {
3852 queue_cast(r, lkb, -DLM_ECANCEL);
3853 } else {
3854 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3855 _unlock_lock(r, lkb);
3856 }
3857 break;
3858 default:
3859 err = 1;
3860 }
3861 } else {
3862 switch (mstype) {
3863 case DLM_MSG_LOOKUP:
3864 case DLM_MSG_REQUEST:
3865 _request_lock(r, lkb);
3866 if (is_master(r))
3867 confirm_master(r, 0);
3868 break;
3869 case DLM_MSG_CONVERT:
3870 _convert_lock(r, lkb);
3871 break;
3872 default:
3873 err = 1;
3874 }
David Teiglande7fd4172006-01-18 09:30:29 +00003875 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003876
3877 if (err)
3878 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3879 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3880 unlock_rsb(r);
3881 put_rsb(r);
3882 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003883 }
3884
3885 return error;
3886}
3887
3888static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3889 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3890{
3891 struct dlm_ls *ls = r->res_ls;
3892 struct dlm_lkb *lkb, *safe;
3893
3894 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3895 if (test(ls, lkb)) {
David Teigland97a35d12006-05-02 13:34:03 -04003896 rsb_set_flag(r, RSB_LOCKS_PURGED);
David Teiglande7fd4172006-01-18 09:30:29 +00003897 del_lkb(r, lkb);
3898 /* this put should free the lkb */
David Teiglandb3f58d82006-02-28 11:16:37 -05003899 if (!dlm_put_lkb(lkb))
David Teiglande7fd4172006-01-18 09:30:29 +00003900 log_error(ls, "purged lkb not released");
3901 }
3902 }
3903}
3904
3905static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3906{
3907 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3908}
3909
3910static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3911{
3912 return is_master_copy(lkb);
3913}
3914
3915static void purge_dead_locks(struct dlm_rsb *r)
3916{
3917 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3918 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3919 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3920}
3921
3922void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3923{
3924 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3925 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3926 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3927}
3928
3929/* Get rid of locks held by nodes that are gone. */
3930
3931int dlm_purge_locks(struct dlm_ls *ls)
3932{
3933 struct dlm_rsb *r;
3934
3935 log_debug(ls, "dlm_purge_locks");
3936
3937 down_write(&ls->ls_root_sem);
3938 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3939 hold_rsb(r);
3940 lock_rsb(r);
3941 if (is_master(r))
3942 purge_dead_locks(r);
3943 unlock_rsb(r);
3944 unhold_rsb(r);
3945
3946 schedule();
3947 }
3948 up_write(&ls->ls_root_sem);
3949
3950 return 0;
3951}
3952
David Teigland97a35d12006-05-02 13:34:03 -04003953static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3954{
3955 struct dlm_rsb *r, *r_ret = NULL;
3956
3957 read_lock(&ls->ls_rsbtbl[bucket].lock);
3958 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3959 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3960 continue;
3961 hold_rsb(r);
3962 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3963 r_ret = r;
3964 break;
3965 }
3966 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3967 return r_ret;
3968}
3969
3970void dlm_grant_after_purge(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +00003971{
3972 struct dlm_rsb *r;
David Teigland2b4e9262006-07-25 13:59:48 -05003973 int bucket = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003974
David Teigland2b4e9262006-07-25 13:59:48 -05003975 while (1) {
3976 r = find_purged_rsb(ls, bucket);
3977 if (!r) {
3978 if (bucket == ls->ls_rsbtbl_size - 1)
3979 break;
3980 bucket++;
David Teigland97a35d12006-05-02 13:34:03 -04003981 continue;
David Teigland2b4e9262006-07-25 13:59:48 -05003982 }
David Teigland97a35d12006-05-02 13:34:03 -04003983 lock_rsb(r);
3984 if (is_master(r)) {
3985 grant_pending_locks(r);
3986 confirm_master(r, 0);
David Teiglande7fd4172006-01-18 09:30:29 +00003987 }
David Teigland97a35d12006-05-02 13:34:03 -04003988 unlock_rsb(r);
3989 put_rsb(r);
David Teigland2b4e9262006-07-25 13:59:48 -05003990 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00003991 }
David Teiglande7fd4172006-01-18 09:30:29 +00003992}
3993
3994static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3995 uint32_t remid)
3996{
3997 struct dlm_lkb *lkb;
3998
3999 list_for_each_entry(lkb, head, lkb_statequeue) {
4000 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4001 return lkb;
4002 }
4003 return NULL;
4004}
4005
4006static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4007 uint32_t remid)
4008{
4009 struct dlm_lkb *lkb;
4010
4011 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4012 if (lkb)
4013 return lkb;
4014 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4015 if (lkb)
4016 return lkb;
4017 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4018 if (lkb)
4019 return lkb;
4020 return NULL;
4021}
4022
4023static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4024 struct dlm_rsb *r, struct dlm_rcom *rc)
4025{
4026 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4027 int lvblen;
4028
4029 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4030 lkb->lkb_ownpid = rl->rl_ownpid;
4031 lkb->lkb_remid = rl->rl_lkid;
4032 lkb->lkb_exflags = rl->rl_exflags;
4033 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
4034 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4035 lkb->lkb_lvbseq = rl->rl_lvbseq;
4036 lkb->lkb_rqmode = rl->rl_rqmode;
4037 lkb->lkb_grmode = rl->rl_grmode;
4038 /* don't set lkb_status because add_lkb wants to itself */
4039
4040 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
4041 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
4042
David Teiglande7fd4172006-01-18 09:30:29 +00004043 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4044 lkb->lkb_lvbptr = allocate_lvb(ls);
4045 if (!lkb->lkb_lvbptr)
4046 return -ENOMEM;
4047 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4048 sizeof(struct rcom_lock);
4049 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4050 }
4051
4052 /* Conversions between PR and CW (middle modes) need special handling.
4053 The real granted mode of these converting locks cannot be determined
4054 until all locks have been rebuilt on the rsb (recover_conversion) */
4055
4056 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
4057 rl->rl_status = DLM_LKSTS_CONVERT;
4058 lkb->lkb_grmode = DLM_LOCK_IV;
4059 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4060 }
4061
4062 return 0;
4063}
4064
4065/* This lkb may have been recovered in a previous aborted recovery so we need
4066 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4067 If so we just send back a standard reply. If not, we create a new lkb with
4068 the given values and send back our lkid. We send back our lkid by sending
4069 back the rcom_lock struct we got but with the remid field filled in. */
4070
4071int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4072{
4073 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4074 struct dlm_rsb *r;
4075 struct dlm_lkb *lkb;
4076 int error;
4077
4078 if (rl->rl_parent_lkid) {
4079 error = -EOPNOTSUPP;
4080 goto out;
4081 }
4082
4083 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
4084 if (error)
4085 goto out;
4086
4087 lock_rsb(r);
4088
4089 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
4090 if (lkb) {
4091 error = -EEXIST;
4092 goto out_remid;
4093 }
4094
4095 error = create_lkb(ls, &lkb);
4096 if (error)
4097 goto out_unlock;
4098
4099 error = receive_rcom_lock_args(ls, lkb, r, rc);
4100 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05004101 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00004102 goto out_unlock;
4103 }
4104
4105 attach_lkb(r, lkb);
4106 add_lkb(r, lkb, rl->rl_status);
4107 error = 0;
4108
4109 out_remid:
4110 /* this is the new value returned to the lock holder for
4111 saving in its process-copy lkb */
4112 rl->rl_remid = lkb->lkb_id;
4113
4114 out_unlock:
4115 unlock_rsb(r);
4116 put_rsb(r);
4117 out:
4118 if (error)
4119 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
4120 rl->rl_result = error;
4121 return error;
4122}
4123
4124int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4125{
4126 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4127 struct dlm_rsb *r;
4128 struct dlm_lkb *lkb;
4129 int error;
4130
4131 error = find_lkb(ls, rl->rl_lkid, &lkb);
4132 if (error) {
4133 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
4134 return error;
4135 }
4136
4137 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4138
4139 error = rl->rl_result;
4140
4141 r = lkb->lkb_resource;
4142 hold_rsb(r);
4143 lock_rsb(r);
4144
4145 switch (error) {
David Teiglanddc200a82006-12-13 10:36:37 -06004146 case -EBADR:
4147 /* There's a chance the new master received our lock before
4148 dlm_recover_master_reply(), this wouldn't happen if we did
4149 a barrier between recover_masters and recover_locks. */
4150 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4151 (unsigned long)r, r->res_name);
4152 dlm_send_rcom_lock(r, lkb);
4153 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00004154 case -EEXIST:
4155 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4156 /* fall through */
4157 case 0:
4158 lkb->lkb_remid = rl->rl_remid;
4159 break;
4160 default:
4161 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4162 error, lkb->lkb_id);
4163 }
4164
4165 /* an ack for dlm_recover_locks() which waits for replies from
4166 all the locks it sends to new masters */
4167 dlm_recovered_lock(r);
David Teiglanddc200a82006-12-13 10:36:37 -06004168 out:
David Teiglande7fd4172006-01-18 09:30:29 +00004169 unlock_rsb(r);
4170 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05004171 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00004172
4173 return 0;
4174}
4175
David Teigland597d0ca2006-07-12 16:44:04 -05004176int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4177 int mode, uint32_t flags, void *name, unsigned int namelen,
David Teiglandd7db9232007-05-18 09:00:32 -05004178 unsigned long timeout_cs)
David Teigland597d0ca2006-07-12 16:44:04 -05004179{
4180 struct dlm_lkb *lkb;
4181 struct dlm_args args;
4182 int error;
4183
David Teigland85e86ed2007-05-18 08:58:15 -05004184 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004185
4186 error = create_lkb(ls, &lkb);
4187 if (error) {
4188 kfree(ua);
4189 goto out;
4190 }
4191
4192 if (flags & DLM_LKF_VALBLK) {
David Teigland62a0f622007-01-31 13:25:00 -06004193 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
David Teigland597d0ca2006-07-12 16:44:04 -05004194 if (!ua->lksb.sb_lvbptr) {
4195 kfree(ua);
4196 __put_lkb(ls, lkb);
4197 error = -ENOMEM;
4198 goto out;
4199 }
4200 }
4201
4202 /* After ua is attached to lkb it will be freed by free_lkb().
4203 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4204 lock and that lkb_astparam is the dlm_user_args structure. */
4205
David Teiglandd7db9232007-05-18 09:00:32 -05004206 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
David Teigland32f105a2006-08-23 16:07:31 -04004207 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
David Teigland597d0ca2006-07-12 16:44:04 -05004208 lkb->lkb_flags |= DLM_IFL_USER;
4209 ua->old_mode = DLM_LOCK_IV;
4210
4211 if (error) {
4212 __put_lkb(ls, lkb);
4213 goto out;
4214 }
4215
4216 error = request_lock(ls, lkb, name, namelen, &args);
4217
4218 switch (error) {
4219 case 0:
4220 break;
4221 case -EINPROGRESS:
4222 error = 0;
4223 break;
4224 case -EAGAIN:
4225 error = 0;
4226 /* fall through */
4227 default:
4228 __put_lkb(ls, lkb);
4229 goto out;
4230 }
4231
4232 /* add this new lkb to the per-process list of locks */
4233 spin_lock(&ua->proc->locks_spin);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004234 hold_lkb(lkb);
David Teigland597d0ca2006-07-12 16:44:04 -05004235 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4236 spin_unlock(&ua->proc->locks_spin);
4237 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004238 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004239 return error;
4240}
4241
4242int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
David Teiglandd7db9232007-05-18 09:00:32 -05004243 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4244 unsigned long timeout_cs)
David Teigland597d0ca2006-07-12 16:44:04 -05004245{
4246 struct dlm_lkb *lkb;
4247 struct dlm_args args;
4248 struct dlm_user_args *ua;
4249 int error;
4250
David Teigland85e86ed2007-05-18 08:58:15 -05004251 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004252
4253 error = find_lkb(ls, lkid, &lkb);
4254 if (error)
4255 goto out;
4256
4257 /* user can change the params on its lock when it converts it, or
4258 add an lvb that didn't exist before */
4259
4260 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4261
4262 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
David Teigland62a0f622007-01-31 13:25:00 -06004263 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
David Teigland597d0ca2006-07-12 16:44:04 -05004264 if (!ua->lksb.sb_lvbptr) {
4265 error = -ENOMEM;
4266 goto out_put;
4267 }
4268 }
4269 if (lvb_in && ua->lksb.sb_lvbptr)
4270 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4271
David Teiglandd7db9232007-05-18 09:00:32 -05004272 ua->xid = ua_tmp->xid;
David Teigland597d0ca2006-07-12 16:44:04 -05004273 ua->castparam = ua_tmp->castparam;
4274 ua->castaddr = ua_tmp->castaddr;
4275 ua->bastparam = ua_tmp->bastparam;
4276 ua->bastaddr = ua_tmp->bastaddr;
Patrick Caulfield10948eb2006-08-23 09:49:31 +01004277 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004278 ua->old_mode = lkb->lkb_grmode;
4279
David Teiglandd7db9232007-05-18 09:00:32 -05004280 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4281 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
David Teigland597d0ca2006-07-12 16:44:04 -05004282 if (error)
4283 goto out_put;
4284
4285 error = convert_lock(ls, lkb, &args);
4286
4287 if (error == -EINPROGRESS || error == -EAGAIN)
4288 error = 0;
4289 out_put:
4290 dlm_put_lkb(lkb);
4291 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004292 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004293 kfree(ua_tmp);
4294 return error;
4295}
4296
4297int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4298 uint32_t flags, uint32_t lkid, char *lvb_in)
4299{
4300 struct dlm_lkb *lkb;
4301 struct dlm_args args;
4302 struct dlm_user_args *ua;
4303 int error;
4304
David Teigland85e86ed2007-05-18 08:58:15 -05004305 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004306
4307 error = find_lkb(ls, lkid, &lkb);
4308 if (error)
4309 goto out;
4310
4311 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4312
4313 if (lvb_in && ua->lksb.sb_lvbptr)
4314 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4315 ua->castparam = ua_tmp->castparam;
Patrick Caulfieldcc346d52006-08-08 10:34:40 -04004316 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004317
4318 error = set_unlock_args(flags, ua, &args);
4319 if (error)
4320 goto out_put;
4321
4322 error = unlock_lock(ls, lkb, &args);
4323
4324 if (error == -DLM_EUNLOCK)
4325 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004326 /* from validate_unlock_args() */
4327 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4328 error = 0;
David Teigland597d0ca2006-07-12 16:44:04 -05004329 if (error)
4330 goto out_put;
4331
4332 spin_lock(&ua->proc->locks_spin);
David Teiglanda1bc86e2007-01-15 10:34:52 -06004333 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4334 if (!list_empty(&lkb->lkb_ownqueue))
4335 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
David Teigland597d0ca2006-07-12 16:44:04 -05004336 spin_unlock(&ua->proc->locks_spin);
David Teigland597d0ca2006-07-12 16:44:04 -05004337 out_put:
4338 dlm_put_lkb(lkb);
4339 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004340 dlm_unlock_recovery(ls);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004341 kfree(ua_tmp);
David Teigland597d0ca2006-07-12 16:44:04 -05004342 return error;
4343}
4344
4345int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4346 uint32_t flags, uint32_t lkid)
4347{
4348 struct dlm_lkb *lkb;
4349 struct dlm_args args;
4350 struct dlm_user_args *ua;
4351 int error;
4352
David Teigland85e86ed2007-05-18 08:58:15 -05004353 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004354
4355 error = find_lkb(ls, lkid, &lkb);
4356 if (error)
4357 goto out;
4358
4359 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4360 ua->castparam = ua_tmp->castparam;
Patrick Caulfieldc059f702006-08-23 10:24:03 +01004361 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004362
4363 error = set_unlock_args(flags, ua, &args);
4364 if (error)
4365 goto out_put;
4366
4367 error = cancel_lock(ls, lkb, &args);
4368
4369 if (error == -DLM_ECANCEL)
4370 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004371 /* from validate_unlock_args() */
4372 if (error == -EBUSY)
4373 error = 0;
David Teigland597d0ca2006-07-12 16:44:04 -05004374 out_put:
4375 dlm_put_lkb(lkb);
4376 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004377 dlm_unlock_recovery(ls);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004378 kfree(ua_tmp);
David Teigland597d0ca2006-07-12 16:44:04 -05004379 return error;
4380}
4381
David Teiglandef0c2bb2007-03-28 09:56:46 -05004382/* lkb's that are removed from the waiters list by revert are just left on the
4383 orphans list with the granted orphan locks, to be freed by purge */
4384
David Teigland597d0ca2006-07-12 16:44:04 -05004385static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4386{
4387 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004388 struct dlm_args args;
4389 int error;
David Teigland597d0ca2006-07-12 16:44:04 -05004390
David Teiglandef0c2bb2007-03-28 09:56:46 -05004391 hold_lkb(lkb);
4392 mutex_lock(&ls->ls_orphans_mutex);
4393 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4394 mutex_unlock(&ls->ls_orphans_mutex);
David Teigland597d0ca2006-07-12 16:44:04 -05004395
David Teiglandef0c2bb2007-03-28 09:56:46 -05004396 set_unlock_args(0, ua, &args);
4397
4398 error = cancel_lock(ls, lkb, &args);
4399 if (error == -DLM_ECANCEL)
4400 error = 0;
4401 return error;
David Teigland597d0ca2006-07-12 16:44:04 -05004402}
4403
4404/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4405 Regardless of what rsb queue the lock is on, it's removed and freed. */
4406
4407static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4408{
4409 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4410 struct dlm_args args;
4411 int error;
4412
David Teigland597d0ca2006-07-12 16:44:04 -05004413 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4414
4415 error = unlock_lock(ls, lkb, &args);
4416 if (error == -DLM_EUNLOCK)
4417 error = 0;
4418 return error;
4419}
4420
David Teiglandef0c2bb2007-03-28 09:56:46 -05004421/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4422 (which does lock_rsb) due to deadlock with receiving a message that does
4423 lock_rsb followed by dlm_user_add_ast() */
4424
4425static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4426 struct dlm_user_proc *proc)
4427{
4428 struct dlm_lkb *lkb = NULL;
4429
4430 mutex_lock(&ls->ls_clear_proc_locks);
4431 if (list_empty(&proc->locks))
4432 goto out;
4433
4434 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4435 list_del_init(&lkb->lkb_ownqueue);
4436
4437 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4438 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4439 else
4440 lkb->lkb_flags |= DLM_IFL_DEAD;
4441 out:
4442 mutex_unlock(&ls->ls_clear_proc_locks);
4443 return lkb;
4444}
4445
David Teigland597d0ca2006-07-12 16:44:04 -05004446/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4447 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4448 which we clear here. */
4449
4450/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4451 list, and no more device_writes should add lkb's to proc->locks list; so we
4452 shouldn't need to take asts_spin or locks_spin here. this assumes that
4453 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4454 them ourself. */
4455
4456void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4457{
4458 struct dlm_lkb *lkb, *safe;
4459
David Teigland85e86ed2007-05-18 08:58:15 -05004460 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004461
David Teiglandef0c2bb2007-03-28 09:56:46 -05004462 while (1) {
4463 lkb = del_proc_lock(ls, proc);
4464 if (!lkb)
4465 break;
4466 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
David Teigland597d0ca2006-07-12 16:44:04 -05004467 orphan_proc_lock(ls, lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004468 else
David Teigland597d0ca2006-07-12 16:44:04 -05004469 unlock_proc_lock(ls, lkb);
David Teigland597d0ca2006-07-12 16:44:04 -05004470
4471 /* this removes the reference for the proc->locks list
4472 added by dlm_user_request, it may result in the lkb
4473 being freed */
4474
4475 dlm_put_lkb(lkb);
4476 }
David Teiglanda1bc86e2007-01-15 10:34:52 -06004477
David Teiglandef0c2bb2007-03-28 09:56:46 -05004478 mutex_lock(&ls->ls_clear_proc_locks);
4479
David Teiglanda1bc86e2007-01-15 10:34:52 -06004480 /* in-progress unlocks */
4481 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4482 list_del_init(&lkb->lkb_ownqueue);
4483 lkb->lkb_flags |= DLM_IFL_DEAD;
4484 dlm_put_lkb(lkb);
4485 }
4486
4487 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4488 list_del(&lkb->lkb_astqueue);
4489 dlm_put_lkb(lkb);
4490 }
4491
David Teigland597d0ca2006-07-12 16:44:04 -05004492 mutex_unlock(&ls->ls_clear_proc_locks);
David Teigland85e86ed2007-05-18 08:58:15 -05004493 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004494}
David Teiglanda1bc86e2007-01-15 10:34:52 -06004495
David Teigland84991372007-03-30 15:02:40 -05004496static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4497{
4498 struct dlm_lkb *lkb, *safe;
4499
4500 while (1) {
4501 lkb = NULL;
4502 spin_lock(&proc->locks_spin);
4503 if (!list_empty(&proc->locks)) {
4504 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4505 lkb_ownqueue);
4506 list_del_init(&lkb->lkb_ownqueue);
4507 }
4508 spin_unlock(&proc->locks_spin);
4509
4510 if (!lkb)
4511 break;
4512
4513 lkb->lkb_flags |= DLM_IFL_DEAD;
4514 unlock_proc_lock(ls, lkb);
4515 dlm_put_lkb(lkb); /* ref from proc->locks list */
4516 }
4517
4518 spin_lock(&proc->locks_spin);
4519 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4520 list_del_init(&lkb->lkb_ownqueue);
4521 lkb->lkb_flags |= DLM_IFL_DEAD;
4522 dlm_put_lkb(lkb);
4523 }
4524 spin_unlock(&proc->locks_spin);
4525
4526 spin_lock(&proc->asts_spin);
4527 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4528 list_del(&lkb->lkb_astqueue);
4529 dlm_put_lkb(lkb);
4530 }
4531 spin_unlock(&proc->asts_spin);
4532}
4533
4534/* pid of 0 means purge all orphans */
4535
4536static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4537{
4538 struct dlm_lkb *lkb, *safe;
4539
4540 mutex_lock(&ls->ls_orphans_mutex);
4541 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4542 if (pid && lkb->lkb_ownpid != pid)
4543 continue;
4544 unlock_proc_lock(ls, lkb);
4545 list_del_init(&lkb->lkb_ownqueue);
4546 dlm_put_lkb(lkb);
4547 }
4548 mutex_unlock(&ls->ls_orphans_mutex);
4549}
4550
4551static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4552{
4553 struct dlm_message *ms;
4554 struct dlm_mhandle *mh;
4555 int error;
4556
4557 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4558 DLM_MSG_PURGE, &ms, &mh);
4559 if (error)
4560 return error;
4561 ms->m_nodeid = nodeid;
4562 ms->m_pid = pid;
4563
4564 return send_message(mh, ms);
4565}
4566
4567int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4568 int nodeid, int pid)
4569{
4570 int error = 0;
4571
4572 if (nodeid != dlm_our_nodeid()) {
4573 error = send_purge(ls, nodeid, pid);
4574 } else {
David Teigland85e86ed2007-05-18 08:58:15 -05004575 dlm_lock_recovery(ls);
David Teigland84991372007-03-30 15:02:40 -05004576 if (pid == current->pid)
4577 purge_proc_locks(ls, proc);
4578 else
4579 do_purge(ls, nodeid, pid);
David Teigland85e86ed2007-05-18 08:58:15 -05004580 dlm_unlock_recovery(ls);
David Teigland84991372007-03-30 15:02:40 -05004581 }
4582 return error;
4583}
4584