blob: b27f4dd36f89efd132315347ddbd74cfea1c1b5a [file] [log] [blame]
Philipp Reisnerb411b362009-09-25 16:07:19 -07001/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
Philipp Reisnerb411b362009-09-25 16:07:19 -070026#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
Philipp Reisnerb411b362009-09-25 16:07:19 -070031#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/smp_lock.h>
40#include <linux/pkt_sched.h>
41#define __KERNEL_SYSCALLS__
42#include <linux/unistd.h>
43#include <linux/vmalloc.h>
44#include <linux/random.h>
45#include <linux/mm.h>
46#include <linux/string.h>
47#include <linux/scatterlist.h>
48#include "drbd_int.h"
Philipp Reisnerb411b362009-09-25 16:07:19 -070049#include "drbd_req.h"
50
51#include "drbd_vli.h"
52
53struct flush_work {
54 struct drbd_work w;
55 struct drbd_epoch *epoch;
56};
57
58enum finish_epoch {
59 FE_STILL_LIVE,
60 FE_DESTROYED,
61 FE_RECYCLED,
62};
63
64static int drbd_do_handshake(struct drbd_conf *mdev);
65static int drbd_do_auth(struct drbd_conf *mdev);
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
70static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71{
72 struct drbd_epoch *prev;
73 spin_lock(&mdev->epoch_lock);
74 prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75 if (prev == epoch || prev == mdev->current_epoch)
76 prev = NULL;
77 spin_unlock(&mdev->epoch_lock);
78 return prev;
79}
80
81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82
83static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
84{
85 struct page *page = NULL;
86
87 /* Yes, testing drbd_pp_vacant outside the lock is racy.
88 * So what. It saves a spin_lock. */
89 if (drbd_pp_vacant > 0) {
90 spin_lock(&drbd_pp_lock);
91 page = drbd_pp_pool;
92 if (page) {
93 drbd_pp_pool = (struct page *)page_private(page);
94 set_page_private(page, 0); /* just to be polite */
95 drbd_pp_vacant--;
96 }
97 spin_unlock(&drbd_pp_lock);
98 }
99 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
100 * "criss-cross" setup, that might cause write-out on some other DRBD,
101 * which in turn might block on the other node at this very place. */
102 if (!page)
103 page = alloc_page(GFP_TRY);
104 if (page)
105 atomic_inc(&mdev->pp_in_use);
106 return page;
107}
108
109/* kick lower level device, if we have more than (arbitrary number)
110 * reference counts on it, which typically are locally submitted io
111 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */
112static void maybe_kick_lo(struct drbd_conf *mdev)
113{
114 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
115 drbd_kick_lo(mdev);
116}
117
118static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
119{
120 struct drbd_epoch_entry *e;
121 struct list_head *le, *tle;
122
123 /* The EEs are always appended to the end of the list. Since
124 they are sent in order over the wire, they have to finish
125 in order. As soon as we see the first not finished we can
126 stop to examine the list... */
127
128 list_for_each_safe(le, tle, &mdev->net_ee) {
129 e = list_entry(le, struct drbd_epoch_entry, w.list);
130 if (drbd_bio_has_active_page(e->private_bio))
131 break;
132 list_move(le, to_be_freed);
133 }
134}
135
136static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
137{
138 LIST_HEAD(reclaimed);
139 struct drbd_epoch_entry *e, *t;
140
141 maybe_kick_lo(mdev);
142 spin_lock_irq(&mdev->req_lock);
143 reclaim_net_ee(mdev, &reclaimed);
144 spin_unlock_irq(&mdev->req_lock);
145
146 list_for_each_entry_safe(e, t, &reclaimed, w.list)
147 drbd_free_ee(mdev, e);
148}
149
150/**
151 * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
152 * @mdev: DRBD device.
153 * @retry: whether or not to retry allocation forever (or until signalled)
154 *
155 * Tries to allocate a page, first from our own page pool, then from the
156 * kernel, unless this allocation would exceed the max_buffers setting.
157 * If @retry is non-zero, retry until DRBD frees a page somewhere else.
158 */
159static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
160{
161 struct page *page = NULL;
162 DEFINE_WAIT(wait);
163
164 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
165 page = drbd_pp_first_page_or_try_alloc(mdev);
166 if (page)
167 return page;
168 }
169
170 for (;;) {
171 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
172
173 drbd_kick_lo_and_reclaim_net(mdev);
174
175 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
176 page = drbd_pp_first_page_or_try_alloc(mdev);
177 if (page)
178 break;
179 }
180
181 if (!retry)
182 break;
183
184 if (signal_pending(current)) {
185 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
186 break;
187 }
188
189 schedule();
190 }
191 finish_wait(&drbd_pp_wait, &wait);
192
193 return page;
194}
195
196/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
197 * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
198static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
199{
200 int free_it;
201
202 spin_lock(&drbd_pp_lock);
203 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
204 free_it = 1;
205 } else {
206 set_page_private(page, (unsigned long)drbd_pp_pool);
207 drbd_pp_pool = page;
208 drbd_pp_vacant++;
209 free_it = 0;
210 }
211 spin_unlock(&drbd_pp_lock);
212
213 atomic_dec(&mdev->pp_in_use);
214
215 if (free_it)
216 __free_page(page);
217
218 wake_up(&drbd_pp_wait);
219}
220
221static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
222{
223 struct page *p_to_be_freed = NULL;
224 struct page *page;
225 struct bio_vec *bvec;
226 int i;
227
228 spin_lock(&drbd_pp_lock);
229 __bio_for_each_segment(bvec, bio, i, 0) {
230 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
231 set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
232 p_to_be_freed = bvec->bv_page;
233 } else {
234 set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
235 drbd_pp_pool = bvec->bv_page;
236 drbd_pp_vacant++;
237 }
238 }
239 spin_unlock(&drbd_pp_lock);
240 atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
241
242 while (p_to_be_freed) {
243 page = p_to_be_freed;
244 p_to_be_freed = (struct page *)page_private(page);
245 set_page_private(page, 0); /* just to be polite */
246 put_page(page);
247 }
248
249 wake_up(&drbd_pp_wait);
250}
251
252/*
253You need to hold the req_lock:
254 _drbd_wait_ee_list_empty()
255
256You must not have the req_lock:
257 drbd_free_ee()
258 drbd_alloc_ee()
259 drbd_init_ee()
260 drbd_release_ee()
261 drbd_ee_fix_bhs()
262 drbd_process_done_ee()
263 drbd_clear_done_ee()
264 drbd_wait_ee_list_empty()
265*/
266
267struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
268 u64 id,
269 sector_t sector,
270 unsigned int data_size,
271 gfp_t gfp_mask) __must_hold(local)
272{
273 struct request_queue *q;
274 struct drbd_epoch_entry *e;
275 struct page *page;
276 struct bio *bio;
277 unsigned int ds;
278
279 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
280 return NULL;
281
282 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
283 if (!e) {
284 if (!(gfp_mask & __GFP_NOWARN))
285 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
286 return NULL;
287 }
288
289 bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
290 if (!bio) {
291 if (!(gfp_mask & __GFP_NOWARN))
292 dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
293 goto fail1;
294 }
295
296 bio->bi_bdev = mdev->ldev->backing_bdev;
297 bio->bi_sector = sector;
298
299 ds = data_size;
300 while (ds) {
301 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
302 if (!page) {
303 if (!(gfp_mask & __GFP_NOWARN))
304 dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
305 goto fail2;
306 }
307 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
308 drbd_pp_free(mdev, page);
309 dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
310 "data_size=%u,ds=%u) failed\n",
311 (unsigned long long)sector, data_size, ds);
312
313 q = bdev_get_queue(bio->bi_bdev);
314 if (q->merge_bvec_fn) {
315 struct bvec_merge_data bvm = {
316 .bi_bdev = bio->bi_bdev,
317 .bi_sector = bio->bi_sector,
318 .bi_size = bio->bi_size,
319 .bi_rw = bio->bi_rw,
320 };
321 int l = q->merge_bvec_fn(q, &bvm,
322 &bio->bi_io_vec[bio->bi_vcnt]);
323 dev_err(DEV, "merge_bvec_fn() = %d\n", l);
324 }
325
326 /* dump more of the bio. */
327 dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
328 dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
329 dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
330 dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
331
332 goto fail2;
333 break;
334 }
335 ds -= min_t(int, ds, PAGE_SIZE);
336 }
337
338 D_ASSERT(data_size == bio->bi_size);
339
340 bio->bi_private = e;
341 e->mdev = mdev;
342 e->sector = sector;
343 e->size = bio->bi_size;
344
345 e->private_bio = bio;
346 e->block_id = id;
347 INIT_HLIST_NODE(&e->colision);
348 e->epoch = NULL;
349 e->flags = 0;
350
Philipp Reisnerb411b362009-09-25 16:07:19 -0700351 return e;
352
353 fail2:
354 drbd_pp_free_bio_pages(mdev, bio);
355 bio_put(bio);
356 fail1:
357 mempool_free(e, drbd_ee_mempool);
358
359 return NULL;
360}
361
362void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
363{
364 struct bio *bio = e->private_bio;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700365 drbd_pp_free_bio_pages(mdev, bio);
366 bio_put(bio);
367 D_ASSERT(hlist_unhashed(&e->colision));
368 mempool_free(e, drbd_ee_mempool);
369}
370
371int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
372{
373 LIST_HEAD(work_list);
374 struct drbd_epoch_entry *e, *t;
375 int count = 0;
376
377 spin_lock_irq(&mdev->req_lock);
378 list_splice_init(list, &work_list);
379 spin_unlock_irq(&mdev->req_lock);
380
381 list_for_each_entry_safe(e, t, &work_list, w.list) {
382 drbd_free_ee(mdev, e);
383 count++;
384 }
385 return count;
386}
387
388
389/*
390 * This function is called from _asender only_
391 * but see also comments in _req_mod(,barrier_acked)
392 * and receive_Barrier.
393 *
394 * Move entries from net_ee to done_ee, if ready.
395 * Grab done_ee, call all callbacks, free the entries.
396 * The callbacks typically send out ACKs.
397 */
398static int drbd_process_done_ee(struct drbd_conf *mdev)
399{
400 LIST_HEAD(work_list);
401 LIST_HEAD(reclaimed);
402 struct drbd_epoch_entry *e, *t;
403 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
404
405 spin_lock_irq(&mdev->req_lock);
406 reclaim_net_ee(mdev, &reclaimed);
407 list_splice_init(&mdev->done_ee, &work_list);
408 spin_unlock_irq(&mdev->req_lock);
409
410 list_for_each_entry_safe(e, t, &reclaimed, w.list)
411 drbd_free_ee(mdev, e);
412
413 /* possible callbacks here:
414 * e_end_block, and e_end_resync_block, e_send_discard_ack.
415 * all ignore the last argument.
416 */
417 list_for_each_entry_safe(e, t, &work_list, w.list) {
Philipp Reisnerb411b362009-09-25 16:07:19 -0700418 /* list_del not necessary, next/prev members not touched */
419 ok = e->w.cb(mdev, &e->w, !ok) && ok;
420 drbd_free_ee(mdev, e);
421 }
422 wake_up(&mdev->ee_wait);
423
424 return ok;
425}
426
427void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
428{
429 DEFINE_WAIT(wait);
430
431 /* avoids spin_lock/unlock
432 * and calling prepare_to_wait in the fast path */
433 while (!list_empty(head)) {
434 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
435 spin_unlock_irq(&mdev->req_lock);
436 drbd_kick_lo(mdev);
437 schedule();
438 finish_wait(&mdev->ee_wait, &wait);
439 spin_lock_irq(&mdev->req_lock);
440 }
441}
442
443void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
444{
445 spin_lock_irq(&mdev->req_lock);
446 _drbd_wait_ee_list_empty(mdev, head);
447 spin_unlock_irq(&mdev->req_lock);
448}
449
450/* see also kernel_accept; which is only present since 2.6.18.
451 * also we want to log which part of it failed, exactly */
452static int drbd_accept(struct drbd_conf *mdev, const char **what,
453 struct socket *sock, struct socket **newsock)
454{
455 struct sock *sk = sock->sk;
456 int err = 0;
457
458 *what = "listen";
459 err = sock->ops->listen(sock, 5);
460 if (err < 0)
461 goto out;
462
463 *what = "sock_create_lite";
464 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
465 newsock);
466 if (err < 0)
467 goto out;
468
469 *what = "accept";
470 err = sock->ops->accept(sock, *newsock, 0);
471 if (err < 0) {
472 sock_release(*newsock);
473 *newsock = NULL;
474 goto out;
475 }
476 (*newsock)->ops = sock->ops;
477
478out:
479 return err;
480}
481
482static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
483 void *buf, size_t size, int flags)
484{
485 mm_segment_t oldfs;
486 struct kvec iov = {
487 .iov_base = buf,
488 .iov_len = size,
489 };
490 struct msghdr msg = {
491 .msg_iovlen = 1,
492 .msg_iov = (struct iovec *)&iov,
493 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
494 };
495 int rv;
496
497 oldfs = get_fs();
498 set_fs(KERNEL_DS);
499 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
500 set_fs(oldfs);
501
502 return rv;
503}
504
505static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
506{
507 mm_segment_t oldfs;
508 struct kvec iov = {
509 .iov_base = buf,
510 .iov_len = size,
511 };
512 struct msghdr msg = {
513 .msg_iovlen = 1,
514 .msg_iov = (struct iovec *)&iov,
515 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
516 };
517 int rv;
518
519 oldfs = get_fs();
520 set_fs(KERNEL_DS);
521
522 for (;;) {
523 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
524 if (rv == size)
525 break;
526
527 /* Note:
528 * ECONNRESET other side closed the connection
529 * ERESTARTSYS (on sock) we got a signal
530 */
531
532 if (rv < 0) {
533 if (rv == -ECONNRESET)
534 dev_info(DEV, "sock was reset by peer\n");
535 else if (rv != -ERESTARTSYS)
536 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
537 break;
538 } else if (rv == 0) {
539 dev_info(DEV, "sock was shut down by peer\n");
540 break;
541 } else {
542 /* signal came in, or peer/link went down,
543 * after we read a partial message
544 */
545 /* D_ASSERT(signal_pending(current)); */
546 break;
547 }
548 };
549
550 set_fs(oldfs);
551
552 if (rv != size)
553 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
554
555 return rv;
556}
557
558static struct socket *drbd_try_connect(struct drbd_conf *mdev)
559{
560 const char *what;
561 struct socket *sock;
562 struct sockaddr_in6 src_in6;
563 int err;
564 int disconnect_on_error = 1;
565
566 if (!get_net_conf(mdev))
567 return NULL;
568
569 what = "sock_create_kern";
570 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
571 SOCK_STREAM, IPPROTO_TCP, &sock);
572 if (err < 0) {
573 sock = NULL;
574 goto out;
575 }
576
577 sock->sk->sk_rcvtimeo =
578 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
579
580 /* explicitly bind to the configured IP as source IP
581 * for the outgoing connections.
582 * This is needed for multihomed hosts and to be
583 * able to use lo: interfaces for drbd.
584 * Make sure to use 0 as port number, so linux selects
585 * a free one dynamically.
586 */
587 memcpy(&src_in6, mdev->net_conf->my_addr,
588 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
589 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
590 src_in6.sin6_port = 0;
591 else
592 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
593
594 what = "bind before connect";
595 err = sock->ops->bind(sock,
596 (struct sockaddr *) &src_in6,
597 mdev->net_conf->my_addr_len);
598 if (err < 0)
599 goto out;
600
601 /* connect may fail, peer not yet available.
602 * stay C_WF_CONNECTION, don't go Disconnecting! */
603 disconnect_on_error = 0;
604 what = "connect";
605 err = sock->ops->connect(sock,
606 (struct sockaddr *)mdev->net_conf->peer_addr,
607 mdev->net_conf->peer_addr_len, 0);
608
609out:
610 if (err < 0) {
611 if (sock) {
612 sock_release(sock);
613 sock = NULL;
614 }
615 switch (-err) {
616 /* timeout, busy, signal pending */
617 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
618 case EINTR: case ERESTARTSYS:
619 /* peer not (yet) available, network problem */
620 case ECONNREFUSED: case ENETUNREACH:
621 case EHOSTDOWN: case EHOSTUNREACH:
622 disconnect_on_error = 0;
623 break;
624 default:
625 dev_err(DEV, "%s failed, err = %d\n", what, err);
626 }
627 if (disconnect_on_error)
628 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
629 }
630 put_net_conf(mdev);
631 return sock;
632}
633
634static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
635{
636 int timeo, err;
637 struct socket *s_estab = NULL, *s_listen;
638 const char *what;
639
640 if (!get_net_conf(mdev))
641 return NULL;
642
643 what = "sock_create_kern";
644 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
645 SOCK_STREAM, IPPROTO_TCP, &s_listen);
646 if (err) {
647 s_listen = NULL;
648 goto out;
649 }
650
651 timeo = mdev->net_conf->try_connect_int * HZ;
652 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
653
654 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
655 s_listen->sk->sk_rcvtimeo = timeo;
656 s_listen->sk->sk_sndtimeo = timeo;
657
658 what = "bind before listen";
659 err = s_listen->ops->bind(s_listen,
660 (struct sockaddr *) mdev->net_conf->my_addr,
661 mdev->net_conf->my_addr_len);
662 if (err < 0)
663 goto out;
664
665 err = drbd_accept(mdev, &what, s_listen, &s_estab);
666
667out:
668 if (s_listen)
669 sock_release(s_listen);
670 if (err < 0) {
671 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
672 dev_err(DEV, "%s failed, err = %d\n", what, err);
673 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
674 }
675 }
676 put_net_conf(mdev);
677
678 return s_estab;
679}
680
681static int drbd_send_fp(struct drbd_conf *mdev,
682 struct socket *sock, enum drbd_packets cmd)
683{
684 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
685
686 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
687}
688
689static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
690{
691 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
692 int rr;
693
694 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
695
696 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
697 return be16_to_cpu(h->command);
698
699 return 0xffff;
700}
701
702/**
703 * drbd_socket_okay() - Free the socket if its connection is not okay
704 * @mdev: DRBD device.
705 * @sock: pointer to the pointer to the socket.
706 */
707static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
708{
709 int rr;
710 char tb[4];
711
712 if (!*sock)
713 return FALSE;
714
715 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
716
717 if (rr > 0 || rr == -EAGAIN) {
718 return TRUE;
719 } else {
720 sock_release(*sock);
721 *sock = NULL;
722 return FALSE;
723 }
724}
725
726/*
727 * return values:
728 * 1 yes, we have a valid connection
729 * 0 oops, did not work out, please try again
730 * -1 peer talks different language,
731 * no point in trying again, please go standalone.
732 * -2 We do not have a network config...
733 */
734static int drbd_connect(struct drbd_conf *mdev)
735{
736 struct socket *s, *sock, *msock;
737 int try, h, ok;
738
739 D_ASSERT(!mdev->data.socket);
740
741 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
742 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
743
744 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
745 return -2;
746
747 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
748
749 sock = NULL;
750 msock = NULL;
751
752 do {
753 for (try = 0;;) {
754 /* 3 tries, this should take less than a second! */
755 s = drbd_try_connect(mdev);
756 if (s || ++try >= 3)
757 break;
758 /* give the other side time to call bind() & listen() */
759 __set_current_state(TASK_INTERRUPTIBLE);
760 schedule_timeout(HZ / 10);
761 }
762
763 if (s) {
764 if (!sock) {
765 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
766 sock = s;
767 s = NULL;
768 } else if (!msock) {
769 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
770 msock = s;
771 s = NULL;
772 } else {
773 dev_err(DEV, "Logic error in drbd_connect()\n");
774 goto out_release_sockets;
775 }
776 }
777
778 if (sock && msock) {
779 __set_current_state(TASK_INTERRUPTIBLE);
780 schedule_timeout(HZ / 10);
781 ok = drbd_socket_okay(mdev, &sock);
782 ok = drbd_socket_okay(mdev, &msock) && ok;
783 if (ok)
784 break;
785 }
786
787retry:
788 s = drbd_wait_for_connect(mdev);
789 if (s) {
790 try = drbd_recv_fp(mdev, s);
791 drbd_socket_okay(mdev, &sock);
792 drbd_socket_okay(mdev, &msock);
793 switch (try) {
794 case P_HAND_SHAKE_S:
795 if (sock) {
796 dev_warn(DEV, "initial packet S crossed\n");
797 sock_release(sock);
798 }
799 sock = s;
800 break;
801 case P_HAND_SHAKE_M:
802 if (msock) {
803 dev_warn(DEV, "initial packet M crossed\n");
804 sock_release(msock);
805 }
806 msock = s;
807 set_bit(DISCARD_CONCURRENT, &mdev->flags);
808 break;
809 default:
810 dev_warn(DEV, "Error receiving initial packet\n");
811 sock_release(s);
812 if (random32() & 1)
813 goto retry;
814 }
815 }
816
817 if (mdev->state.conn <= C_DISCONNECTING)
818 goto out_release_sockets;
819 if (signal_pending(current)) {
820 flush_signals(current);
821 smp_rmb();
822 if (get_t_state(&mdev->receiver) == Exiting)
823 goto out_release_sockets;
824 }
825
826 if (sock && msock) {
827 ok = drbd_socket_okay(mdev, &sock);
828 ok = drbd_socket_okay(mdev, &msock) && ok;
829 if (ok)
830 break;
831 }
832 } while (1);
833
834 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
835 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
836
837 sock->sk->sk_allocation = GFP_NOIO;
838 msock->sk->sk_allocation = GFP_NOIO;
839
840 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
841 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
842
843 if (mdev->net_conf->sndbuf_size) {
844 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
845 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
846 }
847
848 if (mdev->net_conf->rcvbuf_size) {
849 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
850 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
851 }
852
853 /* NOT YET ...
854 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856 * first set it to the P_HAND_SHAKE timeout,
857 * which we set to 4x the configured ping_timeout. */
858 sock->sk->sk_sndtimeo =
859 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860
861 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863
864 /* we don't want delays.
865 * we use TCP_CORK where apropriate, though */
866 drbd_tcp_nodelay(sock);
867 drbd_tcp_nodelay(msock);
868
869 mdev->data.socket = sock;
870 mdev->meta.socket = msock;
871 mdev->last_received = jiffies;
872
873 D_ASSERT(mdev->asender.task == NULL);
874
875 h = drbd_do_handshake(mdev);
876 if (h <= 0)
877 return h;
878
879 if (mdev->cram_hmac_tfm) {
880 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
Johannes Thomab10d96c2010-01-07 16:02:50 +0100881 switch (drbd_do_auth(mdev)) {
882 case -1:
Philipp Reisnerb411b362009-09-25 16:07:19 -0700883 dev_err(DEV, "Authentication of peer failed\n");
884 return -1;
Johannes Thomab10d96c2010-01-07 16:02:50 +0100885 case 0:
886 dev_err(DEV, "Authentication of peer failed, trying again.\n");
887 return 0;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700888 }
889 }
890
891 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
892 return 0;
893
894 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
895 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
896
897 atomic_set(&mdev->packet_seq, 0);
898 mdev->peer_seq = 0;
899
900 drbd_thread_start(&mdev->asender);
901
Philipp Reisner7e2455c2010-04-22 14:50:23 +0200902 if (!drbd_send_protocol(mdev))
903 return -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700904 drbd_send_sync_param(mdev, &mdev->sync_conf);
Philipp Reisnere89b5912010-03-24 17:11:33 +0100905 drbd_send_sizes(mdev, 0, 0);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700906 drbd_send_uuids(mdev);
907 drbd_send_state(mdev);
908 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
909 clear_bit(RESIZE_PENDING, &mdev->flags);
910
911 return 1;
912
913out_release_sockets:
914 if (sock)
915 sock_release(sock);
916 if (msock)
917 sock_release(msock);
918 return -1;
919}
920
921static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
922{
923 int r;
924
925 r = drbd_recv(mdev, h, sizeof(*h));
926
927 if (unlikely(r != sizeof(*h))) {
928 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
929 return FALSE;
930 };
931 h->command = be16_to_cpu(h->command);
932 h->length = be16_to_cpu(h->length);
933 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
934 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
935 (long)be32_to_cpu(h->magic),
936 h->command, h->length);
937 return FALSE;
938 }
939 mdev->last_received = jiffies;
940
941 return TRUE;
942}
943
944static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
945{
946 int rv;
947
948 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
Dmitry Monakhovfbd9b092010-04-28 17:55:06 +0400949 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
950 NULL, BLKDEV_IFL_WAIT);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700951 if (rv) {
952 dev_err(DEV, "local disk flush failed with status %d\n", rv);
953 /* would rather check on EOPNOTSUPP, but that is not reliable.
954 * don't try again for ANY return value != 0
955 * if (rv == -EOPNOTSUPP) */
956 drbd_bump_write_ordering(mdev, WO_drain_io);
957 }
958 put_ldev(mdev);
959 }
960
961 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
962}
963
964static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
965{
966 struct flush_work *fw = (struct flush_work *)w;
967 struct drbd_epoch *epoch = fw->epoch;
968
969 kfree(w);
970
971 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
972 drbd_flush_after_epoch(mdev, epoch);
973
974 drbd_may_finish_epoch(mdev, epoch, EV_PUT |
975 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
976
977 return 1;
978}
979
980/**
981 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
982 * @mdev: DRBD device.
983 * @epoch: Epoch object.
984 * @ev: Epoch event.
985 */
986static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
987 struct drbd_epoch *epoch,
988 enum epoch_event ev)
989{
990 int finish, epoch_size;
991 struct drbd_epoch *next_epoch;
992 int schedule_flush = 0;
993 enum finish_epoch rv = FE_STILL_LIVE;
994
995 spin_lock(&mdev->epoch_lock);
996 do {
997 next_epoch = NULL;
998 finish = 0;
999
1000 epoch_size = atomic_read(&epoch->epoch_size);
1001
1002 switch (ev & ~EV_CLEANUP) {
1003 case EV_PUT:
1004 atomic_dec(&epoch->active);
1005 break;
1006 case EV_GOT_BARRIER_NR:
1007 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1008
1009 /* Special case: If we just switched from WO_bio_barrier to
1010 WO_bdev_flush we should not finish the current epoch */
1011 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1012 mdev->write_ordering != WO_bio_barrier &&
1013 epoch == mdev->current_epoch)
1014 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1015 break;
1016 case EV_BARRIER_DONE:
1017 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1018 break;
1019 case EV_BECAME_LAST:
1020 /* nothing to do*/
1021 break;
1022 }
1023
Philipp Reisnerb411b362009-09-25 16:07:19 -07001024 if (epoch_size != 0 &&
1025 atomic_read(&epoch->active) == 0 &&
1026 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1027 epoch->list.prev == &mdev->current_epoch->list &&
1028 !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1029 /* Nearly all conditions are met to finish that epoch... */
1030 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1031 mdev->write_ordering == WO_none ||
1032 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1033 ev & EV_CLEANUP) {
1034 finish = 1;
1035 set_bit(DE_IS_FINISHING, &epoch->flags);
1036 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1037 mdev->write_ordering == WO_bio_barrier) {
1038 atomic_inc(&epoch->active);
1039 schedule_flush = 1;
1040 }
1041 }
1042 if (finish) {
1043 if (!(ev & EV_CLEANUP)) {
1044 spin_unlock(&mdev->epoch_lock);
1045 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1046 spin_lock(&mdev->epoch_lock);
1047 }
1048 dec_unacked(mdev);
1049
1050 if (mdev->current_epoch != epoch) {
1051 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1052 list_del(&epoch->list);
1053 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1054 mdev->epochs--;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001055 kfree(epoch);
1056
1057 if (rv == FE_STILL_LIVE)
1058 rv = FE_DESTROYED;
1059 } else {
1060 epoch->flags = 0;
1061 atomic_set(&epoch->epoch_size, 0);
1062 /* atomic_set(&epoch->active, 0); is alrady zero */
1063 if (rv == FE_STILL_LIVE)
1064 rv = FE_RECYCLED;
1065 }
1066 }
1067
1068 if (!next_epoch)
1069 break;
1070
1071 epoch = next_epoch;
1072 } while (1);
1073
1074 spin_unlock(&mdev->epoch_lock);
1075
1076 if (schedule_flush) {
1077 struct flush_work *fw;
1078 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1079 if (fw) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001080 fw->w.cb = w_flush;
1081 fw->epoch = epoch;
1082 drbd_queue_work(&mdev->data.work, &fw->w);
1083 } else {
1084 dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1085 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1086 /* That is not a recursion, only one level */
1087 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1088 drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1089 }
1090 }
1091
1092 return rv;
1093}
1094
1095/**
1096 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1097 * @mdev: DRBD device.
1098 * @wo: Write ordering method to try.
1099 */
1100void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1101{
1102 enum write_ordering_e pwo;
1103 static char *write_ordering_str[] = {
1104 [WO_none] = "none",
1105 [WO_drain_io] = "drain",
1106 [WO_bdev_flush] = "flush",
1107 [WO_bio_barrier] = "barrier",
1108 };
1109
1110 pwo = mdev->write_ordering;
1111 wo = min(pwo, wo);
1112 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1113 wo = WO_bdev_flush;
1114 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1115 wo = WO_drain_io;
1116 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1117 wo = WO_none;
1118 mdev->write_ordering = wo;
1119 if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1120 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1121}
1122
1123/**
1124 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1125 * @mdev: DRBD device.
1126 * @w: work object.
1127 * @cancel: The connection will be closed anyways (unused in this callback)
1128 */
1129int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1130{
1131 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1132 struct bio *bio = e->private_bio;
1133
1134 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1135 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1136 so that we can finish that epoch in drbd_may_finish_epoch().
1137 That is necessary if we already have a long chain of Epochs, before
1138 we realize that BIO_RW_BARRIER is actually not supported */
1139
1140 /* As long as the -ENOTSUPP on the barrier is reported immediately
1141 that will never trigger. If it is reported late, we will just
1142 print that warning and continue correctly for all future requests
1143 with WO_bdev_flush */
1144 if (previous_epoch(mdev, e->epoch))
1145 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1146
1147 /* prepare bio for re-submit,
1148 * re-init volatile members */
1149 /* we still have a local reference,
1150 * get_ldev was done in receive_Data. */
1151 bio->bi_bdev = mdev->ldev->backing_bdev;
1152 bio->bi_sector = e->sector;
1153 bio->bi_size = e->size;
1154 bio->bi_idx = 0;
1155
1156 bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1157 bio->bi_flags |= 1 << BIO_UPTODATE;
1158
1159 /* don't know whether this is necessary: */
1160 bio->bi_phys_segments = 0;
1161 bio->bi_next = NULL;
1162
1163 /* these should be unchanged: */
1164 /* bio->bi_end_io = drbd_endio_write_sec; */
1165 /* bio->bi_vcnt = whatever; */
1166
1167 e->w.cb = e_end_block;
1168
1169 /* This is no longer a barrier request. */
1170 bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1171
1172 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1173
1174 return 1;
1175}
1176
1177static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1178{
1179 int rv, issue_flush;
1180 struct p_barrier *p = (struct p_barrier *)h;
1181 struct drbd_epoch *epoch;
1182
1183 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1184
1185 rv = drbd_recv(mdev, h->payload, h->length);
1186 ERR_IF(rv != h->length) return FALSE;
1187
1188 inc_unacked(mdev);
1189
1190 if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1191 drbd_kick_lo(mdev);
1192
1193 mdev->current_epoch->barrier_nr = p->barrier;
1194 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1195
1196 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1197 * the activity log, which means it would not be resynced in case the
1198 * R_PRIMARY crashes now.
1199 * Therefore we must send the barrier_ack after the barrier request was
1200 * completed. */
1201 switch (mdev->write_ordering) {
1202 case WO_bio_barrier:
1203 case WO_none:
1204 if (rv == FE_RECYCLED)
1205 return TRUE;
1206 break;
1207
1208 case WO_bdev_flush:
1209 case WO_drain_io:
Philipp Reisner367a8d72009-12-29 15:56:01 +01001210 if (rv == FE_STILL_LIVE) {
1211 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1212 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1213 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1214 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001215 if (rv == FE_RECYCLED)
1216 return TRUE;
1217
1218 /* The asender will send all the ACKs and barrier ACKs out, since
1219 all EEs moved from the active_ee to the done_ee. We need to
1220 provide a new epoch object for the EEs that come in soon */
1221 break;
1222 }
1223
1224 /* receiver context, in the writeout path of the other node.
1225 * avoid potential distributed deadlock */
1226 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1227 if (!epoch) {
1228 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
Dan Carpenterd3db7b42010-01-23 15:45:22 +03001229 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001230 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1231 if (issue_flush) {
1232 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1233 if (rv == FE_RECYCLED)
1234 return TRUE;
1235 }
1236
1237 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1238
1239 return TRUE;
1240 }
1241
1242 epoch->flags = 0;
1243 atomic_set(&epoch->epoch_size, 0);
1244 atomic_set(&epoch->active, 0);
1245
1246 spin_lock(&mdev->epoch_lock);
1247 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1248 list_add(&epoch->list, &mdev->current_epoch->list);
1249 mdev->current_epoch = epoch;
1250 mdev->epochs++;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001251 } else {
1252 /* The current_epoch got recycled while we allocated this one... */
1253 kfree(epoch);
1254 }
1255 spin_unlock(&mdev->epoch_lock);
1256
1257 return TRUE;
1258}
1259
1260/* used from receive_RSDataReply (recv_resync_read)
1261 * and from receive_Data */
1262static struct drbd_epoch_entry *
1263read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1264{
Lars Ellenberg66660322010-04-06 12:15:04 +02001265 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001266 struct drbd_epoch_entry *e;
1267 struct bio_vec *bvec;
1268 struct page *page;
1269 struct bio *bio;
1270 int dgs, ds, i, rr;
1271 void *dig_in = mdev->int_dig_in;
1272 void *dig_vv = mdev->int_dig_vv;
Philipp Reisner6b4388a2010-04-26 14:11:45 +02001273 unsigned long *data;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001274
1275 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1276 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1277
1278 if (dgs) {
1279 rr = drbd_recv(mdev, dig_in, dgs);
1280 if (rr != dgs) {
1281 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1282 rr, dgs);
1283 return NULL;
1284 }
1285 }
1286
1287 data_size -= dgs;
1288
1289 ERR_IF(data_size & 0x1ff) return NULL;
1290 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL;
1291
Lars Ellenberg66660322010-04-06 12:15:04 +02001292 /* even though we trust out peer,
1293 * we sometimes have to double check. */
1294 if (sector + (data_size>>9) > capacity) {
1295 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1296 (unsigned long long)capacity,
1297 (unsigned long long)sector, data_size);
1298 return NULL;
1299 }
1300
Philipp Reisnerb411b362009-09-25 16:07:19 -07001301 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1302 * "criss-cross" setup, that might cause write-out on some other DRBD,
1303 * which in turn might block on the other node at this very place. */
1304 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1305 if (!e)
1306 return NULL;
1307 bio = e->private_bio;
1308 ds = data_size;
1309 bio_for_each_segment(bvec, bio, i) {
1310 page = bvec->bv_page;
Philipp Reisner6b4388a2010-04-26 14:11:45 +02001311 data = kmap(page);
1312 rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE));
1313 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1314 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1315 data[0] = data[0] ^ (unsigned long)-1;
1316 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001317 kunmap(page);
1318 if (rr != min_t(int, ds, PAGE_SIZE)) {
1319 drbd_free_ee(mdev, e);
1320 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1321 rr, min_t(int, ds, PAGE_SIZE));
1322 return NULL;
1323 }
1324 ds -= rr;
1325 }
1326
1327 if (dgs) {
1328 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1329 if (memcmp(dig_in, dig_vv, dgs)) {
1330 dev_err(DEV, "Digest integrity check FAILED.\n");
1331 drbd_bcast_ee(mdev, "digest failed",
1332 dgs, dig_in, dig_vv, e);
1333 drbd_free_ee(mdev, e);
1334 return NULL;
1335 }
1336 }
1337 mdev->recv_cnt += data_size>>9;
1338 return e;
1339}
1340
1341/* drbd_drain_block() just takes a data block
1342 * out of the socket input buffer, and discards it.
1343 */
1344static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1345{
1346 struct page *page;
1347 int rr, rv = 1;
1348 void *data;
1349
Lars Ellenbergc3470cd2010-04-01 16:57:19 +02001350 if (!data_size)
1351 return TRUE;
1352
Philipp Reisnerb411b362009-09-25 16:07:19 -07001353 page = drbd_pp_alloc(mdev, 1);
1354
1355 data = kmap(page);
1356 while (data_size) {
1357 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1358 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1359 rv = 0;
1360 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1361 rr, min_t(int, data_size, PAGE_SIZE));
1362 break;
1363 }
1364 data_size -= rr;
1365 }
1366 kunmap(page);
1367 drbd_pp_free(mdev, page);
1368 return rv;
1369}
1370
1371static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1372 sector_t sector, int data_size)
1373{
1374 struct bio_vec *bvec;
1375 struct bio *bio;
1376 int dgs, rr, i, expect;
1377 void *dig_in = mdev->int_dig_in;
1378 void *dig_vv = mdev->int_dig_vv;
1379
1380 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1381 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1382
1383 if (dgs) {
1384 rr = drbd_recv(mdev, dig_in, dgs);
1385 if (rr != dgs) {
1386 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1387 rr, dgs);
1388 return 0;
1389 }
1390 }
1391
1392 data_size -= dgs;
1393
1394 /* optimistically update recv_cnt. if receiving fails below,
1395 * we disconnect anyways, and counters will be reset. */
1396 mdev->recv_cnt += data_size>>9;
1397
1398 bio = req->master_bio;
1399 D_ASSERT(sector == bio->bi_sector);
1400
1401 bio_for_each_segment(bvec, bio, i) {
1402 expect = min_t(int, data_size, bvec->bv_len);
1403 rr = drbd_recv(mdev,
1404 kmap(bvec->bv_page)+bvec->bv_offset,
1405 expect);
1406 kunmap(bvec->bv_page);
1407 if (rr != expect) {
1408 dev_warn(DEV, "short read receiving data reply: "
1409 "read %d expected %d\n",
1410 rr, expect);
1411 return 0;
1412 }
1413 data_size -= rr;
1414 }
1415
1416 if (dgs) {
1417 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1418 if (memcmp(dig_in, dig_vv, dgs)) {
1419 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1420 return 0;
1421 }
1422 }
1423
1424 D_ASSERT(data_size == 0);
1425 return 1;
1426}
1427
1428/* e_end_resync_block() is called via
1429 * drbd_process_done_ee() by asender only */
1430static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1431{
1432 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1433 sector_t sector = e->sector;
1434 int ok;
1435
1436 D_ASSERT(hlist_unhashed(&e->colision));
1437
1438 if (likely(drbd_bio_uptodate(e->private_bio))) {
1439 drbd_set_in_sync(mdev, sector, e->size);
1440 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1441 } else {
1442 /* Record failure to sync */
1443 drbd_rs_failed_io(mdev, sector, e->size);
1444
1445 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1446 }
1447 dec_unacked(mdev);
1448
1449 return ok;
1450}
1451
1452static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1453{
1454 struct drbd_epoch_entry *e;
1455
1456 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1457 if (!e) {
1458 put_ldev(mdev);
1459 return FALSE;
1460 }
1461
1462 dec_rs_pending(mdev);
1463
1464 e->private_bio->bi_end_io = drbd_endio_write_sec;
1465 e->private_bio->bi_rw = WRITE;
1466 e->w.cb = e_end_resync_block;
1467
1468 inc_unacked(mdev);
1469 /* corresponding dec_unacked() in e_end_resync_block()
1470 * respective _drbd_clear_done_ee */
1471
1472 spin_lock_irq(&mdev->req_lock);
1473 list_add(&e->w.list, &mdev->sync_ee);
1474 spin_unlock_irq(&mdev->req_lock);
1475
Philipp Reisnerb411b362009-09-25 16:07:19 -07001476 drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1477 /* accounting done in endio */
1478
1479 maybe_kick_lo(mdev);
1480 return TRUE;
1481}
1482
1483static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1484{
1485 struct drbd_request *req;
1486 sector_t sector;
1487 unsigned int header_size, data_size;
1488 int ok;
1489 struct p_data *p = (struct p_data *)h;
1490
1491 header_size = sizeof(*p) - sizeof(*h);
1492 data_size = h->length - header_size;
1493
1494 ERR_IF(data_size == 0) return FALSE;
1495
1496 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1497 return FALSE;
1498
1499 sector = be64_to_cpu(p->sector);
1500
1501 spin_lock_irq(&mdev->req_lock);
1502 req = _ar_id_to_req(mdev, p->block_id, sector);
1503 spin_unlock_irq(&mdev->req_lock);
1504 if (unlikely(!req)) {
1505 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1506 return FALSE;
1507 }
1508
1509 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1510 * special casing it there for the various failure cases.
1511 * still no race with drbd_fail_pending_reads */
1512 ok = recv_dless_read(mdev, req, sector, data_size);
1513
1514 if (ok)
1515 req_mod(req, data_received);
1516 /* else: nothing. handled from drbd_disconnect...
1517 * I don't think we may complete this just yet
1518 * in case we are "on-disconnect: freeze" */
1519
1520 return ok;
1521}
1522
1523static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1524{
1525 sector_t sector;
1526 unsigned int header_size, data_size;
1527 int ok;
1528 struct p_data *p = (struct p_data *)h;
1529
1530 header_size = sizeof(*p) - sizeof(*h);
1531 data_size = h->length - header_size;
1532
1533 ERR_IF(data_size == 0) return FALSE;
1534
1535 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1536 return FALSE;
1537
1538 sector = be64_to_cpu(p->sector);
1539 D_ASSERT(p->block_id == ID_SYNCER);
1540
1541 if (get_ldev(mdev)) {
1542 /* data is submitted to disk within recv_resync_read.
1543 * corresponding put_ldev done below on error,
1544 * or in drbd_endio_write_sec. */
1545 ok = recv_resync_read(mdev, sector, data_size);
1546 } else {
1547 if (__ratelimit(&drbd_ratelimit_state))
1548 dev_err(DEV, "Can not write resync data to local disk.\n");
1549
1550 ok = drbd_drain_block(mdev, data_size);
1551
1552 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1553 }
1554
1555 return ok;
1556}
1557
1558/* e_end_block() is called via drbd_process_done_ee().
1559 * this means this function only runs in the asender thread
1560 */
1561static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1562{
1563 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1564 sector_t sector = e->sector;
1565 struct drbd_epoch *epoch;
1566 int ok = 1, pcmd;
1567
1568 if (e->flags & EE_IS_BARRIER) {
1569 epoch = previous_epoch(mdev, e->epoch);
1570 if (epoch)
1571 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1572 }
1573
1574 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1575 if (likely(drbd_bio_uptodate(e->private_bio))) {
1576 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1577 mdev->state.conn <= C_PAUSED_SYNC_T &&
1578 e->flags & EE_MAY_SET_IN_SYNC) ?
1579 P_RS_WRITE_ACK : P_WRITE_ACK;
1580 ok &= drbd_send_ack(mdev, pcmd, e);
1581 if (pcmd == P_RS_WRITE_ACK)
1582 drbd_set_in_sync(mdev, sector, e->size);
1583 } else {
1584 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1585 /* we expect it to be marked out of sync anyways...
1586 * maybe assert this? */
1587 }
1588 dec_unacked(mdev);
1589 }
1590 /* we delete from the conflict detection hash _after_ we sent out the
1591 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1592 if (mdev->net_conf->two_primaries) {
1593 spin_lock_irq(&mdev->req_lock);
1594 D_ASSERT(!hlist_unhashed(&e->colision));
1595 hlist_del_init(&e->colision);
1596 spin_unlock_irq(&mdev->req_lock);
1597 } else {
1598 D_ASSERT(hlist_unhashed(&e->colision));
1599 }
1600
1601 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1602
1603 return ok;
1604}
1605
1606static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1607{
1608 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1609 int ok = 1;
1610
1611 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1612 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1613
1614 spin_lock_irq(&mdev->req_lock);
1615 D_ASSERT(!hlist_unhashed(&e->colision));
1616 hlist_del_init(&e->colision);
1617 spin_unlock_irq(&mdev->req_lock);
1618
1619 dec_unacked(mdev);
1620
1621 return ok;
1622}
1623
1624/* Called from receive_Data.
1625 * Synchronize packets on sock with packets on msock.
1626 *
1627 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1628 * packet traveling on msock, they are still processed in the order they have
1629 * been sent.
1630 *
1631 * Note: we don't care for Ack packets overtaking P_DATA packets.
1632 *
1633 * In case packet_seq is larger than mdev->peer_seq number, there are
1634 * outstanding packets on the msock. We wait for them to arrive.
1635 * In case we are the logically next packet, we update mdev->peer_seq
1636 * ourselves. Correctly handles 32bit wrap around.
1637 *
1638 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1639 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1640 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1641 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1642 *
1643 * returns 0 if we may process the packet,
1644 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1645static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1646{
1647 DEFINE_WAIT(wait);
1648 unsigned int p_seq;
1649 long timeout;
1650 int ret = 0;
1651 spin_lock(&mdev->peer_seq_lock);
1652 for (;;) {
1653 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1654 if (seq_le(packet_seq, mdev->peer_seq+1))
1655 break;
1656 if (signal_pending(current)) {
1657 ret = -ERESTARTSYS;
1658 break;
1659 }
1660 p_seq = mdev->peer_seq;
1661 spin_unlock(&mdev->peer_seq_lock);
1662 timeout = schedule_timeout(30*HZ);
1663 spin_lock(&mdev->peer_seq_lock);
1664 if (timeout == 0 && p_seq == mdev->peer_seq) {
1665 ret = -ETIMEDOUT;
1666 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1667 break;
1668 }
1669 }
1670 finish_wait(&mdev->seq_wait, &wait);
1671 if (mdev->peer_seq+1 == packet_seq)
1672 mdev->peer_seq++;
1673 spin_unlock(&mdev->peer_seq_lock);
1674 return ret;
1675}
1676
1677/* mirrored write */
1678static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1679{
1680 sector_t sector;
1681 struct drbd_epoch_entry *e;
1682 struct p_data *p = (struct p_data *)h;
1683 int header_size, data_size;
1684 int rw = WRITE;
1685 u32 dp_flags;
1686
1687 header_size = sizeof(*p) - sizeof(*h);
1688 data_size = h->length - header_size;
1689
1690 ERR_IF(data_size == 0) return FALSE;
1691
1692 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1693 return FALSE;
1694
1695 if (!get_ldev(mdev)) {
1696 if (__ratelimit(&drbd_ratelimit_state))
1697 dev_err(DEV, "Can not write mirrored data block "
1698 "to local disk.\n");
1699 spin_lock(&mdev->peer_seq_lock);
1700 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1701 mdev->peer_seq++;
1702 spin_unlock(&mdev->peer_seq_lock);
1703
1704 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1705 atomic_inc(&mdev->current_epoch->epoch_size);
1706 return drbd_drain_block(mdev, data_size);
1707 }
1708
1709 /* get_ldev(mdev) successful.
1710 * Corresponding put_ldev done either below (on various errors),
1711 * or in drbd_endio_write_sec, if we successfully submit the data at
1712 * the end of this function. */
1713
1714 sector = be64_to_cpu(p->sector);
1715 e = read_in_block(mdev, p->block_id, sector, data_size);
1716 if (!e) {
1717 put_ldev(mdev);
1718 return FALSE;
1719 }
1720
1721 e->private_bio->bi_end_io = drbd_endio_write_sec;
1722 e->w.cb = e_end_block;
1723
1724 spin_lock(&mdev->epoch_lock);
1725 e->epoch = mdev->current_epoch;
1726 atomic_inc(&e->epoch->epoch_size);
1727 atomic_inc(&e->epoch->active);
1728
1729 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1730 struct drbd_epoch *epoch;
1731 /* Issue a barrier if we start a new epoch, and the previous epoch
1732 was not a epoch containing a single request which already was
1733 a Barrier. */
1734 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1735 if (epoch == e->epoch) {
1736 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001737 rw |= (1<<BIO_RW_BARRIER);
1738 e->flags |= EE_IS_BARRIER;
1739 } else {
1740 if (atomic_read(&epoch->epoch_size) > 1 ||
1741 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1742 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001743 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001744 rw |= (1<<BIO_RW_BARRIER);
1745 e->flags |= EE_IS_BARRIER;
1746 }
1747 }
1748 }
1749 spin_unlock(&mdev->epoch_lock);
1750
1751 dp_flags = be32_to_cpu(p->dp_flags);
1752 if (dp_flags & DP_HARDBARRIER) {
1753 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1754 /* rw |= (1<<BIO_RW_BARRIER); */
1755 }
1756 if (dp_flags & DP_RW_SYNC)
1757 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1758 if (dp_flags & DP_MAY_SET_IN_SYNC)
1759 e->flags |= EE_MAY_SET_IN_SYNC;
1760
1761 /* I'm the receiver, I do hold a net_cnt reference. */
1762 if (!mdev->net_conf->two_primaries) {
1763 spin_lock_irq(&mdev->req_lock);
1764 } else {
1765 /* don't get the req_lock yet,
1766 * we may sleep in drbd_wait_peer_seq */
1767 const int size = e->size;
1768 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1769 DEFINE_WAIT(wait);
1770 struct drbd_request *i;
1771 struct hlist_node *n;
1772 struct hlist_head *slot;
1773 int first;
1774
1775 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1776 BUG_ON(mdev->ee_hash == NULL);
1777 BUG_ON(mdev->tl_hash == NULL);
1778
1779 /* conflict detection and handling:
1780 * 1. wait on the sequence number,
1781 * in case this data packet overtook ACK packets.
1782 * 2. check our hash tables for conflicting requests.
1783 * we only need to walk the tl_hash, since an ee can not
1784 * have a conflict with an other ee: on the submitting
1785 * node, the corresponding req had already been conflicting,
1786 * and a conflicting req is never sent.
1787 *
1788 * Note: for two_primaries, we are protocol C,
1789 * so there cannot be any request that is DONE
1790 * but still on the transfer log.
1791 *
1792 * unconditionally add to the ee_hash.
1793 *
1794 * if no conflicting request is found:
1795 * submit.
1796 *
1797 * if any conflicting request is found
1798 * that has not yet been acked,
1799 * AND I have the "discard concurrent writes" flag:
1800 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1801 *
1802 * if any conflicting request is found:
1803 * block the receiver, waiting on misc_wait
1804 * until no more conflicting requests are there,
1805 * or we get interrupted (disconnect).
1806 *
1807 * we do not just write after local io completion of those
1808 * requests, but only after req is done completely, i.e.
1809 * we wait for the P_DISCARD_ACK to arrive!
1810 *
1811 * then proceed normally, i.e. submit.
1812 */
1813 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1814 goto out_interrupted;
1815
1816 spin_lock_irq(&mdev->req_lock);
1817
1818 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1819
1820#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1821 slot = tl_hash_slot(mdev, sector);
1822 first = 1;
1823 for (;;) {
1824 int have_unacked = 0;
1825 int have_conflict = 0;
1826 prepare_to_wait(&mdev->misc_wait, &wait,
1827 TASK_INTERRUPTIBLE);
1828 hlist_for_each_entry(i, n, slot, colision) {
1829 if (OVERLAPS) {
1830 /* only ALERT on first iteration,
1831 * we may be woken up early... */
1832 if (first)
1833 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1834 " new: %llus +%u; pending: %llus +%u\n",
1835 current->comm, current->pid,
1836 (unsigned long long)sector, size,
1837 (unsigned long long)i->sector, i->size);
1838 if (i->rq_state & RQ_NET_PENDING)
1839 ++have_unacked;
1840 ++have_conflict;
1841 }
1842 }
1843#undef OVERLAPS
1844 if (!have_conflict)
1845 break;
1846
1847 /* Discard Ack only for the _first_ iteration */
1848 if (first && discard && have_unacked) {
1849 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1850 (unsigned long long)sector);
1851 inc_unacked(mdev);
1852 e->w.cb = e_send_discard_ack;
1853 list_add_tail(&e->w.list, &mdev->done_ee);
1854
1855 spin_unlock_irq(&mdev->req_lock);
1856
1857 /* we could probably send that P_DISCARD_ACK ourselves,
1858 * but I don't like the receiver using the msock */
1859
1860 put_ldev(mdev);
1861 wake_asender(mdev);
1862 finish_wait(&mdev->misc_wait, &wait);
1863 return TRUE;
1864 }
1865
1866 if (signal_pending(current)) {
1867 hlist_del_init(&e->colision);
1868
1869 spin_unlock_irq(&mdev->req_lock);
1870
1871 finish_wait(&mdev->misc_wait, &wait);
1872 goto out_interrupted;
1873 }
1874
1875 spin_unlock_irq(&mdev->req_lock);
1876 if (first) {
1877 first = 0;
1878 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1879 "sec=%llus\n", (unsigned long long)sector);
1880 } else if (discard) {
1881 /* we had none on the first iteration.
1882 * there must be none now. */
1883 D_ASSERT(have_unacked == 0);
1884 }
1885 schedule();
1886 spin_lock_irq(&mdev->req_lock);
1887 }
1888 finish_wait(&mdev->misc_wait, &wait);
1889 }
1890
1891 list_add(&e->w.list, &mdev->active_ee);
1892 spin_unlock_irq(&mdev->req_lock);
1893
1894 switch (mdev->net_conf->wire_protocol) {
1895 case DRBD_PROT_C:
1896 inc_unacked(mdev);
1897 /* corresponding dec_unacked() in e_end_block()
1898 * respective _drbd_clear_done_ee */
1899 break;
1900 case DRBD_PROT_B:
1901 /* I really don't like it that the receiver thread
1902 * sends on the msock, but anyways */
1903 drbd_send_ack(mdev, P_RECV_ACK, e);
1904 break;
1905 case DRBD_PROT_A:
1906 /* nothing to do */
1907 break;
1908 }
1909
1910 if (mdev->state.pdsk == D_DISKLESS) {
1911 /* In case we have the only disk of the cluster, */
1912 drbd_set_out_of_sync(mdev, e->sector, e->size);
1913 e->flags |= EE_CALL_AL_COMPLETE_IO;
1914 drbd_al_begin_io(mdev, e->sector);
1915 }
1916
1917 e->private_bio->bi_rw = rw;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001918 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1919 /* accounting done in endio */
1920
1921 maybe_kick_lo(mdev);
1922 return TRUE;
1923
1924out_interrupted:
1925 /* yes, the epoch_size now is imbalanced.
1926 * but we drop the connection anyways, so we don't have a chance to
1927 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1928 put_ldev(mdev);
1929 drbd_free_ee(mdev, e);
1930 return FALSE;
1931}
1932
1933static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1934{
1935 sector_t sector;
1936 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1937 struct drbd_epoch_entry *e;
1938 struct digest_info *di = NULL;
1939 int size, digest_size;
1940 unsigned int fault_type;
1941 struct p_block_req *p =
1942 (struct p_block_req *)h;
1943 const int brps = sizeof(*p)-sizeof(*h);
1944
1945 if (drbd_recv(mdev, h->payload, brps) != brps)
1946 return FALSE;
1947
1948 sector = be64_to_cpu(p->sector);
1949 size = be32_to_cpu(p->blksize);
1950
1951 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1952 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1953 (unsigned long long)sector, size);
1954 return FALSE;
1955 }
1956 if (sector + (size>>9) > capacity) {
1957 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1958 (unsigned long long)sector, size);
1959 return FALSE;
1960 }
1961
1962 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1963 if (__ratelimit(&drbd_ratelimit_state))
1964 dev_err(DEV, "Can not satisfy peer's read request, "
1965 "no local data.\n");
1966 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1967 P_NEG_RS_DREPLY , p);
Lars Ellenbergc3470cd2010-04-01 16:57:19 +02001968 return drbd_drain_block(mdev, h->length - brps);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001969 }
1970
1971 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1972 * "criss-cross" setup, that might cause write-out on some other DRBD,
1973 * which in turn might block on the other node at this very place. */
1974 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1975 if (!e) {
1976 put_ldev(mdev);
1977 return FALSE;
1978 }
1979
1980 e->private_bio->bi_rw = READ;
1981 e->private_bio->bi_end_io = drbd_endio_read_sec;
1982
1983 switch (h->command) {
1984 case P_DATA_REQUEST:
1985 e->w.cb = w_e_end_data_req;
1986 fault_type = DRBD_FAULT_DT_RD;
1987 break;
1988 case P_RS_DATA_REQUEST:
1989 e->w.cb = w_e_end_rsdata_req;
1990 fault_type = DRBD_FAULT_RS_RD;
1991 /* Eventually this should become asynchronously. Currently it
1992 * blocks the whole receiver just to delay the reading of a
1993 * resync data block.
1994 * the drbd_work_queue mechanism is made for this...
1995 */
1996 if (!drbd_rs_begin_io(mdev, sector)) {
1997 /* we have been interrupted,
1998 * probably connection lost! */
1999 D_ASSERT(signal_pending(current));
2000 goto out_free_e;
2001 }
2002 break;
2003
2004 case P_OV_REPLY:
2005 case P_CSUM_RS_REQUEST:
2006 fault_type = DRBD_FAULT_RS_RD;
2007 digest_size = h->length - brps ;
2008 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2009 if (!di)
2010 goto out_free_e;
2011
2012 di->digest_size = digest_size;
2013 di->digest = (((char *)di)+sizeof(struct digest_info));
2014
2015 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2016 goto out_free_e;
2017
2018 e->block_id = (u64)(unsigned long)di;
2019 if (h->command == P_CSUM_RS_REQUEST) {
2020 D_ASSERT(mdev->agreed_pro_version >= 89);
2021 e->w.cb = w_e_end_csum_rs_req;
2022 } else if (h->command == P_OV_REPLY) {
2023 e->w.cb = w_e_end_ov_reply;
2024 dec_rs_pending(mdev);
2025 break;
2026 }
2027
2028 if (!drbd_rs_begin_io(mdev, sector)) {
2029 /* we have been interrupted, probably connection lost! */
2030 D_ASSERT(signal_pending(current));
2031 goto out_free_e;
2032 }
2033 break;
2034
2035 case P_OV_REQUEST:
2036 if (mdev->state.conn >= C_CONNECTED &&
2037 mdev->state.conn != C_VERIFY_T)
2038 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2039 drbd_conn_str(mdev->state.conn));
2040 if (mdev->ov_start_sector == ~(sector_t)0 &&
2041 mdev->agreed_pro_version >= 90) {
2042 mdev->ov_start_sector = sector;
2043 mdev->ov_position = sector;
2044 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2045 dev_info(DEV, "Online Verify start sector: %llu\n",
2046 (unsigned long long)sector);
2047 }
2048 e->w.cb = w_e_end_ov_req;
2049 fault_type = DRBD_FAULT_RS_RD;
2050 /* Eventually this should become asynchronous. Currently it
2051 * blocks the whole receiver just to delay the reading of a
2052 * resync data block.
2053 * the drbd_work_queue mechanism is made for this...
2054 */
2055 if (!drbd_rs_begin_io(mdev, sector)) {
2056 /* we have been interrupted,
2057 * probably connection lost! */
2058 D_ASSERT(signal_pending(current));
2059 goto out_free_e;
2060 }
2061 break;
2062
2063
2064 default:
2065 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2066 cmdname(h->command));
2067 fault_type = DRBD_FAULT_MAX;
2068 }
2069
2070 spin_lock_irq(&mdev->req_lock);
2071 list_add(&e->w.list, &mdev->read_ee);
2072 spin_unlock_irq(&mdev->req_lock);
2073
2074 inc_unacked(mdev);
2075
Philipp Reisnerb411b362009-09-25 16:07:19 -07002076 drbd_generic_make_request(mdev, fault_type, e->private_bio);
2077 maybe_kick_lo(mdev);
2078
2079 return TRUE;
2080
2081out_free_e:
2082 kfree(di);
2083 put_ldev(mdev);
2084 drbd_free_ee(mdev, e);
2085 return FALSE;
2086}
2087
2088static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2089{
2090 int self, peer, rv = -100;
2091 unsigned long ch_self, ch_peer;
2092
2093 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2094 peer = mdev->p_uuid[UI_BITMAP] & 1;
2095
2096 ch_peer = mdev->p_uuid[UI_SIZE];
2097 ch_self = mdev->comm_bm_set;
2098
2099 switch (mdev->net_conf->after_sb_0p) {
2100 case ASB_CONSENSUS:
2101 case ASB_DISCARD_SECONDARY:
2102 case ASB_CALL_HELPER:
2103 dev_err(DEV, "Configuration error.\n");
2104 break;
2105 case ASB_DISCONNECT:
2106 break;
2107 case ASB_DISCARD_YOUNGER_PRI:
2108 if (self == 0 && peer == 1) {
2109 rv = -1;
2110 break;
2111 }
2112 if (self == 1 && peer == 0) {
2113 rv = 1;
2114 break;
2115 }
2116 /* Else fall through to one of the other strategies... */
2117 case ASB_DISCARD_OLDER_PRI:
2118 if (self == 0 && peer == 1) {
2119 rv = 1;
2120 break;
2121 }
2122 if (self == 1 && peer == 0) {
2123 rv = -1;
2124 break;
2125 }
2126 /* Else fall through to one of the other strategies... */
Lars Ellenbergad19bf62009-10-14 09:36:49 +02002127 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
Philipp Reisnerb411b362009-09-25 16:07:19 -07002128 "Using discard-least-changes instead\n");
2129 case ASB_DISCARD_ZERO_CHG:
2130 if (ch_peer == 0 && ch_self == 0) {
2131 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2132 ? -1 : 1;
2133 break;
2134 } else {
2135 if (ch_peer == 0) { rv = 1; break; }
2136 if (ch_self == 0) { rv = -1; break; }
2137 }
2138 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2139 break;
2140 case ASB_DISCARD_LEAST_CHG:
2141 if (ch_self < ch_peer)
2142 rv = -1;
2143 else if (ch_self > ch_peer)
2144 rv = 1;
2145 else /* ( ch_self == ch_peer ) */
2146 /* Well, then use something else. */
2147 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2148 ? -1 : 1;
2149 break;
2150 case ASB_DISCARD_LOCAL:
2151 rv = -1;
2152 break;
2153 case ASB_DISCARD_REMOTE:
2154 rv = 1;
2155 }
2156
2157 return rv;
2158}
2159
2160static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2161{
2162 int self, peer, hg, rv = -100;
2163
2164 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2165 peer = mdev->p_uuid[UI_BITMAP] & 1;
2166
2167 switch (mdev->net_conf->after_sb_1p) {
2168 case ASB_DISCARD_YOUNGER_PRI:
2169 case ASB_DISCARD_OLDER_PRI:
2170 case ASB_DISCARD_LEAST_CHG:
2171 case ASB_DISCARD_LOCAL:
2172 case ASB_DISCARD_REMOTE:
2173 dev_err(DEV, "Configuration error.\n");
2174 break;
2175 case ASB_DISCONNECT:
2176 break;
2177 case ASB_CONSENSUS:
2178 hg = drbd_asb_recover_0p(mdev);
2179 if (hg == -1 && mdev->state.role == R_SECONDARY)
2180 rv = hg;
2181 if (hg == 1 && mdev->state.role == R_PRIMARY)
2182 rv = hg;
2183 break;
2184 case ASB_VIOLENTLY:
2185 rv = drbd_asb_recover_0p(mdev);
2186 break;
2187 case ASB_DISCARD_SECONDARY:
2188 return mdev->state.role == R_PRIMARY ? 1 : -1;
2189 case ASB_CALL_HELPER:
2190 hg = drbd_asb_recover_0p(mdev);
2191 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2192 self = drbd_set_role(mdev, R_SECONDARY, 0);
2193 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2194 * we might be here in C_WF_REPORT_PARAMS which is transient.
2195 * we do not need to wait for the after state change work either. */
2196 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2197 if (self != SS_SUCCESS) {
2198 drbd_khelper(mdev, "pri-lost-after-sb");
2199 } else {
2200 dev_warn(DEV, "Successfully gave up primary role.\n");
2201 rv = hg;
2202 }
2203 } else
2204 rv = hg;
2205 }
2206
2207 return rv;
2208}
2209
2210static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2211{
2212 int self, peer, hg, rv = -100;
2213
2214 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2215 peer = mdev->p_uuid[UI_BITMAP] & 1;
2216
2217 switch (mdev->net_conf->after_sb_2p) {
2218 case ASB_DISCARD_YOUNGER_PRI:
2219 case ASB_DISCARD_OLDER_PRI:
2220 case ASB_DISCARD_LEAST_CHG:
2221 case ASB_DISCARD_LOCAL:
2222 case ASB_DISCARD_REMOTE:
2223 case ASB_CONSENSUS:
2224 case ASB_DISCARD_SECONDARY:
2225 dev_err(DEV, "Configuration error.\n");
2226 break;
2227 case ASB_VIOLENTLY:
2228 rv = drbd_asb_recover_0p(mdev);
2229 break;
2230 case ASB_DISCONNECT:
2231 break;
2232 case ASB_CALL_HELPER:
2233 hg = drbd_asb_recover_0p(mdev);
2234 if (hg == -1) {
2235 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2236 * we might be here in C_WF_REPORT_PARAMS which is transient.
2237 * we do not need to wait for the after state change work either. */
2238 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2239 if (self != SS_SUCCESS) {
2240 drbd_khelper(mdev, "pri-lost-after-sb");
2241 } else {
2242 dev_warn(DEV, "Successfully gave up primary role.\n");
2243 rv = hg;
2244 }
2245 } else
2246 rv = hg;
2247 }
2248
2249 return rv;
2250}
2251
2252static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2253 u64 bits, u64 flags)
2254{
2255 if (!uuid) {
2256 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2257 return;
2258 }
2259 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2260 text,
2261 (unsigned long long)uuid[UI_CURRENT],
2262 (unsigned long long)uuid[UI_BITMAP],
2263 (unsigned long long)uuid[UI_HISTORY_START],
2264 (unsigned long long)uuid[UI_HISTORY_END],
2265 (unsigned long long)bits,
2266 (unsigned long long)flags);
2267}
2268
2269/*
2270 100 after split brain try auto recover
2271 2 C_SYNC_SOURCE set BitMap
2272 1 C_SYNC_SOURCE use BitMap
2273 0 no Sync
2274 -1 C_SYNC_TARGET use BitMap
2275 -2 C_SYNC_TARGET set BitMap
2276 -100 after split brain, disconnect
2277-1000 unrelated data
2278 */
2279static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2280{
2281 u64 self, peer;
2282 int i, j;
2283
2284 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2285 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2286
2287 *rule_nr = 10;
2288 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2289 return 0;
2290
2291 *rule_nr = 20;
2292 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2293 peer != UUID_JUST_CREATED)
2294 return -2;
2295
2296 *rule_nr = 30;
2297 if (self != UUID_JUST_CREATED &&
2298 (peer == UUID_JUST_CREATED || peer == (u64)0))
2299 return 2;
2300
2301 if (self == peer) {
2302 int rct, dc; /* roles at crash time */
2303
2304 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2305
2306 if (mdev->agreed_pro_version < 91)
2307 return -1001;
2308
2309 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2310 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2311 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2312 drbd_uuid_set_bm(mdev, 0UL);
2313
2314 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2315 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2316 *rule_nr = 34;
2317 } else {
2318 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2319 *rule_nr = 36;
2320 }
2321
2322 return 1;
2323 }
2324
2325 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2326
2327 if (mdev->agreed_pro_version < 91)
2328 return -1001;
2329
2330 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2331 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2332 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2333
2334 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2335 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2336 mdev->p_uuid[UI_BITMAP] = 0UL;
2337
2338 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2339 *rule_nr = 35;
2340 } else {
2341 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2342 *rule_nr = 37;
2343 }
2344
2345 return -1;
2346 }
2347
2348 /* Common power [off|failure] */
2349 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2350 (mdev->p_uuid[UI_FLAGS] & 2);
2351 /* lowest bit is set when we were primary,
2352 * next bit (weight 2) is set when peer was primary */
2353 *rule_nr = 40;
2354
2355 switch (rct) {
2356 case 0: /* !self_pri && !peer_pri */ return 0;
2357 case 1: /* self_pri && !peer_pri */ return 1;
2358 case 2: /* !self_pri && peer_pri */ return -1;
2359 case 3: /* self_pri && peer_pri */
2360 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2361 return dc ? -1 : 1;
2362 }
2363 }
2364
2365 *rule_nr = 50;
2366 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2367 if (self == peer)
2368 return -1;
2369
2370 *rule_nr = 51;
2371 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2372 if (self == peer) {
2373 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2374 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2375 if (self == peer) {
2376 /* The last P_SYNC_UUID did not get though. Undo the last start of
2377 resync as sync source modifications of the peer's UUIDs. */
2378
2379 if (mdev->agreed_pro_version < 91)
2380 return -1001;
2381
2382 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2383 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2384 return -1;
2385 }
2386 }
2387
2388 *rule_nr = 60;
2389 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2390 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2391 peer = mdev->p_uuid[i] & ~((u64)1);
2392 if (self == peer)
2393 return -2;
2394 }
2395
2396 *rule_nr = 70;
2397 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2398 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2399 if (self == peer)
2400 return 1;
2401
2402 *rule_nr = 71;
2403 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2404 if (self == peer) {
2405 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2406 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2407 if (self == peer) {
2408 /* The last P_SYNC_UUID did not get though. Undo the last start of
2409 resync as sync source modifications of our UUIDs. */
2410
2411 if (mdev->agreed_pro_version < 91)
2412 return -1001;
2413
2414 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2415 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2416
2417 dev_info(DEV, "Undid last start of resync:\n");
2418
2419 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2420 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2421
2422 return 1;
2423 }
2424 }
2425
2426
2427 *rule_nr = 80;
Philipp Reisnerd8c2a362009-11-18 15:52:51 +01002428 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002429 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2430 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2431 if (self == peer)
2432 return 2;
2433 }
2434
2435 *rule_nr = 90;
2436 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2437 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2438 if (self == peer && self != ((u64)0))
2439 return 100;
2440
2441 *rule_nr = 100;
2442 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2443 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2444 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2445 peer = mdev->p_uuid[j] & ~((u64)1);
2446 if (self == peer)
2447 return -100;
2448 }
2449 }
2450
2451 return -1000;
2452}
2453
2454/* drbd_sync_handshake() returns the new conn state on success, or
2455 CONN_MASK (-1) on failure.
2456 */
2457static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2458 enum drbd_disk_state peer_disk) __must_hold(local)
2459{
2460 int hg, rule_nr;
2461 enum drbd_conns rv = C_MASK;
2462 enum drbd_disk_state mydisk;
2463
2464 mydisk = mdev->state.disk;
2465 if (mydisk == D_NEGOTIATING)
2466 mydisk = mdev->new_state_tmp.disk;
2467
2468 dev_info(DEV, "drbd_sync_handshake:\n");
2469 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2470 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2471 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2472
2473 hg = drbd_uuid_compare(mdev, &rule_nr);
2474
2475 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2476
2477 if (hg == -1000) {
2478 dev_alert(DEV, "Unrelated data, aborting!\n");
2479 return C_MASK;
2480 }
2481 if (hg == -1001) {
2482 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2483 return C_MASK;
2484 }
2485
2486 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2487 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2488 int f = (hg == -100) || abs(hg) == 2;
2489 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2490 if (f)
2491 hg = hg*2;
2492 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2493 hg > 0 ? "source" : "target");
2494 }
2495
Adam Gandelman3a11a482010-04-08 16:48:23 -07002496 if (abs(hg) == 100)
2497 drbd_khelper(mdev, "initial-split-brain");
2498
Philipp Reisnerb411b362009-09-25 16:07:19 -07002499 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2500 int pcount = (mdev->state.role == R_PRIMARY)
2501 + (peer_role == R_PRIMARY);
2502 int forced = (hg == -100);
2503
2504 switch (pcount) {
2505 case 0:
2506 hg = drbd_asb_recover_0p(mdev);
2507 break;
2508 case 1:
2509 hg = drbd_asb_recover_1p(mdev);
2510 break;
2511 case 2:
2512 hg = drbd_asb_recover_2p(mdev);
2513 break;
2514 }
2515 if (abs(hg) < 100) {
2516 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2517 "automatically solved. Sync from %s node\n",
2518 pcount, (hg < 0) ? "peer" : "this");
2519 if (forced) {
2520 dev_warn(DEV, "Doing a full sync, since"
2521 " UUIDs where ambiguous.\n");
2522 hg = hg*2;
2523 }
2524 }
2525 }
2526
2527 if (hg == -100) {
2528 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2529 hg = -1;
2530 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2531 hg = 1;
2532
2533 if (abs(hg) < 100)
2534 dev_warn(DEV, "Split-Brain detected, manually solved. "
2535 "Sync from %s node\n",
2536 (hg < 0) ? "peer" : "this");
2537 }
2538
2539 if (hg == -100) {
Lars Ellenberg580b9762010-02-26 23:15:23 +01002540 /* FIXME this log message is not correct if we end up here
2541 * after an attempted attach on a diskless node.
2542 * We just refuse to attach -- well, we drop the "connection"
2543 * to that disk, in a way... */
Adam Gandelman3a11a482010-04-08 16:48:23 -07002544 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
Philipp Reisnerb411b362009-09-25 16:07:19 -07002545 drbd_khelper(mdev, "split-brain");
2546 return C_MASK;
2547 }
2548
2549 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2550 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2551 return C_MASK;
2552 }
2553
2554 if (hg < 0 && /* by intention we do not use mydisk here. */
2555 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2556 switch (mdev->net_conf->rr_conflict) {
2557 case ASB_CALL_HELPER:
2558 drbd_khelper(mdev, "pri-lost");
2559 /* fall through */
2560 case ASB_DISCONNECT:
2561 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2562 return C_MASK;
2563 case ASB_VIOLENTLY:
2564 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2565 "assumption\n");
2566 }
2567 }
2568
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002569 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2570 if (hg == 0)
2571 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2572 else
2573 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2574 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2575 abs(hg) >= 2 ? "full" : "bit-map based");
2576 return C_MASK;
2577 }
2578
Philipp Reisnerb411b362009-09-25 16:07:19 -07002579 if (abs(hg) >= 2) {
2580 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2581 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2582 return C_MASK;
2583 }
2584
2585 if (hg > 0) { /* become sync source. */
2586 rv = C_WF_BITMAP_S;
2587 } else if (hg < 0) { /* become sync target */
2588 rv = C_WF_BITMAP_T;
2589 } else {
2590 rv = C_CONNECTED;
2591 if (drbd_bm_total_weight(mdev)) {
2592 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2593 drbd_bm_total_weight(mdev));
2594 }
2595 }
2596
2597 return rv;
2598}
2599
2600/* returns 1 if invalid */
2601static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2602{
2603 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2604 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2605 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2606 return 0;
2607
2608 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2609 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2610 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2611 return 1;
2612
2613 /* everything else is valid if they are equal on both sides. */
2614 if (peer == self)
2615 return 0;
2616
2617 /* everything es is invalid. */
2618 return 1;
2619}
2620
2621static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2622{
2623 struct p_protocol *p = (struct p_protocol *)h;
2624 int header_size, data_size;
2625 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002626 int p_want_lose, p_two_primaries, cf;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002627 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2628
2629 header_size = sizeof(*p) - sizeof(*h);
2630 data_size = h->length - header_size;
2631
2632 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2633 return FALSE;
2634
2635 p_proto = be32_to_cpu(p->protocol);
2636 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2637 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2638 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002639 p_two_primaries = be32_to_cpu(p->two_primaries);
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002640 cf = be32_to_cpu(p->conn_flags);
2641 p_want_lose = cf & CF_WANT_LOSE;
2642
2643 clear_bit(CONN_DRY_RUN, &mdev->flags);
2644
2645 if (cf & CF_DRY_RUN)
2646 set_bit(CONN_DRY_RUN, &mdev->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002647
2648 if (p_proto != mdev->net_conf->wire_protocol) {
2649 dev_err(DEV, "incompatible communication protocols\n");
2650 goto disconnect;
2651 }
2652
2653 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2654 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2655 goto disconnect;
2656 }
2657
2658 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2659 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2660 goto disconnect;
2661 }
2662
2663 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2664 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2665 goto disconnect;
2666 }
2667
2668 if (p_want_lose && mdev->net_conf->want_lose) {
2669 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2670 goto disconnect;
2671 }
2672
2673 if (p_two_primaries != mdev->net_conf->two_primaries) {
2674 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2675 goto disconnect;
2676 }
2677
2678 if (mdev->agreed_pro_version >= 87) {
2679 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2680
2681 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2682 return FALSE;
2683
2684 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2685 if (strcmp(p_integrity_alg, my_alg)) {
2686 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2687 goto disconnect;
2688 }
2689 dev_info(DEV, "data-integrity-alg: %s\n",
2690 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2691 }
2692
2693 return TRUE;
2694
2695disconnect:
2696 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2697 return FALSE;
2698}
2699
2700/* helper function
2701 * input: alg name, feature name
2702 * return: NULL (alg name was "")
2703 * ERR_PTR(error) if something goes wrong
2704 * or the crypto hash ptr, if it worked out ok. */
2705struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2706 const char *alg, const char *name)
2707{
2708 struct crypto_hash *tfm;
2709
2710 if (!alg[0])
2711 return NULL;
2712
2713 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2714 if (IS_ERR(tfm)) {
2715 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2716 alg, name, PTR_ERR(tfm));
2717 return tfm;
2718 }
2719 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2720 crypto_free_hash(tfm);
2721 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2722 return ERR_PTR(-EINVAL);
2723 }
2724 return tfm;
2725}
2726
2727static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2728{
2729 int ok = TRUE;
2730 struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2731 unsigned int header_size, data_size, exp_max_sz;
2732 struct crypto_hash *verify_tfm = NULL;
2733 struct crypto_hash *csums_tfm = NULL;
2734 const int apv = mdev->agreed_pro_version;
2735
2736 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2737 : apv == 88 ? sizeof(struct p_rs_param)
2738 + SHARED_SECRET_MAX
2739 : /* 89 */ sizeof(struct p_rs_param_89);
2740
2741 if (h->length > exp_max_sz) {
2742 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2743 h->length, exp_max_sz);
2744 return FALSE;
2745 }
2746
2747 if (apv <= 88) {
2748 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2749 data_size = h->length - header_size;
2750 } else /* apv >= 89 */ {
2751 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2752 data_size = h->length - header_size;
2753 D_ASSERT(data_size == 0);
2754 }
2755
2756 /* initialize verify_alg and csums_alg */
2757 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2758
2759 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2760 return FALSE;
2761
2762 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2763
2764 if (apv >= 88) {
2765 if (apv == 88) {
2766 if (data_size > SHARED_SECRET_MAX) {
2767 dev_err(DEV, "verify-alg too long, "
2768 "peer wants %u, accepting only %u byte\n",
2769 data_size, SHARED_SECRET_MAX);
2770 return FALSE;
2771 }
2772
2773 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2774 return FALSE;
2775
2776 /* we expect NUL terminated string */
2777 /* but just in case someone tries to be evil */
2778 D_ASSERT(p->verify_alg[data_size-1] == 0);
2779 p->verify_alg[data_size-1] = 0;
2780
2781 } else /* apv >= 89 */ {
2782 /* we still expect NUL terminated strings */
2783 /* but just in case someone tries to be evil */
2784 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2785 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2786 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2787 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2788 }
2789
2790 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2791 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2792 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2793 mdev->sync_conf.verify_alg, p->verify_alg);
2794 goto disconnect;
2795 }
2796 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2797 p->verify_alg, "verify-alg");
2798 if (IS_ERR(verify_tfm)) {
2799 verify_tfm = NULL;
2800 goto disconnect;
2801 }
2802 }
2803
2804 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2805 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2806 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2807 mdev->sync_conf.csums_alg, p->csums_alg);
2808 goto disconnect;
2809 }
2810 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2811 p->csums_alg, "csums-alg");
2812 if (IS_ERR(csums_tfm)) {
2813 csums_tfm = NULL;
2814 goto disconnect;
2815 }
2816 }
2817
2818
2819 spin_lock(&mdev->peer_seq_lock);
2820 /* lock against drbd_nl_syncer_conf() */
2821 if (verify_tfm) {
2822 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2823 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2824 crypto_free_hash(mdev->verify_tfm);
2825 mdev->verify_tfm = verify_tfm;
2826 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2827 }
2828 if (csums_tfm) {
2829 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2830 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2831 crypto_free_hash(mdev->csums_tfm);
2832 mdev->csums_tfm = csums_tfm;
2833 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2834 }
2835 spin_unlock(&mdev->peer_seq_lock);
2836 }
2837
2838 return ok;
2839disconnect:
2840 /* just for completeness: actually not needed,
2841 * as this is not reached if csums_tfm was ok. */
2842 crypto_free_hash(csums_tfm);
2843 /* but free the verify_tfm again, if csums_tfm did not work out */
2844 crypto_free_hash(verify_tfm);
2845 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2846 return FALSE;
2847}
2848
2849static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2850{
2851 /* sorry, we currently have no working implementation
2852 * of distributed TCQ */
2853}
2854
2855/* warn if the arguments differ by more than 12.5% */
2856static void warn_if_differ_considerably(struct drbd_conf *mdev,
2857 const char *s, sector_t a, sector_t b)
2858{
2859 sector_t d;
2860 if (a == 0 || b == 0)
2861 return;
2862 d = (a > b) ? (a - b) : (b - a);
2863 if (d > (a>>3) || d > (b>>3))
2864 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2865 (unsigned long long)a, (unsigned long long)b);
2866}
2867
2868static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2869{
2870 struct p_sizes *p = (struct p_sizes *)h;
2871 enum determine_dev_size dd = unchanged;
2872 unsigned int max_seg_s;
2873 sector_t p_size, p_usize, my_usize;
2874 int ldsc = 0; /* local disk size changed */
Philipp Reisnere89b5912010-03-24 17:11:33 +01002875 enum dds_flags ddsf;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002876
2877 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2878 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2879 return FALSE;
2880
2881 p_size = be64_to_cpu(p->d_size);
2882 p_usize = be64_to_cpu(p->u_size);
2883
2884 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2885 dev_err(DEV, "some backing storage is needed\n");
2886 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2887 return FALSE;
2888 }
2889
2890 /* just store the peer's disk size for now.
2891 * we still need to figure out whether we accept that. */
2892 mdev->p_size = p_size;
2893
2894#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2895 if (get_ldev(mdev)) {
2896 warn_if_differ_considerably(mdev, "lower level device sizes",
2897 p_size, drbd_get_max_capacity(mdev->ldev));
2898 warn_if_differ_considerably(mdev, "user requested size",
2899 p_usize, mdev->ldev->dc.disk_size);
2900
2901 /* if this is the first connect, or an otherwise expected
2902 * param exchange, choose the minimum */
2903 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2904 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2905 p_usize);
2906
2907 my_usize = mdev->ldev->dc.disk_size;
2908
2909 if (mdev->ldev->dc.disk_size != p_usize) {
2910 mdev->ldev->dc.disk_size = p_usize;
2911 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2912 (unsigned long)mdev->ldev->dc.disk_size);
2913 }
2914
2915 /* Never shrink a device with usable data during connect.
2916 But allow online shrinking if we are connected. */
Philipp Reisnera393db62009-12-22 13:35:52 +01002917 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
Philipp Reisnerb411b362009-09-25 16:07:19 -07002918 drbd_get_capacity(mdev->this_bdev) &&
2919 mdev->state.disk >= D_OUTDATED &&
2920 mdev->state.conn < C_CONNECTED) {
2921 dev_err(DEV, "The peer's disk size is too small!\n");
2922 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2923 mdev->ldev->dc.disk_size = my_usize;
2924 put_ldev(mdev);
2925 return FALSE;
2926 }
2927 put_ldev(mdev);
2928 }
2929#undef min_not_zero
2930
Philipp Reisnere89b5912010-03-24 17:11:33 +01002931 ddsf = be16_to_cpu(p->dds_flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002932 if (get_ldev(mdev)) {
Philipp Reisnere89b5912010-03-24 17:11:33 +01002933 dd = drbd_determin_dev_size(mdev, ddsf);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002934 put_ldev(mdev);
2935 if (dd == dev_size_error)
2936 return FALSE;
2937 drbd_md_sync(mdev);
2938 } else {
2939 /* I am diskless, need to accept the peer's size. */
2940 drbd_set_my_capacity(mdev, p_size);
2941 }
2942
Philipp Reisnerb411b362009-09-25 16:07:19 -07002943 if (get_ldev(mdev)) {
2944 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2945 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2946 ldsc = 1;
2947 }
2948
2949 max_seg_s = be32_to_cpu(p->max_segment_size);
2950 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2951 drbd_setup_queue_param(mdev, max_seg_s);
2952
Philipp Reisnere89b5912010-03-24 17:11:33 +01002953 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
Philipp Reisnerb411b362009-09-25 16:07:19 -07002954 put_ldev(mdev);
2955 }
2956
2957 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2958 if (be64_to_cpu(p->c_size) !=
2959 drbd_get_capacity(mdev->this_bdev) || ldsc) {
2960 /* we have different sizes, probably peer
2961 * needs to know my new size... */
Philipp Reisnere89b5912010-03-24 17:11:33 +01002962 drbd_send_sizes(mdev, 0, ddsf);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002963 }
2964 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2965 (dd == grew && mdev->state.conn == C_CONNECTED)) {
2966 if (mdev->state.pdsk >= D_INCONSISTENT &&
Philipp Reisnere89b5912010-03-24 17:11:33 +01002967 mdev->state.disk >= D_INCONSISTENT) {
2968 if (ddsf & DDSF_NO_RESYNC)
2969 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
2970 else
2971 resync_after_online_grow(mdev);
2972 } else
Philipp Reisnerb411b362009-09-25 16:07:19 -07002973 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2974 }
2975 }
2976
2977 return TRUE;
2978}
2979
2980static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2981{
2982 struct p_uuids *p = (struct p_uuids *)h;
2983 u64 *p_uuid;
2984 int i;
2985
2986 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2987 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2988 return FALSE;
2989
2990 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2991
2992 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2993 p_uuid[i] = be64_to_cpu(p->uuid[i]);
2994
2995 kfree(mdev->p_uuid);
2996 mdev->p_uuid = p_uuid;
2997
2998 if (mdev->state.conn < C_CONNECTED &&
2999 mdev->state.disk < D_INCONSISTENT &&
3000 mdev->state.role == R_PRIMARY &&
3001 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3002 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3003 (unsigned long long)mdev->ed_uuid);
3004 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3005 return FALSE;
3006 }
3007
3008 if (get_ldev(mdev)) {
3009 int skip_initial_sync =
3010 mdev->state.conn == C_CONNECTED &&
3011 mdev->agreed_pro_version >= 90 &&
3012 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3013 (p_uuid[UI_FLAGS] & 8);
3014 if (skip_initial_sync) {
3015 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3016 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3017 "clear_n_write from receive_uuids");
3018 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3019 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3020 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3021 CS_VERBOSE, NULL);
3022 drbd_md_sync(mdev);
3023 }
3024 put_ldev(mdev);
3025 }
3026
3027 /* Before we test for the disk state, we should wait until an eventually
3028 ongoing cluster wide state change is finished. That is important if
3029 we are primary and are detaching from our disk. We need to see the
3030 new disk state... */
3031 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3032 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3033 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3034
3035 return TRUE;
3036}
3037
3038/**
3039 * convert_state() - Converts the peer's view of the cluster state to our point of view
3040 * @ps: The state as seen by the peer.
3041 */
3042static union drbd_state convert_state(union drbd_state ps)
3043{
3044 union drbd_state ms;
3045
3046 static enum drbd_conns c_tab[] = {
3047 [C_CONNECTED] = C_CONNECTED,
3048
3049 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3050 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3051 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3052 [C_VERIFY_S] = C_VERIFY_T,
3053 [C_MASK] = C_MASK,
3054 };
3055
3056 ms.i = ps.i;
3057
3058 ms.conn = c_tab[ps.conn];
3059 ms.peer = ps.role;
3060 ms.role = ps.peer;
3061 ms.pdsk = ps.disk;
3062 ms.disk = ps.pdsk;
3063 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3064
3065 return ms;
3066}
3067
3068static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3069{
3070 struct p_req_state *p = (struct p_req_state *)h;
3071 union drbd_state mask, val;
3072 int rv;
3073
3074 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3075 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3076 return FALSE;
3077
3078 mask.i = be32_to_cpu(p->mask);
3079 val.i = be32_to_cpu(p->val);
3080
3081 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3082 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3083 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3084 return TRUE;
3085 }
3086
3087 mask = convert_state(mask);
3088 val = convert_state(val);
3089
3090 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3091
3092 drbd_send_sr_reply(mdev, rv);
3093 drbd_md_sync(mdev);
3094
3095 return TRUE;
3096}
3097
3098static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3099{
3100 struct p_state *p = (struct p_state *)h;
3101 enum drbd_conns nconn, oconn;
3102 union drbd_state ns, peer_state;
3103 enum drbd_disk_state real_peer_disk;
3104 int rv;
3105
3106 ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3107 return FALSE;
3108
3109 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3110 return FALSE;
3111
3112 peer_state.i = be32_to_cpu(p->state);
3113
3114 real_peer_disk = peer_state.disk;
3115 if (peer_state.disk == D_NEGOTIATING) {
3116 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3117 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3118 }
3119
3120 spin_lock_irq(&mdev->req_lock);
3121 retry:
3122 oconn = nconn = mdev->state.conn;
3123 spin_unlock_irq(&mdev->req_lock);
3124
3125 if (nconn == C_WF_REPORT_PARAMS)
3126 nconn = C_CONNECTED;
3127
3128 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3129 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3130 int cr; /* consider resync */
3131
3132 /* if we established a new connection */
3133 cr = (oconn < C_CONNECTED);
3134 /* if we had an established connection
3135 * and one of the nodes newly attaches a disk */
3136 cr |= (oconn == C_CONNECTED &&
3137 (peer_state.disk == D_NEGOTIATING ||
3138 mdev->state.disk == D_NEGOTIATING));
3139 /* if we have both been inconsistent, and the peer has been
3140 * forced to be UpToDate with --overwrite-data */
3141 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3142 /* if we had been plain connected, and the admin requested to
3143 * start a sync by "invalidate" or "invalidate-remote" */
3144 cr |= (oconn == C_CONNECTED &&
3145 (peer_state.conn >= C_STARTING_SYNC_S &&
3146 peer_state.conn <= C_WF_BITMAP_T));
3147
3148 if (cr)
3149 nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3150
3151 put_ldev(mdev);
3152 if (nconn == C_MASK) {
Lars Ellenberg580b9762010-02-26 23:15:23 +01003153 nconn = C_CONNECTED;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003154 if (mdev->state.disk == D_NEGOTIATING) {
3155 drbd_force_state(mdev, NS(disk, D_DISKLESS));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003156 } else if (peer_state.disk == D_NEGOTIATING) {
3157 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3158 peer_state.disk = D_DISKLESS;
Lars Ellenberg580b9762010-02-26 23:15:23 +01003159 real_peer_disk = D_DISKLESS;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003160 } else {
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01003161 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3162 return FALSE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003163 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3164 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3165 return FALSE;
3166 }
3167 }
3168 }
3169
3170 spin_lock_irq(&mdev->req_lock);
3171 if (mdev->state.conn != oconn)
3172 goto retry;
3173 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3174 ns.i = mdev->state.i;
3175 ns.conn = nconn;
3176 ns.peer = peer_state.role;
3177 ns.pdsk = real_peer_disk;
3178 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3179 if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3180 ns.disk = mdev->new_state_tmp.disk;
3181
3182 rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3183 ns = mdev->state;
3184 spin_unlock_irq(&mdev->req_lock);
3185
3186 if (rv < SS_SUCCESS) {
3187 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3188 return FALSE;
3189 }
3190
3191 if (oconn > C_WF_REPORT_PARAMS) {
3192 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3193 peer_state.disk != D_NEGOTIATING ) {
3194 /* we want resync, peer has not yet decided to sync... */
3195 /* Nowadays only used when forcing a node into primary role and
3196 setting its disk to UpToDate with that */
3197 drbd_send_uuids(mdev);
3198 drbd_send_state(mdev);
3199 }
3200 }
3201
3202 mdev->net_conf->want_lose = 0;
3203
3204 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3205
3206 return TRUE;
3207}
3208
3209static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3210{
3211 struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3212
3213 wait_event(mdev->misc_wait,
3214 mdev->state.conn == C_WF_SYNC_UUID ||
3215 mdev->state.conn < C_CONNECTED ||
3216 mdev->state.disk < D_NEGOTIATING);
3217
3218 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3219
3220 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3221 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3222 return FALSE;
3223
3224 /* Here the _drbd_uuid_ functions are right, current should
3225 _not_ be rotated into the history */
3226 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3227 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3228 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3229
3230 drbd_start_resync(mdev, C_SYNC_TARGET);
3231
3232 put_ldev(mdev);
3233 } else
3234 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3235
3236 return TRUE;
3237}
3238
3239enum receive_bitmap_ret { OK, DONE, FAILED };
3240
3241static enum receive_bitmap_ret
3242receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3243 unsigned long *buffer, struct bm_xfer_ctx *c)
3244{
3245 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3246 unsigned want = num_words * sizeof(long);
3247
3248 if (want != h->length) {
3249 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3250 return FAILED;
3251 }
3252 if (want == 0)
3253 return DONE;
3254 if (drbd_recv(mdev, buffer, want) != want)
3255 return FAILED;
3256
3257 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3258
3259 c->word_offset += num_words;
3260 c->bit_offset = c->word_offset * BITS_PER_LONG;
3261 if (c->bit_offset > c->bm_bits)
3262 c->bit_offset = c->bm_bits;
3263
3264 return OK;
3265}
3266
3267static enum receive_bitmap_ret
3268recv_bm_rle_bits(struct drbd_conf *mdev,
3269 struct p_compressed_bm *p,
3270 struct bm_xfer_ctx *c)
3271{
3272 struct bitstream bs;
3273 u64 look_ahead;
3274 u64 rl;
3275 u64 tmp;
3276 unsigned long s = c->bit_offset;
3277 unsigned long e;
3278 int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3279 int toggle = DCBP_get_start(p);
3280 int have;
3281 int bits;
3282
3283 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3284
3285 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3286 if (bits < 0)
3287 return FAILED;
3288
3289 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3290 bits = vli_decode_bits(&rl, look_ahead);
3291 if (bits <= 0)
3292 return FAILED;
3293
3294 if (toggle) {
3295 e = s + rl -1;
3296 if (e >= c->bm_bits) {
3297 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3298 return FAILED;
3299 }
3300 _drbd_bm_set_bits(mdev, s, e);
3301 }
3302
3303 if (have < bits) {
3304 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3305 have, bits, look_ahead,
3306 (unsigned int)(bs.cur.b - p->code),
3307 (unsigned int)bs.buf_len);
3308 return FAILED;
3309 }
3310 look_ahead >>= bits;
3311 have -= bits;
3312
3313 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3314 if (bits < 0)
3315 return FAILED;
3316 look_ahead |= tmp << have;
3317 have += bits;
3318 }
3319
3320 c->bit_offset = s;
3321 bm_xfer_ctx_bit_to_word_offset(c);
3322
3323 return (s == c->bm_bits) ? DONE : OK;
3324}
3325
3326static enum receive_bitmap_ret
3327decode_bitmap_c(struct drbd_conf *mdev,
3328 struct p_compressed_bm *p,
3329 struct bm_xfer_ctx *c)
3330{
3331 if (DCBP_get_code(p) == RLE_VLI_Bits)
3332 return recv_bm_rle_bits(mdev, p, c);
3333
3334 /* other variants had been implemented for evaluation,
3335 * but have been dropped as this one turned out to be "best"
3336 * during all our tests. */
3337
3338 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3339 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3340 return FAILED;
3341}
3342
3343void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3344 const char *direction, struct bm_xfer_ctx *c)
3345{
3346 /* what would it take to transfer it "plaintext" */
3347 unsigned plain = sizeof(struct p_header) *
3348 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3349 + c->bm_words * sizeof(long);
3350 unsigned total = c->bytes[0] + c->bytes[1];
3351 unsigned r;
3352
3353 /* total can not be zero. but just in case: */
3354 if (total == 0)
3355 return;
3356
3357 /* don't report if not compressed */
3358 if (total >= plain)
3359 return;
3360
3361 /* total < plain. check for overflow, still */
3362 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3363 : (1000 * total / plain);
3364
3365 if (r > 1000)
3366 r = 1000;
3367
3368 r = 1000 - r;
3369 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3370 "total %u; compression: %u.%u%%\n",
3371 direction,
3372 c->bytes[1], c->packets[1],
3373 c->bytes[0], c->packets[0],
3374 total, r/10, r % 10);
3375}
3376
3377/* Since we are processing the bitfield from lower addresses to higher,
3378 it does not matter if the process it in 32 bit chunks or 64 bit
3379 chunks as long as it is little endian. (Understand it as byte stream,
3380 beginning with the lowest byte...) If we would use big endian
3381 we would need to process it from the highest address to the lowest,
3382 in order to be agnostic to the 32 vs 64 bits issue.
3383
3384 returns 0 on failure, 1 if we successfully received it. */
3385static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3386{
3387 struct bm_xfer_ctx c;
3388 void *buffer;
3389 enum receive_bitmap_ret ret;
3390 int ok = FALSE;
3391
3392 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3393
3394 drbd_bm_lock(mdev, "receive bitmap");
3395
3396 /* maybe we should use some per thread scratch page,
3397 * and allocate that during initial device creation? */
3398 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3399 if (!buffer) {
3400 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3401 goto out;
3402 }
3403
3404 c = (struct bm_xfer_ctx) {
3405 .bm_bits = drbd_bm_bits(mdev),
3406 .bm_words = drbd_bm_words(mdev),
3407 };
3408
3409 do {
3410 if (h->command == P_BITMAP) {
3411 ret = receive_bitmap_plain(mdev, h, buffer, &c);
3412 } else if (h->command == P_COMPRESSED_BITMAP) {
3413 /* MAYBE: sanity check that we speak proto >= 90,
3414 * and the feature is enabled! */
3415 struct p_compressed_bm *p;
3416
3417 if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3418 dev_err(DEV, "ReportCBitmap packet too large\n");
3419 goto out;
3420 }
3421 /* use the page buff */
3422 p = buffer;
3423 memcpy(p, h, sizeof(*h));
3424 if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3425 goto out;
3426 if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3427 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3428 return FAILED;
3429 }
3430 ret = decode_bitmap_c(mdev, p, &c);
3431 } else {
3432 dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3433 goto out;
3434 }
3435
3436 c.packets[h->command == P_BITMAP]++;
3437 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3438
3439 if (ret != OK)
3440 break;
3441
3442 if (!drbd_recv_header(mdev, h))
3443 goto out;
3444 } while (ret == OK);
3445 if (ret == FAILED)
3446 goto out;
3447
3448 INFO_bm_xfer_stats(mdev, "receive", &c);
3449
3450 if (mdev->state.conn == C_WF_BITMAP_T) {
3451 ok = !drbd_send_bitmap(mdev);
3452 if (!ok)
3453 goto out;
3454 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3455 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3456 D_ASSERT(ok == SS_SUCCESS);
3457 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3458 /* admin may have requested C_DISCONNECTING,
3459 * other threads may have noticed network errors */
3460 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3461 drbd_conn_str(mdev->state.conn));
3462 }
3463
3464 ok = TRUE;
3465 out:
3466 drbd_bm_unlock(mdev);
3467 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3468 drbd_start_resync(mdev, C_SYNC_SOURCE);
3469 free_page((unsigned long) buffer);
3470 return ok;
3471}
3472
3473static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3474{
3475 /* TODO zero copy sink :) */
3476 static char sink[128];
3477 int size, want, r;
3478
3479 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3480 h->command, h->length);
3481
3482 size = h->length;
3483 while (size > 0) {
3484 want = min_t(int, size, sizeof(sink));
3485 r = drbd_recv(mdev, sink, want);
3486 ERR_IF(r <= 0) break;
3487 size -= r;
3488 }
3489 return size == 0;
3490}
3491
3492static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3493{
3494 if (mdev->state.disk >= D_INCONSISTENT)
3495 drbd_kick_lo(mdev);
3496
3497 /* Make sure we've acked all the TCP data associated
3498 * with the data requests being unplugged */
3499 drbd_tcp_quickack(mdev->data.socket);
3500
3501 return TRUE;
3502}
3503
3504typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3505
3506static drbd_cmd_handler_f drbd_default_handler[] = {
3507 [P_DATA] = receive_Data,
3508 [P_DATA_REPLY] = receive_DataReply,
3509 [P_RS_DATA_REPLY] = receive_RSDataReply,
3510 [P_BARRIER] = receive_Barrier,
3511 [P_BITMAP] = receive_bitmap,
3512 [P_COMPRESSED_BITMAP] = receive_bitmap,
3513 [P_UNPLUG_REMOTE] = receive_UnplugRemote,
3514 [P_DATA_REQUEST] = receive_DataRequest,
3515 [P_RS_DATA_REQUEST] = receive_DataRequest,
3516 [P_SYNC_PARAM] = receive_SyncParam,
3517 [P_SYNC_PARAM89] = receive_SyncParam,
3518 [P_PROTOCOL] = receive_protocol,
3519 [P_UUIDS] = receive_uuids,
3520 [P_SIZES] = receive_sizes,
3521 [P_STATE] = receive_state,
3522 [P_STATE_CHG_REQ] = receive_req_state,
3523 [P_SYNC_UUID] = receive_sync_uuid,
3524 [P_OV_REQUEST] = receive_DataRequest,
3525 [P_OV_REPLY] = receive_DataRequest,
3526 [P_CSUM_RS_REQUEST] = receive_DataRequest,
3527 /* anything missing from this table is in
3528 * the asender_tbl, see get_asender_cmd */
3529 [P_MAX_CMD] = NULL,
3530};
3531
3532static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3533static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3534
3535static void drbdd(struct drbd_conf *mdev)
3536{
3537 drbd_cmd_handler_f handler;
3538 struct p_header *header = &mdev->data.rbuf.header;
3539
3540 while (get_t_state(&mdev->receiver) == Running) {
3541 drbd_thread_current_set_cpu(mdev);
Lars Ellenberg0b33a912009-11-16 15:58:04 +01003542 if (!drbd_recv_header(mdev, header)) {
3543 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003544 break;
Lars Ellenberg0b33a912009-11-16 15:58:04 +01003545 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07003546
3547 if (header->command < P_MAX_CMD)
3548 handler = drbd_cmd_handler[header->command];
3549 else if (P_MAY_IGNORE < header->command
3550 && header->command < P_MAX_OPT_CMD)
3551 handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3552 else if (header->command > P_MAX_OPT_CMD)
3553 handler = receive_skip;
3554 else
3555 handler = NULL;
3556
3557 if (unlikely(!handler)) {
3558 dev_err(DEV, "unknown packet type %d, l: %d!\n",
3559 header->command, header->length);
3560 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3561 break;
3562 }
3563 if (unlikely(!handler(mdev, header))) {
3564 dev_err(DEV, "error receiving %s, l: %d!\n",
3565 cmdname(header->command), header->length);
3566 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3567 break;
3568 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07003569 }
3570}
3571
3572static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3573{
3574 struct hlist_head *slot;
3575 struct hlist_node *pos;
3576 struct hlist_node *tmp;
3577 struct drbd_request *req;
3578 int i;
3579
3580 /*
3581 * Application READ requests
3582 */
3583 spin_lock_irq(&mdev->req_lock);
3584 for (i = 0; i < APP_R_HSIZE; i++) {
3585 slot = mdev->app_reads_hash+i;
3586 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3587 /* it may (but should not any longer!)
3588 * be on the work queue; if that assert triggers,
3589 * we need to also grab the
3590 * spin_lock_irq(&mdev->data.work.q_lock);
3591 * and list_del_init here. */
3592 D_ASSERT(list_empty(&req->w.list));
3593 /* It would be nice to complete outside of spinlock.
3594 * But this is easier for now. */
3595 _req_mod(req, connection_lost_while_pending);
3596 }
3597 }
3598 for (i = 0; i < APP_R_HSIZE; i++)
3599 if (!hlist_empty(mdev->app_reads_hash+i))
3600 dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3601 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3602
3603 memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3604 spin_unlock_irq(&mdev->req_lock);
3605}
3606
3607void drbd_flush_workqueue(struct drbd_conf *mdev)
3608{
3609 struct drbd_wq_barrier barr;
3610
3611 barr.w.cb = w_prev_work_done;
3612 init_completion(&barr.done);
3613 drbd_queue_work(&mdev->data.work, &barr.w);
3614 wait_for_completion(&barr.done);
3615}
3616
3617static void drbd_disconnect(struct drbd_conf *mdev)
3618{
3619 enum drbd_fencing_p fp;
3620 union drbd_state os, ns;
3621 int rv = SS_UNKNOWN_ERROR;
3622 unsigned int i;
3623
3624 if (mdev->state.conn == C_STANDALONE)
3625 return;
3626 if (mdev->state.conn >= C_WF_CONNECTION)
3627 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3628 drbd_conn_str(mdev->state.conn));
3629
3630 /* asender does not clean up anything. it must not interfere, either */
3631 drbd_thread_stop(&mdev->asender);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003632 drbd_free_sock(mdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003633
3634 spin_lock_irq(&mdev->req_lock);
3635 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3636 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3637 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3638 spin_unlock_irq(&mdev->req_lock);
3639
3640 /* We do not have data structures that would allow us to
3641 * get the rs_pending_cnt down to 0 again.
3642 * * On C_SYNC_TARGET we do not have any data structures describing
3643 * the pending RSDataRequest's we have sent.
3644 * * On C_SYNC_SOURCE there is no data structure that tracks
3645 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3646 * And no, it is not the sum of the reference counts in the
3647 * resync_LRU. The resync_LRU tracks the whole operation including
3648 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3649 * on the fly. */
3650 drbd_rs_cancel_all(mdev);
3651 mdev->rs_total = 0;
3652 mdev->rs_failed = 0;
3653 atomic_set(&mdev->rs_pending_cnt, 0);
3654 wake_up(&mdev->misc_wait);
3655
3656 /* make sure syncer is stopped and w_resume_next_sg queued */
3657 del_timer_sync(&mdev->resync_timer);
3658 set_bit(STOP_SYNC_TIMER, &mdev->flags);
3659 resync_timer_fn((unsigned long)mdev);
3660
Philipp Reisnerb411b362009-09-25 16:07:19 -07003661 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3662 * w_make_resync_request etc. which may still be on the worker queue
3663 * to be "canceled" */
3664 drbd_flush_workqueue(mdev);
3665
3666 /* This also does reclaim_net_ee(). If we do this too early, we might
3667 * miss some resync ee and pages.*/
3668 drbd_process_done_ee(mdev);
3669
3670 kfree(mdev->p_uuid);
3671 mdev->p_uuid = NULL;
3672
3673 if (!mdev->state.susp)
3674 tl_clear(mdev);
3675
3676 drbd_fail_pending_reads(mdev);
3677
3678 dev_info(DEV, "Connection closed\n");
3679
3680 drbd_md_sync(mdev);
3681
3682 fp = FP_DONT_CARE;
3683 if (get_ldev(mdev)) {
3684 fp = mdev->ldev->dc.fencing;
3685 put_ldev(mdev);
3686 }
3687
3688 if (mdev->state.role == R_PRIMARY) {
3689 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3690 enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3691 drbd_request_state(mdev, NS(pdsk, nps));
3692 }
3693 }
3694
3695 spin_lock_irq(&mdev->req_lock);
3696 os = mdev->state;
3697 if (os.conn >= C_UNCONNECTED) {
3698 /* Do not restart in case we are C_DISCONNECTING */
3699 ns = os;
3700 ns.conn = C_UNCONNECTED;
3701 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3702 }
3703 spin_unlock_irq(&mdev->req_lock);
3704
3705 if (os.conn == C_DISCONNECTING) {
3706 struct hlist_head *h;
3707 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3708
3709 /* we must not free the tl_hash
3710 * while application io is still on the fly */
3711 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3712
3713 spin_lock_irq(&mdev->req_lock);
3714 /* paranoia code */
3715 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3716 if (h->first)
3717 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3718 (int)(h - mdev->ee_hash), h->first);
3719 kfree(mdev->ee_hash);
3720 mdev->ee_hash = NULL;
3721 mdev->ee_hash_s = 0;
3722
3723 /* paranoia code */
3724 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3725 if (h->first)
3726 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3727 (int)(h - mdev->tl_hash), h->first);
3728 kfree(mdev->tl_hash);
3729 mdev->tl_hash = NULL;
3730 mdev->tl_hash_s = 0;
3731 spin_unlock_irq(&mdev->req_lock);
3732
3733 crypto_free_hash(mdev->cram_hmac_tfm);
3734 mdev->cram_hmac_tfm = NULL;
3735
3736 kfree(mdev->net_conf);
3737 mdev->net_conf = NULL;
3738 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3739 }
3740
3741 /* tcp_close and release of sendpage pages can be deferred. I don't
3742 * want to use SO_LINGER, because apparently it can be deferred for
3743 * more than 20 seconds (longest time I checked).
3744 *
3745 * Actually we don't care for exactly when the network stack does its
3746 * put_page(), but release our reference on these pages right here.
3747 */
3748 i = drbd_release_ee(mdev, &mdev->net_ee);
3749 if (i)
3750 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3751 i = atomic_read(&mdev->pp_in_use);
3752 if (i)
3753 dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
3754
3755 D_ASSERT(list_empty(&mdev->read_ee));
3756 D_ASSERT(list_empty(&mdev->active_ee));
3757 D_ASSERT(list_empty(&mdev->sync_ee));
3758 D_ASSERT(list_empty(&mdev->done_ee));
3759
3760 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3761 atomic_set(&mdev->current_epoch->epoch_size, 0);
3762 D_ASSERT(list_empty(&mdev->current_epoch->list));
3763}
3764
3765/*
3766 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3767 * we can agree on is stored in agreed_pro_version.
3768 *
3769 * feature flags and the reserved array should be enough room for future
3770 * enhancements of the handshake protocol, and possible plugins...
3771 *
3772 * for now, they are expected to be zero, but ignored.
3773 */
3774static int drbd_send_handshake(struct drbd_conf *mdev)
3775{
3776 /* ASSERT current == mdev->receiver ... */
3777 struct p_handshake *p = &mdev->data.sbuf.handshake;
3778 int ok;
3779
3780 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3781 dev_err(DEV, "interrupted during initial handshake\n");
3782 return 0; /* interrupted. not ok. */
3783 }
3784
3785 if (mdev->data.socket == NULL) {
3786 mutex_unlock(&mdev->data.mutex);
3787 return 0;
3788 }
3789
3790 memset(p, 0, sizeof(*p));
3791 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3792 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3793 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3794 (struct p_header *)p, sizeof(*p), 0 );
3795 mutex_unlock(&mdev->data.mutex);
3796 return ok;
3797}
3798
3799/*
3800 * return values:
3801 * 1 yes, we have a valid connection
3802 * 0 oops, did not work out, please try again
3803 * -1 peer talks different language,
3804 * no point in trying again, please go standalone.
3805 */
3806static int drbd_do_handshake(struct drbd_conf *mdev)
3807{
3808 /* ASSERT current == mdev->receiver ... */
3809 struct p_handshake *p = &mdev->data.rbuf.handshake;
3810 const int expect = sizeof(struct p_handshake)
3811 -sizeof(struct p_header);
3812 int rv;
3813
3814 rv = drbd_send_handshake(mdev);
3815 if (!rv)
3816 return 0;
3817
3818 rv = drbd_recv_header(mdev, &p->head);
3819 if (!rv)
3820 return 0;
3821
3822 if (p->head.command != P_HAND_SHAKE) {
3823 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3824 cmdname(p->head.command), p->head.command);
3825 return -1;
3826 }
3827
3828 if (p->head.length != expect) {
3829 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3830 expect, p->head.length);
3831 return -1;
3832 }
3833
3834 rv = drbd_recv(mdev, &p->head.payload, expect);
3835
3836 if (rv != expect) {
3837 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3838 return 0;
3839 }
3840
Philipp Reisnerb411b362009-09-25 16:07:19 -07003841 p->protocol_min = be32_to_cpu(p->protocol_min);
3842 p->protocol_max = be32_to_cpu(p->protocol_max);
3843 if (p->protocol_max == 0)
3844 p->protocol_max = p->protocol_min;
3845
3846 if (PRO_VERSION_MAX < p->protocol_min ||
3847 PRO_VERSION_MIN > p->protocol_max)
3848 goto incompat;
3849
3850 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3851
3852 dev_info(DEV, "Handshake successful: "
3853 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3854
3855 return 1;
3856
3857 incompat:
3858 dev_err(DEV, "incompatible DRBD dialects: "
3859 "I support %d-%d, peer supports %d-%d\n",
3860 PRO_VERSION_MIN, PRO_VERSION_MAX,
3861 p->protocol_min, p->protocol_max);
3862 return -1;
3863}
3864
3865#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3866static int drbd_do_auth(struct drbd_conf *mdev)
3867{
3868 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3869 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01003870 return -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003871}
3872#else
3873#define CHALLENGE_LEN 64
Johannes Thomab10d96c2010-01-07 16:02:50 +01003874
3875/* Return value:
3876 1 - auth succeeded,
3877 0 - failed, try again (network error),
3878 -1 - auth failed, don't try again.
3879*/
3880
Philipp Reisnerb411b362009-09-25 16:07:19 -07003881static int drbd_do_auth(struct drbd_conf *mdev)
3882{
3883 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
3884 struct scatterlist sg;
3885 char *response = NULL;
3886 char *right_response = NULL;
3887 char *peers_ch = NULL;
3888 struct p_header p;
3889 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3890 unsigned int resp_size;
3891 struct hash_desc desc;
3892 int rv;
3893
3894 desc.tfm = mdev->cram_hmac_tfm;
3895 desc.flags = 0;
3896
3897 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3898 (u8 *)mdev->net_conf->shared_secret, key_len);
3899 if (rv) {
3900 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01003901 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003902 goto fail;
3903 }
3904
3905 get_random_bytes(my_challenge, CHALLENGE_LEN);
3906
3907 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3908 if (!rv)
3909 goto fail;
3910
3911 rv = drbd_recv_header(mdev, &p);
3912 if (!rv)
3913 goto fail;
3914
3915 if (p.command != P_AUTH_CHALLENGE) {
3916 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3917 cmdname(p.command), p.command);
3918 rv = 0;
3919 goto fail;
3920 }
3921
3922 if (p.length > CHALLENGE_LEN*2) {
3923 dev_err(DEV, "expected AuthChallenge payload too big.\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01003924 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003925 goto fail;
3926 }
3927
3928 peers_ch = kmalloc(p.length, GFP_NOIO);
3929 if (peers_ch == NULL) {
3930 dev_err(DEV, "kmalloc of peers_ch failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01003931 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003932 goto fail;
3933 }
3934
3935 rv = drbd_recv(mdev, peers_ch, p.length);
3936
3937 if (rv != p.length) {
3938 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
3939 rv = 0;
3940 goto fail;
3941 }
3942
3943 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
3944 response = kmalloc(resp_size, GFP_NOIO);
3945 if (response == NULL) {
3946 dev_err(DEV, "kmalloc of response failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01003947 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003948 goto fail;
3949 }
3950
3951 sg_init_table(&sg, 1);
3952 sg_set_buf(&sg, peers_ch, p.length);
3953
3954 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
3955 if (rv) {
3956 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01003957 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003958 goto fail;
3959 }
3960
3961 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
3962 if (!rv)
3963 goto fail;
3964
3965 rv = drbd_recv_header(mdev, &p);
3966 if (!rv)
3967 goto fail;
3968
3969 if (p.command != P_AUTH_RESPONSE) {
3970 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
3971 cmdname(p.command), p.command);
3972 rv = 0;
3973 goto fail;
3974 }
3975
3976 if (p.length != resp_size) {
3977 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
3978 rv = 0;
3979 goto fail;
3980 }
3981
3982 rv = drbd_recv(mdev, response , resp_size);
3983
3984 if (rv != resp_size) {
3985 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
3986 rv = 0;
3987 goto fail;
3988 }
3989
3990 right_response = kmalloc(resp_size, GFP_NOIO);
Julia Lawall2d1ee872009-12-27 22:27:11 +01003991 if (right_response == NULL) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07003992 dev_err(DEV, "kmalloc of right_response failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01003993 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003994 goto fail;
3995 }
3996
3997 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
3998
3999 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4000 if (rv) {
4001 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004002 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004003 goto fail;
4004 }
4005
4006 rv = !memcmp(response, right_response, resp_size);
4007
4008 if (rv)
4009 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4010 resp_size, mdev->net_conf->cram_hmac_alg);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004011 else
4012 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004013
4014 fail:
4015 kfree(peers_ch);
4016 kfree(response);
4017 kfree(right_response);
4018
4019 return rv;
4020}
4021#endif
4022
4023int drbdd_init(struct drbd_thread *thi)
4024{
4025 struct drbd_conf *mdev = thi->mdev;
4026 unsigned int minor = mdev_to_minor(mdev);
4027 int h;
4028
4029 sprintf(current->comm, "drbd%d_receiver", minor);
4030
4031 dev_info(DEV, "receiver (re)started\n");
4032
4033 do {
4034 h = drbd_connect(mdev);
4035 if (h == 0) {
4036 drbd_disconnect(mdev);
4037 __set_current_state(TASK_INTERRUPTIBLE);
4038 schedule_timeout(HZ);
4039 }
4040 if (h == -1) {
4041 dev_warn(DEV, "Discarding network configuration.\n");
4042 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4043 }
4044 } while (h == 0);
4045
4046 if (h > 0) {
4047 if (get_net_conf(mdev)) {
4048 drbdd(mdev);
4049 put_net_conf(mdev);
4050 }
4051 }
4052
4053 drbd_disconnect(mdev);
4054
4055 dev_info(DEV, "receiver terminated\n");
4056 return 0;
4057}
4058
4059/* ********* acknowledge sender ******** */
4060
4061static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4062{
4063 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4064
4065 int retcode = be32_to_cpu(p->retcode);
4066
4067 if (retcode >= SS_SUCCESS) {
4068 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4069 } else {
4070 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4071 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4072 drbd_set_st_err_str(retcode), retcode);
4073 }
4074 wake_up(&mdev->state_wait);
4075
4076 return TRUE;
4077}
4078
4079static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4080{
4081 return drbd_send_ping_ack(mdev);
4082
4083}
4084
4085static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4086{
4087 /* restore idle timeout */
4088 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
Philipp Reisner309d1602010-03-02 15:03:44 +01004089 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4090 wake_up(&mdev->misc_wait);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004091
4092 return TRUE;
4093}
4094
4095static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4096{
4097 struct p_block_ack *p = (struct p_block_ack *)h;
4098 sector_t sector = be64_to_cpu(p->sector);
4099 int blksize = be32_to_cpu(p->blksize);
4100
4101 D_ASSERT(mdev->agreed_pro_version >= 89);
4102
4103 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4104
4105 drbd_rs_complete_io(mdev, sector);
4106 drbd_set_in_sync(mdev, sector, blksize);
4107 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4108 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4109 dec_rs_pending(mdev);
4110
4111 return TRUE;
4112}
4113
4114/* when we receive the ACK for a write request,
4115 * verify that we actually know about it */
4116static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4117 u64 id, sector_t sector)
4118{
4119 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4120 struct hlist_node *n;
4121 struct drbd_request *req;
4122
4123 hlist_for_each_entry(req, n, slot, colision) {
4124 if ((unsigned long)req == (unsigned long)id) {
4125 if (req->sector != sector) {
4126 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4127 "wrong sector (%llus versus %llus)\n", req,
4128 (unsigned long long)req->sector,
4129 (unsigned long long)sector);
4130 break;
4131 }
4132 return req;
4133 }
4134 }
4135 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4136 (void *)(unsigned long)id, (unsigned long long)sector);
4137 return NULL;
4138}
4139
4140typedef struct drbd_request *(req_validator_fn)
4141 (struct drbd_conf *mdev, u64 id, sector_t sector);
4142
4143static int validate_req_change_req_state(struct drbd_conf *mdev,
4144 u64 id, sector_t sector, req_validator_fn validator,
4145 const char *func, enum drbd_req_event what)
4146{
4147 struct drbd_request *req;
4148 struct bio_and_error m;
4149
4150 spin_lock_irq(&mdev->req_lock);
4151 req = validator(mdev, id, sector);
4152 if (unlikely(!req)) {
4153 spin_unlock_irq(&mdev->req_lock);
4154 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4155 return FALSE;
4156 }
4157 __req_mod(req, what, &m);
4158 spin_unlock_irq(&mdev->req_lock);
4159
4160 if (m.bio)
4161 complete_master_bio(mdev, &m);
4162 return TRUE;
4163}
4164
4165static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4166{
4167 struct p_block_ack *p = (struct p_block_ack *)h;
4168 sector_t sector = be64_to_cpu(p->sector);
4169 int blksize = be32_to_cpu(p->blksize);
4170 enum drbd_req_event what;
4171
4172 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4173
4174 if (is_syncer_block_id(p->block_id)) {
4175 drbd_set_in_sync(mdev, sector, blksize);
4176 dec_rs_pending(mdev);
4177 return TRUE;
4178 }
4179 switch (be16_to_cpu(h->command)) {
4180 case P_RS_WRITE_ACK:
4181 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4182 what = write_acked_by_peer_and_sis;
4183 break;
4184 case P_WRITE_ACK:
4185 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4186 what = write_acked_by_peer;
4187 break;
4188 case P_RECV_ACK:
4189 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4190 what = recv_acked_by_peer;
4191 break;
4192 case P_DISCARD_ACK:
4193 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4194 what = conflict_discarded_by_peer;
4195 break;
4196 default:
4197 D_ASSERT(0);
4198 return FALSE;
4199 }
4200
4201 return validate_req_change_req_state(mdev, p->block_id, sector,
4202 _ack_id_to_req, __func__ , what);
4203}
4204
4205static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4206{
4207 struct p_block_ack *p = (struct p_block_ack *)h;
4208 sector_t sector = be64_to_cpu(p->sector);
4209
4210 if (__ratelimit(&drbd_ratelimit_state))
4211 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4212
4213 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4214
4215 if (is_syncer_block_id(p->block_id)) {
4216 int size = be32_to_cpu(p->blksize);
4217 dec_rs_pending(mdev);
4218 drbd_rs_failed_io(mdev, sector, size);
4219 return TRUE;
4220 }
4221 return validate_req_change_req_state(mdev, p->block_id, sector,
4222 _ack_id_to_req, __func__ , neg_acked);
4223}
4224
4225static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4226{
4227 struct p_block_ack *p = (struct p_block_ack *)h;
4228 sector_t sector = be64_to_cpu(p->sector);
4229
4230 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4231 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4232 (unsigned long long)sector, be32_to_cpu(p->blksize));
4233
4234 return validate_req_change_req_state(mdev, p->block_id, sector,
4235 _ar_id_to_req, __func__ , neg_acked);
4236}
4237
4238static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4239{
4240 sector_t sector;
4241 int size;
4242 struct p_block_ack *p = (struct p_block_ack *)h;
4243
4244 sector = be64_to_cpu(p->sector);
4245 size = be32_to_cpu(p->blksize);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004246
4247 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4248
4249 dec_rs_pending(mdev);
4250
4251 if (get_ldev_if_state(mdev, D_FAILED)) {
4252 drbd_rs_complete_io(mdev, sector);
4253 drbd_rs_failed_io(mdev, sector, size);
4254 put_ldev(mdev);
4255 }
4256
4257 return TRUE;
4258}
4259
4260static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4261{
4262 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4263
4264 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4265
4266 return TRUE;
4267}
4268
4269static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4270{
4271 struct p_block_ack *p = (struct p_block_ack *)h;
4272 struct drbd_work *w;
4273 sector_t sector;
4274 int size;
4275
4276 sector = be64_to_cpu(p->sector);
4277 size = be32_to_cpu(p->blksize);
4278
4279 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4280
4281 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4282 drbd_ov_oos_found(mdev, sector, size);
4283 else
4284 ov_oos_print(mdev);
4285
4286 drbd_rs_complete_io(mdev, sector);
4287 dec_rs_pending(mdev);
4288
4289 if (--mdev->ov_left == 0) {
4290 w = kmalloc(sizeof(*w), GFP_NOIO);
4291 if (w) {
4292 w->cb = w_ov_finished;
4293 drbd_queue_work_front(&mdev->data.work, w);
4294 } else {
4295 dev_err(DEV, "kmalloc(w) failed.");
4296 ov_oos_print(mdev);
4297 drbd_resync_finished(mdev);
4298 }
4299 }
4300 return TRUE;
4301}
4302
4303struct asender_cmd {
4304 size_t pkt_size;
4305 int (*process)(struct drbd_conf *mdev, struct p_header *h);
4306};
4307
4308static struct asender_cmd *get_asender_cmd(int cmd)
4309{
4310 static struct asender_cmd asender_tbl[] = {
4311 /* anything missing from this table is in
4312 * the drbd_cmd_handler (drbd_default_handler) table,
4313 * see the beginning of drbdd() */
4314 [P_PING] = { sizeof(struct p_header), got_Ping },
4315 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4316 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4317 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4318 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4319 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4320 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4321 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4322 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4323 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4324 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4325 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4326 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4327 [P_MAX_CMD] = { 0, NULL },
4328 };
4329 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4330 return NULL;
4331 return &asender_tbl[cmd];
4332}
4333
4334int drbd_asender(struct drbd_thread *thi)
4335{
4336 struct drbd_conf *mdev = thi->mdev;
4337 struct p_header *h = &mdev->meta.rbuf.header;
4338 struct asender_cmd *cmd = NULL;
4339
4340 int rv, len;
4341 void *buf = h;
4342 int received = 0;
4343 int expect = sizeof(struct p_header);
4344 int empty;
4345
4346 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4347
4348 current->policy = SCHED_RR; /* Make this a realtime task! */
4349 current->rt_priority = 2; /* more important than all other tasks */
4350
4351 while (get_t_state(thi) == Running) {
4352 drbd_thread_current_set_cpu(mdev);
4353 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4354 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4355 mdev->meta.socket->sk->sk_rcvtimeo =
4356 mdev->net_conf->ping_timeo*HZ/10;
4357 }
4358
4359 /* conditionally cork;
4360 * it may hurt latency if we cork without much to send */
4361 if (!mdev->net_conf->no_cork &&
4362 3 < atomic_read(&mdev->unacked_cnt))
4363 drbd_tcp_cork(mdev->meta.socket);
4364 while (1) {
4365 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4366 flush_signals(current);
4367 if (!drbd_process_done_ee(mdev)) {
4368 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4369 goto reconnect;
4370 }
4371 /* to avoid race with newly queued ACKs */
4372 set_bit(SIGNAL_ASENDER, &mdev->flags);
4373 spin_lock_irq(&mdev->req_lock);
4374 empty = list_empty(&mdev->done_ee);
4375 spin_unlock_irq(&mdev->req_lock);
4376 /* new ack may have been queued right here,
4377 * but then there is also a signal pending,
4378 * and we start over... */
4379 if (empty)
4380 break;
4381 }
4382 /* but unconditionally uncork unless disabled */
4383 if (!mdev->net_conf->no_cork)
4384 drbd_tcp_uncork(mdev->meta.socket);
4385
4386 /* short circuit, recv_msg would return EINTR anyways. */
4387 if (signal_pending(current))
4388 continue;
4389
4390 rv = drbd_recv_short(mdev, mdev->meta.socket,
4391 buf, expect-received, 0);
4392 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4393
4394 flush_signals(current);
4395
4396 /* Note:
4397 * -EINTR (on meta) we got a signal
4398 * -EAGAIN (on meta) rcvtimeo expired
4399 * -ECONNRESET other side closed the connection
4400 * -ERESTARTSYS (on data) we got a signal
4401 * rv < 0 other than above: unexpected error!
4402 * rv == expected: full header or command
4403 * rv < expected: "woken" by signal during receive
4404 * rv == 0 : "connection shut down by peer"
4405 */
4406 if (likely(rv > 0)) {
4407 received += rv;
4408 buf += rv;
4409 } else if (rv == 0) {
4410 dev_err(DEV, "meta connection shut down by peer.\n");
4411 goto reconnect;
4412 } else if (rv == -EAGAIN) {
4413 if (mdev->meta.socket->sk->sk_rcvtimeo ==
4414 mdev->net_conf->ping_timeo*HZ/10) {
4415 dev_err(DEV, "PingAck did not arrive in time.\n");
4416 goto reconnect;
4417 }
4418 set_bit(SEND_PING, &mdev->flags);
4419 continue;
4420 } else if (rv == -EINTR) {
4421 continue;
4422 } else {
4423 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4424 goto reconnect;
4425 }
4426
4427 if (received == expect && cmd == NULL) {
4428 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4429 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4430 (long)be32_to_cpu(h->magic),
4431 h->command, h->length);
4432 goto reconnect;
4433 }
4434 cmd = get_asender_cmd(be16_to_cpu(h->command));
4435 len = be16_to_cpu(h->length);
4436 if (unlikely(cmd == NULL)) {
4437 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4438 (long)be32_to_cpu(h->magic),
4439 h->command, h->length);
4440 goto disconnect;
4441 }
4442 expect = cmd->pkt_size;
Jens Axboe6a0afdf2009-10-01 09:04:14 +02004443 ERR_IF(len != expect-sizeof(struct p_header))
Philipp Reisnerb411b362009-09-25 16:07:19 -07004444 goto reconnect;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004445 }
4446 if (received == expect) {
4447 D_ASSERT(cmd != NULL);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004448 if (!cmd->process(mdev, h))
4449 goto reconnect;
4450
4451 buf = h;
4452 received = 0;
4453 expect = sizeof(struct p_header);
4454 cmd = NULL;
4455 }
4456 }
4457
4458 if (0) {
4459reconnect:
4460 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4461 }
4462 if (0) {
4463disconnect:
4464 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4465 }
4466 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4467
4468 D_ASSERT(mdev->state.conn < C_CONNECTED);
4469 dev_info(DEV, "asender terminated\n");
4470
4471 return 0;
4472}