| #!/usr/bin/env python3 |
| # |
| # Copyright (C) 2015 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| """Tests for the adb program itself. |
| |
| This differs from things in test_device.py in that there is no API for these |
| things. Most of these tests involve specific error messages or the help text. |
| """ |
| |
| import contextlib |
| import os |
| import random |
| import select |
| import socket |
| import struct |
| import subprocess |
| import sys |
| import threading |
| import time |
| import unittest |
| import warnings |
| |
| def find_open_port(): |
| # Find an open port. |
| with socket.socket() as s: |
| s.bind(("localhost", 0)) |
| return s.getsockname()[1] |
| |
| @contextlib.contextmanager |
| def fake_adbd(protocol=socket.AF_INET, port=0): |
| """Creates a fake ADB daemon that just replies with a CNXN packet.""" |
| |
| serversock = socket.socket(protocol, socket.SOCK_STREAM) |
| serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| if protocol == socket.AF_INET: |
| serversock.bind(("127.0.0.1", port)) |
| else: |
| serversock.bind(("::1", port)) |
| serversock.listen(1) |
| |
| # A pipe that is used to signal the thread that it should terminate. |
| readsock, writesock = socket.socketpair() |
| |
| def _adb_packet(command: bytes, arg0: int, arg1: int, data: bytes) -> bytes: |
| bin_command = struct.unpack("I", command)[0] |
| buf = struct.pack("IIIIII", bin_command, arg0, arg1, len(data), 0, |
| bin_command ^ 0xffffffff) |
| buf += data |
| return buf |
| |
| def _handle(sock): |
| with contextlib.closing(sock) as serversock: |
| rlist = [readsock, serversock] |
| cnxn_sent = {} |
| while True: |
| read_ready, _, _ = select.select(rlist, [], []) |
| for ready in read_ready: |
| if ready == readsock: |
| # Closure pipe |
| for f in rlist: |
| f.close() |
| return |
| elif ready == serversock: |
| # Server socket |
| conn, _ = ready.accept() |
| rlist.append(conn) |
| else: |
| # Client socket |
| data = ready.recv(1024) |
| if not data or data.startswith(b"OPEN"): |
| if ready in cnxn_sent: |
| del cnxn_sent[ready] |
| ready.shutdown(socket.SHUT_RDWR) |
| ready.close() |
| rlist.remove(ready) |
| continue |
| if ready in cnxn_sent: |
| continue |
| cnxn_sent[ready] = True |
| ready.sendall(_adb_packet(b"CNXN", 0x01000001, 1024 * 1024, |
| b"device::ro.product.name=fakeadb")) |
| |
| port = serversock.getsockname()[1] |
| server_thread = threading.Thread(target=_handle, args=(serversock,)) |
| server_thread.start() |
| |
| try: |
| yield port, writesock |
| finally: |
| writesock.close() |
| server_thread.join() |
| |
| |
| @contextlib.contextmanager |
| def adb_connect(unittest, serial): |
| """Context manager for an ADB connection. |
| |
| This automatically disconnects when done with the connection. |
| """ |
| |
| output = subprocess.check_output(["adb", "connect", serial]) |
| unittest.assertEqual(output.strip(), |
| "connected to {}".format(serial).encode("utf8")) |
| |
| try: |
| yield |
| finally: |
| # Perform best-effort disconnection. Discard the output. |
| subprocess.Popen(["adb", "disconnect", serial], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE).communicate() |
| |
| |
| @contextlib.contextmanager |
| def adb_server(): |
| """Context manager for an ADB server. |
| |
| This creates an ADB server and returns the port it's listening on. |
| """ |
| |
| port = find_open_port() |
| read_pipe, write_pipe = os.pipe() |
| |
| if sys.platform == "win32": |
| import msvcrt |
| write_handle = msvcrt.get_osfhandle(write_pipe) |
| os.set_handle_inheritable(write_handle, True) |
| reply_fd = str(write_handle) |
| else: |
| os.set_inheritable(write_pipe, True) |
| reply_fd = str(write_pipe) |
| |
| proc = subprocess.Popen(["adb", "-L", "tcp:localhost:{}".format(port), |
| "fork-server", "server", |
| "--reply-fd", reply_fd], close_fds=False) |
| try: |
| os.close(write_pipe) |
| greeting = os.read(read_pipe, 1024) |
| assert greeting == b"OK\n", repr(greeting) |
| yield port |
| finally: |
| proc.terminate() |
| proc.wait() |
| |
| |
| class CommandlineTest(unittest.TestCase): |
| """Tests for the ADB commandline.""" |
| |
| def test_help(self): |
| """Make sure we get _something_ out of help.""" |
| out = subprocess.check_output( |
| ["adb", "help"], stderr=subprocess.STDOUT) |
| self.assertGreater(len(out), 0) |
| |
| def test_version(self): |
| """Get a version number out of the output of adb.""" |
| lines = subprocess.check_output(["adb", "version"]).splitlines() |
| version_line = lines[0] |
| self.assertRegex( |
| version_line, rb"^Android Debug Bridge version \d+\.\d+\.\d+$") |
| if len(lines) == 2: |
| # Newer versions of ADB have a second line of output for the |
| # version that includes a specific revision (git SHA). |
| revision_line = lines[1] |
| self.assertRegex( |
| revision_line, rb"^Revision [0-9a-f]{12}-android$") |
| |
| def test_tcpip_error_messages(self): |
| """Make sure 'adb tcpip' parsing is sane.""" |
| proc = subprocess.Popen(["adb", "tcpip"], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT) |
| out, _ = proc.communicate() |
| self.assertEqual(1, proc.returncode) |
| self.assertIn(b"requires an argument", out) |
| |
| proc = subprocess.Popen(["adb", "tcpip", "foo"], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT) |
| out, _ = proc.communicate() |
| self.assertEqual(1, proc.returncode) |
| self.assertIn(b"invalid port", out) |
| |
| |
| class ServerTest(unittest.TestCase): |
| """Tests for the ADB server.""" |
| |
| @staticmethod |
| def _read_pipe_and_set_event(pipe, event): |
| """Reads a pipe until it is closed, then sets the event.""" |
| pipe.read() |
| event.set() |
| |
| def test_handle_inheritance(self): |
| """Test that launch_server() does not inherit handles. |
| |
| launch_server() should not let the adb server inherit |
| stdin/stdout/stderr handles, which can cause callers of adb.exe to hang. |
| This test also runs fine on unix even though the impetus is an issue |
| unique to Windows. |
| """ |
| # This test takes 5 seconds to run on Windows: if there is no adb server |
| # running on the the port used below, adb kill-server tries to make a |
| # TCP connection to a closed port and that takes 1 second on Windows; |
| # adb start-server does the same TCP connection which takes another |
| # second, and it waits 3 seconds after starting the server. |
| |
| # Start adb client with redirected stdin/stdout/stderr to check if it |
| # passes those redirections to the adb server that it starts. To do |
| # this, run an instance of the adb server on a non-default port so we |
| # don't conflict with a pre-existing adb server that may already be |
| # setup with adb TCP/emulator connections. If there is a pre-existing |
| # adb server, this also tests whether multiple instances of the adb |
| # server conflict on adb.log. |
| |
| port = find_open_port() |
| |
| try: |
| # We get warnings for unclosed files for the subprocess's pipes, |
| # and it's somewhat cumbersome to close them, so just ignore this. |
| warnings.simplefilter("ignore", ResourceWarning) |
| |
| # Run the adb client and have it start the adb server. |
| proc = subprocess.Popen(["adb", "-P", str(port), "start-server"], |
| stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| |
| # Start threads that set events when stdout/stderr are closed. |
| stdout_event = threading.Event() |
| stdout_thread = threading.Thread( |
| target=ServerTest._read_pipe_and_set_event, |
| args=(proc.stdout, stdout_event)) |
| stdout_thread.start() |
| |
| stderr_event = threading.Event() |
| stderr_thread = threading.Thread( |
| target=ServerTest._read_pipe_and_set_event, |
| args=(proc.stderr, stderr_event)) |
| stderr_thread.start() |
| |
| # Wait for the adb client to finish. Once that has occurred, if |
| # stdin/stderr/stdout are still open, it must be open in the adb |
| # server. |
| proc.wait() |
| |
| # Try to write to stdin which we expect is closed. If it isn't |
| # closed, we should get an IOError. If we don't get an IOError, |
| # stdin must still be open in the adb server. The adb client is |
| # probably letting the adb server inherit stdin which would be |
| # wrong. |
| with self.assertRaises(IOError): |
| proc.stdin.write(b"x") |
| proc.stdin.flush() |
| |
| # Wait a few seconds for stdout/stderr to be closed (in the success |
| # case, this won't wait at all). If there is a timeout, that means |
| # stdout/stderr were not closed and and they must be open in the adb |
| # server, suggesting that the adb client is letting the adb server |
| # inherit stdout/stderr which would be wrong. |
| self.assertTrue(stdout_event.wait(5), "adb stdout not closed") |
| self.assertTrue(stderr_event.wait(5), "adb stderr not closed") |
| stdout_thread.join() |
| stderr_thread.join() |
| finally: |
| # If we started a server, kill it. |
| subprocess.check_output(["adb", "-P", str(port), "kill-server"], |
| stderr=subprocess.STDOUT) |
| |
| @unittest.skipUnless( |
| os.name == "posix", |
| "adb doesn't yet support IPv6 on Windows", |
| ) |
| def test_starts_on_ipv6_localhost(self): |
| """ |
| Tests that the server can start up on ::1 and that it's accessible |
| """ |
| |
| server_port = find_open_port() |
| try: |
| subprocess.check_output( |
| ["adb", "-L", "tcp:[::1]:{}".format(server_port), "server"], |
| stderr=subprocess.STDOUT, |
| ) |
| with fake_adbd() as (port, _): |
| with adb_connect(self, serial="localhost:{}".format(port)): |
| pass |
| finally: |
| # If we started a server, kill it. |
| subprocess.check_output( |
| ["adb", "-P", str(server_port), "kill-server"], |
| stderr=subprocess.STDOUT, |
| ) |
| |
| |
| |
| |
| class EmulatorTest(unittest.TestCase): |
| """Tests for the emulator connection.""" |
| |
| def _reset_socket_on_close(self, sock): |
| """Use SO_LINGER to cause TCP RST segment to be sent on socket close.""" |
| # The linger structure is two shorts on Windows, but two ints on Unix. |
| linger_format = "hh" if os.name == "nt" else "ii" |
| l_onoff = 1 |
| l_linger = 0 |
| |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, |
| struct.pack(linger_format, l_onoff, l_linger)) |
| # Verify that we set the linger structure properly by retrieving it. |
| linger = sock.getsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 16) |
| self.assertEqual((l_onoff, l_linger), |
| struct.unpack_from(linger_format, linger)) |
| |
| def test_emu_kill(self): |
| """Ensure that adb emu kill works. |
| |
| Bug: https://code.google.com/p/android/issues/detail?id=21021 |
| """ |
| with contextlib.closing( |
| socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener: |
| # Use SO_REUSEADDR so subsequent runs of the test can grab the port |
| # even if it is in TIME_WAIT. |
| listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| listener.bind(("127.0.0.1", 0)) |
| listener.listen(4) |
| port = listener.getsockname()[1] |
| |
| # Now that listening has started, start adb emu kill, telling it to |
| # connect to our mock emulator. |
| proc = subprocess.Popen( |
| ["adb", "-s", "emulator-" + str(port), "emu", "kill"], |
| stderr=subprocess.STDOUT) |
| |
| accepted_connection, addr = listener.accept() |
| with contextlib.closing(accepted_connection) as conn: |
| # If WSAECONNABORTED (10053) is raised by any socket calls, |
| # then adb probably isn't reading the data that we sent it. |
| conn.sendall(("Android Console: type 'help' for a list " |
| "of commands\r\n").encode("utf8")) |
| conn.sendall(b"OK\r\n") |
| |
| with contextlib.closing(conn.makefile()) as connf: |
| line = connf.readline() |
| if line.startswith("auth"): |
| # Ignore the first auth line. |
| line = connf.readline() |
| self.assertEqual("kill\n", line) |
| self.assertEqual("quit\n", connf.readline()) |
| |
| conn.sendall(b"OK: killing emulator, bye bye\r\n") |
| |
| # Use SO_LINGER to send TCP RST segment to test whether adb |
| # ignores WSAECONNRESET on Windows. This happens with the |
| # real emulator because it just calls exit() without closing |
| # the socket or calling shutdown(SD_SEND). At process |
| # termination, Windows sends a TCP RST segment for every |
| # open socket that shutdown(SD_SEND) wasn't used on. |
| self._reset_socket_on_close(conn) |
| |
| # Wait for adb to finish, so we can check return code. |
| proc.communicate() |
| |
| # If this fails, adb probably isn't ignoring WSAECONNRESET when |
| # reading the response from the adb emu kill command (on Windows). |
| self.assertEqual(0, proc.returncode) |
| |
| def test_emulator_connect(self): |
| """Ensure that the emulator can connect. |
| |
| Bug: http://b/78991667 |
| """ |
| with adb_server() as server_port: |
| with fake_adbd() as (port, _): |
| serial = "emulator-{}".format(port - 1) |
| # Ensure that the emulator is not there. |
| try: |
| subprocess.check_output(["adb", "-P", str(server_port), |
| "-s", serial, "get-state"], |
| stderr=subprocess.STDOUT) |
| self.fail("Device should not be available") |
| except subprocess.CalledProcessError as err: |
| self.assertEqual( |
| err.output.strip(), |
| "error: device '{}' not found".format(serial).encode("utf8")) |
| |
| # Let the ADB server know that the emulator has started. |
| with contextlib.closing( |
| socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: |
| sock.connect(("localhost", server_port)) |
| command = "host:emulator:{}".format(port).encode("utf8") |
| sock.sendall(b"%04x%s" % (len(command), command)) |
| |
| # Ensure the emulator is there. |
| subprocess.check_call(["adb", "-P", str(server_port), |
| "-s", serial, "wait-for-device"]) |
| output = subprocess.check_output(["adb", "-P", str(server_port), |
| "-s", serial, "get-state"]) |
| self.assertEqual(output.strip(), b"device") |
| |
| |
| class ConnectionTest(unittest.TestCase): |
| """Tests for adb connect.""" |
| |
| def test_connect_ipv4_ipv6(self): |
| """Ensure that `adb connect localhost:1234` will try both IPv4 and IPv6. |
| |
| Bug: http://b/30313466 |
| """ |
| for protocol in (socket.AF_INET, socket.AF_INET6): |
| try: |
| with fake_adbd(protocol=protocol) as (port, _): |
| serial = "localhost:{}".format(port) |
| with adb_connect(self, serial): |
| pass |
| except socket.error: |
| print("IPv6 not available, skipping") |
| continue |
| |
| def test_already_connected(self): |
| """Ensure that an already-connected device stays connected.""" |
| |
| with fake_adbd() as (port, _): |
| serial = "localhost:{}".format(port) |
| with adb_connect(self, serial): |
| # b/31250450: this always returns 0 but probably shouldn't. |
| output = subprocess.check_output(["adb", "connect", serial]) |
| self.assertEqual( |
| output.strip(), |
| "already connected to {}".format(serial).encode("utf8")) |
| |
| @unittest.skip("Currently failing b/123247844") |
| def test_reconnect(self): |
| """Ensure that a disconnected device reconnects.""" |
| |
| with fake_adbd() as (port, _): |
| serial = "localhost:{}".format(port) |
| with adb_connect(self, serial): |
| # Wait a bit to give adb some time to connect. |
| time.sleep(0.25) |
| |
| output = subprocess.check_output(["adb", "-s", serial, |
| "get-state"]) |
| self.assertEqual(output.strip(), b"device") |
| |
| # This will fail. |
| proc = subprocess.Popen(["adb", "-s", serial, "shell", "true"], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT) |
| output, _ = proc.communicate() |
| self.assertEqual(output.strip(), b"error: closed") |
| |
| subprocess.check_call(["adb", "-s", serial, "wait-for-device"]) |
| |
| output = subprocess.check_output(["adb", "-s", serial, |
| "get-state"]) |
| self.assertEqual(output.strip(), b"device") |
| |
| # Once we explicitly kick a device, it won't attempt to |
| # reconnect. |
| output = subprocess.check_output(["adb", "disconnect", serial]) |
| self.assertEqual( |
| output.strip(), |
| "disconnected {}".format(serial).encode("utf8")) |
| try: |
| subprocess.check_output(["adb", "-s", serial, "get-state"], |
| stderr=subprocess.STDOUT) |
| self.fail("Device should not be available") |
| except subprocess.CalledProcessError as err: |
| self.assertEqual( |
| err.output.strip(), |
| "error: device '{}' not found".format(serial).encode("utf8")) |
| |
| |
| class DisconnectionTest(unittest.TestCase): |
| """Tests for adb disconnect.""" |
| |
| def test_disconnect(self): |
| """Ensure that `adb disconnect` takes effect immediately.""" |
| |
| def _devices(port): |
| output = subprocess.check_output(["adb", "-P", str(port), "devices"]) |
| return [x.split("\t") for x in output.decode("utf8").strip().splitlines()[1:]] |
| |
| with adb_server() as server_port: |
| with fake_adbd() as (port, sock): |
| device_name = "localhost:{}".format(port) |
| output = subprocess.check_output(["adb", "-P", str(server_port), |
| "connect", device_name]) |
| self.assertEqual(output.strip(), |
| "connected to {}".format(device_name).encode("utf8")) |
| |
| |
| self.assertEqual(_devices(server_port), [[device_name, "device"]]) |
| |
| # Send a deliberately malformed packet to make the device go offline. |
| packet = struct.pack("IIIIII", 0, 0, 0, 0, 0, 0) |
| sock.sendall(packet) |
| |
| # Wait a bit. |
| time.sleep(0.1) |
| |
| self.assertEqual(_devices(server_port), [[device_name, "offline"]]) |
| |
| # Disconnect the device. |
| output = subprocess.check_output(["adb", "-P", str(server_port), |
| "disconnect", device_name]) |
| |
| # Wait a bit. |
| time.sleep(0.1) |
| |
| self.assertEqual(_devices(server_port), []) |
| |
| |
| @unittest.skipUnless(sys.platform == "win32", "requires Windows") |
| class PowerTest(unittest.TestCase): |
| def test_resume_usb_kick(self): |
| """Resuming from sleep/hibernate should kick USB devices.""" |
| try: |
| usb_serial = subprocess.check_output(["adb", "-d", "get-serialno"]).strip() |
| except subprocess.CalledProcessError: |
| # If there are multiple USB devices, we don't have a way to check whether the selected |
| # device is USB. |
| raise unittest.SkipTest('requires single USB device') |
| |
| try: |
| serial = subprocess.check_output(["adb", "get-serialno"]).strip() |
| except subprocess.CalledProcessError: |
| # Did you forget to select a device with $ANDROID_SERIAL? |
| raise unittest.SkipTest('requires $ANDROID_SERIAL set to a USB device') |
| |
| # Test only works with USB devices because adb _power_notification_thread does not kick |
| # non-USB devices on resume event. |
| if serial != usb_serial: |
| raise unittest.SkipTest('requires USB device') |
| |
| # Run an adb shell command in the background that takes a while to complete. |
| proc = subprocess.Popen(['adb', 'shell', 'sleep', '5']) |
| |
| # Wait for startup of adb server's _power_notification_thread. |
| time.sleep(0.1) |
| |
| # Simulate resuming from sleep/hibernation by sending Windows message. |
| import ctypes |
| from ctypes import wintypes |
| HWND_BROADCAST = 0xffff |
| WM_POWERBROADCAST = 0x218 |
| PBT_APMRESUMEAUTOMATIC = 0x12 |
| |
| PostMessageW = ctypes.windll.user32.PostMessageW |
| PostMessageW.restype = wintypes.BOOL |
| PostMessageW.argtypes = (wintypes.HWND, wintypes.UINT, wintypes.WPARAM, wintypes.LPARAM) |
| result = PostMessageW(HWND_BROADCAST, WM_POWERBROADCAST, PBT_APMRESUMEAUTOMATIC, 0) |
| if not result: |
| raise ctypes.WinError() |
| |
| # Wait for connection to adb shell to be broken by _power_notification_thread detecting the |
| # Windows message. |
| start = time.time() |
| proc.wait() |
| end = time.time() |
| |
| # If the power event was detected, the adb shell command should be broken very quickly. |
| self.assertLess(end - start, 2) |
| |
| |
| def main(): |
| """Main entrypoint.""" |
| random.seed(0) |
| unittest.main(verbosity=3) |
| |
| |
| if __name__ == "__main__": |
| main() |