You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
7.2 KiB
7.2 KiB
原始ADB客户端参考代码
说明: 这是项目重构前的原始ADB客户端实现代码,作为参考文档保存。 重构后的代码已迁移到
app/core/adb/目录下。
coding: utf-8
Refs adb SERVICES.TXT
https://github.com/aosp-mirror/platform_system_core/blob/master/adb/SERVICES.TXT
import os import subprocess from collections import namedtuple
import tornado.iostream from logzero import logger from tornado import gen from tornado.tcpclient import TCPClient
OKAY = "OKAY" FAIL = "FAIL"
DeviceItem = namedtuple("Device", ['serial', 'status']) DeviceEvent = namedtuple('DeviceEvent', ['present', 'serial', 'status']) ForwardItem = namedtuple("ForwardItem", ['serial', 'local', 'remote'])
class AdbError(Exception): """ adb error """
class AdbStreamConnection(tornado.iostream.IOStream): """ Example usgae: async with AdbStreamConnection(host, port) as c: c.send_cmd("host:kill") """
def __init__(self, host, port):
self.__host = host
self.__port = port
self.__stream = None
@property
def stream(self):
return self.__stream
async def send_cmd(self, cmd: str):
await self.stream.write("{:04x}{}".format(len(cmd),
cmd).encode('utf-8'))
async def read_bytes(self, num: int):
return (await self.stream.read_bytes(num)).decode()
async def read_string(self):
lenstr = await self.read_bytes(4)
msgsize = int(lenstr, 16)
return await self.read_string(msgsize)
async def check_okay(self):
data = await self.read_bytes(4)
if data == FAIL:
raise AdbError(await self.read_string())
elif data == OKAY:
return
else:
raise AdbError("Unknown data: %s" % data)
async def connect(self):
adb_host = self.__host or os.environ.get(
"ANDROID_ADB_SERVER_HOST", "127.0.0.1")
adb_port = self.__port or int(os.environ.get(
"ANDROID_ADB_SERVER_PORT", 5037))
stream = await TCPClient().connect(adb_host, adb_port)
self.__stream = stream
return self
async def __aenter__(self):
return await self.connect()
async def __aexit__(self, exc_type, exc, tb):
self.stream.close()
class AdbClient(object): def init(self): self._stream = None
def connect(self, host=None, port=None) -> AdbStreamConnection:
return AdbStreamConnection(host, port)
async def server_version(self) -> int:
async with self.connect() as c:
await c.send_cmd("host:version")
await c.check_okay()
return int(await c.read_string(), 16)
async def track_devices(self):
"""
yield DeviceEvent according to track-devices
Example:
async for event in track_devices():
print(event)
# output: DeviceEvent(present=True, serial='xxxx', status='device')
"""
orig_devices = []
while True:
try:
async for content in self._unsafe_track_devices():
curr_devices = self.output2devices(
content, limit_status=['device'])
for evt in self._diff_devices(orig_devices, curr_devices):
yield evt
orig_devices = curr_devices
except tornado.iostream.StreamClosedError:
# adb server maybe killed
for evt in self._diff_devices(orig_devices, []):
yield evt
orig_devices = []
sleep = 1.0
logger.info(
"adb connection is down, retry after %.1fs" % sleep)
await gen.sleep(sleep)
subprocess.run(['adb', 'start-server'])
version = await self.server_version()
logger.info("adb-server started, version: %d", version)
async def _unsafe_track_devices(self):
async with self.connect() as conn:
await conn.send_cmd("host:track-devices")
await conn.check_okay()
while True:
yield await conn.read_string()
def _diff_devices(self, orig_devices: list, curr_devices: list):
""" Return iter(DeviceEvent) """
for d in set(orig_devices).difference(curr_devices):
yield DeviceEvent(False, d.serial, d.status)
for d in set(curr_devices).difference(orig_devices):
yield DeviceEvent(True, d.serial, d.status)
def output2devices(self, output: str, limit_status=[]):
"""
Args:
outptu: str of adb devices output
Returns:
list of DeviceItem
"""
results = []
for line in output.splitlines():
fields = line.strip().split("\t", maxsplit=1)
if len(fields) != 2:
continue
serial, status = fields[0], fields[1]
if limit_status:
if status in limit_status:
results.append(DeviceItem(serial, status))
else:
results.append(DeviceItem(serial, status))
return results
async def shell(self, serial: str, command: str):
async with self.connect() as conn:
await conn.send_cmd("host:transport:"+serial)
await conn.check_okay()
await conn.send_cmd("shell:"+command)
await conn.check_okay()
output = await conn.stream.read_until_close()
return output.decode('utf-8')
async def forward_list(self):
async with self.connect() as conn:
# adb 1.0.40 not support host-local
await conn.send_cmd("host:list-forward")
await conn.check_okay()
content = await conn.read_string()
for line in content.splitlines():
parts = line.split()
if len(parts) != 3:
continue
yield ForwardItem(*parts)
async def forward_remove(self, local=None):
async with self.connect() as conn:
if local:
await conn.send_cmd("host:killforward:"+local)
else:
await conn.send_cmd("host:killforward-all")
await conn.check_okay()
async def forward(self, serial: str, local: str, remote: str, norebind=False):
"""
Args:
serial: device serial
local, remote (str): tcp:<port> | localabstract:<name>
norebind(bool): set to true will fail it when
there is already a forward connection from <local>
"""
async with self.connect() as conn:
cmds = ["host-serial", serial, "forward"]
if norebind:
cmds.append('norebind')
cmds.append(local+";"+remote)
await conn.send_cmd(":".join(cmds))
await conn.check_okay()
async def devices(self):
"""
Return:
list of devices
"""
async with self.connect() as conn:
await conn.send_cmd("host:devices")
await conn.check_okay()
content = await conn.read_string()
return self.output2devices(content)
adb = AdbClient()