--------- Co-authored-by: Amit Sharma <48654350+buddhhu@users.noreply.github.com> Co-authored-by: Aditya <me@xditya.me> Co-authored-by: Dark <darkbeamer.official@gmail.com> Co-authored-by: buddhhu <amitsharma123234@gmail.com> Co-authored-by: Kaif <88398455+kaif-00z@users.noreply.github.com> Co-authored-by: 1Danish-00 <danish@ultroid.tech> Co-authored-by: TechiError <techierror@gmail.com> Co-authored-by: Dark <59723913+DarkBeamerYT@users.noreply.github.com>
77 lines
2.1 KiB
Python
77 lines
2.1 KiB
Python
from asyncio import create_subprocess_exec, subprocess
|
|
|
|
|
|
class Terminal:
|
|
"""
|
|
Class for running terminal commands asynchronously.
|
|
|
|
Methods:
|
|
|
|
run(commands: str)
|
|
commands: Terminal Commands.
|
|
Returns Process id (int)
|
|
|
|
terminate(pid: int)
|
|
pid: Process id returned in `run` method.
|
|
Returns True if terminated else False (bool)
|
|
|
|
output(pid: int)
|
|
pid: Process id returned in `run` method.
|
|
Returns Output of process (str)
|
|
|
|
error(pid: int)
|
|
pid: Process id returned in `run` method.
|
|
Returns Error of process (str)
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._processes = {}
|
|
|
|
@staticmethod
|
|
def _to_str(data: bytes) -> str:
|
|
return data.decode("utf-8").strip()
|
|
|
|
async def run(self, *args) -> int:
|
|
process = await create_subprocess_exec(
|
|
*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
)
|
|
pid = process.pid
|
|
self._processes[pid] = process
|
|
return pid
|
|
|
|
def terminate(self, pid: int) -> bool:
|
|
try:
|
|
self._processes.pop(pid)
|
|
self._processes[pid].kill()
|
|
return True
|
|
except KeyError:
|
|
return False
|
|
|
|
async def output(self, pid: int) -> str:
|
|
output = []
|
|
while True:
|
|
out = self._to_str(await self._processes[pid].stdout.readline())
|
|
if not out:
|
|
break
|
|
output.append(out)
|
|
return "\n".join(output)
|
|
|
|
async def error(self, pid: int) -> str:
|
|
error = []
|
|
while True:
|
|
err = self._to_str(await self._processes[pid].stderr.readline())
|
|
if not err:
|
|
break
|
|
error.append(err)
|
|
return "\n".join(error)
|
|
|
|
@property
|
|
def _auto_remove_processes(self) -> None:
|
|
while self._processes:
|
|
for proc in self._processes.keys():
|
|
if proc.returncode is not None: # process is still running
|
|
try:
|
|
self._processes.pop(proc)
|
|
except KeyError:
|
|
pass
|