UI自动化偷懒必备:AirTest封装好ADB命令
作者:互联网
文章目录
前言
在UI自动化测试的过程中免不了要跟ADB打交道,原生的ADB命令用是没问题,问题是结合实际业务和效率来说,基本都要封装一下,如果用Python语言的同学就可以偷懒一下,偷懒是为了避免重复造轮子,网易开源的AirTest里有ADB的封装源码,我们拿出来,结合自己需求灵活使用,这里要感谢网易的测开团队。
安装依赖
上面有提到ADB是基于AirTest的,所以要安装AirTest库
pip -U install airtest
源码
注意:如果有安装有多个adb版本,出现路径冲突的话,需指定对应的adb路径
使用也比较简单,初始化时传入设备号:serialno就可以,其他参数按需传入
大部分的ADB命令已封装,喜欢的朋友复制拿走
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import os
import re
import sys
import time
import random
import platform
import warnings
import subprocess
import threading
from six import PY3, text_type, binary_type, raise_from
from six.moves import reduce
from airtest.core.android.constant import (DEFAULT_ADB_PATH, IP_PATTERN,
SDK_VERISON_ANDROID7)
from airtest.core.error import (AdbError, AdbShellError, AirtestError,
DeviceConnectionError)
from airtest.utils.compat import decode_path, raisefrom, proc_communicate_timeout, SUBPROCESS_FLAG
from airtest.utils.logger import get_logger
from airtest.utils.nbsp import NonBlockingStreamReader
from airtest.utils.retry import retries
from airtest.utils.snippet import get_std_encoding, reg_cleanup, split_cmd
LOGGING = get_logger(__name__)
class ADB(object):
"""adb client object class"""
_instances = []
status_device = "device"
status_offline = "offline"
SHELL_ENCODING = "utf-8"
def __init__(self, serialno=None, adb_path=None, server_addr=None, display_id=None, input_event=None):
self.serialno = serialno
self.adb_path = adb_path or self.builtin_adb_path()
self.display_id = display_id
self.input_event = input_event
self._set_cmd_options(server_addr)
self.connect()
self._sdk_version = None
self._line_breaker = None
self._display_info = {}
self._display_info_lock = threading.Lock()
self._forward_local_using = []
self.__class__._instances.append(self)
@staticmethod
def builtin_adb_path():
"""
Return built-in adb executable path
Returns:
adb executable path
"""
system = platform.system()
machine = platform.machine()
adb_path = DEFAULT_ADB_PATH.get('{}-{}'.format(system, machine))
if not adb_path:
adb_path = DEFAULT_ADB_PATH.get(system)
if not adb_path:
raise RuntimeError("No adb executable supports this platform({}-{}).".format(system, machine))
# overwrite uiautomator adb
if "ANDROID_HOME" in os.environ:
del os.environ["ANDROID_HOME"]
return adb_path
def _set_cmd_options(self, server_addr=None):
"""
Set communication parameters (host and port) between adb server and adb client
Args:
server_addr: adb server address, default is 127.0.0.1:5037
Returns:
None
"""
self.host = server_addr[0] if server_addr else "127.0.0.1"
self.port = server_addr[1] if server_addr else 5037
self.cmd_options = [self.adb_path]
if self.host not in ("localhost", "127.0.0.1"):
self.cmd_options += ['-H', self.host]
if self.port != 5037:
self.cmd_options += ['-P', str(self.port)]
def start_server(self):
"""
Perform `adb start-server` command to start the adb server
Returns:
None
"""
return self.cmd("start-server", device=False)
def kill_server(self):
"""
Perform `adb kill-server` command to kill the adb server
Returns:
None
"""
return self.cmd("kill-server", device=False)
def version(self):
"""
Perform `adb version` command and return the command output
Returns:
command output
"""
return self.cmd("version", device=False).strip()
def start_cmd(self, cmds, device=True):
"""
Start a subprocess with adb command(s)
Args:
cmds: command(s) to be run
device: if True, the device serial number must be specified by `-s serialno` argument
Raises:
RuntimeError: if `device` is True and serialno is not specified
Returns:
a subprocess
"""
if device:
if not self.serialno:
raise RuntimeError("please set serialno first")
cmd_options = self.cmd_options + ['-s', self.serialno]
else:
cmd_options = self.cmd_options
cmds = cmd_options + split_cmd(cmds)
LOGGING.debug(" ".join(cmds))
if not PY3:
cmds = [c.encode(get_std_encoding(sys.stdin)) for c in cmds]
proc = subprocess.Popen(
cmds,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
creationflags=SUBPROCESS_FLAG
)
return proc
def cmd(self, cmds, device=True, ensure_unicode=True, timeout=None):
"""
Run the adb command(s) in subprocess and return the standard output
Args:
cmds: command(s) to be run
device: if True, the device serial number must be specified by -s serialno argument
ensure_unicode: encode/decode unicode of standard outputs (stdout, stderr)
timeout: timeout in seconds
Raises:
DeviceConnectionError: if any error occurs when connecting the device
AdbError: if any other adb error occurs
Returns:
command(s) standard output (stdout)
"""
proc = self.start_cmd(cmds, device)
if timeout:
stdout, stderr = proc_communicate_timeout(proc, timeout)
else:
stdout, stderr = proc.communicate()
if ensure_unicode:
stdout = stdout.decode(get_std_encoding(sys.stdout))
stderr = stderr.decode(get_std_encoding(sys.stderr))
if proc.returncode > 0:
# adb connection error
pattern = DeviceConnectionError.DEVICE_CONNECTION_ERROR
if isinstance(stderr, binary_type):
pattern = pattern.encode("utf-8")
if re.search(pattern, stderr):
raise DeviceConnectionError(stderr)
else:
raise AdbError(stdout, stderr)
return stdout
def close_proc_pipe(self, proc):
"""close stdin/stdout/stderr of subprocess.Popen."""
def close_pipe(pipe):
if pipe:
pipe.close()
close_pipe(proc.stdin)
close_pipe(proc.stdout)
close_pipe(proc.stderr)
def devices(self, state=None):
"""
Perform `adb devices` command and return the list of adb devices
Args:
state: optional parameter to filter devices in specific state
Returns:
list od adb devices
"""
patten = re.compile(r'^[\w\d.:-]+\t[\w]+$')
device_list = []
# self.start_server()
output = self.cmd("devices", device=False)
for line in output.splitlines():
line = line.strip()
if not line or not patten.match(line):
continue
serialno, cstate = line.split('\t')
if state and cstate != state:
continue
device_list.append((serialno, cstate))
return device_list
def connect(self, force=False):
"""
Perform `adb connect` command, remote devices are preferred to connect first
Args:
force: force connection, default is False
Returns:
None
"""
if self.serialno and ":" in self.serialno and (force or self.get_status() != "device"):
connect_result = self.cmd("connect %s" % self.serialno)
LOGGING.info(connect_result)
def disconnect(self):
"""
Perform `adb disconnect` command
Returns:
None
"""
if ":" in self.serialno:
self.cmd("disconnect %s" % self.serialno)
def get_status(self):
"""
Perform `adb get-state` and return the device status
Raises:
AdbError: if status cannot be obtained from the device
Returns:
None if status is `not found`, otherwise return the standard output from `adb get-state` command
"""
proc = self.start_cmd("get-state")
stdout, stderr = proc.communicate()
stdout = stdout.decode(get_std_encoding(sys.stdout))
stderr = stderr.decode(get_std_encoding(sys.stdout))
if proc.returncode == 0:
return stdout.strip()
elif "not found" in stderr:
return None
else:
raise AdbError(stdout, stderr)
def wait_for_device(self, timeout=5):
"""
Perform `adb wait-for-device` command
Args:
timeout: time interval in seconds to wait for device
Raises:
DeviceConnectionError: if device is not available after timeout
Returns:
None
"""
try:
self.cmd("wait-for-device", timeout=timeout)
except RuntimeError as e:
raisefrom(DeviceConnectionError, "device not ready", e)
def start_shell(self, cmds):
"""
Handle `adb shell` command(s)
Args:
cmds: adb shell command(s)
Returns:
None
"""
cmds = ['shell'] + split_cmd(cmds)
return self.start_cmd(cmds)
def raw_shell(self, cmds, ensure_unicode=True):
"""
Handle `adb shell` command(s) with unicode support
Args:
cmds: adb shell command(s)
ensure_unicode: decode/encode unicode True or False, default is True
Returns:
command(s) output
"""
cmds = ['shell'] + split_cmd(cmds)
out = self.cmd(cmds, ensure_unicode=False)
if not ensure_unicode:
return out
# use shell encoding to decode output
try:
return out.decode(self.SHELL_ENCODING)
except UnicodeDecodeError:
warnings.warn("shell output decode {} fail. repr={}".format(self.SHELL_ENCODING, repr(out)))
return text_type(repr(out))
def shell(self, cmd):
"""
Run the `adb shell` command on the device
Args:
cmd: a command to be run
Raises:
AdbShellError: if command return value is non-zero or if any other `AdbError` occurred
Returns:
command output
"""
if self.sdk_version < SDK_VERISON_ANDROID7:
# for sdk_version < 25, adb shell do not raise error
# https://stackoverflow.com/questions/9379400/adb-error-codes
cmd = split_cmd(cmd) + [";", "echo", "---$?---"]
out = self.raw_shell(cmd).rstrip()
m = re.match("(.*)---(\d+)---$", out, re.DOTALL)
if not m:
warnings.warn("return code not matched")
stdout = out
returncode = 0
else:
stdout = m.group(1)
returncode = int(m.group(2))
if returncode > 0:
raise AdbShellError("", stdout)
return stdout
else:
try:
out = self.raw_shell(cmd)
except AdbError as err:
raise AdbShellError(err.stdout, err.stderr)
else:
return out
def keyevent(self, keyname):
"""
Perform `adb shell input keyevent` command on the device
Args:
keyname: key event name
Returns:
None
"""
self.shell(["input", "keyevent", keyname.upper()])
def getprop(self, key, strip=True):
"""
Perform `adb shell getprop` on the device
Args:
key: key value for property
strip: True or False to strip the return carriage and line break from returned string
Returns:
propery value
"""
prop = self.raw_shell(['getprop', key])
if strip:
if "\r\r\n" in prop:
# Some mobile phones will output multiple lines of extra log
prop = prop.split("\r\r\n")
if len(prop) > 1:
prop = prop[-2]
else:
prop = prop[-1]
else:
prop = prop.strip("\r\n")
return prop
@property
@retries(max_tries=3)
def sdk_version(self):
"""
Get the SDK version from the device
Returns:
SDK version
"""
if self._sdk_version is None:
keyname = 'ro.build.version.sdk'
self._sdk_version = int(self.getprop(keyname))
return self._sdk_version
def push(self, local, remote):
"""
Perform `adb push` command
Args:
local: local file to be copied to the device
remote: destination on the device where the file will be copied
Returns:
None
"""
self.cmd(["push", local, remote], ensure_unicode=False)
def pull(self, remote, local):
"""
Perform `adb pull` command
Args:
remote: remote file to be downloaded from the device
local: local destination where the file will be downloaded from the device
Returns:
None
"""
self.cmd(["pull", remote, local], ensure_unicode=False)
def forward(self, local, remote, no_rebind=True):
"""
Perform `adb forward` command
Args:
local: local tcp port to be forwarded
remote: tcp port of the device where the local tcp port will be forwarded
no_rebind: True or False
Returns:
None
"""
cmds = ['forward']
if no_rebind:
cmds += ['--no-rebind']
self.cmd(cmds + [local, remote])
# register for cleanup atexit
if local not in self._forward_local_using:
self._forward_local_using.append(local)
def get_forwards(self):
"""
Perform `adb forward --list`command
Yields:
serial number, local tcp port, remote tcp port
Returns:
None
"""
out = self.cmd(['forward', '--list'])
for line in out.splitlines():
line = line.strip()
if not line:
continue
cols = line.split()
if len(cols) != 3:
continue
serialno, local, remote = cols
yield serialno, local, remote
@classmethod
def get_available_forward_local(cls):
"""
Generate a pseudo random number between 11111 and 20000 that will be used as local forward port
Returns:
integer between 11111 and 20000
Note:
use `forward --no-rebind` to check if port is available
"""
return random.randint(11111, 20000)
@retries(3)
def setup_forward(self, device_port):
"""
Generate pseudo random local port and check if the port is available.
Args:
device_port: it can be string or the value of the `function(localport)`,
e.g. `"tcp:5001"` or `"localabstract:{}".format`
Returns:
local port and device port
"""
localport = self.get_available_forward_local()
if callable(device_port):
device_port = device_port(localport)
self.forward("tcp:%s" % localport, device_port)
return localport, device_port
def remove_forward(self, local=None):
"""
Perform `adb forward --remove` command
Args:
local: local tcp port
Returns:
None
"""
if local:
cmds = ["forward", "--remove", local]
else:
cmds = ["forward", "--remove-all"]
self.cmd(cmds)
# unregister for cleanup
if local in self._forward_local_using:
self._forward_local_using.remove(local)
def install_app(self, filepath, replace=False, install_options=None):
"""
Perform `adb install` command
Args:
filepath: full path to file to be installed on the device
replace: force to replace existing application, default is False
e.g.["-t", # allow test packages
"-l", # forward lock application,
"-s", # install application on sdcard,
"-d", # allow version code downgrade (debuggable packages only)
"-g", # grant all runtime permissions
]
Returns:
command output
"""
if isinstance(filepath, str):
filepath = decode_path(filepath)
if not os.path.isfile(filepath):
raise RuntimeError("file: %s does not exists" % (repr(filepath)))
if not install_options or type(install_options) != list:
install_options = []
if replace:
install_options.append("-r")
cmds = ["install", ] + install_options + [filepath, ]
out = self.cmd(cmds)
if re.search(r"Failure \[.*?\]", out):
raise AdbShellError("Installation Failure", repr(out))
return out
def install_multiple_app(self, filepath, replace=False, install_options=None):
"""
Perform `adb install-multiple` command
Args:
filepath: full path to file to be installed on the device
replace: force to replace existing application, default is False
install_options: list of options
e.g.["-t", # allow test packages
"-l", # forward lock application,
"-s", # install application on sdcard,
"-d", # allow version code downgrade (debuggable packages only)
"-g", # grant all runtime permissions
"-p", # partial application install (install-multiple only)
]
Returns:
command output
"""
if isinstance(filepath, str):
filepath = decode_path(filepath)
if not os.path.isfile(filepath):
raise RuntimeError("file: %s does not exists" % (repr(filepath)))
if not install_options or type(install_options) != list:
install_options = []
if replace:
install_options.append("-r")
cmds = ["install-multiple", ] + install_options + [filepath, ]
try:
out = self.cmd(cmds)
except AdbError as err:
if "Failed to finalize session".lower() in err.stderr.lower():
return "Success"
else:
return self.install_app(filepath, replace)
if re.search(r"Failure \[.*?\]", out):
raise AdbShellError("Installation Failure", repr(out))
return out
def pm_install(self, filepath, replace=False):
"""
Perform `adb push` and `adb install` commands
Note:
This is more reliable and recommended way of installing `.apk` files
Args:
filepath: full path to file to be installed on the device
replace: force to replace existing application, default is False
Returns:
None
"""
filename = os.path.basename(filepath)
device_dir = "/data/local/tmp"
# if the apk file path contains spaces, the path must be escaped
device_path = '\"%s/%s\"' % (device_dir, filename)
out = self.cmd(["push", filepath, device_dir])
print(out)
if not replace:
self.shell(['pm', 'install', device_path])
else:
self.shell(['pm', 'install', '-r', device_path])
def uninstall_app(self, package):
"""
Perform `adb uninstall` command
Args:
package: package name to be uninstalled from the device
Returns:
command output
"""
return self.cmd(['uninstall', package])
def pm_uninstall(self, package, keepdata=False):
"""
Perform `adb uninstall` command and delete all related application data
Args:
package: package name to be uninstalled from the device
keepdata: True or False, keep application data after removing the app from the device
Returns:
command output
"""
cmd = ['pm', 'uninstall', package]
if keepdata:
cmd.append('-k')
self.shell(cmd)
def snapshot(self):
"""
Take the screenshot of the device display
Returns:
command output (stdout)
"""
if self.display_id:
raw = self.cmd('shell screencap -d {0} -p'.format(self.display_id), ensure_unicode=False)
else:
raw = self.cmd('shell screencap -p', ensure_unicode=False)
return raw.replace(self.line_breaker, b"\n")
# PEP 3113 -- Removal of Tuple Parameter Unpacking
# https://www.python.org/dev/peps/pep-3113/
def touch(self, tuple_xy):
"""
Perform user input (touchscreen) on given coordinates
Args:
tuple_xy: coordinates (x, y)
Returns:
None
"""
x, y = tuple_xy
self.shell('input tap %d %d' % (x, y))
time.sleep(0.1)
def swipe(self, tuple_x0y0, tuple_x1y1, duration=500):
"""
Perform user input (swipe screen) from start point (x,y) to end point (x,y)
Args:
tuple_x0y0: start point coordinates (x, y)
tuple_x1y1: end point coordinates (x, y)
duration: time interval for action, default 500
Raises:
AirtestError: if SDK version is not supported
Returns:
None
"""
# prot python 3
x0, y0 = tuple_x0y0
x1, y1 = tuple_x1y1
version = self.sdk_version
if version <= 15:
raise AirtestError('swipe: API <= 15 not supported (version=%d)' % version)
elif version <= 17:
self.shell('input swipe %d %d %d %d' % (x0, y0, x1, y1))
else:
self.shell('input touchscreen swipe %d %d %d %d %d' % (x0, y0, x1, y1, duration))
def logcat(self, grep_str="", extra_args="", read_timeout=10):
"""
Perform `adb shell logcat` command and search for given patterns
Args:
grep_str: pattern to filter from the logcat output
extra_args: additional logcat arguments
read_timeout: time interval to read the logcat, default is 10
Yields:
logcat lines containing filtered patterns
Returns:
None
"""
cmds = "shell logcat"
if extra_args:
cmds += " " + extra_args
if grep_str:
cmds += " | grep " + grep_str
logcat_proc = self.start_cmd(cmds)
nbsp = NonBlockingStreamReader(logcat_proc.stdout, print_output=False)
while True:
line = nbsp.readline(read_timeout)
if line is None:
break
else:
yield line
nbsp.kill()
logcat_proc.kill()
return
def exists_file(self, filepath):
"""
Check if the file exits on the device
Args:
filepath: path to the file
Returns:
True or False if file found or not
"""
try:
out = self.shell(["ls", filepath])
except AdbShellError:
return False
else:
return not ("No such file or directory" in out)
def file_size(self, filepath):
"""
Get the file size
Args:
filepath: path to the file
Returns:
The file size
Raises:
AdbShellError if no such file
"""
out = self.shell(["ls", "-l", filepath])
file_size = int(out.split()[4])
return file_size
def _cleanup_forwards(self):
"""
Remove the local forward ports
Returns:
None
"""
for local in self._forward_local_using:
self.start_cmd(["forward", "--remove", local])
self._forward_local_using = []
@property
def line_breaker(self):
"""
Set carriage return and line break property for various platforms and SDK versions
Returns:
carriage return and line break string
"""
if not self._line_breaker:
if self.sdk_version >= SDK_VERISON_ANDROID7:
line_breaker = os.linesep
else:
line_breaker = '\r' + os.linesep
self._line_breaker = line_breaker.encode("ascii")
return self._line_breaker
@property
def display_info(self):
"""
Set device display properties (orientation, rotation and max values for x and y coordinates)
Notes:
if there is a lock screen detected, the function tries to unlock the device first
Returns:
device screen properties
"""
self._display_info_lock.acquire()
if not self._display_info:
self._display_info = self.get_display_info()
self._display_info_lock.release()
return self._display_info
def get_display_info(self):
"""
Get information about device physical display (orientation, rotation and max values for x and y coordinates)
Returns:
device screen properties
"""
display_info = self.getPhysicalDisplayInfo()
orientation = self.getDisplayOrientation()
max_x, max_y = self.getMaxXY()
display_info.update({
"orientation": orientation,
"rotation": orientation * 90,
"max_x": max_x,
"max_y": max_y,
})
return display_info
def getMaxXY(self):
"""
Get device display maximum values for x and y coordinates
Returns:
max x and max y coordinates
"""
ret = self.shell('getevent -p').split('\n')
max_x, max_y = None, None
for i in ret:
if i.find("0035") != -1:
patten = re.compile(r'max [0-9]+')
ret = patten.search(i)
if ret:
max_x = int(ret.group(0).split()[1])
if i.find("0036") != -1:
patten = re.compile(r'max [0-9]+')
ret = patten.search(i)
if ret:
max_y = int(ret.group(0).split()[1])
return max_x, max_y
def getRestrictedScreen(self):
"""
Get value for mRestrictedScreen (without black border / virtual keyboard)`
Returns:
screen resolution mRestrictedScreen value as tuple (x, y)
"""
# get the effective screen resolution of the device
result = None
# get the corresponding mRestrictedScreen parameters according to the device serial number
dumpsys_info = self.shell("dumpsys window")
match = re.search(r'mRestrictedScreen=.+', dumpsys_info)
if match:
infoline = match.group(0).strip() # like 'mRestrictedScreen=(0,0) 720x1184'
resolution = infoline.split(" ")[1].split("x")
if isinstance(resolution, list) and len(resolution) == 2:
result = int(str(resolution[0])), int(str(resolution[1]))
return result
def getPhysicalDisplayInfo(self):
"""
Get value for display dimension and density from `mPhysicalDisplayInfo` value obtained from `dumpsys` command.
Returns:
physical display info for dimension and density
"""
phyDispRE = re.compile('.*PhysicalDisplayInfo{(?P<width>\d+) x (?P<height>\d+), .*, density (?P<density>[\d.]+).*')
out = self.raw_shell('dumpsys display')
m = phyDispRE.search(out)
if m:
displayInfo = {}
for prop in ['width', 'height']:
displayInfo[prop] = int(m.group(prop))
for prop in ['density']:
# In mPhysicalDisplayInfo density is already a factor, no need to calculate
displayInfo[prop] = float(m.group(prop))
return displayInfo
# This could also be mSystem or mOverscanScreen
phyDispRE = re.compile('\s*mUnrestrictedScreen=\((?P<x>\d+),(?P<y>\d+)\) (?P<width>\d+)x(?P<height>\d+)')
# This is known to work on older versions (i.e. API 10) where mrestrictedScreen is not available
dispWHRE = re.compile('\s*DisplayWidth=(?P<width>\d+) *DisplayHeight=(?P<height>\d+)')
out = self.raw_shell('dumpsys window')
m = phyDispRE.search(out, 0)
if not m:
m = dispWHRE.search(out, 0)
if m:
displayInfo = {}
for prop in ['width', 'height']:
displayInfo[prop] = int(m.group(prop))
for prop in ['density']:
d = self._getDisplayDensity(None, strip=True)
if d:
displayInfo[prop] = d
else:
# No available density information
displayInfo[prop] = -1.0
return displayInfo
# gets C{mPhysicalDisplayInfo} values from dumpsys. This is a method to obtain display dimensions and density
phyDispRE = re.compile('Physical size: (?P<width>\d+)x(?P<height>\d+).*Physical density: (?P<density>\d+)', re.S)
m = phyDispRE.search(self.raw_shell('wm size; wm density'))
if m:
displayInfo = {}
for prop in ['width', 'height']:
displayInfo[prop] = int(m.group(prop))
for prop in ['density']:
displayInfo[prop] = float(m.group(prop))
return displayInfo
return {}
def _getDisplayDensity(self, key, strip=True):
"""
Get display density
Args:
key:
strip: strip the output
Returns:
display density
"""
BASE_DPI = 160.0
d = self.getprop('ro.sf.lcd_density', strip)
if d:
return float(d) / BASE_DPI
d = self.getprop('qemu.sf.lcd_density', strip)
if d:
return float(d) / BASE_DPI
return -1.0
def getDisplayOrientation(self):
"""
Another way to get the display orientation, this works well for older devices (SDK version 15)
Returns:
display orientation information
"""
# another way to get orientation, for old sumsung device(sdk version 15) from xiaoma
SurfaceFlingerRE = re.compile('orientation=(\d+)')
output = self.shell('dumpsys SurfaceFlinger')
m = SurfaceFlingerRE.search(output)
if m:
return int(m.group(1))
# Fallback method to obtain the orientation
# See https://github.com/dtmilano/AndroidViewClient/issues/128
surfaceOrientationRE = re.compile('SurfaceOrientation:\s+(\d+)')
output = self.shell('dumpsys input')
m = surfaceOrientationRE.search(output)
if m:
return int(m.group(1))
# We couldn't obtain the orientation
warnings.warn("Could not obtain the orientation, return 0")
return 0
def get_top_activity(self):
"""
Perform `adb shell dumpsys activity top` command search for the top activity
Raises:
AirtestError: if top activity cannot be obtained
Returns:
top activity as a tuple: (package_name, activity_name, pid)
"""
dat = self.shell('dumpsys activity top')
activityRE = re.compile(r'\s*ACTIVITY ([A-Za-z0-9_.$]+)/([A-Za-z0-9_.$]+) \w+ pid=(\d+)')
# in Android8.0 or higher, the result may be more than one
m = activityRE.findall(dat)
if m:
return m[-1]
else:
raise AirtestError("Can not get top activity, output:%s" % dat)
def is_keyboard_shown(self):
"""
Perform `adb shell dumpsys input_method` command and search for information if keyboard is shown
Returns:
True or False whether the keyboard is shown or not
"""
dim = self.shell('dumpsys input_method')
if dim:
return "mInputShown=true" in dim
return False
def is_screenon(self):
"""
Perform `adb shell dumpsys window policy` command and search for information if screen is turned on or off
Raises:
AirtestError: if screen state can't be detected
Returns:
True or False whether the screen is turned on or off
"""
screenOnRE = re.compile('mScreenOnFully=(true|false)')
m = screenOnRE.search(self.shell('dumpsys window policy'))
if m:
return m.group(1) == 'true'
else:
# MIUI11
screenOnRE = re.compile('screenState=(SCREEN_STATE_ON|SCREEN_STATE_OFF)')
m = screenOnRE.search(self.shell('dumpsys window policy'))
if m:
return m.group(1) == 'SCREEN_STATE_ON'
raise AirtestError("Couldn't determine screen ON state")
def is_locked(self):
"""
Perform `adb shell dumpsys window policy` command and search for information if screen is locked or not
Raises:
AirtestError: if lock screen can't be detected
Returns:
True or False whether the screen is locked or not
"""
lockScreenRE = re.compile('(?:mShowingLockscreen|isStatusBarKeyguard|showing)=(true|false)')
m = lockScreenRE.search(self.shell('dumpsys window policy'))
if not m:
raise AirtestError("Couldn't determine screen lock state")
return (m.group(1) == 'true')
def unlock(self):
"""
Perform `adb shell input keyevent MENU` and `adb shell input keyevent BACK` commands to attempt
to unlock the screen
Returns:
None
Warnings:
Might not work on all devices
"""
self.shell('input keyevent MENU')
self.shell('input keyevent BACK')
def get_package_version(self, package):
"""
Perform `adb shell dumpsys package` and search for information about given package version
Args:
package: package name
Returns:
None if no info has been found, otherwise package version
"""
package_info = self.shell(['dumpsys', 'package', package])
matcher = re.search(r'versionCode=(\d+)', package_info)
if matcher:
return int(matcher.group(1))
return None
def list_app(self, third_only=False):
"""
Perform `adb shell pm list packages` to print all packages, optionally only
those whose package name contains the text in FILTER.
Options
-f: see their associated file
-d: filter to only show disabled packages
-e: filter to only show enabled packages
-s: filter to only show system packages
-3: filter to only show third party packages
-i: see the installer for the packages
-u: also include uninstalled packages
Args:
third_only: print only third party packages
Returns:
list of packages
"""
cmd = ["pm", "list", "packages"]
if third_only:
cmd.append("-3")
output = self.shell(cmd)
packages = output.splitlines()
# remove all empty string; "package:xxx" -> "xxx"
packages = [p.split(":")[1] for p in packages if p]
return packages
def path_app(self, package):
"""
Perform `adb shell pm path` command to print the path to the package
Args:
package: package name
Raises:
AdbShellError: if any adb error occurs
AirtestError: if package is not found on the device
Returns:
path to the package
"""
try:
output = self.shell(['pm', 'path', package])
except AdbShellError:
output = ""
if 'package:' not in output:
raise AirtestError('package not found, output:[%s]' % output)
return output.split("package:")[1].strip()
def check_app(self, package):
"""
Perform `adb shell dumpsys package` command and check if package exists on the device
Args:
package: package name
Raises:
AirtestError: if package is not found
Returns:
True if package has been found
"""
output = self.shell(['dumpsys', 'package', package])
pattern = r'Package\s+\[' + str(package) + '\]'
match = re.search(pattern, output)
if match is None:
raise AirtestError('package "{}" not found'.format(package))
return True
def start_app(self, package, activity=None):
"""
Perform `adb shell monkey` commands to start the application, if `activity` argument is `None`, then
`adb shell am start` command is used.
Args:
package: package name
activity: activity name
Returns:
None
"""
if not activity:
self.shell(['monkey', '-p', package, '-c', 'android.intent.category.LAUNCHER', '1'])
else:
self.shell(['am', 'start', '-n', '%s/%s.%s' % (package, package, activity)])
def start_app_timing(self, package, activity):
"""
Start the application and activity, and measure time
Args:
package: package name
activity: activity name
Returns:
app launch time
"""
out = self.shell(['am', 'start', '-S', '-W', '%s/%s' % (package, activity),
'-c', 'android.intent.category.LAUNCHER', '-a', 'android.intent.action.MAIN'])
if not re.search(r"Status:\s*ok", out):
raise AirtestError("Starting App: %s/%s Failed!" % (package, activity))
# matcher = re.search(r"TotalTime:\s*(\d+)", out)
matcher = re.search(r"TotalTime:\s*(\d+)", out)
if matcher:
return int(matcher.group(1))
else:
return 0
def stop_app(self, package):
"""
Perform `adb shell am force-stop` command to force stop the application
Args:
package: package name
Returns:
None
"""
self.shell(['am', 'force-stop', package])
def clear_app(self, package):
"""
Perform `adb shell pm clear` command to clear all application data
Args:
package: package name
Returns:
None
"""
self.shell(['pm', 'clear', package])
def get_ip_address(self):
"""
Perform several set of commands to obtain the IP address.
* `adb shell netcfg | grep wlan0`
* `adb shell ifconfig`
* `adb getprop dhcp.wlan0.ipaddress`
Returns:
None if no IP address has been found, otherwise return the IP address
"""
def get_ip_address_from_interface(interface):
"""Get device ip from target network interface."""
# android >= 6.0: ip -f inet addr show {interface}
try:
res = self.shell('ip -f inet addr show {}'.format(interface))
except AdbShellError:
res = ''
matcher = re.search(r"inet (\d+\.){3}\d+", res)
if matcher:
return matcher.group().split(" ")[-1]
# android >= 6.0 backup method: ifconfig
try:
res = self.shell('ifconfig')
except AdbShellError:
res = ''
matcher = re.search(interface + r'.*?inet addr:((\d+\.){3}\d+)', res, re.DOTALL)
if matcher:
return matcher.group(1)
# android <= 6.0: netcfg
try:
res = self.shell('netcfg')
except AdbShellError:
res = ''
matcher = re.search(interface + r'.* ((\d+\.){3}\d+)/\d+', res)
if matcher:
return matcher.group(1)
# android <= 6.0 backup method: getprop dhcp.{}.ipaddress
try:
res = self.shell('getprop dhcp.{}.ipaddress'.format(interface))
except AdbShellError:
res = ''
matcher = IP_PATTERN.search(res)
if matcher:
return matcher.group(0)
# sorry, no more methods...
return None
interfaces = ('eth0', 'eth1', 'wlan0')
for i in interfaces:
ip = get_ip_address_from_interface(i)
if ip and not ip.startswith('172.') and not ip.startswith('127.') and not ip.startswith('169.'):
return ip
return None
def get_gateway_address(self):
"""
Perform several set of commands to obtain the gateway address.
* `adb getprop dhcp.wlan0.gateway`
* `adb shell netcfg | grep wlan0`
Returns:
None if no gateway address has been found, otherwise return the gateway address
"""
ip2int = lambda ip: reduce(lambda a, b: (a << 8) + b, map(int, ip.split('.')), 0)
int2ip = lambda n: '.'.join([str(n >> (i << 3) & 0xFF) for i in range(0, 4)[::-1]])
try:
res = self.shell('getprop dhcp.wlan0.gateway')
except AdbShellError:
res = ''
matcher = IP_PATTERN.search(res)
if matcher:
return matcher.group(0)
ip = self.get_ip_address()
if not ip:
return None
mask_len = self._get_subnet_mask_len()
gateway = (ip2int(ip) & (((1 << mask_len) - 1) << (32 - mask_len))) + 1
return int2ip(gateway)
def _get_subnet_mask_len(self):
"""
Perform `adb shell netcfg | grep wlan0` command to obtain mask length
Returns:
17 if mask length could not be detected, otherwise the mask length
"""
try:
res = self.shell('netcfg')
except AdbShellError:
pass
else:
matcher = re.search(r'wlan0.* (\d+\.){3}\d+/(\d+) ', res)
if matcher:
return int(matcher.group(2))
# 获取不到网段长度就默认取17
print('[iputils WARNING] fail to get subnet mask len. use 17 as default.')
return 17
def get_memory(self):
res = self.shell("dumpsys meminfo")
pat = re.compile(r".*Total RAM:\s+(\S+)\s+", re.DOTALL)
_str = pat.match(res).group(1)
if ',' in _str:
_list = _str.split(',')
_num = int(_list[0])
_num = round(_num + (float(_list[1]) / 1000.0))
else:
_num = round(float(_str) / 1000.0 / 1000.0)
res = str(_num) + 'G'
return res
def get_storage(self):
res = self.shell("df /data")
pat = re.compile(r".*\/data\s+(\S+)", re.DOTALL)
if pat.match(res):
_str = pat.match(res).group(1)
else:
pat = re.compile(r".*\s+(\S+)\s+\S+\s+\S+\s+\S+\s+\/data", re.DOTALL)
_str = pat.match(res).group(1)
if 'G' in _str:
_num = round(float(_str[:-1]))
elif 'M' in _str:
_num = round(float(_str[:-1]) / 1000.0)
else:
_num = round(float(_str) / 1000.0 / 1000.0)
if _num > 64:
res = '128G'
elif _num > 32:
res = '64G'
elif _num > 16:
res = '32G'
elif _num > 8:
res = '16G'
else:
res = '8G'
return res
def get_cpuinfo(self):
res = self.shell("cat /proc/cpuinfo").strip()
cpuNum = res.count("processor")
pat = re.compile(r'Hardware\s+:\s+(\w+.*)')
m = pat.match(res)
if not m:
pat = re.compile(r'Processor\s+:\s+(\w+.*)')
m = pat.match(res)
cpuName = m.group(1).replace('\r', '')
return dict(cpuNum=cpuNum, cpuName=cpuName)
def get_cpufreq(self):
res = self.shell("cat /sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq")
num = round(float(res) / 1000 / 1000, 1)
res = str(num) + 'GHz'
return res.strip()
def get_cpuabi(self):
res = self.shell("getprop ro.product.cpu.abi")
return res.strip()
def get_gpu(self):
res = self.shell("dumpsys SurfaceFlinger")
pat = re.compile(r'GLES:\s+(.*)')
m = pat.search(res)
if not m:
return None
_list = m.group(1).split(',')
gpuModel = ""
opengl = ""
if len(_list) > 0:
gpuModel = _list[1].strip()
if len(_list) > 1:
m2 = re.search(r'(\S+\s+\S+\s+\S+).*', _list[2])
if m2:
opengl = m2.group(1)
return dict(gpuModel=gpuModel, opengl=opengl)
def get_model(self):
return self.getprop("ro.product.model")
def get_manufacturer(self):
return self.getprop("ro.product.manufacturer")
def get_device_info(self):
"""
Get android device information, including: memory/storage/display/cpu/gpu/model/manufacturer...
Returns:
Dict of info
"""
handlers = {
"platform": "Android",
"serialno": self.serialno,
"memory": self.get_memory,
"storage": self.get_storage,
"display": self.getPhysicalDisplayInfo,
"cpuinfo": self.get_cpuinfo,
"cpufreq": self.get_cpufreq,
"cpuabi": self.get_cpuabi,
"sdkversion": self.sdk_version,
"gpu": self.get_gpu,
"model": self.get_model,
"manufacturer": self.get_manufacturer,
# "battery": getBatteryCapacity
}
ret = {}
for k, v in handlers.items():
if callable(v):
try:
value = v()
except Exception:
value = None
ret[k] = value
else:
ret[k] = v
return ret
def get_display_of_all_screen(self, info):
"""
Perform `adb shell dumpsys window windows` commands to get window display of application.
Args:
info: device screen properties
Returns:
None if adb command failed to run, otherwise return device screen properties
"""
output = self.shell("dumpsys window windows")
windows = output.split("Window #")
offsetx, offsety, x, y = info['width'], info['height'], 0, 0
package = self._search_for_current_package(output)
if package:
for w in windows:
if "package=%s" % package in w:
arr = re.findall(r'Frames: containing=\[(\d+\.?\d*),(\d+\.?\d*)]\[(\d+\.?\d*),(\d+\.?\d*)]', w)
if len(arr) >= 1 and len(arr[0]) == 4:
offsetx, offsety, x, y = float(arr[0][0]), float(arr[0][1]), float(arr[0][2]), float(arr[0][3])
if info["orientation"] in [1, 3]:
offsetx, offsety, x, y = offsety, offsetx, y, x
x, y = x - offsetx, y - offsety
return {
"offset_x": offsetx,
"offset_y": offsety,
"offset_width": x,
"offset_height": y
}
def _search_for_current_package(self, ret):
"""
Search for current app package name from the output of command "adb shell dumpsys window windows"
Returns:
package name if exists else ""
"""
try:
packageRE = re.compile('\s*mCurrentFocus=Window{.* ([A-Za-z0-9_.]+)/[A-Za-z0-9_.]+}')
m = packageRE.findall(ret)
if m:
return m[-1]
else:
return self.get_top_activity()[0]
except Exception as e:
print("[Error] Cannot get current top activity")
return ""
def cleanup_adb_forward():
for adb in ADB._instances:
adb._cleanup_forwards()
reg_cleanup(cleanup_adb_forward)
# if __name__ == "__main__":
欢迎小伙伴关注微信公众号ID:gameTesterGz
或关注我的CSDN:https://blog.csdn.net/qq_32557025
谢谢各位的关注、点赞!
标签:shell,return,package,self,ADB,UI,AirTest,device,adb 来源: https://blog.csdn.net/qq_32557025/article/details/121838284